diff piecrust/pipelines/base.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents f070a4fc033c
children 448710d84121
line wrap: on
line diff
--- a/piecrust/pipelines/base.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/base.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,5 +1,7 @@
 import os.path
 import logging
+from werkzeug.utils import cached_property
+from piecrust.configuration import ConfigurationError
 
 
 logger = logging.getLogger(__name__)
@@ -8,10 +10,9 @@
 class PipelineContext:
     """ The context for running a content pipeline.
     """
-    def __init__(self, out_dir, record_history, *,
+    def __init__(self, out_dir, *,
                  worker_id=-1, force=None):
         self.out_dir = out_dir
-        self.record_history = record_history
         self.worker_id = worker_id
         self.force = force
 
@@ -30,28 +31,73 @@
         """
         return self.worker_id < 0
 
-    @property
-    def current_record(self):
-        return self.record_history.current
+
+class PipelineJob:
+    """ Base class for a pipline baking job.
+    """
+    def __init__(self, pipeline, content_item):
+        self.source_name = pipeline.source.name
+        self.record_name = pipeline.record_name
+        self.content_item = content_item
+        self.data = {}
 
 
-class PipelineResult:
+class PipelineJobRunContext:
+    """ Context for running pipeline baking jobs.
+    """
+    def __init__(self, job, pipeline, record_histories):
+        self.record_histories = record_histories
+        self._job_item_spec = job.content_item.spec
+        self._record_name = pipeline.record_name
+
+    @cached_property
+    def previous_record(self):
+        return self.record_histories.getPreviousRecord(self._record_name)
+
+    @cached_property
+    def previous_entry(self):
+        return self.previous_record.getEntry(self._job_item_spec)
+
+
+class PipelineJobResult:
     """ Result of running a pipeline on a content item.
     """
     def __init__(self):
-        self.pipeline_name = None
         self.record_entry = None
+        self.next_pass_job = None
+
+
+class PipelineMergeRecordContext:
+    """ The context for merging a record entry for a second or higher pass
+        into the bake record.
+    """
+    def __init__(self, record, job, pass_num):
+        self.record = record
+        self.job = job
+        self.pass_num = pass_num
+
+
+class PipelineDeletionContext:
+    def __init__(self, record_history):
+        self.record_history = record_history
+
+
+class PipelineCollapseRecordContext:
+    def __init__(self, record_history):
+        self.record_history = record_history
 
 
 class ContentPipeline:
     """ A pipeline that processes content from a `ContentSource`.
     """
     PIPELINE_NAME = None
-    PIPELINE_PASSES = 1
     RECORD_ENTRY_CLASS = None
+    PASS_NUM = 0
 
-    def __init__(self, source):
+    def __init__(self, source, ctx):
         self.source = source
+        self.ctx = ctx
+        self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME)
 
         app = source.app
         tmp_dir = app.cache_dir
@@ -64,10 +110,27 @@
     def app(self):
         return self.source.app
 
-    def initialize(self, ctx):
+    def initialize(self):
         pass
 
-    def run(self, content_item, ctx, result):
+    def createJobs(self):
+        return [
+            self.createJob(item)
+            for item in self.source.getAllContents()]
+
+    def createJob(self, content_item):
+        return PipelineJob(self, content_item)
+
+    def createRecordEntry(self, job):
+        entry_class = self.RECORD_ENTRY_CLASS
+        record_entry = entry_class()
+        record_entry.item_spec = job.content_item.spec
+        return record_entry
+
+    def mergeRecordEntry(self, record_entry, ctx):
+        raise NotImplementedError()
+
+    def run(self, job, ctx, result):
         raise NotImplementedError()
 
     def getDeletions(self, ctx):
@@ -76,5 +139,100 @@
     def collapseRecords(self, ctx):
         pass
 
-    def shutdown(self, ctx):
+    def shutdown(self):
         pass
+
+
+def get_pipeline_name_for_source(source):
+    pname = source.config['pipeline']
+    if not pname:
+        pname = source.DEFAULT_PIPELINE_NAME
+    if not pname:
+        raise ConfigurationError(
+            "Source '%s' doesn't specify any pipeline." % source.name)
+    return pname
+
+
+class PipelineManager:
+    def __init__(self, app, out_dir, record_histories, *,
+                 worker_id=-1, force=False):
+        self.app = app
+        self.record_histories = record_histories
+        self.out_dir = out_dir
+        self.worker_id = worker_id
+        self.force = force
+
+        self._pipeline_classes = {}
+        for pclass in app.plugin_loader.getPipelines():
+            self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
+
+        self._pipelines = {}
+
+    def getPipeline(self, source_name):
+        return self._pipelines[source_name]
+
+    def getPipelines(self):
+        return self._pipelines.values()
+
+    def createPipeline(self, source):
+        if source.name in self._pipelines:
+            raise ValueError("Pipeline for source '%s' was already created." %
+                             source.name)
+
+        pname = get_pipeline_name_for_source(source)
+        ppctx = PipelineContext(self.out_dir,
+                                worker_id=self.worker_id, force=self.force)
+        pp = self._pipeline_classes[pname](source, ppctx)
+        pp.initialize()
+
+        record_history = self.record_histories.getHistory(pp.record_name)
+
+        info = _PipelineInfo(pp, record_history)
+        self._pipelines[source.name] = info
+        return info
+
+    def buildHistoryDiffs(self):
+        for ppinfo in self.getPipelines():
+            ppinfo.record_history.build()
+
+    def deleteStaleOutputs(self):
+        for ppinfo in self.getPipelines():
+            ctx = PipelineDeletionContext(ppinfo.record_history)
+            to_delete = ppinfo.pipeline.getDeletions(ctx)
+            current_record = ppinfo.record_history.current
+            if to_delete is not None:
+                for path, reason in to_delete:
+                    logger.debug("Removing '%s': %s" % (path, reason))
+                    current_record.deleted_out_paths.append(path)
+                    try:
+                        os.remove(path)
+                    except FileNotFoundError:
+                        pass
+                    logger.info('[delete] %s' % path)
+
+    def collapseRecords(self):
+        for ppinfo in self.getPipelines():
+            ctx = PipelineCollapseRecordContext(ppinfo.record_history)
+            ppinfo.pipeline.collapseRecords(ctx)
+
+    def shutdownPipelines(self):
+        for ppinfo in self.getPipelines():
+            ppinfo.pipeline.shutdown()
+
+        self._pipelines = {}
+
+
+class _PipelineInfo:
+    def __init__(self, pipeline, record_history):
+        self.pipeline = pipeline
+        self.record_history = record_history
+        self.userdata = None
+
+    @property
+    def source(self):
+        return self.pipeline.source
+
+    @property
+    def pipeline_name(self):
+        return self.pipeline.PIPELINE_NAME
+