# HG changeset patch # User Ludovic Chabant # Date 1436325594 25200 # Node ID 8351a77e13f53da76cc397de31e6163d0c938467 # Parent 55026b7bb1bfde5b93083d09d9870d33484b3039 bake: Don't pass the previous record entries to the workers. Workers now load the previous record on their own and find the previous entry in their own copy. diff -r 55026b7bb1bf -r 8351a77e13f5 piecrust/baking/baker.py --- a/piecrust/baking/baker.py Tue Jul 07 19:58:07 2015 -0700 +++ b/piecrust/baking/baker.py Tue Jul 07 20:19:54 2015 -0700 @@ -72,7 +72,9 @@ # Figure out if we need to clean the cache because important things # have changed. - self._handleCacheValidity(record) + is_cache_valid = self._handleCacheValidity(record) + if not is_cache_valid: + previous_record_path = None # Pre-create all caches. for cache_name in ['app', 'baker', 'pages', 'renders']: @@ -88,7 +90,7 @@ srclist.append(source) # Create the worker processes. - pool = self._createWorkerPool() + pool = self._createWorkerPool(previous_record_path) # Bake the realms. realm_list = [REALM_USER, REALM_THEME] @@ -179,11 +181,13 @@ logger.info(format_timed( start_time, "cleaned cache (reason: %s)" % reason)) + return False else: record.incremental_count += 1 logger.debug(format_timed( start_time, "cache is assumed valid", colored=False)) + return True def _bakeRealm(self, record, pool, realm, srclist): start_time = time.perf_counter() @@ -511,7 +515,6 @@ 'factory_info': save_factory(fac), 'taxonomy_info': tax_info, 'route_metadata': route_metadata, - 'prev_entry': prev_entry, 'dirty_source_names': record.dirty_source_names } } @@ -535,12 +538,13 @@ for e in errors: logger.error(" " + e) - def _createWorkerPool(self): + def _createWorkerPool(self, previous_record_path): from piecrust.workerpool import WorkerPool from piecrust.baking.worker import BakeWorkerContext, BakeWorker ctx = BakeWorkerContext( self.app.root_dir, self.app.cache.base_dir, self.out_dir, + previous_record_path=previous_record_path, force=self.force, debug=self.app.debug) pool = WorkerPool( worker_class=BakeWorker, diff -r 55026b7bb1bf -r 8351a77e13f5 piecrust/baking/worker.py --- a/piecrust/baking/worker.py Tue Jul 07 19:58:07 2015 -0700 +++ b/piecrust/baking/worker.py Tue Jul 07 20:19:54 2015 -0700 @@ -1,6 +1,7 @@ import time import logging from piecrust.app import PieCrust +from piecrust.baking.records import BakeRecord, _get_transition_key from piecrust.baking.single import PageBaker, BakingError from piecrust.rendering import ( QualifiedPage, PageRenderingContext, render_page_segments) @@ -14,12 +15,17 @@ class BakeWorkerContext(object): def __init__(self, root_dir, sub_cache_dir, out_dir, + previous_record_path=None, force=False, debug=False): self.root_dir = root_dir self.sub_cache_dir = sub_cache_dir self.out_dir = out_dir + self.previous_record_path = previous_record_path self.force = force self.debug = debug + self.app = None + self.previous_record = None + self.previous_record_index = None class BakeWorker(IWorker): @@ -35,13 +41,22 @@ app.env.registerTimer("BakeWorker_%d_Total" % self.wid) app.env.registerTimer("BakeWorkerInit") app.env.registerTimer("JobReceive") - self.app = app + self.ctx.app = app + + # Load previous record + if self.ctx.previous_record_path: + self.ctx.previous_record = BakeRecord.load( + self.ctx.previous_record_path) + self.ctx.previous_record_index = {} + for e in self.ctx.previous_record.entries: + key = _get_transition_key(e.path, e.taxonomy_info) + self.ctx.previous_record_index[key] = e # Create the job handlers. job_handlers = { - JOB_LOAD: LoadJobHandler(app, self.ctx), - JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), - JOB_BAKE: BakeJobHandler(app, self.ctx)} + JOB_LOAD: LoadJobHandler(self.ctx), + JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx), + JOB_BAKE: BakeJobHandler(self.ctx)} for jt, jh in job_handlers.items(): app.env.registerTimer(type(jh).__name__) self.job_handlers = job_handlers @@ -50,25 +65,28 @@ def process(self, job): handler = self.job_handlers[job['type']] - with self.app.env.timerScope(type(handler).__name__): + with self.ctx.app.env.timerScope(type(handler).__name__): return handler.handleJob(job['job']) def getReport(self): - self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, - self.work_start_time) + self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, + self.work_start_time) return { 'type': 'timers', - 'data': self.app.env._timers} + 'data': self.ctx.app.env._timers} JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) class JobHandler(object): - def __init__(self, app, ctx): - self.app = app + def __init__(self, ctx): self.ctx = ctx + @property + def app(self): + return self.ctx.app + def handleJob(self, job): raise NotImplementedError() @@ -145,9 +163,9 @@ class BakeJobHandler(JobHandler): - def __init__(self, app, ctx): - super(BakeJobHandler, self).__init__(app, ctx) - self.page_baker = PageBaker(app, ctx.out_dir, ctx.force) + def __init__(self, ctx): + super(BakeJobHandler, self).__init__(ctx) + self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force) def handleJob(self, job): # Actually bake the page and all its sub-pages to the output folder. @@ -171,8 +189,13 @@ 'taxonomy_info': tax_info, 'sub_entries': None, 'errors': None} - previous_entry = job['prev_entry'] dirty_source_names = job['dirty_source_names'] + + previous_entry = None + if self.ctx.previous_record_index is not None: + key = _get_transition_key(fac.path, tax_info) + previous_entry = self.ctx.previous_record_index.get(key) + logger.debug("Baking page: %s" % fac.ref_spec) try: sub_entries = self.page_baker.bake( diff -r 55026b7bb1bf -r 8351a77e13f5 piecrust/workerpool.py --- a/piecrust/workerpool.py Tue Jul 07 19:58:07 2015 -0700 +++ b/piecrust/workerpool.py Tue Jul 07 20:19:54 2015 -0700 @@ -50,7 +50,13 @@ w = params.worker_class(*params.initargs) w.wid = wid - w.initialize() + try: + w.initialize() + except Exception as ex: + logger.error("Working failed to initialize:") + logger.exception(ex) + params.outqueue.put(None) + return get = params.inqueue.get put = params.outqueue.put @@ -152,6 +158,9 @@ if self._listener is not None: raise Exception("A previous job queue has not finished yet.") + if any([not p.is_alive() for p in self._pool]): + raise Exception("Some workers have prematurely exited.") + if handler is not None: self.setHandler(handler)