Mercurial > piecrust2
comparison piecrust/baking/worker.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | f070a4fc033c |
children | 8d25f76fce98 |
comparison
equal
deleted
inserted
replaced
853:f070a4fc033c | 854:08e02c2a2a1a |
---|---|
1 import time | 1 import time |
2 import logging | 2 import logging |
3 from piecrust.pipelines.base import PipelineContext, PipelineResult | 3 from piecrust.pipelines.base import ( |
4 PipelineManager, PipelineJobRunContext, PipelineJobResult, | |
5 get_pipeline_name_for_source) | |
4 from piecrust.pipelines.records import ( | 6 from piecrust.pipelines.records import ( |
5 MultiRecordHistory, MultiRecord, RecordEntry, load_records) | 7 MultiRecordHistory, MultiRecord, load_records) |
6 from piecrust.sources.base import ContentItem | |
7 from piecrust.workerpool import IWorker | 8 from piecrust.workerpool import IWorker |
8 | 9 |
9 | 10 |
10 logger = logging.getLogger(__name__) | 11 logger = logging.getLogger(__name__) |
11 | 12 |
23 | 24 |
24 class BakeWorker(IWorker): | 25 class BakeWorker(IWorker): |
25 def __init__(self, ctx): | 26 def __init__(self, ctx): |
26 self.ctx = ctx | 27 self.ctx = ctx |
27 self.app = None | 28 self.app = None |
28 self.record_history = None | 29 self.record_histories = None |
29 self._work_start_time = time.perf_counter() | 30 self._work_start_time = time.perf_counter() |
30 self._sources = {} | 31 self._sources = {} |
31 self._ppctx = None | 32 self._ppctx = None |
32 | 33 |
33 def initialize(self): | 34 def initialize(self): |
49 if self.ctx.previous_records_path: | 50 if self.ctx.previous_records_path: |
50 previous_records = load_records(self.ctx.previous_records_path) | 51 previous_records = load_records(self.ctx.previous_records_path) |
51 else: | 52 else: |
52 previous_records = MultiRecord() | 53 previous_records = MultiRecord() |
53 current_records = MultiRecord() | 54 current_records = MultiRecord() |
54 self.record_history = MultiRecordHistory( | 55 self.record_histories = MultiRecordHistory( |
55 previous_records, current_records) | 56 previous_records, current_records) |
56 | 57 |
57 # Cache sources and create pipelines. | 58 # Create the pipelines. |
58 ppclasses = {} | 59 self.ppmngr = PipelineManager( |
59 for ppclass in app.plugin_loader.getPipelines(): | 60 app, self.ctx.out_dir, self.record_histories, |
60 ppclasses[ppclass.PIPELINE_NAME] = ppclass | 61 worker_id=self.wid, force=self.ctx.force) |
61 | |
62 self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history, | |
63 worker_id=self.wid, | |
64 force=self.ctx.force) | |
65 for src in app.sources: | 62 for src in app.sources: |
66 ppname = src.config['pipeline'] | 63 pname = get_pipeline_name_for_source(src) |
67 if (self.ctx.allowed_pipelines is not None and | 64 if (self.ctx.allowed_pipelines is not None and |
68 ppname not in self.ctx.allowed_pipelines): | 65 pname not in self.ctx.allowed_pipelines): |
69 continue | 66 continue |
70 | 67 |
71 pp = ppclasses[ppname](src) | 68 self.ppmngr.createPipeline(src) |
72 pp.initialize(self._ppctx) | |
73 self._sources[src.name] = (src, pp) | |
74 | 69 |
75 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) | 70 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) |
76 | 71 |
77 def process(self, job): | 72 def process(self, job): |
78 logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec)) | 73 item = job.content_item |
79 src, pp = self._sources[job.source_name] | 74 logger.debug("Received job: %s@%s" % (job.source_name, item.spec)) |
80 item = ContentItem(job.item_spec, job.item_metadata) | |
81 | 75 |
82 entry_class = pp.RECORD_ENTRY_CLASS or RecordEntry | 76 ppinfo = self.ppmngr.getPipeline(job.source_name) |
83 ppres = PipelineResult() | 77 pp = ppinfo.pipeline |
84 ppres.pipeline_name = pp.PIPELINE_NAME | |
85 ppres.record_entry = entry_class() | |
86 ppres.record_entry.item_spec = job.item_spec | |
87 | 78 |
88 pp.run(item, self._ppctx, ppres) | 79 ppres = PipelineJobResult() |
80 # For subsequent pass jobs, there will be a record entry given. For | |
81 # first pass jobs, there's none so we get the pipeline to create it. | |
82 ppres.record_entry = job.data.get('record_entry') | |
83 if ppres.record_entry is None: | |
84 ppres.record_entry = pp.createRecordEntry(job) | |
85 | |
86 runctx = PipelineJobRunContext(job, pp, self.record_histories) | |
87 pp.run(job, runctx, ppres) | |
89 return ppres | 88 return ppres |
90 | 89 |
91 def getStats(self): | 90 def getStats(self): |
92 stats = self.app.env.stats | 91 stats = self.app.env.stats |
93 stats.stepTimerSince("BakeWorker_%d_Total" % self.wid, | 92 stats.stepTimerSince("BakeWorker_%d_Total" % self.wid, |
96 | 95 |
97 def shutdown(self): | 96 def shutdown(self): |
98 for src, pp in self._sources.values(): | 97 for src, pp in self._sources.values(): |
99 pp.shutdown(self._ppctx) | 98 pp.shutdown(self._ppctx) |
100 | 99 |
101 | |
102 class BakeJob: | |
103 def __init__(self, source_name, item_spec, item_metadata): | |
104 self.source_name = source_name | |
105 self.item_spec = item_spec | |
106 self.item_metadata = item_metadata | |
107 |