comparison piecrust/baking/worker.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents f070a4fc033c
children 8d25f76fce98
comparison
equal deleted inserted replaced
853:f070a4fc033c 854:08e02c2a2a1a
1 import time 1 import time
2 import logging 2 import logging
3 from piecrust.pipelines.base import PipelineContext, PipelineResult 3 from piecrust.pipelines.base import (
4 PipelineManager, PipelineJobRunContext, PipelineJobResult,
5 get_pipeline_name_for_source)
4 from piecrust.pipelines.records import ( 6 from piecrust.pipelines.records import (
5 MultiRecordHistory, MultiRecord, RecordEntry, load_records) 7 MultiRecordHistory, MultiRecord, load_records)
6 from piecrust.sources.base import ContentItem
7 from piecrust.workerpool import IWorker 8 from piecrust.workerpool import IWorker
8 9
9 10
10 logger = logging.getLogger(__name__) 11 logger = logging.getLogger(__name__)
11 12
23 24
24 class BakeWorker(IWorker): 25 class BakeWorker(IWorker):
25 def __init__(self, ctx): 26 def __init__(self, ctx):
26 self.ctx = ctx 27 self.ctx = ctx
27 self.app = None 28 self.app = None
28 self.record_history = None 29 self.record_histories = None
29 self._work_start_time = time.perf_counter() 30 self._work_start_time = time.perf_counter()
30 self._sources = {} 31 self._sources = {}
31 self._ppctx = None 32 self._ppctx = None
32 33
33 def initialize(self): 34 def initialize(self):
49 if self.ctx.previous_records_path: 50 if self.ctx.previous_records_path:
50 previous_records = load_records(self.ctx.previous_records_path) 51 previous_records = load_records(self.ctx.previous_records_path)
51 else: 52 else:
52 previous_records = MultiRecord() 53 previous_records = MultiRecord()
53 current_records = MultiRecord() 54 current_records = MultiRecord()
54 self.record_history = MultiRecordHistory( 55 self.record_histories = MultiRecordHistory(
55 previous_records, current_records) 56 previous_records, current_records)
56 57
57 # Cache sources and create pipelines. 58 # Create the pipelines.
58 ppclasses = {} 59 self.ppmngr = PipelineManager(
59 for ppclass in app.plugin_loader.getPipelines(): 60 app, self.ctx.out_dir, self.record_histories,
60 ppclasses[ppclass.PIPELINE_NAME] = ppclass 61 worker_id=self.wid, force=self.ctx.force)
61
62 self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history,
63 worker_id=self.wid,
64 force=self.ctx.force)
65 for src in app.sources: 62 for src in app.sources:
66 ppname = src.config['pipeline'] 63 pname = get_pipeline_name_for_source(src)
67 if (self.ctx.allowed_pipelines is not None and 64 if (self.ctx.allowed_pipelines is not None and
68 ppname not in self.ctx.allowed_pipelines): 65 pname not in self.ctx.allowed_pipelines):
69 continue 66 continue
70 67
71 pp = ppclasses[ppname](src) 68 self.ppmngr.createPipeline(src)
72 pp.initialize(self._ppctx)
73 self._sources[src.name] = (src, pp)
74 69
75 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) 70 stats.stepTimerSince("BakeWorkerInit", self._work_start_time)
76 71
77 def process(self, job): 72 def process(self, job):
78 logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec)) 73 item = job.content_item
79 src, pp = self._sources[job.source_name] 74 logger.debug("Received job: %s@%s" % (job.source_name, item.spec))
80 item = ContentItem(job.item_spec, job.item_metadata)
81 75
82 entry_class = pp.RECORD_ENTRY_CLASS or RecordEntry 76 ppinfo = self.ppmngr.getPipeline(job.source_name)
83 ppres = PipelineResult() 77 pp = ppinfo.pipeline
84 ppres.pipeline_name = pp.PIPELINE_NAME
85 ppres.record_entry = entry_class()
86 ppres.record_entry.item_spec = job.item_spec
87 78
88 pp.run(item, self._ppctx, ppres) 79 ppres = PipelineJobResult()
80 # For subsequent pass jobs, there will be a record entry given. For
81 # first pass jobs, there's none so we get the pipeline to create it.
82 ppres.record_entry = job.data.get('record_entry')
83 if ppres.record_entry is None:
84 ppres.record_entry = pp.createRecordEntry(job)
85
86 runctx = PipelineJobRunContext(job, pp, self.record_histories)
87 pp.run(job, runctx, ppres)
89 return ppres 88 return ppres
90 89
91 def getStats(self): 90 def getStats(self):
92 stats = self.app.env.stats 91 stats = self.app.env.stats
93 stats.stepTimerSince("BakeWorker_%d_Total" % self.wid, 92 stats.stepTimerSince("BakeWorker_%d_Total" % self.wid,
96 95
97 def shutdown(self): 96 def shutdown(self):
98 for src, pp in self._sources.values(): 97 for src, pp in self._sources.values():
99 pp.shutdown(self._ppctx) 98 pp.shutdown(self._ppctx)
100 99
101
102 class BakeJob:
103 def __init__(self, source_name, item_spec, item_metadata):
104 self.source_name = source_name
105 self.item_spec = item_spec
106 self.item_metadata = item_metadata
107