Mercurial > piecrust2
diff piecrust/pipelines/base.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | f070a4fc033c |
children | 448710d84121 |
line wrap: on
line diff
--- a/piecrust/pipelines/base.py Sun May 21 00:06:59 2017 -0700 +++ b/piecrust/pipelines/base.py Sun Jun 04 23:34:28 2017 -0700 @@ -1,5 +1,7 @@ import os.path import logging +from werkzeug.utils import cached_property +from piecrust.configuration import ConfigurationError logger = logging.getLogger(__name__) @@ -8,10 +10,9 @@ class PipelineContext: """ The context for running a content pipeline. """ - def __init__(self, out_dir, record_history, *, + def __init__(self, out_dir, *, worker_id=-1, force=None): self.out_dir = out_dir - self.record_history = record_history self.worker_id = worker_id self.force = force @@ -30,28 +31,73 @@ """ return self.worker_id < 0 - @property - def current_record(self): - return self.record_history.current + +class PipelineJob: + """ Base class for a pipline baking job. + """ + def __init__(self, pipeline, content_item): + self.source_name = pipeline.source.name + self.record_name = pipeline.record_name + self.content_item = content_item + self.data = {} -class PipelineResult: +class PipelineJobRunContext: + """ Context for running pipeline baking jobs. + """ + def __init__(self, job, pipeline, record_histories): + self.record_histories = record_histories + self._job_item_spec = job.content_item.spec + self._record_name = pipeline.record_name + + @cached_property + def previous_record(self): + return self.record_histories.getPreviousRecord(self._record_name) + + @cached_property + def previous_entry(self): + return self.previous_record.getEntry(self._job_item_spec) + + +class PipelineJobResult: """ Result of running a pipeline on a content item. """ def __init__(self): - self.pipeline_name = None self.record_entry = None + self.next_pass_job = None + + +class PipelineMergeRecordContext: + """ The context for merging a record entry for a second or higher pass + into the bake record. + """ + def __init__(self, record, job, pass_num): + self.record = record + self.job = job + self.pass_num = pass_num + + +class PipelineDeletionContext: + def __init__(self, record_history): + self.record_history = record_history + + +class PipelineCollapseRecordContext: + def __init__(self, record_history): + self.record_history = record_history class ContentPipeline: """ A pipeline that processes content from a `ContentSource`. """ PIPELINE_NAME = None - PIPELINE_PASSES = 1 RECORD_ENTRY_CLASS = None + PASS_NUM = 0 - def __init__(self, source): + def __init__(self, source, ctx): self.source = source + self.ctx = ctx + self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME) app = source.app tmp_dir = app.cache_dir @@ -64,10 +110,27 @@ def app(self): return self.source.app - def initialize(self, ctx): + def initialize(self): pass - def run(self, content_item, ctx, result): + def createJobs(self): + return [ + self.createJob(item) + for item in self.source.getAllContents()] + + def createJob(self, content_item): + return PipelineJob(self, content_item) + + def createRecordEntry(self, job): + entry_class = self.RECORD_ENTRY_CLASS + record_entry = entry_class() + record_entry.item_spec = job.content_item.spec + return record_entry + + def mergeRecordEntry(self, record_entry, ctx): + raise NotImplementedError() + + def run(self, job, ctx, result): raise NotImplementedError() def getDeletions(self, ctx): @@ -76,5 +139,100 @@ def collapseRecords(self, ctx): pass - def shutdown(self, ctx): + def shutdown(self): pass + + +def get_pipeline_name_for_source(source): + pname = source.config['pipeline'] + if not pname: + pname = source.DEFAULT_PIPELINE_NAME + if not pname: + raise ConfigurationError( + "Source '%s' doesn't specify any pipeline." % source.name) + return pname + + +class PipelineManager: + def __init__(self, app, out_dir, record_histories, *, + worker_id=-1, force=False): + self.app = app + self.record_histories = record_histories + self.out_dir = out_dir + self.worker_id = worker_id + self.force = force + + self._pipeline_classes = {} + for pclass in app.plugin_loader.getPipelines(): + self._pipeline_classes[pclass.PIPELINE_NAME] = pclass + + self._pipelines = {} + + def getPipeline(self, source_name): + return self._pipelines[source_name] + + def getPipelines(self): + return self._pipelines.values() + + def createPipeline(self, source): + if source.name in self._pipelines: + raise ValueError("Pipeline for source '%s' was already created." % + source.name) + + pname = get_pipeline_name_for_source(source) + ppctx = PipelineContext(self.out_dir, + worker_id=self.worker_id, force=self.force) + pp = self._pipeline_classes[pname](source, ppctx) + pp.initialize() + + record_history = self.record_histories.getHistory(pp.record_name) + + info = _PipelineInfo(pp, record_history) + self._pipelines[source.name] = info + return info + + def buildHistoryDiffs(self): + for ppinfo in self.getPipelines(): + ppinfo.record_history.build() + + def deleteStaleOutputs(self): + for ppinfo in self.getPipelines(): + ctx = PipelineDeletionContext(ppinfo.record_history) + to_delete = ppinfo.pipeline.getDeletions(ctx) + current_record = ppinfo.record_history.current + if to_delete is not None: + for path, reason in to_delete: + logger.debug("Removing '%s': %s" % (path, reason)) + current_record.deleted_out_paths.append(path) + try: + os.remove(path) + except FileNotFoundError: + pass + logger.info('[delete] %s' % path) + + def collapseRecords(self): + for ppinfo in self.getPipelines(): + ctx = PipelineCollapseRecordContext(ppinfo.record_history) + ppinfo.pipeline.collapseRecords(ctx) + + def shutdownPipelines(self): + for ppinfo in self.getPipelines(): + ppinfo.pipeline.shutdown() + + self._pipelines = {} + + +class _PipelineInfo: + def __init__(self, pipeline, record_history): + self.pipeline = pipeline + self.record_history = record_history + self.userdata = None + + @property + def source(self): + return self.pipeline.source + + @property + def pipeline_name(self): + return self.pipeline.PIPELINE_NAME +