Mercurial > piecrust2
diff piecrust/baking/worker.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | bf65a1a6992a |
children | 2e5c5d33d62c |
line wrap: on
line diff
--- a/piecrust/baking/worker.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/baking/worker.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,7 +1,7 @@ import time import logging from piecrust.pipelines.base import ( - PipelineManager, PipelineJobRunContext, PipelineJobResult, + PipelineManager, PipelineJobRunContext, get_pipeline_name_for_source) from piecrust.pipelines.records import ( MultiRecordHistory, MultiRecord, load_records) @@ -27,7 +27,8 @@ def __init__(self, ctx): self.ctx = ctx self.app = None - self.record_histories = None + self.stats = None + self.previous_records = None self._work_start_time = time.perf_counter() self._sources = {} self._ppctx = None @@ -44,22 +45,20 @@ stats = app.env.stats stats.registerTimer("BakeWorker_%d_Total" % self.wid) stats.registerTimer("BakeWorkerInit") - self.timerScope = stats.timerScope self.app = app + self.stats = stats # Load previous record if self.ctx.previous_records_path: previous_records = load_records(self.ctx.previous_records_path) else: previous_records = MultiRecord() - current_records = MultiRecord() - self.record_histories = MultiRecordHistory( - previous_records, current_records) + self.previous_records = previous_records # Create the pipelines. self.ppmngr = PipelineManager( - app, self.ctx.out_dir, self.record_histories, + app, self.ctx.out_dir, worker_id=self.wid, force=self.ctx.force) ok_pp = self.ctx.allowed_pipelines nok_pp = self.ctx.forbidden_pipelines @@ -78,24 +77,22 @@ stats.stepTimerSince("BakeWorkerInit", self._work_start_time) def process(self, job): - item = job.content_item - logger.debug("Received job: %s@%s" % (job.source_name, item.spec)) - - ppinfo = self.ppmngr.getPipeline(job.source_name) - pp = ppinfo.pipeline + source_name, item_spec = job['job_spec'] + logger.debug("Received job: %s@%s" % (source_name, item_spec)) - with self.timerScope("PipelineJobs_%s" % pp.PIPELINE_NAME): - runctx = PipelineJobRunContext(job, pp.record_name, - self.record_histories) + # Run the job! + job_start = time.perf_counter() + pp = self.ppmngr.getPipeline(source_name) + runctx = PipelineJobRunContext(job, pp.record_name, + self.previous_records) + ppres = { + 'item_spec': item_spec + } + pp.run(job, runctx, ppres) - ppres = PipelineJobResult() - # For subsequent pass jobs, there will be a record entry given. - # For first pass jobs, there's none so we get the pipeline to - # create it. - ppres.record_entry = job.data.get('record_entry') - if ppres.record_entry is None: - ppres.record_entry = pp.createRecordEntry(job, runctx) - pp.run(job, runctx, ppres) + # Log time spent in this pipeline. + self.stats.stepTimerSince("PipelineJobs_%s" % pp.PIPELINE_NAME, + job_start) return ppres