Mercurial > piecrust2
view piecrust/baking/worker.py @ 1051:971b4d67e82a
serve: Fix problems with assets disappearing between servings.
When an asset file changes, its source's pipeline is re-run. But that created
a bake record that only had that pipeline's output, so the other outputs were
incorrectly considered empty and therefore any stray files were removed. Now we
copy over bake records for the pipelines we don't run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 26 Jan 2018 18:05:02 -0800 |
parents | bd544b65cfad |
children |
line wrap: on
line source
import time import logging from piecrust.pipelines.base import ( PipelineManager, PipelineJobRunContext, get_pipeline_name_for_source) from piecrust.pipelines.records import ( MultiRecordHistory, MultiRecord, load_records) from piecrust.workerpool import IWorker logger = logging.getLogger(__name__) class BakeWorkerContext(object): def __init__(self, appfactory, out_dir, *, force=False, previous_records_path=None, allowed_pipelines=None, forbidden_pipelines=None): self.appfactory = appfactory self.out_dir = out_dir self.force = force self.previous_records_path = previous_records_path self.allowed_pipelines = allowed_pipelines self.forbidden_pipelines = forbidden_pipelines class BakeWorker(IWorker): def __init__(self, ctx): self.ctx = ctx self.app = None self.stats = None self.previous_records = None self._work_start_time = time.perf_counter() self._sources = {} self._ppctx = None def initialize(self): # Create the app local to this worker. app = self.ctx.appfactory.create() app.config.set('baker/is_baking', True) app.config.set('baker/worker_id', self.wid) app.config.set('site/asset_url_format', '%page_uri%/%filename%') app.env.fs_cache_only_for_main_page = True stats = app.env.stats stats.registerTimer("Worker_%d_Total" % self.wid) stats.registerTimer("Worker_%d_Init" % self.wid) self.app = app self.stats = stats # Load previous record if self.ctx.previous_records_path: previous_records = load_records(self.ctx.previous_records_path) else: previous_records = MultiRecord() self.previous_records = previous_records # Create the pipelines. self.ppmngr = PipelineManager( app, self.ctx.out_dir, worker_id=self.wid, force=self.ctx.force) ok_pp = self.ctx.allowed_pipelines nok_pp = self.ctx.forbidden_pipelines for src in app.sources: pname = get_pipeline_name_for_source(src) if ok_pp is not None and pname not in ok_pp: continue if nok_pp is not None and pname in nok_pp: continue self.ppmngr.createPipeline(src) stats.registerTimer("PipelineJobs_%s" % pname, raise_if_registered=False) stats.stepTimerSince( "Worker_%d_Init" % self.wid, self._work_start_time) def process(self, job): source_name, item_spec = job['job_spec'] logger.debug("Received job: %s@%s" % (source_name, item_spec)) # Run the job! job_start = time.perf_counter() pp = self.ppmngr.getPipeline(source_name) runctx = PipelineJobRunContext(job, pp.record_name, self.previous_records) ppres = { 'item_spec': item_spec } pp.run(job, runctx, ppres) # Log time spent in this pipeline. self.stats.stepTimerSince("PipelineJobs_%s" % pp.PIPELINE_NAME, job_start) return ppres def getStats(self): stats = self.app.env.stats stats.stepTimerSince("Worker_%d_Total" % self.wid, self._work_start_time) return stats def shutdown(self): for src, pp in self._sources.values(): pp.shutdown(self._ppctx)