changeset 1136:5f97b5b59dfe

bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 23 Apr 2018 21:47:49 -0700
parents 6350ee084273
children 10fd55b9ccfb
files piecrust/baking/baker.py piecrust/pipelines/_pagebaker.py piecrust/pipelines/_pagerecords.py piecrust/pipelines/base.py piecrust/pipelines/page.py piecrust/sources/blogarchives.py piecrust/sources/taxonomy.py
diffstat 7 files changed, 179 insertions(+), 286 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/baking/baker.py	Mon Apr 23 21:47:49 2018 -0700
@@ -6,11 +6,10 @@
     format_timed_scope, format_timed)
 from piecrust.environment import ExecutionStats
 from piecrust.pipelines.base import (
-    PipelineJobCreateContext, PipelineJobResultHandleContext,
-    PipelineJobValidateContext, PipelineManager,
+    PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager,
     get_pipeline_name_for_source)
 from piecrust.pipelines.records import (
-    MultiRecordHistory, MultiRecord, RecordEntry,
+    MultiRecordHistory, MultiRecord,
     load_records)
 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES
 
@@ -234,18 +233,12 @@
 
     def _bakeRealm(self, pool, ppmngr, record_histories,
                    pp_pass_num, realm, pplist):
-        # Start with the first step, where we iterate on the content sources'
-        # items and run jobs on those.
+        start_time = time.perf_counter()
+
+        job_count = 0
+        job_descs = {}
+        realm_name = REALM_NAMES[realm].lower()
         pool.userdata.cur_pass = pp_pass_num
-        pool.userdata.cur_step = 0
-        next_step_jobs = {}
-        pool.userdata.next_step_jobs = next_step_jobs
-
-        start_time = time.perf_counter()
-        job_count = 0
-        stats = self.app.env.stats
-        realm_name = REALM_NAMES[realm].lower()
-        participating_source_names = []
 
         for ppinfo in pplist:
             src = ppinfo.source
@@ -253,19 +246,19 @@
             jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name,
                                              record_histories)
 
-            next_step_jobs[src.name] = []
-            jobs = pp.createJobs(jcctx)
+            jobs, job_desc = pp.createJobs(jcctx)
             if jobs is not None:
                 new_job_count = len(jobs)
                 job_count += new_job_count
                 pool.queueJobs(jobs)
-                participating_source_names.append(src.name)
+                if job_desc:
+                    job_descs.setdefault(job_desc, []).append(src.name)
             else:
                 new_job_count = 0
 
             logger.debug(
                 "Queued %d jobs for source '%s' using pipeline '%s' "
-                "(%s, step 0)." %
+                "(%s)." %
                 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name))
 
         if job_count == 0:
@@ -276,50 +269,9 @@
 
         logger.info(format_timed(
             start_time, "%d jobs completed (%s)." %
-            (job_count, ', '.join(participating_source_names))))
-
-        # Now let's see if any job created a follow-up job. Let's keep
-        # processing those jobs as long as they create new ones.
-        pool.userdata.cur_step = 1
-        while True:
-            # Make a copy of out next step jobs and reset the list, so
-            # the first jobs to be processed don't mess it up as we're
-            # still iterating on it.
-            next_step_jobs = pool.userdata.next_step_jobs
-            pool.userdata.next_step_jobs = {}
-
-            start_time = time.perf_counter()
-            job_count = 0
-            participating_source_names = []
-
-            for sn, jobs in next_step_jobs.items():
-                if jobs:
-                    logger.debug(
-                        "Queuing jobs for source '%s' (%s, step %d)." %
-                        (sn, realm_name, pool.userdata.cur_step))
-
-                    pp = ppmngr.getPipeline(sn)
-                    valctx = PipelineJobValidateContext(
-                        pp_pass_num, pool.userdata.cur_step,
-                        pp.record_name, record_histories)
-                    pp.validateNextStepJobs(jobs, valctx)
-
-                    job_count += len(jobs)
-                    pool.userdata.next_step_jobs[sn] = []
-                    pool.queueJobs(jobs)
-                    participating_source_names.append(sn)
-
-            if job_count == 0:
-                break
-
-            pool.wait()
-
-            logger.info(format_timed(
-                start_time,
-                "%d jobs completed (%s)." %
-                (job_count, ', '.join(participating_source_names))))
-
-            pool.userdata.cur_step += 1
+            (job_count, ', '.join(
+                ['%s %s' % (d, ', '.join(sn))
+                 for d, sn in job_descs.items()]))))
 
     def _logErrors(self, item_spec, errors):
         logger.error("Errors found in %s:" % item_spec)
@@ -358,22 +310,13 @@
 
     def _handleWorkerResult(self, job, res, userdata):
         cur_pass = userdata.cur_pass
-        cur_step = userdata.cur_step
         source_name, item_spec = job['job_spec']
 
-        # See if there's a next step to take.
-        npj = res.get('next_step_job')
-        if npj is not None:
-            npj['pass_num'] = cur_pass
-            npj['step_num'] = cur_step + 1
-            userdata.next_step_jobs[source_name].append(npj)
-
         # Make the pipeline do custom handling to update the record entry.
         ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
         pipeline = ppinfo.pipeline
         record = ppinfo.current_record
-        ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass,
-                                                 cur_step)
+        ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass)
         pipeline.handleJobResult(res, ppmrctx)
 
         # Set the overall success flags if there was an error.
@@ -412,8 +355,6 @@
         self.ppmngr = ppmngr
         self.records = ppmngr.record_histories.current
         self.cur_pass = 0
-        self.cur_step = 0
-        self.next_step_jobs = {}
 
 
 def _get_pipeline_infos_by_pass_and_realm(pp_infos):
--- a/piecrust/pipelines/_pagebaker.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/pipelines/_pagebaker.py	Mon Apr 23 21:47:49 2018 -0700
@@ -250,7 +250,7 @@
     if not prev_sub_entry:
         # No previous record, so most probably was never baked. Bake it.
         cur_sub_entry['flags'] |= \
-            SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS
+            SubPageFlags.FLAG_FORCED_BY_NO_RECORD
         return STATUS_BAKE
 
     return STATUS_CLEAN
--- a/piecrust/pipelines/_pagerecords.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/pipelines/_pagerecords.py	Mon Apr 23 21:47:49 2018 -0700
@@ -1,4 +1,3 @@
-import copy
 from piecrust.pipelines.records import RecordEntry, get_flag_descriptions
 
 
@@ -7,10 +6,11 @@
     FLAG_BAKED = 2**0
     FLAG_FORCED_BY_SOURCE = 2**1
     FLAG_FORCED_BY_NO_PREVIOUS = 2**2
-    FLAG_FORCED_BY_PREVIOUS_ERRORS = 2**3
-    FLAG_FORCED_BY_GENERAL_FORCE = 2**4
-    FLAG_RENDER_CACHE_INVALIDATED = 2**5
-    FLAG_COLLAPSED_FROM_LAST_RUN = 2**6
+    FLAG_FORCED_BY_NO_RECORD = 2**3
+    FLAG_FORCED_BY_PREVIOUS_ERRORS = 2**4
+    FLAG_FORCED_BY_GENERAL_FORCE = 2**5
+    FLAG_RENDER_CACHE_INVALIDATED = 2**6
+    FLAG_COLLAPSED_FROM_LAST_RUN = 2**7
 
 
 def create_subpage_job_result(out_uri, out_path):
@@ -23,23 +23,10 @@
     }
 
 
-def was_subpage_clean(sub):
-    return ((sub['flags'] & SubPageFlags.FLAG_BAKED) == 0 and
-            len(sub['errors']) == 0)
-
-
-def was_subpage_baked(sub):
-    return (sub['flags'] & SubPageFlags.FLAG_BAKED) != 0
-
-
-def was_subpage_baked_successfully(sub):
-    return was_subpage_baked(sub) and len(sub['errors']) == 0
-
-
 class PagePipelineRecordEntry(RecordEntry):
     FLAG_NONE = 0
-    FLAG_NEW = 2**0
-    FLAG_SOURCE_MODIFIED = 2**1
+    FLAG_SOURCE_MODIFIED = 2**0
+    FLAG_SEGMENTS_RENDERED = 2**1
     FLAG_OVERRIDEN = 2**2
     FLAG_COLLAPSED_FROM_LAST_RUN = 2**3
     FLAG_IS_DRAFT = 2**4
@@ -54,25 +41,10 @@
         self.subs = []
 
     @property
-    def was_touched(self):
-        return (self.flags & self.FLAG_SOURCE_MODIFIED) != 0
-
-    @property
-    def was_overriden(self):
-        return (self.flags & self.FLAG_OVERRIDEN) != 0
-
-    @property
     def num_subs(self):
         return len(self.subs)
 
     @property
-    def was_any_sub_baked(self):
-        for o in self.subs:
-            if was_subpage_baked(o):
-                return True
-        return False
-
-    @property
     def has_any_error(self):
         if len(self.errors) > 0:
             return True
@@ -81,6 +53,9 @@
                 return True
         return False
 
+    def hasFlag(self, flag):
+        return (self.flags & flag) != 0
+
     def getSub(self, page_num):
         return self.subs[page_num - 1]
 
@@ -118,33 +93,21 @@
         return d
 
 
-def add_page_job_result(result):
-    result.update({
-        'flags': PagePipelineRecordEntry.FLAG_NONE,
-        'errors': [],
-        'subs': []
-    })
-
-
-def merge_job_result_into_record_entry(record_entry, result):
-    record_entry.flags |= result['flags']
-    record_entry.errors += result['errors']
-    record_entry.subs += result['subs']
-
-
 flag_descriptions = {
-    PagePipelineRecordEntry.FLAG_NEW: 'new',
     PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED: 'touched',
+    PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED: 'rendered segments',
     PagePipelineRecordEntry.FLAG_OVERRIDEN: 'overriden',
     PagePipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN: 'from last run',
     PagePipelineRecordEntry.FLAG_IS_DRAFT: 'draft',
-    PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE: 'aborted for source use'}
+    PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE: ('aborted for '
+                                                          'source use')}
 
 
 sub_flag_descriptions = {
     SubPageFlags.FLAG_BAKED: 'baked',
     SubPageFlags.FLAG_FORCED_BY_SOURCE: 'forced by source',
     SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS: 'forced b/c new',
+    SubPageFlags.FLAG_FORCED_BY_NO_RECORD: 'forced b/c no record',
     SubPageFlags.FLAG_FORCED_BY_PREVIOUS_ERRORS: 'forced by errors',
     SubPageFlags.FLAG_FORCED_BY_GENERAL_FORCE: 'manually forced',
     SubPageFlags.FLAG_RENDER_CACHE_INVALIDATED: 'cache invalidated',
--- a/piecrust/pipelines/base.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/pipelines/base.py	Mon Apr 23 21:47:49 2018 -0700
@@ -2,7 +2,6 @@
 import logging
 from werkzeug.utils import cached_property
 from piecrust.configuration import ConfigurationError
-from piecrust.sources.base import ContentItem
 
 
 logger = logging.getLogger(__name__)
@@ -58,19 +57,6 @@
         self.pass_num = pass_num
 
 
-class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase):
-    """ Context for validating jobs on subsequent step runs (i.e. validating
-        the list of jobs to run starting with the second step).
-
-        This is run on the master process, so it can access both the
-        previous and current records.
-    """
-    def __init__(self, pass_num, step_num, record_name, record_histories):
-        super().__init__(record_name, record_histories)
-        self.pass_num = pass_num
-        self.step_num = step_num
-
-
 class PipelineJobRunContext:
     """ Context for running pipeline baking jobs.
 
@@ -103,11 +89,10 @@
         This is run on the master process, so it can access the current
         record.
     """
-    def __init__(self, record, job, pass_num, step_num):
+    def __init__(self, record, job, pass_num):
         self.record = record
         self.job = job
         self.pass_num = pass_num
-        self.step_num = step_num
 
     @cached_property
     def record_entry(self):
@@ -160,7 +145,7 @@
     def createJobs(self, ctx):
         return [
             create_job(self, item.spec)
-            for item in self.source.getAllContents()]
+            for item in self.source.getAllContents()], None
 
     def createRecordEntry(self, item_spec):
         entry_class = self.RECORD_ENTRY_CLASS
@@ -171,9 +156,6 @@
     def handleJobResult(self, result, ctx):
         raise NotImplementedError()
 
-    def validateNextStepJobs(self, jobs, ctx):
-        pass
-
     def run(self, job, ctx, result):
         raise NotImplementedError()
 
--- a/piecrust/pipelines/page.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/pipelines/page.py	Mon Apr 23 21:47:49 2018 -0700
@@ -1,12 +1,11 @@
 import copy
-import time
 import logging
 from piecrust.pipelines.base import (
     ContentPipeline, create_job, content_item_from_job)
 from piecrust.pipelines._pagebaker import PageBaker, get_output_path
 from piecrust.pipelines._pagerecords import (
-    PagePipelineRecordEntry,
-    add_page_job_result, merge_job_result_into_record_entry)
+    PagePipelineRecordEntry, SubPageFlags)
+from piecrust.rendering import RenderingContext, render_page_segments
 from piecrust.sources.base import AbortedSourceUseError
 
 
@@ -38,11 +37,11 @@
         pass_num = ctx.pass_num
         if pass_num == 0:
             ctx.current_record.user_data['dirty_source_names'] = set()
-            return self._createLoadJobs(ctx)
+            return self._createLoadJobs(ctx), "load"
         if pass_num == 1:
-            return self._createSecondPassJobs(ctx)
+            return self._createSegmentJobs(ctx), "render"
         if pass_num == 2:
-            return self._createThirdPassJobs(ctx)
+            return self._createLayoutJobs(ctx), "layout"
         raise Exception("Unexpected pipeline pass: %d" % pass_num)
 
     def _createLoadJobs(self, ctx):
@@ -55,20 +54,21 @@
             return jobs
         return None
 
-    def _createSecondPassJobs(self, ctx):
+    def _createSegmentJobs(self, ctx):
         jobs = []
 
         app = self.app
+        pass_num = ctx.pass_num
         out_dir = self.ctx.out_dir
         uri_getter = self.source.route.getUri
         pretty_urls = app.config.get('site/pretty_urls')
 
-        used_paths = _get_used_paths_from_records(
-            ctx.record_histories.current.records)
         history = ctx.record_histories.getHistory(ctx.record_name).copy()
         history.build()
 
-        pass_num = ctx.pass_num
+        cur_rec_used_paths = {}
+        history.current.user_data['used_paths'] = cur_rec_used_paths
+        all_records = ctx.record_histories.current.records
 
         for prev, cur in history.diffs:
             # Ignore pages that disappeared since last bake.
@@ -76,18 +76,24 @@
                 continue
 
             # Skip draft pages.
-            if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT:
+            if cur.hasFlag(PagePipelineRecordEntry.FLAG_IS_DRAFT):
+                continue
+
+            # Skip pages that haven't changed since last bake.
+            if (prev and not cur.hasFlag(
+                    PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED)):
                 continue
 
-            # For pages that are known to use other sources, we make a dummy
-            # job that will effectively get directly passed on to the next
-            # step.
+            # For pages that are known to use other sources in their own
+            # content segments (we don't care about the layout yet), we
+            # postpone them to the next pipeline pass immediately, because they
+            # might need populated render caches for those sources' pages.
             if prev:
-                usn1, usn2 = prev.getAllUsedSourceNames()
-                if usn1 or usn2:
-                    jobs.append(create_job(self, cur.item_spec,
-                                           pass_num=pass_num,
-                                           uses_sources=True))
+                usn1, _ = prev.getAllUsedSourceNames()
+                if usn1:
+                    logger.debug("Postponing: %s" % cur.item_spec)
+                    cur.flags |= \
+                        PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
                     continue
 
             # Check if this item has been overriden by a previous pipeline
@@ -95,27 +101,30 @@
             # source, and some of our pages have been overriden by a user
             # page that writes out to the same URL.
             uri = uri_getter(cur.route_params)
-            path = get_output_path(app, out_dir, uri, pretty_urls)
-            override = used_paths.get(path)
+            out_path = get_output_path(app, out_dir, uri, pretty_urls)
+            override = _find_used_path_spec(all_records, out_path)
             if override is not None:
-                override_source_name, override_entry = override
+                override_source_name, override_entry_spec = override
                 override_source = app.getSource(override_source_name)
                 if override_source.config['realm'] == \
                         self.source.config['realm']:
                     logger.error(
                         "Page '%s' would get baked to '%s' "
                         "but is overriden by '%s'." %
-                        (cur.item_spec, path, override_entry.item_spec))
+                        (cur.item_spec, out_path, override_entry_spec))
                 else:
                     logger.debug(
                         "Page '%s' would get baked to '%s' "
                         "but is overriden by '%s'." %
-                        (cur.item_spec, path, override_entry.item_spec))
+                        (cur.item_spec, out_path, override_entry_spec))
 
                 cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
                 continue
 
             # Nope, all good, let's create a job for this item.
+            cur.flags |= PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED
+            cur_rec_used_paths[out_path] = cur.item_spec
+
             jobs.append(create_job(self, cur.item_spec,
                                    pass_num=pass_num))
 
@@ -123,7 +132,7 @@
             return jobs
         return None
 
-    def _createThirdPassJobs(self, ctx):
+    def _createLayoutJobs(self, ctx):
         # Get the list of all sources that had anything baked.
         dirty_source_names = set()
         all_records = ctx.record_histories.current.records
@@ -132,41 +141,59 @@
             if rec_dsn:
                 dirty_source_names |= rec_dsn
 
-        # Now look at the stuff we bake for our own source on the first pass.
-        # For anything that wasn't baked (i.e. it was considered 'up to date')
-        # we look at the records from last time, and if they say that some
-        # page was using a source that is "dirty", then we force bake it.
-        #
-        # The common example for this is a blog index page which hasn't been
-        # touched, but needs to be re-baked because someone added or edited
-        # a post.
         jobs = []
         pass_num = ctx.pass_num
         history = ctx.record_histories.getHistory(ctx.record_name).copy()
         history.build()
         for prev, cur in history.diffs:
-            if not cur:
+            if not cur or cur.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN):
                 continue
-            if cur.was_any_sub_baked:
-                continue
+
+            do_bake = False
+            force_segments = False
+            force_layout = False
+
+            # Make sure we bake the layout for pages that got their segments
+            # re-rendered.
+            if cur.hasFlag(PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED):
+                do_bake = True
+
+            # Now look at the stuff we baked for our own source on the second
+            # pass.  For anything that wasn't baked (i.e. it was considered 'up
+            # to date') we look at the records from last time, and if they say
+            # that some page was using a source that is "dirty", then we force
+            # bake it.
+            #
+            # The common example for this is a blog index page which hasn't
+            # been touched, but needs to be re-baked because someone added or
+            # edited a post.
             if prev:
                 usn1, usn2 = prev.getAllUsedSourceNames()
                 force_segments = any(map(lambda u: u in dirty_source_names,
                                      usn1))
                 force_layout = any(map(lambda u: u in dirty_source_names,
                                    usn2))
+
                 if force_segments or force_layout:
-                    jobs.append(create_job(self, prev.item_spec,
-                                           pass_num=pass_num,
-                                           force_segments=force_segments,
-                                           force_layout=force_layout))
-                else:
+                    # Yep, we need to force-rebake some aspect of this page.
+                    do_bake = True
+
+                elif not do_bake:
                     # This page uses other sources, but no source was dirty
                     # this time around (it was a null build, maybe). We
                     # don't have any work to do, but we need to carry over
                     # any information we have, otherwise the post bake step
                     # will think we need to delete last bake's outputs.
                     cur.subs = copy.deepcopy(prev.subs)
+                    for cur_sub in cur.subs:
+                        cur_sub['flags'] = \
+                            SubPageFlags.FLAG_COLLAPSED_FROM_LAST_RUN
+
+            if do_bake:
+                jobs.append(create_job(self, cur.item_spec,
+                                       pass_num=pass_num,
+                                       force_segments=force_segments,
+                                       force_layout=force_layout))
 
         if len(jobs) > 0:
             return jobs
@@ -174,7 +201,6 @@
 
     def handleJobResult(self, result, ctx):
         pass_num = ctx.pass_num
-        step_num = ctx.step_num
 
         if pass_num == 0:
             # Just went through a "load page" job. Let's create a record
@@ -188,35 +214,37 @@
 
             # If this page was modified, flag its entire source as "dirty",
             # so any pages using that source can be re-baked.
-            if (new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED):
+            if new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED:
                 ctx.record.user_data['dirty_source_names'].add(
                     self.source.name)
+
+            # If this page is new
+
+        elif pass_num == 1:
+            # Just went through the "render segments" job.
+            existing = ctx.record_entry
+            existing.flags |= result.get('flags',
+                                         PagePipelineRecordEntry.FLAG_NONE)
+
         else:
             # Update the entry with the new information.
             existing = ctx.record_entry
-            if not result.get('postponed', False):
-                merge_job_result_into_record_entry(existing, result)
+            existing.flags |= result.get('flags',
+                                         PagePipelineRecordEntry.FLAG_NONE)
+            existing.errors += result.get('errors', [])
+            existing.subs += result.get('subs', [])
 
     def run(self, job, ctx, result):
         pass_num = job.get('pass_num', 0)
-        step_num = job.get('step_num', 0)
 
         if pass_num == 0:
-            if step_num == 0:
-                return self._loadPage(job, ctx, result)
+            return self._loadPage(job, ctx, result)
 
         elif pass_num == 1:
-            if step_num == 0:
-                return self._renderOrPostpone(job, ctx, result)
-            elif step_num == 1:
-                return self._renderAlways(job, ctx, result)
+            return self._renderSegments(job, ctx, result)
 
-        elif pass_num == 2:
-            if step_num == 0:
-                return self._renderAlways(job, ctx, result)
-
-        raise Exception("Unexpected pipeline pass/step: %d/%d" %
-                        (pass_num, step_num))
+        elif pass_num >= 2:
+            return self._renderLayout(job, ctx, result)
 
     def getDeletions(self, ctx):
         for prev, cur in ctx.record_history.diffs:
@@ -250,62 +278,50 @@
         if page.config.get(self._draft_setting):
             result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT
 
-    def _renderOrPostpone(self, job, ctx, result):
-        # See if we should immediately kick this job off to the next step.
-        if job.get('uses_sources', False):
-            result['postponed'] = True
-            result['next_step_job'] = create_job(self, job['job_spec'][1])
-            return
-
+    def _renderSegments(self, job, ctx, result):
         # Here our job is to render the page's segments so that they're
         # cached in memory and on disk... unless we detect that the page
         # is using some other sources, in which case we abort and we'll try
         # again on the second pass.
         content_item = content_item_from_job(self, job)
-        logger.debug("Conditional render for: %s" % content_item.spec)
+        logger.debug("Render segments for: %s" % content_item.spec)
         page = self.app.getPage(self.source, content_item)
         if page.config.get(self._draft_setting):
             raise Exception("Shouldn't have a draft page in a render job!")
 
-        prev_entry = ctx.previous_entry
-
         env = self.app.env
         env.abort_source_use = True
-        add_page_job_result(result)
         try:
-            rdr_subs = self._pagebaker.bake(page, prev_entry)
-            result['subs'] = rdr_subs
+            rdr_ctx = RenderingContext(page)
+            render_page_segments(rdr_ctx)
         except AbortedSourceUseError:
             logger.debug("Page was aborted for using source: %s" %
                          content_item.spec)
-            result['flags'] |= \
+            result['flags'] = \
                 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
             env.stats.stepCounter("SourceUseAbortions")
             env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
-            result['next_step_job'] = create_job(self, content_item.spec)
         finally:
             env.abort_source_use = False
 
-    def _renderAlways(self, job, ctx, result):
+    def _renderLayout(self, job, ctx, result):
         content_item = content_item_from_job(self, job)
-        logger.debug("Full render for: %s" % content_item.spec)
+        logger.debug("Render layout for: %s" % content_item.spec)
         page = self.app.getPage(self.source, content_item)
         prev_entry = ctx.previous_entry
         rdr_subs = self._pagebaker.bake(
             page, prev_entry,
             force_segments=job.get('force_segments'),
             force_layout=job.get('force_layout'))
-
-        add_page_job_result(result)
         result['subs'] = rdr_subs
 
-def _get_used_paths_from_records(records):
-    used_paths = {}
+
+def _find_used_path_spec(records, path):
     for rec in records:
-        src_name = rec.name.split('@')[0]
-        for e in rec.getEntries():
-            paths = e.getAllOutputPaths()
-            if paths is not None:
-                for p in paths:
-                    used_paths[p] = (src_name, e)
-    return used_paths
+        up = rec.user_data.get('used_paths')
+        if up is not None:
+            entry_spec = up.get(path)
+            if entry_spec is not None:
+                src_name = rec.name.split('@')[0]
+                return (src_name, entry_spec)
+    return None
--- a/piecrust/sources/blogarchives.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/sources/blogarchives.py	Mon Apr 23 21:47:49 2018 -0700
@@ -7,12 +7,10 @@
     PageIterator, HardCodedFilterIterator, DateSortIterator)
 from piecrust.page import Page
 from piecrust.pipelines._pagebaker import PageBaker
-from piecrust.pipelines._pagerecords import (
-    PagePipelineRecordEntry,
-    add_page_job_result, merge_job_result_into_record_entry)
+from piecrust.pipelines._pagerecords import PagePipelineRecordEntry
 from piecrust.pipelines.base import (
     ContentPipeline,
-    create_job, get_record_name_for_source, content_item_from_job)
+    create_job, get_record_name_for_source)
 from piecrust.routing import RouteParameter
 from piecrust.sources.base import ContentItem
 from piecrust.sources.generator import GeneratorSourceBase
@@ -193,8 +191,8 @@
             current_record.addEntry(entry)
 
         if len(jobs) > 0:
-            return jobs
-        return None
+            return jobs, "archive"
+        return None, None
 
     def run(self, job, ctx, result):
         year = job['year']
@@ -206,13 +204,12 @@
         prev_entry = ctx.previous_entry
         rdr_subs = self._pagebaker.bake(page, prev_entry)
 
-        add_page_job_result(result)
         result['subs'] = rdr_subs
         result['year'] = page.source_metadata['year']
 
     def handleJobResult(self, result, ctx):
         existing = ctx.record_entry
-        merge_job_result_into_record_entry(existing, result)
+        existing.subs = result['subs']
         existing.year = result['year']
 
     def postJobRun(self, ctx):
@@ -243,7 +240,8 @@
         for cur_entry in cur_rec.getEntries():
             dt = datetime.datetime.fromtimestamp(cur_entry.timestamp)
             all_years.add(dt.year)
-            if cur_entry.was_any_sub_baked:
+            if cur_entry.hasFlag(
+                    PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED):
                 dirty_years.add(dt.year)
 
         self._all_years = all_years
--- a/piecrust/sources/taxonomy.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/sources/taxonomy.py	Mon Apr 23 21:47:49 2018 -0700
@@ -7,13 +7,9 @@
     PaginationFilter, SettingFilterClause)
 from piecrust.page import Page
 from piecrust.pipelines._pagebaker import PageBaker
-from piecrust.pipelines._pagerecords import (
-    PagePipelineRecordEntry,
-    add_page_job_result, merge_job_result_into_record_entry)
+from piecrust.pipelines._pagerecords import PagePipelineRecordEntry
 from piecrust.pipelines.base import (
-    ContentPipeline, get_record_name_for_source,
-    create_job, content_item_from_job)
-from piecrust.pipelines.records import RecordHistory
+    ContentPipeline, get_record_name_for_source, create_job)
 from piecrust.routing import RouteParameter
 from piecrust.sources.base import ContentItem
 from piecrust.sources.generator import GeneratorSourceBase
@@ -307,8 +303,8 @@
             current_record.addEntry(entry)
 
         if len(jobs) > 0:
-            return jobs
-        return None
+            return jobs, "taxonomize"
+        return None, None
 
     def run(self, job, ctx, result):
         term = job['term']
@@ -324,13 +320,12 @@
         prev_entry = ctx.previous_entry
         rdr_subs = self._pagebaker.bake(page, prev_entry)
 
-        add_page_job_result(result)
         result['subs'] = rdr_subs
         result['term'] = page.source_metadata['term']
 
     def handleJobResult(self, result, ctx):
         existing = ctx.record_entry
-        merge_job_result_into_record_entry(existing, result)
+        existing.subs = result['subs']
         existing.term = result['term']
 
     def postJobRun(self, ctx):
@@ -362,7 +357,6 @@
         self.pipeline = pipeline
         self.record_histories = record_histories
         self._all_terms = {}
-        self._single_dirty_slugified_terms = set()
         self._all_dirty_slugified_terms = None
 
     @property
@@ -381,49 +375,48 @@
     def analyze(self):
         # Build the list of terms for our taxonomy, and figure out which ones
         # are 'dirty' for the current bake.
-        #
-        # Remember all terms used.
         source = self.pipeline.inner_source
         taxonomy = self.pipeline.taxonomy
         slugifier = self.pipeline.slugifier
 
+        tax_is_mult = taxonomy.is_multiple
+        tax_setting_name = taxonomy.setting_name
+
+        # First, go over all of our source's pages seen during this bake.
+        # Gather all the taxonomy terms they have, and also keep track of
+        # the ones used by the pages that were actually rendered (instead of
+        # those that were up-to-date and skipped).
+        single_dirty_slugified_terms = set()
+        current_records = self.record_histories.current
         record_name = get_record_name_for_source(source)
-        current_records = self.record_histories.current
         cur_rec = current_records.getRecord(record_name)
         for cur_entry in cur_rec.getEntries():
-            if not cur_entry.was_overriden:
-                cur_terms = cur_entry.config.get(taxonomy.setting_name)
-                if cur_terms:
-                    if not taxonomy.is_multiple:
-                        self._addTerm(
-                            slugifier, cur_entry.item_spec, cur_terms)
-                    else:
-                        self._addTerms(
-                            slugifier, cur_entry.item_spec, cur_terms)
+            if cur_entry.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN):
+                continue
+
+            cur_terms = cur_entry.config.get(tax_setting_name)
+            if not cur_terms:
+                continue
 
-        # Re-bake all taxonomy terms that include new or changed pages, by
-        # marking them as 'dirty'.
-        history = self.record_histories.getHistory(record_name).copy()
-        history.build()
-        for prev_entry, cur_entry in history.diffs:
-            entries = [cur_entry]
-            if prev_entry:
-                entries.append(prev_entry)
+            if not tax_is_mult:
+                self._addTerm(
+                    slugifier, cur_entry.item_spec, cur_terms)
+            else:
+                self._addTerms(
+                    slugifier, cur_entry.item_spec, cur_terms)
 
-            for e in entries:
-                if e and e.was_any_sub_baked:
-                    entry_terms = e.config.get(taxonomy.setting_name)
-                    if entry_terms:
-                        if not taxonomy.is_multiple:
-                            self._single_dirty_slugified_terms.add(
-                                slugifier.slugify(entry_terms))
-                        else:
-                            self._single_dirty_slugified_terms.update(
-                                (slugifier.slugify(t)
-                                 for t in entry_terms))
+            if cur_entry.hasFlag(
+                    PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED):
+                if not tax_is_mult:
+                    single_dirty_slugified_terms.add(
+                        slugifier.slugify(cur_terms))
+                else:
+                    single_dirty_slugified_terms.update(
+                        (slugifier.slugify(t)
+                         for t in cur_terms))
 
         self._all_dirty_slugified_terms = list(
-            self._single_dirty_slugified_terms)
+            single_dirty_slugified_terms)
         logger.debug("Gathered %d dirty taxonomy terms",
                      len(self._all_dirty_slugified_terms))
 
@@ -438,7 +431,7 @@
         # by any page in the website (anywhere someone can ask for an URL
         # to the combination page), it means we check all the records, not
         # just the record for our source.
-        if taxonomy.is_multiple:
+        if tax_is_mult:
             known_combinations = set()
             for rec in current_records.records:
                 # Cheap way to test if a record contains entries that
@@ -456,7 +449,7 @@
 
             dcc = 0
             for terms in known_combinations:
-                if not self._single_dirty_slugified_terms.isdisjoint(
+                if not single_dirty_slugified_terms.isdisjoint(
                         set(terms)):
                     self._all_dirty_slugified_terms.append(
                         taxonomy.separator.join(terms))