diff piecrust/pipelines/page.py @ 1136:5f97b5b59dfe

bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 23 Apr 2018 21:47:49 -0700
parents 3bcb2d446397
children
line wrap: on
line diff
--- a/piecrust/pipelines/page.py	Mon Apr 23 21:37:43 2018 -0700
+++ b/piecrust/pipelines/page.py	Mon Apr 23 21:47:49 2018 -0700
@@ -1,12 +1,11 @@
 import copy
-import time
 import logging
 from piecrust.pipelines.base import (
     ContentPipeline, create_job, content_item_from_job)
 from piecrust.pipelines._pagebaker import PageBaker, get_output_path
 from piecrust.pipelines._pagerecords import (
-    PagePipelineRecordEntry,
-    add_page_job_result, merge_job_result_into_record_entry)
+    PagePipelineRecordEntry, SubPageFlags)
+from piecrust.rendering import RenderingContext, render_page_segments
 from piecrust.sources.base import AbortedSourceUseError
 
 
@@ -38,11 +37,11 @@
         pass_num = ctx.pass_num
         if pass_num == 0:
             ctx.current_record.user_data['dirty_source_names'] = set()
-            return self._createLoadJobs(ctx)
+            return self._createLoadJobs(ctx), "load"
         if pass_num == 1:
-            return self._createSecondPassJobs(ctx)
+            return self._createSegmentJobs(ctx), "render"
         if pass_num == 2:
-            return self._createThirdPassJobs(ctx)
+            return self._createLayoutJobs(ctx), "layout"
         raise Exception("Unexpected pipeline pass: %d" % pass_num)
 
     def _createLoadJobs(self, ctx):
@@ -55,20 +54,21 @@
             return jobs
         return None
 
-    def _createSecondPassJobs(self, ctx):
+    def _createSegmentJobs(self, ctx):
         jobs = []
 
         app = self.app
+        pass_num = ctx.pass_num
         out_dir = self.ctx.out_dir
         uri_getter = self.source.route.getUri
         pretty_urls = app.config.get('site/pretty_urls')
 
-        used_paths = _get_used_paths_from_records(
-            ctx.record_histories.current.records)
         history = ctx.record_histories.getHistory(ctx.record_name).copy()
         history.build()
 
-        pass_num = ctx.pass_num
+        cur_rec_used_paths = {}
+        history.current.user_data['used_paths'] = cur_rec_used_paths
+        all_records = ctx.record_histories.current.records
 
         for prev, cur in history.diffs:
             # Ignore pages that disappeared since last bake.
@@ -76,18 +76,24 @@
                 continue
 
             # Skip draft pages.
-            if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT:
+            if cur.hasFlag(PagePipelineRecordEntry.FLAG_IS_DRAFT):
+                continue
+
+            # Skip pages that haven't changed since last bake.
+            if (prev and not cur.hasFlag(
+                    PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED)):
                 continue
 
-            # For pages that are known to use other sources, we make a dummy
-            # job that will effectively get directly passed on to the next
-            # step.
+            # For pages that are known to use other sources in their own
+            # content segments (we don't care about the layout yet), we
+            # postpone them to the next pipeline pass immediately, because they
+            # might need populated render caches for those sources' pages.
             if prev:
-                usn1, usn2 = prev.getAllUsedSourceNames()
-                if usn1 or usn2:
-                    jobs.append(create_job(self, cur.item_spec,
-                                           pass_num=pass_num,
-                                           uses_sources=True))
+                usn1, _ = prev.getAllUsedSourceNames()
+                if usn1:
+                    logger.debug("Postponing: %s" % cur.item_spec)
+                    cur.flags |= \
+                        PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
                     continue
 
             # Check if this item has been overriden by a previous pipeline
@@ -95,27 +101,30 @@
             # source, and some of our pages have been overriden by a user
             # page that writes out to the same URL.
             uri = uri_getter(cur.route_params)
-            path = get_output_path(app, out_dir, uri, pretty_urls)
-            override = used_paths.get(path)
+            out_path = get_output_path(app, out_dir, uri, pretty_urls)
+            override = _find_used_path_spec(all_records, out_path)
             if override is not None:
-                override_source_name, override_entry = override
+                override_source_name, override_entry_spec = override
                 override_source = app.getSource(override_source_name)
                 if override_source.config['realm'] == \
                         self.source.config['realm']:
                     logger.error(
                         "Page '%s' would get baked to '%s' "
                         "but is overriden by '%s'." %
-                        (cur.item_spec, path, override_entry.item_spec))
+                        (cur.item_spec, out_path, override_entry_spec))
                 else:
                     logger.debug(
                         "Page '%s' would get baked to '%s' "
                         "but is overriden by '%s'." %
-                        (cur.item_spec, path, override_entry.item_spec))
+                        (cur.item_spec, out_path, override_entry_spec))
 
                 cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
                 continue
 
             # Nope, all good, let's create a job for this item.
+            cur.flags |= PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED
+            cur_rec_used_paths[out_path] = cur.item_spec
+
             jobs.append(create_job(self, cur.item_spec,
                                    pass_num=pass_num))
 
@@ -123,7 +132,7 @@
             return jobs
         return None
 
-    def _createThirdPassJobs(self, ctx):
+    def _createLayoutJobs(self, ctx):
         # Get the list of all sources that had anything baked.
         dirty_source_names = set()
         all_records = ctx.record_histories.current.records
@@ -132,41 +141,59 @@
             if rec_dsn:
                 dirty_source_names |= rec_dsn
 
-        # Now look at the stuff we bake for our own source on the first pass.
-        # For anything that wasn't baked (i.e. it was considered 'up to date')
-        # we look at the records from last time, and if they say that some
-        # page was using a source that is "dirty", then we force bake it.
-        #
-        # The common example for this is a blog index page which hasn't been
-        # touched, but needs to be re-baked because someone added or edited
-        # a post.
         jobs = []
         pass_num = ctx.pass_num
         history = ctx.record_histories.getHistory(ctx.record_name).copy()
         history.build()
         for prev, cur in history.diffs:
-            if not cur:
+            if not cur or cur.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN):
                 continue
-            if cur.was_any_sub_baked:
-                continue
+
+            do_bake = False
+            force_segments = False
+            force_layout = False
+
+            # Make sure we bake the layout for pages that got their segments
+            # re-rendered.
+            if cur.hasFlag(PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED):
+                do_bake = True
+
+            # Now look at the stuff we baked for our own source on the second
+            # pass.  For anything that wasn't baked (i.e. it was considered 'up
+            # to date') we look at the records from last time, and if they say
+            # that some page was using a source that is "dirty", then we force
+            # bake it.
+            #
+            # The common example for this is a blog index page which hasn't
+            # been touched, but needs to be re-baked because someone added or
+            # edited a post.
             if prev:
                 usn1, usn2 = prev.getAllUsedSourceNames()
                 force_segments = any(map(lambda u: u in dirty_source_names,
                                      usn1))
                 force_layout = any(map(lambda u: u in dirty_source_names,
                                    usn2))
+
                 if force_segments or force_layout:
-                    jobs.append(create_job(self, prev.item_spec,
-                                           pass_num=pass_num,
-                                           force_segments=force_segments,
-                                           force_layout=force_layout))
-                else:
+                    # Yep, we need to force-rebake some aspect of this page.
+                    do_bake = True
+
+                elif not do_bake:
                     # This page uses other sources, but no source was dirty
                     # this time around (it was a null build, maybe). We
                     # don't have any work to do, but we need to carry over
                     # any information we have, otherwise the post bake step
                     # will think we need to delete last bake's outputs.
                     cur.subs = copy.deepcopy(prev.subs)
+                    for cur_sub in cur.subs:
+                        cur_sub['flags'] = \
+                            SubPageFlags.FLAG_COLLAPSED_FROM_LAST_RUN
+
+            if do_bake:
+                jobs.append(create_job(self, cur.item_spec,
+                                       pass_num=pass_num,
+                                       force_segments=force_segments,
+                                       force_layout=force_layout))
 
         if len(jobs) > 0:
             return jobs
@@ -174,7 +201,6 @@
 
     def handleJobResult(self, result, ctx):
         pass_num = ctx.pass_num
-        step_num = ctx.step_num
 
         if pass_num == 0:
             # Just went through a "load page" job. Let's create a record
@@ -188,35 +214,37 @@
 
             # If this page was modified, flag its entire source as "dirty",
             # so any pages using that source can be re-baked.
-            if (new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED):
+            if new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED:
                 ctx.record.user_data['dirty_source_names'].add(
                     self.source.name)
+
+            # If this page is new
+
+        elif pass_num == 1:
+            # Just went through the "render segments" job.
+            existing = ctx.record_entry
+            existing.flags |= result.get('flags',
+                                         PagePipelineRecordEntry.FLAG_NONE)
+
         else:
             # Update the entry with the new information.
             existing = ctx.record_entry
-            if not result.get('postponed', False):
-                merge_job_result_into_record_entry(existing, result)
+            existing.flags |= result.get('flags',
+                                         PagePipelineRecordEntry.FLAG_NONE)
+            existing.errors += result.get('errors', [])
+            existing.subs += result.get('subs', [])
 
     def run(self, job, ctx, result):
         pass_num = job.get('pass_num', 0)
-        step_num = job.get('step_num', 0)
 
         if pass_num == 0:
-            if step_num == 0:
-                return self._loadPage(job, ctx, result)
+            return self._loadPage(job, ctx, result)
 
         elif pass_num == 1:
-            if step_num == 0:
-                return self._renderOrPostpone(job, ctx, result)
-            elif step_num == 1:
-                return self._renderAlways(job, ctx, result)
+            return self._renderSegments(job, ctx, result)
 
-        elif pass_num == 2:
-            if step_num == 0:
-                return self._renderAlways(job, ctx, result)
-
-        raise Exception("Unexpected pipeline pass/step: %d/%d" %
-                        (pass_num, step_num))
+        elif pass_num >= 2:
+            return self._renderLayout(job, ctx, result)
 
     def getDeletions(self, ctx):
         for prev, cur in ctx.record_history.diffs:
@@ -250,62 +278,50 @@
         if page.config.get(self._draft_setting):
             result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT
 
-    def _renderOrPostpone(self, job, ctx, result):
-        # See if we should immediately kick this job off to the next step.
-        if job.get('uses_sources', False):
-            result['postponed'] = True
-            result['next_step_job'] = create_job(self, job['job_spec'][1])
-            return
-
+    def _renderSegments(self, job, ctx, result):
         # Here our job is to render the page's segments so that they're
         # cached in memory and on disk... unless we detect that the page
         # is using some other sources, in which case we abort and we'll try
         # again on the second pass.
         content_item = content_item_from_job(self, job)
-        logger.debug("Conditional render for: %s" % content_item.spec)
+        logger.debug("Render segments for: %s" % content_item.spec)
         page = self.app.getPage(self.source, content_item)
         if page.config.get(self._draft_setting):
             raise Exception("Shouldn't have a draft page in a render job!")
 
-        prev_entry = ctx.previous_entry
-
         env = self.app.env
         env.abort_source_use = True
-        add_page_job_result(result)
         try:
-            rdr_subs = self._pagebaker.bake(page, prev_entry)
-            result['subs'] = rdr_subs
+            rdr_ctx = RenderingContext(page)
+            render_page_segments(rdr_ctx)
         except AbortedSourceUseError:
             logger.debug("Page was aborted for using source: %s" %
                          content_item.spec)
-            result['flags'] |= \
+            result['flags'] = \
                 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
             env.stats.stepCounter("SourceUseAbortions")
             env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
-            result['next_step_job'] = create_job(self, content_item.spec)
         finally:
             env.abort_source_use = False
 
-    def _renderAlways(self, job, ctx, result):
+    def _renderLayout(self, job, ctx, result):
         content_item = content_item_from_job(self, job)
-        logger.debug("Full render for: %s" % content_item.spec)
+        logger.debug("Render layout for: %s" % content_item.spec)
         page = self.app.getPage(self.source, content_item)
         prev_entry = ctx.previous_entry
         rdr_subs = self._pagebaker.bake(
             page, prev_entry,
             force_segments=job.get('force_segments'),
             force_layout=job.get('force_layout'))
-
-        add_page_job_result(result)
         result['subs'] = rdr_subs
 
-def _get_used_paths_from_records(records):
-    used_paths = {}
+
+def _find_used_path_spec(records, path):
     for rec in records:
-        src_name = rec.name.split('@')[0]
-        for e in rec.getEntries():
-            paths = e.getAllOutputPaths()
-            if paths is not None:
-                for p in paths:
-                    used_paths[p] = (src_name, e)
-    return used_paths
+        up = rec.user_data.get('used_paths')
+        if up is not None:
+            entry_spec = up.get(path)
+            if entry_spec is not None:
+                src_name = rec.name.split('@')[0]
+                return (src_name, entry_spec)
+    return None