changeset 1015:fa489c5e829e

bake: Load pages in parallel again.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 29 Nov 2017 20:37:57 -0800
parents 071f30aa04bb
children c4cfbbeed72e
files piecrust/baking/baker.py piecrust/baking/worker.py piecrust/pipelines/base.py piecrust/pipelines/page.py
diffstat 4 files changed, 74 insertions(+), 85 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Tue Nov 28 21:28:15 2017 -0800
+++ b/piecrust/baking/baker.py	Wed Nov 29 20:37:57 2017 -0800
@@ -90,19 +90,17 @@
         # Create the pipelines.
         ppmngr = self._createPipelineManager(record_histories)
 
-        # Create the worker processes.
-        pool_userdata = _PoolUserData(self, ppmngr)
-        pool = self._createWorkerPool(records_path, pool_userdata)
-
         # Done with all the setup, let's start the actual work.
         logger.info(format_timed(start_time, "setup baker"))
 
         # Load all sources, pre-cache templates.
         load_start_time = time.perf_counter()
-        self._startPopulateTemplateCaches(pool)
-        self._loadSources(ppmngr)
-        self._endPopulateTemplateCache(pool)
-        logger.info(format_timed(load_start_time, "loaded site content"))
+        self._populateTemplateCaches()
+        logger.info(format_timed(load_start_time, "cache templates"))
+
+        # Create the worker processes.
+        pool_userdata = _PoolUserData(self, ppmngr)
+        pool = self._createWorkerPool(records_path, pool_userdata)
 
         # Bake the realms.
         self._bakeRealms(pool, ppmngr, record_histories)
@@ -208,37 +206,9 @@
                             "out. There's nothing to do.")
         return ppmngr
 
-    def _loadSources(self, ppmngr):
-        for ppinfo in ppmngr.getPipelineInfos():
-            rec = ppinfo.record_history.current
-            rec_entries = ppinfo.pipeline.loadAllContents()
-            if rec_entries is not None:
-                for e in rec_entries:
-                    rec.addEntry(e)
-
-    def _startPopulateTemplateCaches(self, pool):
-        # If we can, cache templates in a worker process, so we can load
-        # the sources' pages in the main process in the meantime.
-        # But if we don't have any workers, well, we'll have to make do
-        # in the `_endPopulateTemplateCache` method.
-        if pool.pool_size == 0:
-            return
-
-        pool._callback = None
-        pool._error_callback = None
-        job = {'job_spec': ('__special__', 'populate_template_cache')}
-        pool.queueJobs([job])
-
-    def _endPopulateTemplateCache(self, pool):
-        if pool.pool_size == 0:
-            # No workers... load the templates synchronously.
-            for eng in self.app.plugin_loader.getTemplateEngines():
-                eng.populateCache()
-        else:
-            # Wait for the job to finish.
-            pool.wait()
-            pool._callback = self._handleWorkerResult
-            pool._error_callback = self._handleWorkerError
+    def _populateTemplateCaches(self):
+        for eng in self.app.plugin_loader.getTemplateEngines():
+            eng.populateCache()
 
     def _bakeRealms(self, pool, ppmngr, record_histories):
         # Bake the realms -- user first, theme second, so that a user item
@@ -261,6 +231,7 @@
                    pp_pass_num, realm, pplist):
         # Start with the first step, where we iterate on the content sources'
         # items and run jobs on those.
+        pool.userdata.cur_pass = pp_pass_num
         pool.userdata.cur_step = 0
         next_step_jobs = {}
         pool.userdata.next_step_jobs = next_step_jobs
@@ -381,6 +352,7 @@
         return pool
 
     def _handleWorkerResult(self, job, res, userdata):
+        cur_pass = userdata.cur_pass
         cur_step = userdata.cur_step
         source_name, item_spec = job['job_spec']
 
@@ -394,7 +366,8 @@
         ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
         pipeline = ppinfo.pipeline
         record = ppinfo.current_record
-        ppmrctx = PipelineJobResultHandleContext(record, job, cur_step)
+        ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass,
+                                                 cur_step)
         pipeline.handleJobResult(res, ppmrctx)
 
         # Set the overall success flags if there was an error.
@@ -430,6 +403,7 @@
         self.baker = baker
         self.ppmngr = ppmngr
         self.records = ppmngr.record_histories.current
+        self.cur_pass = 0
         self.cur_step = 0
         self.next_step_jobs = {}
 
--- a/piecrust/baking/worker.py	Tue Nov 28 21:28:15 2017 -0800
+++ b/piecrust/baking/worker.py	Wed Nov 29 20:37:57 2017 -0800
@@ -80,15 +80,6 @@
         source_name, item_spec = job['job_spec']
         logger.debug("Received job: %s@%s" % (source_name, item_spec))
 
-        # Check for special jobs.
-        if source_name == '__special__':
-            if item_spec == 'populate_template_cache':
-                for eng in self.app.plugin_loader.getTemplateEngines():
-                    eng.populateCache()
-            else:
-                raise Exception("Unknown special job: %s" % item_spec)
-            return {}
-
         # Run the job!
         job_start = time.perf_counter()
         pp = self.ppmngr.getPipeline(source_name)
--- a/piecrust/pipelines/base.py	Tue Nov 28 21:28:15 2017 -0800
+++ b/piecrust/pipelines/base.py	Wed Nov 29 20:37:57 2017 -0800
@@ -103,9 +103,10 @@
         This is run on the master process, so it can access the current
         record.
     """
-    def __init__(self, record, job, step_num):
+    def __init__(self, record, job, pass_num, step_num):
         self.record = record
         self.job = job
+        self.pass_num = pass_num
         self.step_num = step_num
 
     @cached_property
@@ -156,9 +157,6 @@
     def initialize(self):
         pass
 
-    def loadAllContents(self):
-        return None
-
     def createJobs(self, ctx):
         return [
             create_job(self, item.spec)
--- a/piecrust/pipelines/page.py	Tue Nov 28 21:28:15 2017 -0800
+++ b/piecrust/pipelines/page.py	Wed Nov 29 20:37:57 2017 -0800
@@ -16,7 +16,7 @@
 class PagePipeline(ContentPipeline):
     PIPELINE_NAME = 'page'
     RECORD_ENTRY_CLASS = PagePipelineRecordEntry
-    PASS_NUM = [0, 1]
+    PASS_NUM = [0, 1, 2]
 
     def __init__(self, source, ppctx):
         super().__init__(source, ppctx)
@@ -34,34 +34,24 @@
                                     force=self.ctx.force)
         self._pagebaker.startWriterQueue()
 
-    def loadAllContents(self):
+    def createJobs(self, ctx):
+        if ctx.pass_num == 0:
+            return self._createLoadJobs(ctx)
+        if ctx.pass_num == 1:
+            return self._createSecondPassJobs(ctx)
+        return self._createThirdPassJobs(ctx)
+
+    def _createLoadJobs(self, ctx):
         # Here we load all the pages in the source, making sure they all
         # have a valid cache for their configuration and contents.
-        # We also create the record entries while we're at it.
-        source = self.source
-        page_fac = self.app.getPage
-        record_fac = self.createRecordEntry
-        for item in source.getAllContents():
-            page = page_fac(source, item)
-
-            cur_entry = record_fac(item.spec)
-            cur_entry.config = page.config.getAll()
-            cur_entry.route_params = item.metadata['route_params']
-            cur_entry.timestamp = page.datetime.timestamp()
+        jobs = []
+        for item in self.source.getAllContents():
+            jobs.append(create_job(self, item.spec))
+        if len(jobs) > 0:
+            return jobs
+        return None
 
-            if page.was_modified:
-                cur_entry.flags |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
-            if page.config.get(self._draft_setting):
-                cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
-
-            yield cur_entry
-
-    def createJobs(self, ctx):
-        if ctx.pass_num == 0:
-            return self._createFirstPassJobs(ctx)
-        return self._createSecondPassJobs(ctx)
-
-    def _createFirstPassJobs(self, ctx):
+    def _createSecondPassJobs(self, ctx):
         jobs = []
 
         app = self.app
@@ -125,7 +115,7 @@
             return jobs
         return None
 
-    def _createSecondPassJobs(self, ctx):
+    def _createThirdPassJobs(self, ctx):
         # Get the list of all sources that had anything baked.
         dirty_source_names = set()
         all_records = ctx.record_histories.current.records
@@ -171,21 +161,38 @@
         return None
 
     def handleJobResult(self, result, ctx):
-        existing = ctx.record_entry
-        merge_job_result_into_record_entry(existing, result)
-        if existing.was_any_sub_baked:
-            ctx.record.user_data['dirty_source_names'].add(self.source.name)
+        step_num = ctx.step_num
+
+        if step_num == 0:
+            print(result)
+            new_entry = self.createRecordEntry(result['item_spec'])
+            new_entry.config = result['config']
+            new_entry.route_params = result['route_params']
+            new_entry.timestamp = result['timestamp']
+            ctx.record.addEntry(new_entry)
+        else:
+            existing = ctx.record_entry
+            merge_job_result_into_record_entry(existing, result)
+
+            if existing.was_any_sub_baked:
+                ctx.record.user_data['dirty_source_names'].add(self.source.name)
 
     def run(self, job, ctx, result):
         pass_num = job.get('pass_num', 0)
         step_num = job.get('step_num', 0)
         if pass_num == 0:
             if step_num == 0:
-                self._renderOrPostpone(job, ctx, result)
+                self._loadPage(job, ctx, result)
             elif step_num == 1:
+                self._renderOrPostpone(job, ctx, result)
+            elif step_num == 2:
                 self._renderAlways(job, ctx, result)
+            else:
+                raise Exception("Unexpected pipeline step: %d" % step_num)
         elif pass_num == 1:
             self._renderAlways(job, ctx, result)
+        else:
+            raise Exception("Unexpected pipeline pass: %d" % pass_num)
 
     def getDeletions(self, ctx):
         for prev, cur in ctx.record_history.diffs:
@@ -205,6 +212,25 @@
     def shutdown(self):
         self._pagebaker.stopWriterQueue()
 
+    def _loadPage(self, job, ctx, result):
+        content_item = content_item_from_job(self, job)
+        page = self.app.getPage(self.source, content_item)
+
+        trigger_next_job = True
+        result['flags'] = PagePipelineRecordEntry.FLAG_NONE
+        result['config'] = page.config.getAll()
+        result['route_params'] = item.metadata['route_params']
+        result['timestamp'] = page.datetime.timestamp()
+
+        if page.was_modified:
+            result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
+        if page.config.get(self._draft_setting):
+            result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT
+            trigger_next_job = False
+
+        if trigger_next_job:
+            result['next_step_job'] = create_job(self, content_item.spec)
+
     def _renderOrPostpone(self, job, ctx, result):
         # Here our job is to render the page's segments so that they're
         # cached in memory and on disk... unless we detect that the page