diff piecrust/baking/baker.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents 45ad976712ec
children 09dc0240f08a
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Fri Nov 03 23:14:56 2017 -0700
+++ b/piecrust/baking/baker.py	Sun Nov 19 14:29:17 2017 -0800
@@ -6,7 +6,8 @@
     format_timed_scope, format_timed)
 from piecrust.environment import ExecutionStats
 from piecrust.pipelines.base import (
-    PipelineJobCreateContext, PipelineMergeRecordContext, PipelineManager,
+    PipelineJobCreateContext, PipelineJobResultHandleContext,
+    PipelineJobValidateContext, PipelineManager,
     get_pipeline_name_for_source)
 from piecrust.pipelines.records import (
     MultiRecordHistory, MultiRecord, RecordEntry,
@@ -42,6 +43,8 @@
 
     def bake(self):
         start_time = time.perf_counter()
+
+        # Setup baker.
         logger.debug("  Bake Output: %s" % self.out_dir)
         logger.debug("  Root URL: %s" % self.app.config.get('site/root'))
 
@@ -50,7 +53,9 @@
         self.app.config.set('site/asset_url_format', '%page_uri%/%filename%')
 
         stats = self.app.env.stats
-        stats.registerTimer('WorkerTaskPut')
+        stats.registerTimer('LoadSourceContents', raise_if_registered=False)
+        stats.registerTimer('MasterTaskPut_1', raise_if_registered=False)
+        stats.registerTimer('MasterTaskPut_2+', raise_if_registered=False)
 
         # Make sure the output directory exists.
         if not os.path.isdir(self.out_dir):
@@ -90,6 +95,12 @@
         pool_userdata = _PoolUserData(self, ppmngr)
         pool = self._createWorkerPool(records_path, pool_userdata)
 
+        # Done with all the setup, let's start the actual work.
+        logger.info(format_timed(start_time, "setup baker"))
+
+        # Load all sources.
+        self._loadSources(ppmngr)
+
         # Bake the realms.
         self._bakeRealms(pool, ppmngr, record_histories)
 
@@ -149,8 +160,9 @@
             self.force = True
             current_records.incremental_count = 0
             previous_records = MultiRecord()
-            logger.info(format_timed(
-                start_time, "cleaned cache (reason: %s)" % reason))
+            logger.debug(format_timed(
+                start_time, "cleaned cache (reason: %s)" % reason,
+                colored=False))
             return False
         else:
             current_records.incremental_count += 1
@@ -167,7 +179,8 @@
         # Also, create and initialize each pipeline for each source.
         has_any_pp = False
         ppmngr = PipelineManager(
-            self.app, self.out_dir, record_histories)
+            self.app, self.out_dir,
+            record_histories=record_histories)
         ok_pp = self.allowed_pipelines
         nok_pp = self.forbidden_pipelines
         ok_src = self.allowed_sources
@@ -192,13 +205,28 @@
                             "out. There's nothing to do.")
         return ppmngr
 
+    def _loadSources(self, ppmngr):
+        start_time = time.perf_counter()
+
+        for ppinfo in ppmngr.getPipelineInfos():
+            rec = ppinfo.record_history.current
+            rec_entries = ppinfo.pipeline.loadAllContents()
+            if rec_entries is not None:
+                for e in rec_entries:
+                    rec.addEntry(e)
+
+        stats = self.app.env.stats
+        stats.stepTimer('LoadSourceContents',
+                        time.perf_counter() - start_time)
+        logger.info(format_timed(start_time, "loaded site content"))
+
     def _bakeRealms(self, pool, ppmngr, record_histories):
         # Bake the realms -- user first, theme second, so that a user item
         # can override a theme item.
         # Do this for as many times as we have pipeline passes left to do.
         realm_list = [REALM_USER, REALM_THEME]
         pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm(
-            ppmngr.getPipelines())
+            ppmngr.getPipelineInfos())
 
         for pp_pass_num in sorted(pp_by_pass_and_realm.keys()):
             logger.debug("Pipelines pass %d" % pp_pass_num)
@@ -206,10 +234,11 @@
             for realm in realm_list:
                 pplist = pp_by_realm.get(realm)
                 if pplist is not None:
-                    self._bakeRealm(
-                        pool, record_histories, pp_pass_num, realm, pplist)
+                    self._bakeRealm(pool, ppmngr, record_histories,
+                                    pp_pass_num, realm, pplist)
 
-    def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist):
+    def _bakeRealm(self, pool, ppmngr, record_histories,
+                   pp_pass_num, realm, pplist):
         # Start with the first step, where we iterate on the content sources'
         # items and run jobs on those.
         pool.userdata.cur_step = 0
@@ -218,15 +247,16 @@
 
         start_time = time.perf_counter()
         job_count = 0
+        stats = self.app.env.stats
         realm_name = REALM_NAMES[realm].lower()
-        stats = self.app.env.stats
 
         for ppinfo in pplist:
             src = ppinfo.source
             pp = ppinfo.pipeline
+            jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name,
+                                             record_histories)
 
             next_step_jobs[src.name] = []
-            jcctx = PipelineJobCreateContext(pp_pass_num, record_histories)
             jobs = pp.createJobs(jcctx)
             if jobs is not None:
                 new_job_count = len(jobs)
@@ -240,7 +270,7 @@
                 "(%s, step 0)." %
                 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name))
 
-        stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time)
+        stats.stepTimer('MasterTaskPut_1', time.perf_counter() - start_time)
 
         if job_count == 0:
             logger.debug("No jobs queued! Bailing out of this bake pass.")
@@ -270,11 +300,18 @@
                     logger.debug(
                         "Queuing jobs for source '%s' (%s, step %d)." %
                         (sn, realm_name, pool.userdata.cur_step))
+
+                    pp = ppmngr.getPipeline(sn)
+                    valctx = PipelineJobValidateContext(
+                        pp_pass_num, pool.userdata.cur_step,
+                        pp.record_name, record_histories)
+                    pp.validateNextStepJobs(jobs, valctx)
+
                     job_count += len(jobs)
                     pool.userdata.next_step_jobs[sn] = []
                     pool.queueJobs(jobs)
 
-            stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time)
+            stats.stepTimer('MasterTaskPut_2+', time.perf_counter() - start_time)
 
             if job_count == 0:
                 break
@@ -293,6 +330,12 @@
         for e in errors:
             logger.error("  " + e)
 
+    def _logWorkerException(self, item_spec, exc_data):
+        logger.error("Errors found in %s:" % item_spec)
+        logger.error(exc_data['value'])
+        if self.app.debug:
+            logger.error(exc_data['traceback'])
+
     def _createWorkerPool(self, previous_records_path, pool_userdata):
         from piecrust.workerpool import WorkerPool
         from piecrust.baking.worker import BakeWorkerContext, BakeWorker
@@ -319,47 +362,45 @@
 
     def _handleWorkerResult(self, job, res, userdata):
         cur_step = userdata.cur_step
-        record = userdata.records.getRecord(job.record_name)
+        source_name, item_spec = job['job_spec']
+
+        # See if there's a next step to take.
+        npj = res.get('next_step_job')
+        if npj is not None:
+            npj['step_num'] = cur_step + 1
+            userdata.next_step_jobs[source_name].append(npj)
 
-        if cur_step == 0:
-            record.addEntry(res.record_entry)
-        else:
-            ppinfo = userdata.ppmngr.getPipeline(job.source_name)
-            ppmrctx = PipelineMergeRecordContext(record, job, cur_step)
-            ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx)
+        # Make the pipeline do custom handling to update the record entry.
+        ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
+        pipeline = ppinfo.pipeline
+        record = ppinfo.current_record
+        ppmrctx = PipelineJobResultHandleContext(record, job, cur_step)
+        pipeline.handleJobResult(res, ppmrctx)
 
-        npj = res.next_step_job
-        if npj is not None:
-            npj.step_num = cur_step + 1
-            userdata.next_step_jobs[job.source_name].append(npj)
-
-        if not res.record_entry.success:
+        # Set the overall success flags if there was an error.
+        record_entry = ppmrctx.record_entry
+        if not record_entry.success:
             record.success = False
             userdata.records.success = False
-            self._logErrors(job.content_item.spec, res.record_entry.errors)
+            self._logErrors(job['item_spec'], record_entry.errors)
 
     def _handleWorkerError(self, job, exc_data, userdata):
-        cur_step = userdata.cur_step
-        record = userdata.records.getRecord(job.record_name)
-
-        record_entry_spec = job.content_item.metadata.get(
-            'record_entry_spec', job.content_item.spec)
-
-        if cur_step == 0:
-            ppinfo = userdata.ppmngr.getPipeline(job.source_name)
-            entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry
-            e = entry_class()
-            e.item_spec = record_entry_spec
-            e.errors.append(str(exc_data))
-            record.addEntry(e)
-        else:
-            e = record.getEntry(record_entry_spec)
-            e.errors.append(str(exc_data))
-
+        # Set the overall success flag.
+        source_name, item_spec = job['job_spec']
+        ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
+        pipeline = ppinfo.pipeline
+        record = ppinfo.current_record
         record.success = False
         userdata.records.success = False
 
-        self._logErrors(job.content_item.spec, e.errors)
+        # Add those errors to the record, if possible.
+        record_entry_spec = job.get('record_entry_spec', item_spec)
+        e = record.getEntry(record_entry_spec)
+        if e:
+            e.errors.append(exc_data['value'])
+            self._logWorkerException(item_spec, exc_data)
+
+        # Log debug stuff.
         if self.app.debug:
             logger.error(exc_data.traceback)