Mercurial > piecrust2
diff piecrust/baking/worker.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | f070a4fc033c |
children | 8d25f76fce98 |
line wrap: on
line diff
--- a/piecrust/baking/worker.py Sun May 21 00:06:59 2017 -0700 +++ b/piecrust/baking/worker.py Sun Jun 04 23:34:28 2017 -0700 @@ -1,9 +1,10 @@ import time import logging -from piecrust.pipelines.base import PipelineContext, PipelineResult +from piecrust.pipelines.base import ( + PipelineManager, PipelineJobRunContext, PipelineJobResult, + get_pipeline_name_for_source) from piecrust.pipelines.records import ( - MultiRecordHistory, MultiRecord, RecordEntry, load_records) -from piecrust.sources.base import ContentItem + MultiRecordHistory, MultiRecord, load_records) from piecrust.workerpool import IWorker @@ -25,7 +26,7 @@ def __init__(self, ctx): self.ctx = ctx self.app = None - self.record_history = None + self.record_histories = None self._work_start_time = time.perf_counter() self._sources = {} self._ppctx = None @@ -51,41 +52,39 @@ else: previous_records = MultiRecord() current_records = MultiRecord() - self.record_history = MultiRecordHistory( + self.record_histories = MultiRecordHistory( previous_records, current_records) - # Cache sources and create pipelines. - ppclasses = {} - for ppclass in app.plugin_loader.getPipelines(): - ppclasses[ppclass.PIPELINE_NAME] = ppclass - - self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history, - worker_id=self.wid, - force=self.ctx.force) + # Create the pipelines. + self.ppmngr = PipelineManager( + app, self.ctx.out_dir, self.record_histories, + worker_id=self.wid, force=self.ctx.force) for src in app.sources: - ppname = src.config['pipeline'] + pname = get_pipeline_name_for_source(src) if (self.ctx.allowed_pipelines is not None and - ppname not in self.ctx.allowed_pipelines): + pname not in self.ctx.allowed_pipelines): continue - pp = ppclasses[ppname](src) - pp.initialize(self._ppctx) - self._sources[src.name] = (src, pp) + self.ppmngr.createPipeline(src) stats.stepTimerSince("BakeWorkerInit", self._work_start_time) def process(self, job): - logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec)) - src, pp = self._sources[job.source_name] - item = ContentItem(job.item_spec, job.item_metadata) + item = job.content_item + logger.debug("Received job: %s@%s" % (job.source_name, item.spec)) + + ppinfo = self.ppmngr.getPipeline(job.source_name) + pp = ppinfo.pipeline - entry_class = pp.RECORD_ENTRY_CLASS or RecordEntry - ppres = PipelineResult() - ppres.pipeline_name = pp.PIPELINE_NAME - ppres.record_entry = entry_class() - ppres.record_entry.item_spec = job.item_spec + ppres = PipelineJobResult() + # For subsequent pass jobs, there will be a record entry given. For + # first pass jobs, there's none so we get the pipeline to create it. + ppres.record_entry = job.data.get('record_entry') + if ppres.record_entry is None: + ppres.record_entry = pp.createRecordEntry(job) - pp.run(item, self._ppctx, ppres) + runctx = PipelineJobRunContext(job, pp, self.record_histories) + pp.run(job, runctx, ppres) return ppres def getStats(self): @@ -98,10 +97,3 @@ for src, pp in self._sources.values(): pp.shutdown(self._ppctx) - -class BakeJob: - def __init__(self, source_name, item_spec, item_metadata): - self.source_name = source_name - self.item_spec = item_spec - self.item_metadata = item_metadata -