diff piecrust/pipelines/page.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents 45ad976712ec
children 1857dbd4580f
line wrap: on
line diff
--- a/piecrust/pipelines/page.py	Fri Nov 03 23:14:56 2017 -0700
+++ b/piecrust/pipelines/page.py	Sun Nov 19 14:29:17 2017 -0800
@@ -1,7 +1,11 @@
+import time
 import logging
-from piecrust.pipelines.base import ContentPipeline
+from piecrust.pipelines.base import (
+    ContentPipeline, create_job, content_item_from_job)
 from piecrust.pipelines._pagebaker import PageBaker, get_output_path
-from piecrust.pipelines._pagerecords import PagePipelineRecordEntry
+from piecrust.pipelines._pagerecords import (
+    PagePipelineRecordEntry,
+    add_page_job_result, merge_job_result_into_record_entry)
 from piecrust.sources.base import AbortedSourceUseError
 
 
@@ -11,6 +15,7 @@
 class PagePipeline(ContentPipeline):
     PIPELINE_NAME = 'page'
     RECORD_ENTRY_CLASS = PagePipelineRecordEntry
+    PASS_NUM = [0, 1]
 
     def __init__(self, source, ppctx):
         super().__init__(source, ppctx)
@@ -19,7 +24,7 @@
         self._draft_setting = self.app.config['baker/no_bake_setting']
 
     def initialize(self):
-        stats = self.app.env.stats
+        stats = self._stats
         stats.registerCounter('SourceUseAbortions', raise_if_registered=False)
         stats.registerManifest('SourceUseAbortions', raise_if_registered=False)
 
@@ -28,29 +33,69 @@
                                     force=self.ctx.force)
         self._pagebaker.startWriterQueue()
 
+    def loadAllContents(self):
+        # Here we load all the pages in the source, making sure they all
+        # have a valid cache for their configuration and contents.
+        # We also create the record entries while we're at it.
+        source = self.source
+        page_fac = self.app.getPage
+        record_fac = self.createRecordEntry
+        for item in source.getAllContents():
+            page = page_fac(source, item)
+
+            cur_entry = record_fac(item.spec)
+            cur_entry.config = page.config.getAll()
+            cur_entry.route_params = item.metadata['route_params']
+            cur_entry.timestamp = page.datetime.timestamp()
+
+            if page.config.get(self._draft_setting):
+                cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
+
+            yield cur_entry
+
     def createJobs(self, ctx):
-        used_paths = {}
-        for rec in ctx.record_histories.current.records:
-            src_name = rec.name.split('@')[0]
-            for e in rec.getEntries():
-                paths = e.getAllOutputPaths()
-                if paths is not None:
-                    for p in paths:
-                        used_paths[p] = (src_name, e)
+        if ctx.pass_num == 0:
+            return self._createFirstPassJobs(ctx)
+        return self._createSecondPassJobs(ctx)
 
+    def _createFirstPassJobs(self, ctx):
         jobs = []
+
         app = self.app
-        route = self.source.route
         out_dir = self.ctx.out_dir
+        uri_getter = self.source.route.getUri
         pretty_urls = app.config.get('site/pretty_urls')
-        record = ctx.record_histories.current.getRecord(self.record_name)
+
+        used_paths = _get_used_paths_from_records(
+            ctx.record_histories.current.records)
+        history = ctx.record_histories.getHistory(ctx.record_name).copy()
+        history.build()
+
+        record = ctx.current_record
+        record.user_data['dirty_source_names'] = set()
+
+        for prev, cur in history.diffs:
+            # Ignore pages that disappeared since last bake.
+            if cur is None:
+                continue
 
-        for item in self.source.getAllContents():
-            route_params = item.metadata['route_params']
-            uri = route.getUri(route_params)
+            # Skip draft pages.
+            if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT:
+                continue
+
+            # Skip pages that are known to use other sources... we'll
+            # schedule them in the second pass.
+            if prev and prev.getAllUsedSourceNames():
+                continue
+
+            # Check if this item has been overriden by a previous pipeline
+            # run... for instance, we could be the pipeline for a "theme pages"
+            # source, and some of our pages have been overriden by a user
+            # page that writes out to the same URL.
+            uri = uri_getter(cur.route_params)
             path = get_output_path(app, out_dir, uri, pretty_urls)
+
             override = used_paths.get(path)
-
             if override is not None:
                 override_source_name, override_entry = override
                 override_source = app.getSource(override_source_name)
@@ -59,49 +104,82 @@
                     logger.error(
                         "Page '%s' would get baked to '%s' "
                         "but is overriden by '%s'." %
-                        (item.spec, path, override_entry.item_spec))
+                        (enrty.item_spec, path, override_entry.item_spec))
                 else:
                     logger.debug(
                         "Page '%s' would get baked to '%s' "
                         "but is overriden by '%s'." %
-                        (item.spec, path, override_entry.item_spec))
+                        (cur.item_spec, path, override_entry.item_spec))
 
-                entry = PagePipelineRecordEntry()
-                entry.item_spec = item.spec
-                entry.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
-                record.addEntry(entry)
-
+                cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
                 continue
 
-            jobs.append(self.createJob(item))
+            # Nope, all good, let's create a job for this item.
+            jobs.append(create_job(self, cur.item_spec))
 
         if len(jobs) > 0:
             return jobs
         return None
 
-    def mergeRecordEntry(self, record_entry, ctx):
-        existing = ctx.record.getEntry(record_entry.item_spec)
-        existing.flags |= record_entry.flags
-        existing.errors += record_entry.errors
-        existing.subs += record_entry.subs
+    def _createSecondPassJobs(self, ctx):
+        # Get the list of all sources that had anything baked.
+        dirty_source_names = set()
+        all_records = ctx.record_histories.current.records
+        for rec in all_records:
+            rec_dsn = rec.user_data.get('dirty_source_names')
+            if rec_dsn:
+                dirty_source_names |= rec_dsn
+
+        # Now look at the stuff we bake for our own source on the first pass.
+        # For anything that wasn't baked (i.e. it was considered 'up to date')
+        # we look at the records from last time, and if they say that some
+        # page was using a source that is "dirty", then we force bake it.
+        #
+        # The common example for this is a blog index page which hasn't been
+        # touched, but needs to be re-baked because someone added or edited
+        # a post.
+        jobs = []
+        pass_num = ctx.pass_num
+        history = ctx.record_histories.getHistory(ctx.record_name).copy()
+        history.build()
+        for prev, cur in history.diffs:
+            if cur and cur.was_any_sub_baked:
+                continue
+            if prev and any(map(
+                    lambda usn: usn in dirty_source_names,
+                    prev.getAllUsedSourceNames())):
+                jobs.append(create_job(self, prev.item_spec,
+                                       pass_num=pass_num,
+                                       force_bake=True))
+        if len(jobs) > 0:
+            return jobs
+        return None
+
+    def handleJobResult(self, result, ctx):
+        existing = ctx.record_entry
+        merge_job_result_into_record_entry(existing, result)
+        if existing.was_any_sub_baked:
+            ctx.record.user_data['dirty_source_names'].add(self.source.name)
 
     def run(self, job, ctx, result):
-        step_num = job.step_num
-        if step_num == 0:
-            self._loadPage(job.content_item, ctx, result)
-        elif step_num == 1:
-            self._renderOrPostpone(job.content_item, ctx, result)
-        elif step_num == 2:
-            self._renderAlways(job.content_item, ctx, result)
+        pass_num = job.get('pass_num', 0)
+        step_num = job.get('step_num', 0)
+        if pass_num == 0:
+            if step_num == 0:
+                self._renderOrPostpone(job, ctx, result)
+            elif step_num == 1:
+                self._renderAlways(job, ctx, result)
+        elif pass_num == 1:
+            self._renderAlways(job, ctx, result)
 
     def getDeletions(self, ctx):
         for prev, cur in ctx.record_history.diffs:
             if prev and not cur:
                 for sub in prev.subs:
-                    yield (sub.out_path, 'previous source file was removed')
+                    yield (sub['out_path'], 'previous source file was removed')
             elif prev and cur:
-                prev_out_paths = [o.out_path for o in prev.subs]
-                cur_out_paths = [o.out_path for o in cur.subs]
+                prev_out_paths = [o['out_path'] for o in prev.subs]
+                cur_out_paths = [o['out_path'] for o in cur.subs]
                 diff = set(prev_out_paths) - set(cur_out_paths)
                 for p in diff:
                     yield (p, 'source file changed outputs')
@@ -112,43 +190,54 @@
     def shutdown(self):
         self._pagebaker.stopWriterQueue()
 
-    def _loadPage(self, content_item, ctx, result):
-        logger.debug("Loading page: %s" % content_item.spec)
-        page = self.app.getPage(self.source, content_item)
-        record_entry = result.record_entry
-        record_entry.config = page.config.getAll()
-        record_entry.timestamp = page.datetime.timestamp()
-
-        if not page.config.get(self._draft_setting):
-            result.next_step_job = self.createJob(content_item)
-        else:
-            record_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
-
-    def _renderOrPostpone(self, content_item, ctx, result):
+    def _renderOrPostpone(self, job, ctx, result):
         # Here our job is to render the page's segments so that they're
         # cached in memory and on disk... unless we detect that the page
         # is using some other sources, in which case we abort and we'll try
         # again on the second pass.
+        content_item = content_item_from_job(self, job)
         logger.debug("Conditional render for: %s" % content_item.spec)
         page = self.app.getPage(self.source, content_item)
+        if page.config.get(self._draft_setting):
+            return
+
         prev_entry = ctx.previous_entry
-        cur_entry = result.record_entry
-        self.app.env.abort_source_use = True
+
+        env = self.app.env
+        env.abort_source_use = True
+        add_page_job_result(result)
         try:
-            self._pagebaker.bake(page, prev_entry, cur_entry)
+            rdr_subs = self._pagebaker.bake(page, prev_entry)
+            result['subs'] = rdr_subs
         except AbortedSourceUseError:
             logger.debug("Page was aborted for using source: %s" %
                          content_item.spec)
-            self.app.env.stats.stepCounter("SourceUseAbortions")
-            self.app.env.stats.addManifestEntry("SourceUseAbortions",
-                                                content_item.spec)
-            result.next_step_job = self.createJob(content_item)
+            result['flags'] |= \
+                PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
+            env.stats.stepCounter("SourceUseAbortions")
+            env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
+            result['next_step_job'] = create_job(self, content_item.spec)
         finally:
-            self.app.env.abort_source_use = False
+            env.abort_source_use = False
 
-    def _renderAlways(self, content_item, ctx, result):
+    def _renderAlways(self, job, ctx, result):
+        content_item = content_item_from_job(self, job)
         logger.debug("Full render for: %s" % content_item.spec)
         page = self.app.getPage(self.source, content_item)
         prev_entry = ctx.previous_entry
-        cur_entry = result.record_entry
-        self._pagebaker.bake(page, prev_entry, cur_entry)
+        rdr_subs = self._pagebaker.bake(page, prev_entry,
+                                        force=job.get('force_bake'))
+
+        add_page_job_result(result)
+        result['subs'] = rdr_subs
+
+def _get_used_paths_from_records(records):
+    used_paths = {}
+    for rec in records:
+        src_name = rec.name.split('@')[0]
+        for e in rec.getEntries():
+            paths = e.getAllOutputPaths()
+            if paths is not None:
+                for p in paths:
+                    used_paths[p] = (src_name, e)
+    return used_paths