diff piecrust/pipelines/page.py @ 1015:fa489c5e829e

bake: Load pages in parallel again.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 29 Nov 2017 20:37:57 -0800
parents 1857dbd4580f
children 3c6e6e7b9639
line wrap: on
line diff
--- a/piecrust/pipelines/page.py	Tue Nov 28 21:28:15 2017 -0800
+++ b/piecrust/pipelines/page.py	Wed Nov 29 20:37:57 2017 -0800
@@ -16,7 +16,7 @@
 class PagePipeline(ContentPipeline):
     PIPELINE_NAME = 'page'
     RECORD_ENTRY_CLASS = PagePipelineRecordEntry
-    PASS_NUM = [0, 1]
+    PASS_NUM = [0, 1, 2]
 
     def __init__(self, source, ppctx):
         super().__init__(source, ppctx)
@@ -34,34 +34,24 @@
                                     force=self.ctx.force)
         self._pagebaker.startWriterQueue()
 
-    def loadAllContents(self):
+    def createJobs(self, ctx):
+        if ctx.pass_num == 0:
+            return self._createLoadJobs(ctx)
+        if ctx.pass_num == 1:
+            return self._createSecondPassJobs(ctx)
+        return self._createThirdPassJobs(ctx)
+
+    def _createLoadJobs(self, ctx):
         # Here we load all the pages in the source, making sure they all
         # have a valid cache for their configuration and contents.
-        # We also create the record entries while we're at it.
-        source = self.source
-        page_fac = self.app.getPage
-        record_fac = self.createRecordEntry
-        for item in source.getAllContents():
-            page = page_fac(source, item)
-
-            cur_entry = record_fac(item.spec)
-            cur_entry.config = page.config.getAll()
-            cur_entry.route_params = item.metadata['route_params']
-            cur_entry.timestamp = page.datetime.timestamp()
+        jobs = []
+        for item in self.source.getAllContents():
+            jobs.append(create_job(self, item.spec))
+        if len(jobs) > 0:
+            return jobs
+        return None
 
-            if page.was_modified:
-                cur_entry.flags |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
-            if page.config.get(self._draft_setting):
-                cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
-
-            yield cur_entry
-
-    def createJobs(self, ctx):
-        if ctx.pass_num == 0:
-            return self._createFirstPassJobs(ctx)
-        return self._createSecondPassJobs(ctx)
-
-    def _createFirstPassJobs(self, ctx):
+    def _createSecondPassJobs(self, ctx):
         jobs = []
 
         app = self.app
@@ -125,7 +115,7 @@
             return jobs
         return None
 
-    def _createSecondPassJobs(self, ctx):
+    def _createThirdPassJobs(self, ctx):
         # Get the list of all sources that had anything baked.
         dirty_source_names = set()
         all_records = ctx.record_histories.current.records
@@ -171,21 +161,38 @@
         return None
 
     def handleJobResult(self, result, ctx):
-        existing = ctx.record_entry
-        merge_job_result_into_record_entry(existing, result)
-        if existing.was_any_sub_baked:
-            ctx.record.user_data['dirty_source_names'].add(self.source.name)
+        step_num = ctx.step_num
+
+        if step_num == 0:
+            print(result)
+            new_entry = self.createRecordEntry(result['item_spec'])
+            new_entry.config = result['config']
+            new_entry.route_params = result['route_params']
+            new_entry.timestamp = result['timestamp']
+            ctx.record.addEntry(new_entry)
+        else:
+            existing = ctx.record_entry
+            merge_job_result_into_record_entry(existing, result)
+
+            if existing.was_any_sub_baked:
+                ctx.record.user_data['dirty_source_names'].add(self.source.name)
 
     def run(self, job, ctx, result):
         pass_num = job.get('pass_num', 0)
         step_num = job.get('step_num', 0)
         if pass_num == 0:
             if step_num == 0:
-                self._renderOrPostpone(job, ctx, result)
+                self._loadPage(job, ctx, result)
             elif step_num == 1:
+                self._renderOrPostpone(job, ctx, result)
+            elif step_num == 2:
                 self._renderAlways(job, ctx, result)
+            else:
+                raise Exception("Unexpected pipeline step: %d" % step_num)
         elif pass_num == 1:
             self._renderAlways(job, ctx, result)
+        else:
+            raise Exception("Unexpected pipeline pass: %d" % pass_num)
 
     def getDeletions(self, ctx):
         for prev, cur in ctx.record_history.diffs:
@@ -205,6 +212,25 @@
     def shutdown(self):
         self._pagebaker.stopWriterQueue()
 
+    def _loadPage(self, job, ctx, result):
+        content_item = content_item_from_job(self, job)
+        page = self.app.getPage(self.source, content_item)
+
+        trigger_next_job = True
+        result['flags'] = PagePipelineRecordEntry.FLAG_NONE
+        result['config'] = page.config.getAll()
+        result['route_params'] = item.metadata['route_params']
+        result['timestamp'] = page.datetime.timestamp()
+
+        if page.was_modified:
+            result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
+        if page.config.get(self._draft_setting):
+            result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT
+            trigger_next_job = False
+
+        if trigger_next_job:
+            result['next_step_job'] = create_job(self, content_item.spec)
+
     def _renderOrPostpone(self, job, ctx, result):
         # Here our job is to render the page's segments so that they're
         # cached in memory and on disk... unless we detect that the page