diff piecrust/pipelines/base.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents d6d35b2efd04
children fa489c5e829e
line wrap: on
line diff
--- a/piecrust/pipelines/base.py	Fri Nov 03 23:14:56 2017 -0700
+++ b/piecrust/pipelines/base.py	Sun Nov 19 14:29:17 2017 -0800
@@ -2,6 +2,7 @@
 import logging
 from werkzeug.utils import cached_property
 from piecrust.configuration import ConfigurationError
+from piecrust.sources.base import ContentItem
 
 
 logger = logging.getLogger(__name__)
@@ -32,69 +33,87 @@
         return self.worker_id < 0
 
 
-class PipelineJob:
-    """ Base class for a pipline baking job.
-    """
-    def __init__(self, pipeline, content_item):
-        self.source_name = pipeline.source.name
-        self.record_name = pipeline.record_name
-        self.content_item = content_item
-        self.step_num = 0
-        self.data = {}
+class _PipelineMasterProcessJobContextBase:
+    def __init__(self, record_name, record_histories):
+        self.record_name = record_name
+        self.record_histories = record_histories
+
+    @property
+    def previous_record(self):
+        return self.record_histories.getPreviousRecord(self.record_name)
+
+    @property
+    def current_record(self):
+        return self.record_histories.getCurrentRecord(self.record_name)
 
 
-class PipelineJobCreateContext:
-    """ Context for create pipeline baking jobs.
+class PipelineJobCreateContext(_PipelineMasterProcessJobContextBase):
+    """ Context for creating pipeline baking jobs.
+
+        This is run on the master process, so it can access both the
+        previous and current records.
     """
-    def __init__(self, step_num, record_histories):
+    def __init__(self, pass_num, record_name, record_histories):
+        super().__init__(record_name, record_histories)
+        self.pass_num = pass_num
+
+
+class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase):
+    """ Context for validating jobs on subsequent step runs (i.e. validating
+        the list of jobs to run starting with the second step).
+
+        This is run on the master process, so it can access both the
+        previous and current records.
+    """
+    def __init__(self, pass_num, step_num, record_name, record_histories):
+        super().__init__(record_name, record_histories)
+        self.pass_num = pass_num
         self.step_num = step_num
-        self.record_histories = record_histories
 
 
 class PipelineJobRunContext:
     """ Context for running pipeline baking jobs.
+
+        This is run on the worker processes, so it can only access the
+        previous records.
     """
-    def __init__(self, job, record_name, record_histories):
+    def __init__(self, job, record_name, previous_records):
         self.job = job
         self.record_name = record_name
-        self.record_histories = record_histories
+        self.previous_records = previous_records
 
-    @property
-    def content_item(self):
-        return self.job.content_item
+    @cached_property
+    def record_entry_spec(self):
+        return self.job.get('record_entry_spec',
+                            self.job['job_spec'][1])
 
     @cached_property
     def previous_record(self):
-        return self.record_histories.getPreviousRecord(self.record_name)
-
-    @cached_property
-    def record_entry_spec(self):
-        content_item = self.content_item
-        return content_item.metadata.get('record_entry_spec',
-                                         content_item.spec)
+        return self.previous_records.getRecord(self.record_name)
 
     @cached_property
     def previous_entry(self):
         return self.previous_record.getEntry(self.record_entry_spec)
 
 
-class PipelineJobResult:
-    """ Result of running a pipeline on a content item.
-    """
-    def __init__(self):
-        self.record_entry = None
-        self.next_step_job = None
+class PipelineJobResultHandleContext:
+    """ The context for handling the result from a job that ran in a
+        worker process.
 
-
-class PipelineMergeRecordContext:
-    """ The context for merging a record entry for a second or higher pass
-        into the bake record.
+        This is run on the master process, so it can access the current
+        record.
     """
     def __init__(self, record, job, step_num):
         self.record = record
         self.job = job
         self.step_num = step_num
 
+    @cached_property
+    def record_entry(self):
+        record_entry_spec = self.job.get('record_entry_spec',
+                                         self.job['job_spec'][1])
+        return self.record.getEntry(record_entry_spec)
+
 
 class PipelinePostJobRunContext:
     def __init__(self, record_history):
@@ -137,23 +156,26 @@
     def initialize(self):
         pass
 
+    def loadAllContents(self):
+        return None
+
     def createJobs(self, ctx):
         return [
-            self.createJob(item)
+            create_job(self, item.spec)
             for item in self.source.getAllContents()]
 
-    def createJob(self, content_item):
-        return PipelineJob(self, content_item)
-
-    def createRecordEntry(self, job, ctx):
+    def createRecordEntry(self, item_spec):
         entry_class = self.RECORD_ENTRY_CLASS
         record_entry = entry_class()
-        record_entry.item_spec = ctx.record_entry_spec
+        record_entry.item_spec = item_spec
         return record_entry
 
-    def mergeRecordEntry(self, record_entry, ctx):
+    def handleJobResult(self, result, ctx):
         raise NotImplementedError()
 
+    def validateNextStepJobs(self, jobs, ctx):
+        pass
+
     def run(self, job, ctx, result):
         raise NotImplementedError()
 
@@ -170,6 +192,18 @@
         pass
 
 
+def create_job(pipeline, item_spec, **kwargs):
+    job = {
+        'job_spec': (pipeline.source.name, item_spec)
+    }
+    job.update(kwargs)
+    return job
+
+
+def content_item_from_job(pipeline, job):
+    return pipeline.source.findContentFromSpec(job['job_spec'][1])
+
+
 def get_record_name_for_source(source):
     ppname = get_pipeline_name_for_source(source)
     return '%s@%s' % (source.name, ppname)
@@ -186,8 +220,8 @@
 
 
 class PipelineManager:
-    def __init__(self, app, out_dir, record_histories, *,
-                 worker_id=-1, force=False):
+    def __init__(self, app, out_dir, *,
+                 record_histories=None, worker_id=-1, force=False):
         self.app = app
         self.record_histories = record_histories
         self.out_dir = out_dir
@@ -201,9 +235,12 @@
         self._pipelines = {}
 
     def getPipeline(self, source_name):
+        return self.getPipelineInfo(source_name).pipeline
+
+    def getPipelineInfo(self, source_name):
         return self._pipelines[source_name]
 
-    def getPipelines(self):
+    def getPipelineInfos(self):
         return self._pipelines.values()
 
     def createPipeline(self, source):
@@ -217,22 +254,24 @@
         pp = self._pipeline_classes[pname](source, ppctx)
         pp.initialize()
 
-        record_history = self.record_histories.getHistory(pp.record_name)
+        record_history = None
+        if self.record_histories:
+            record_history = self.record_histories.getHistory(pp.record_name)
 
         info = _PipelineInfo(pp, record_history)
         self._pipelines[source.name] = info
         return info
 
     def postJobRun(self):
-        for ppinfo in self.getPipelines():
+        for ppinfo in self.getPipelineInfos():
             ppinfo.record_history.build()
 
-        for ppinfo in self.getPipelines():
+        for ppinfo in self.getPipelineInfos():
             ctx = PipelinePostJobRunContext(ppinfo.record_history)
             ppinfo.pipeline.postJobRun(ctx)
 
     def deleteStaleOutputs(self):
-        for ppinfo in self.getPipelines():
+        for ppinfo in self.getPipelineInfos():
             ctx = PipelineDeletionContext(ppinfo.record_history)
             to_delete = ppinfo.pipeline.getDeletions(ctx)
             current_record = ppinfo.record_history.current
@@ -247,12 +286,12 @@
                     logger.info('[delete] %s' % path)
 
     def collapseRecords(self):
-        for ppinfo in self.getPipelines():
+        for ppinfo in self.getPipelineInfos():
             ctx = PipelineCollapseRecordContext(ppinfo.record_history)
             ppinfo.pipeline.collapseRecords(ctx)
 
     def shutdownPipelines(self):
-        for ppinfo in self.getPipelines():
+        for ppinfo in self.getPipelineInfos():
             ppinfo.pipeline.shutdown()
 
         self._pipelines = {}
@@ -269,6 +308,12 @@
         return self.pipeline.source
 
     @property
+    def current_record(self):
+        if self.record_history is not None:
+            return self.record_history.current
+        raise Exception("The current record is not available.")
+
+    @property
     def pipeline_name(self):
         return self.pipeline.PIPELINE_NAME