diff piecrust/baking/worker.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents bf65a1a6992a
children 2e5c5d33d62c
line wrap: on
line diff
--- a/piecrust/baking/worker.py	Fri Nov 03 23:14:56 2017 -0700
+++ b/piecrust/baking/worker.py	Sun Nov 19 14:29:17 2017 -0800
@@ -1,7 +1,7 @@
 import time
 import logging
 from piecrust.pipelines.base import (
-    PipelineManager, PipelineJobRunContext, PipelineJobResult,
+    PipelineManager, PipelineJobRunContext,
     get_pipeline_name_for_source)
 from piecrust.pipelines.records import (
     MultiRecordHistory, MultiRecord, load_records)
@@ -27,7 +27,8 @@
     def __init__(self, ctx):
         self.ctx = ctx
         self.app = None
-        self.record_histories = None
+        self.stats = None
+        self.previous_records = None
         self._work_start_time = time.perf_counter()
         self._sources = {}
         self._ppctx = None
@@ -44,22 +45,20 @@
         stats = app.env.stats
         stats.registerTimer("BakeWorker_%d_Total" % self.wid)
         stats.registerTimer("BakeWorkerInit")
-        self.timerScope = stats.timerScope
 
         self.app = app
+        self.stats = stats
 
         # Load previous record
         if self.ctx.previous_records_path:
             previous_records = load_records(self.ctx.previous_records_path)
         else:
             previous_records = MultiRecord()
-        current_records = MultiRecord()
-        self.record_histories = MultiRecordHistory(
-            previous_records, current_records)
+        self.previous_records = previous_records
 
         # Create the pipelines.
         self.ppmngr = PipelineManager(
-            app, self.ctx.out_dir, self.record_histories,
+            app, self.ctx.out_dir,
             worker_id=self.wid, force=self.ctx.force)
         ok_pp = self.ctx.allowed_pipelines
         nok_pp = self.ctx.forbidden_pipelines
@@ -78,24 +77,22 @@
         stats.stepTimerSince("BakeWorkerInit", self._work_start_time)
 
     def process(self, job):
-        item = job.content_item
-        logger.debug("Received job: %s@%s" % (job.source_name, item.spec))
-
-        ppinfo = self.ppmngr.getPipeline(job.source_name)
-        pp = ppinfo.pipeline
+        source_name, item_spec = job['job_spec']
+        logger.debug("Received job: %s@%s" % (source_name, item_spec))
 
-        with self.timerScope("PipelineJobs_%s" % pp.PIPELINE_NAME):
-            runctx = PipelineJobRunContext(job, pp.record_name,
-                                           self.record_histories)
+        # Run the job!
+        job_start = time.perf_counter()
+        pp = self.ppmngr.getPipeline(source_name)
+        runctx = PipelineJobRunContext(job, pp.record_name,
+                                       self.previous_records)
+        ppres = {
+            'item_spec': item_spec
+        }
+        pp.run(job, runctx, ppres)
 
-            ppres = PipelineJobResult()
-            # For subsequent pass jobs, there will be a record entry given.
-            # For first pass jobs, there's none so we get the pipeline to
-            # create it.
-            ppres.record_entry = job.data.get('record_entry')
-            if ppres.record_entry is None:
-                ppres.record_entry = pp.createRecordEntry(job, runctx)
-            pp.run(job, runctx, ppres)
+        # Log time spent in this pipeline.
+        self.stats.stepTimerSince("PipelineJobs_%s" % pp.PIPELINE_NAME,
+                                  job_start)
 
         return ppres