changeset 453:8351a77e13f5

bake: Don't pass the previous record entries to the workers. Workers now load the previous record on their own and find the previous entry in their own copy.
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 07 Jul 2015 20:19:54 -0700
parents 55026b7bb1bf
children 96d363e2da4b
files piecrust/baking/baker.py piecrust/baking/worker.py piecrust/workerpool.py
diffstat 3 files changed, 55 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Tue Jul 07 19:58:07 2015 -0700
+++ b/piecrust/baking/baker.py	Tue Jul 07 20:19:54 2015 -0700
@@ -72,7 +72,9 @@
 
         # Figure out if we need to clean the cache because important things
         # have changed.
-        self._handleCacheValidity(record)
+        is_cache_valid = self._handleCacheValidity(record)
+        if not is_cache_valid:
+            previous_record_path = None
 
         # Pre-create all caches.
         for cache_name in ['app', 'baker', 'pages', 'renders']:
@@ -88,7 +90,7 @@
             srclist.append(source)
 
         # Create the worker processes.
-        pool = self._createWorkerPool()
+        pool = self._createWorkerPool(previous_record_path)
 
         # Bake the realms.
         realm_list = [REALM_USER, REALM_THEME]
@@ -179,11 +181,13 @@
             logger.info(format_timed(
                     start_time,
                     "cleaned cache (reason: %s)" % reason))
+            return False
         else:
             record.incremental_count += 1
             logger.debug(format_timed(
                     start_time, "cache is assumed valid",
                     colored=False))
+            return True
 
     def _bakeRealm(self, record, pool, realm, srclist):
         start_time = time.perf_counter()
@@ -511,7 +515,6 @@
                         'factory_info': save_factory(fac),
                         'taxonomy_info': tax_info,
                         'route_metadata': route_metadata,
-                        'prev_entry': prev_entry,
                         'dirty_source_names': record.dirty_source_names
                         }
                 }
@@ -535,12 +538,13 @@
         for e in errors:
             logger.error("  " + e)
 
-    def _createWorkerPool(self):
+    def _createWorkerPool(self, previous_record_path):
         from piecrust.workerpool import WorkerPool
         from piecrust.baking.worker import BakeWorkerContext, BakeWorker
 
         ctx = BakeWorkerContext(
                 self.app.root_dir, self.app.cache.base_dir, self.out_dir,
+                previous_record_path=previous_record_path,
                 force=self.force, debug=self.app.debug)
         pool = WorkerPool(
                 worker_class=BakeWorker,
--- a/piecrust/baking/worker.py	Tue Jul 07 19:58:07 2015 -0700
+++ b/piecrust/baking/worker.py	Tue Jul 07 20:19:54 2015 -0700
@@ -1,6 +1,7 @@
 import time
 import logging
 from piecrust.app import PieCrust
+from piecrust.baking.records import BakeRecord, _get_transition_key
 from piecrust.baking.single import PageBaker, BakingError
 from piecrust.rendering import (
         QualifiedPage, PageRenderingContext, render_page_segments)
@@ -14,12 +15,17 @@
 
 class BakeWorkerContext(object):
     def __init__(self, root_dir, sub_cache_dir, out_dir,
+                 previous_record_path=None,
                  force=False, debug=False):
         self.root_dir = root_dir
         self.sub_cache_dir = sub_cache_dir
         self.out_dir = out_dir
+        self.previous_record_path = previous_record_path
         self.force = force
         self.debug = debug
+        self.app = None
+        self.previous_record = None
+        self.previous_record_index = None
 
 
 class BakeWorker(IWorker):
@@ -35,13 +41,22 @@
         app.env.registerTimer("BakeWorker_%d_Total" % self.wid)
         app.env.registerTimer("BakeWorkerInit")
         app.env.registerTimer("JobReceive")
-        self.app = app
+        self.ctx.app = app
+
+        # Load previous record
+        if self.ctx.previous_record_path:
+            self.ctx.previous_record = BakeRecord.load(
+                    self.ctx.previous_record_path)
+            self.ctx.previous_record_index = {}
+            for e in self.ctx.previous_record.entries:
+                key = _get_transition_key(e.path, e.taxonomy_info)
+                self.ctx.previous_record_index[key] = e
 
         # Create the job handlers.
         job_handlers = {
-                JOB_LOAD: LoadJobHandler(app, self.ctx),
-                JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
-                JOB_BAKE: BakeJobHandler(app, self.ctx)}
+                JOB_LOAD: LoadJobHandler(self.ctx),
+                JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx),
+                JOB_BAKE: BakeJobHandler(self.ctx)}
         for jt, jh in job_handlers.items():
             app.env.registerTimer(type(jh).__name__)
         self.job_handlers = job_handlers
@@ -50,25 +65,28 @@
 
     def process(self, job):
         handler = self.job_handlers[job['type']]
-        with self.app.env.timerScope(type(handler).__name__):
+        with self.ctx.app.env.timerScope(type(handler).__name__):
             return handler.handleJob(job['job'])
 
     def getReport(self):
-        self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
-                                    self.work_start_time)
+        self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
+                                        self.work_start_time)
         return {
                 'type': 'timers',
-                'data': self.app.env._timers}
+                'data': self.ctx.app.env._timers}
 
 
 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)
 
 
 class JobHandler(object):
-    def __init__(self, app, ctx):
-        self.app = app
+    def __init__(self, ctx):
         self.ctx = ctx
 
+    @property
+    def app(self):
+        return self.ctx.app
+
     def handleJob(self, job):
         raise NotImplementedError()
 
@@ -145,9 +163,9 @@
 
 
 class BakeJobHandler(JobHandler):
-    def __init__(self, app, ctx):
-        super(BakeJobHandler, self).__init__(app, ctx)
-        self.page_baker = PageBaker(app, ctx.out_dir, ctx.force)
+    def __init__(self, ctx):
+        super(BakeJobHandler, self).__init__(ctx)
+        self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force)
 
     def handleJob(self, job):
         # Actually bake the page and all its sub-pages to the output folder.
@@ -171,8 +189,13 @@
                 'taxonomy_info': tax_info,
                 'sub_entries': None,
                 'errors': None}
-        previous_entry = job['prev_entry']
         dirty_source_names = job['dirty_source_names']
+
+        previous_entry = None
+        if self.ctx.previous_record_index is not None:
+            key = _get_transition_key(fac.path, tax_info)
+            previous_entry = self.ctx.previous_record_index.get(key)
+
         logger.debug("Baking page: %s" % fac.ref_spec)
         try:
             sub_entries = self.page_baker.bake(
--- a/piecrust/workerpool.py	Tue Jul 07 19:58:07 2015 -0700
+++ b/piecrust/workerpool.py	Tue Jul 07 20:19:54 2015 -0700
@@ -50,7 +50,13 @@
 
     w = params.worker_class(*params.initargs)
     w.wid = wid
-    w.initialize()
+    try:
+        w.initialize()
+    except Exception as ex:
+        logger.error("Working failed to initialize:")
+        logger.exception(ex)
+        params.outqueue.put(None)
+        return
 
     get = params.inqueue.get
     put = params.outqueue.put
@@ -152,6 +158,9 @@
         if self._listener is not None:
             raise Exception("A previous job queue has not finished yet.")
 
+        if any([not p.is_alive() for p in self._pool]):
+            raise Exception("Some workers have prematurely exited.")
+
         if handler is not None:
             self.setHandler(handler)