diff piecrust/baking/baker.py @ 1136:5f97b5b59dfe

bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 23 Apr 2018 21:47:49 -0700
parents 971b4d67e82a
children
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/baking/baker.py	Mon Apr 23 21:47:49 2018 -0700
@@ -6,11 +6,10 @@
     format_timed_scope, format_timed)
 from piecrust.environment import ExecutionStats
 from piecrust.pipelines.base import (
-    PipelineJobCreateContext, PipelineJobResultHandleContext,
-    PipelineJobValidateContext, PipelineManager,
+    PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager,
     get_pipeline_name_for_source)
 from piecrust.pipelines.records import (
-    MultiRecordHistory, MultiRecord, RecordEntry,
+    MultiRecordHistory, MultiRecord,
     load_records)
 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES
 
@@ -234,18 +233,12 @@
 
     def _bakeRealm(self, pool, ppmngr, record_histories,
                    pp_pass_num, realm, pplist):
-        # Start with the first step, where we iterate on the content sources'
-        # items and run jobs on those.
+        start_time = time.perf_counter()
+
+        job_count = 0
+        job_descs = {}
+        realm_name = REALM_NAMES[realm].lower()
         pool.userdata.cur_pass = pp_pass_num
-        pool.userdata.cur_step = 0
-        next_step_jobs = {}
-        pool.userdata.next_step_jobs = next_step_jobs
-
-        start_time = time.perf_counter()
-        job_count = 0
-        stats = self.app.env.stats
-        realm_name = REALM_NAMES[realm].lower()
-        participating_source_names = []
 
         for ppinfo in pplist:
             src = ppinfo.source
@@ -253,19 +246,19 @@
             jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name,
                                              record_histories)
 
-            next_step_jobs[src.name] = []
-            jobs = pp.createJobs(jcctx)
+            jobs, job_desc = pp.createJobs(jcctx)
             if jobs is not None:
                 new_job_count = len(jobs)
                 job_count += new_job_count
                 pool.queueJobs(jobs)
-                participating_source_names.append(src.name)
+                if job_desc:
+                    job_descs.setdefault(job_desc, []).append(src.name)
             else:
                 new_job_count = 0
 
             logger.debug(
                 "Queued %d jobs for source '%s' using pipeline '%s' "
-                "(%s, step 0)." %
+                "(%s)." %
                 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name))
 
         if job_count == 0:
@@ -276,50 +269,9 @@
 
         logger.info(format_timed(
             start_time, "%d jobs completed (%s)." %
-            (job_count, ', '.join(participating_source_names))))
-
-        # Now let's see if any job created a follow-up job. Let's keep
-        # processing those jobs as long as they create new ones.
-        pool.userdata.cur_step = 1
-        while True:
-            # Make a copy of out next step jobs and reset the list, so
-            # the first jobs to be processed don't mess it up as we're
-            # still iterating on it.
-            next_step_jobs = pool.userdata.next_step_jobs
-            pool.userdata.next_step_jobs = {}
-
-            start_time = time.perf_counter()
-            job_count = 0
-            participating_source_names = []
-
-            for sn, jobs in next_step_jobs.items():
-                if jobs:
-                    logger.debug(
-                        "Queuing jobs for source '%s' (%s, step %d)." %
-                        (sn, realm_name, pool.userdata.cur_step))
-
-                    pp = ppmngr.getPipeline(sn)
-                    valctx = PipelineJobValidateContext(
-                        pp_pass_num, pool.userdata.cur_step,
-                        pp.record_name, record_histories)
-                    pp.validateNextStepJobs(jobs, valctx)
-
-                    job_count += len(jobs)
-                    pool.userdata.next_step_jobs[sn] = []
-                    pool.queueJobs(jobs)
-                    participating_source_names.append(sn)
-
-            if job_count == 0:
-                break
-
-            pool.wait()
-
-            logger.info(format_timed(
-                start_time,
-                "%d jobs completed (%s)." %
-                (job_count, ', '.join(participating_source_names))))
-
-            pool.userdata.cur_step += 1
+            (job_count, ', '.join(
+                ['%s %s' % (d, ', '.join(sn))
+                 for d, sn in job_descs.items()]))))
 
     def _logErrors(self, item_spec, errors):
         logger.error("Errors found in %s:" % item_spec)
@@ -358,22 +310,13 @@
 
     def _handleWorkerResult(self, job, res, userdata):
         cur_pass = userdata.cur_pass
-        cur_step = userdata.cur_step
         source_name, item_spec = job['job_spec']
 
-        # See if there's a next step to take.
-        npj = res.get('next_step_job')
-        if npj is not None:
-            npj['pass_num'] = cur_pass
-            npj['step_num'] = cur_step + 1
-            userdata.next_step_jobs[source_name].append(npj)
-
         # Make the pipeline do custom handling to update the record entry.
         ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
         pipeline = ppinfo.pipeline
         record = ppinfo.current_record
-        ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass,
-                                                 cur_step)
+        ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass)
         pipeline.handleJobResult(res, ppmrctx)
 
         # Set the overall success flags if there was an error.
@@ -412,8 +355,6 @@
         self.ppmngr = ppmngr
         self.records = ppmngr.record_histories.current
         self.cur_pass = 0
-        self.cur_step = 0
-        self.next_step_jobs = {}
 
 
 def _get_pipeline_infos_by_pass_and_realm(pp_infos):