Mercurial > piecrust2
comparison piecrust/baking/worker.py @ 852:4850f8c21b6e
core: Start of the big refactor for PieCrust 3.0.
* Everything is a `ContentSource`, including assets directories.
* Most content sources are subclasses of the base file-system source.
* A source is processed by a "pipeline", and there are 2 built-in pipelines,
one for assets and one for pages. The asset pipeline is vaguely functional,
but the page pipeline is completely broken right now.
* Rewrite the baking process as just running appropriate pipelines on each
content item. This should allow for better parallelization.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 17 May 2017 00:11:48 -0700 |
parents | c3cb2f9df882 |
children | f070a4fc033c |
comparison
equal
deleted
inserted
replaced
851:2c7e57d80bba | 852:4850f8c21b6e |
---|---|
1 import time | 1 import time |
2 import logging | 2 import logging |
3 from piecrust.app import PieCrust, apply_variant_and_values | 3 from piecrust.pipelines.base import PipelineContext, PipelineResult |
4 from piecrust.baking.records import BakeRecord, _get_transition_key | 4 from piecrust.pipelines.records import ( |
5 from piecrust.baking.single import PageBaker, BakingError | 5 MultiRecordHistory, MultiRecord, Record, load_records) |
6 from piecrust.environment import AbortedSourceUseError | 6 from piecrust.sources.base import ContentItem |
7 from piecrust.rendering import ( | |
8 QualifiedPage, PageRenderingContext, render_page_segments) | |
9 from piecrust.routing import create_route_metadata | |
10 from piecrust.sources.base import PageFactory | |
11 from piecrust.workerpool import IWorker | 7 from piecrust.workerpool import IWorker |
12 | 8 |
13 | 9 |
14 logger = logging.getLogger(__name__) | 10 logger = logging.getLogger(__name__) |
15 | 11 |
16 | 12 |
17 class BakeWorkerContext(object): | 13 class BakeWorkerContext(object): |
18 def __init__(self, appfactory, out_dir, *, | 14 def __init__(self, appfactory, out_dir, *, |
19 force=False, previous_record_path=None): | 15 force=False, previous_records_path=None, |
16 allowed_pipelines=None): | |
20 self.appfactory = appfactory | 17 self.appfactory = appfactory |
21 self.out_dir = out_dir | 18 self.out_dir = out_dir |
22 self.force = force | 19 self.force = force |
23 self.previous_record_path = previous_record_path | 20 self.previous_records_path = previous_records_path |
24 self.app = None | 21 self.allowed_pipelines = allowed_pipelines |
25 self.previous_record = None | |
26 self.previous_record_index = None | |
27 | 22 |
28 | 23 |
29 class BakeWorker(IWorker): | 24 class BakeWorker(IWorker): |
30 def __init__(self, ctx): | 25 def __init__(self, ctx): |
31 self.ctx = ctx | 26 self.ctx = ctx |
32 self.work_start_time = time.perf_counter() | 27 self.app = None |
28 self.record_history = None | |
29 self._work_start_time = time.perf_counter() | |
30 self._sources = {} | |
31 self._ppctx = None | |
33 | 32 |
34 def initialize(self): | 33 def initialize(self): |
35 # Create the app local to this worker. | 34 # Create the app local to this worker. |
36 app = self.ctx.appfactory.create() | 35 app = self.ctx.appfactory.create() |
37 app.config.set('baker/is_baking', True) | 36 app.config.set('baker/is_baking', True) |
38 app.config.set('baker/worker_id', self.wid) | 37 app.config.set('baker/worker_id', self.wid) |
39 app.env.base_asset_url_format = '%uri%' | 38 app.config.set('site/base_asset_url_format', '%uri') |
39 | |
40 app.env.fs_cache_only_for_main_page = True | 40 app.env.fs_cache_only_for_main_page = True |
41 app.env.registerTimer("BakeWorker_%d_Total" % self.wid) | 41 |
42 app.env.registerTimer("BakeWorkerInit") | 42 stats = app.env.stats |
43 app.env.registerTimer("JobReceive") | 43 stats.registerTimer("BakeWorker_%d_Total" % self.wid) |
44 app.env.registerCounter("SourceUseAbortions") | 44 stats.registerTimer("BakeWorkerInit") |
45 app.env.registerManifest("LoadJobs") | 45 stats.registerTimer("JobReceive") |
46 app.env.registerManifest("RenderJobs") | 46 stats.registerTimer('LoadJob', raise_if_registered=False) |
47 app.env.registerManifest("BakeJobs") | 47 stats.registerTimer('RenderFirstSubJob', |
48 self.ctx.app = app | 48 raise_if_registered=False) |
49 stats.registerTimer('BakeJob', raise_if_registered=False) | |
50 | |
51 stats.registerCounter("SourceUseAbortions") | |
52 | |
53 stats.registerManifest("LoadJobs") | |
54 stats.registerManifest("RenderJobs") | |
55 stats.registerManifest("BakeJobs") | |
56 | |
57 self.app = app | |
49 | 58 |
50 # Load previous record | 59 # Load previous record |
51 if self.ctx.previous_record_path: | 60 if self.ctx.previous_records_path: |
52 self.ctx.previous_record = BakeRecord.load( | 61 previous_records = load_records(self.ctx.previous_records_path) |
53 self.ctx.previous_record_path) | 62 else: |
54 self.ctx.previous_record_index = {} | 63 previous_records = MultiRecord() |
55 for e in self.ctx.previous_record.entries: | 64 current_records = MultiRecord() |
56 key = _get_transition_key(e.path, e.extra_key) | 65 self.record_history = MultiRecordHistory( |
57 self.ctx.previous_record_index[key] = e | 66 previous_records, current_records) |
58 | 67 |
59 # Create the job handlers. | 68 # Cache sources and create pipelines. |
60 job_handlers = { | 69 ppclasses = {} |
61 JOB_LOAD: LoadJobHandler(self.ctx), | 70 for ppclass in app.plugin_loader.getPipelines(): |
62 JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx), | 71 ppclasses[ppclass.PIPELINE_NAME] = ppclass |
63 JOB_BAKE: BakeJobHandler(self.ctx)} | |
64 for jt, jh in job_handlers.items(): | |
65 app.env.registerTimer(type(jh).__name__) | |
66 self.job_handlers = job_handlers | |
67 | 72 |
68 app.env.stepTimerSince("BakeWorkerInit", self.work_start_time) | 73 self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history, |
74 worker_id=self.wid, | |
75 force=self.ctx.force) | |
76 for src in app.sources: | |
77 ppname = src.config['pipeline'] | |
78 if (self.ctx.allowed_pipelines is not None and | |
79 ppname not in self.ctx.allowed_pipelines): | |
80 continue | |
81 | |
82 pp = ppclasses[ppname](src) | |
83 pp.initialize(self._ppctx) | |
84 self._sources[src.name] = (src, pp) | |
85 | |
86 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) | |
69 | 87 |
70 def process(self, job): | 88 def process(self, job): |
71 handler = self.job_handlers[job['type']] | 89 logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec)) |
72 with self.ctx.app.env.timerScope(type(handler).__name__): | 90 src, pp = self._sources[job.source_name] |
73 return handler.handleJob(job['job']) | 91 item = ContentItem(job.item_spec, job.item_metadata) |
74 | 92 |
75 def getReport(self, pool_reports): | 93 record_class = pp.RECORD_CLASS or Record |
76 self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, | 94 ppres = PipelineResult(record_class()) |
77 self.work_start_time) | 95 ppres.record.item_spec = job.item_spec |
78 data = self.ctx.app.env.getStats() | 96 pp.run(item, self._ppctx, ppres) |
79 data.timers.update(pool_reports) | 97 return ppres |
80 return { | 98 |
81 'type': 'stats', | 99 def getStats(self): |
82 'data': data} | 100 stats = self.app.env.stats |
101 stats.stepTimerSince("BakeWorker_%d_Total" % self.wid, | |
102 self._work_start_time) | |
103 return stats | |
83 | 104 |
84 def shutdown(self): | 105 def shutdown(self): |
85 for jh in self.job_handlers.values(): | 106 for src, pp in self._sources.values(): |
86 jh.shutdown() | 107 pp.shutdown(self._ppctx) |
87 | 108 |
88 | 109 |
89 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) | 110 class BakeJob: |
111 def __init__(self, source_name, item_spec, item_metadata): | |
112 self.source_name = source_name | |
113 self.item_spec = item_spec | |
114 self.item_metadata = item_metadata | |
90 | 115 |
91 | 116 |
92 class JobHandler(object): | 117 class JobHandler: |
93 def __init__(self, ctx): | 118 def __init__(self, ctx): |
94 self.ctx = ctx | 119 self.ctx = ctx |
95 | 120 |
96 @property | 121 @property |
97 def app(self): | 122 def app(self): |
109 while ex is not None: | 134 while ex is not None: |
110 errors.append(str(ex)) | 135 errors.append(str(ex)) |
111 ex = ex.__cause__ | 136 ex = ex.__cause__ |
112 return errors | 137 return errors |
113 | 138 |
114 | |
115 def save_factory(fac): | |
116 return { | |
117 'source_name': fac.source.name, | |
118 'rel_path': fac.rel_path, | |
119 'metadata': fac.metadata} | |
120 | |
121 | |
122 def load_factory(app, info): | |
123 source = app.getSource(info['source_name']) | |
124 return PageFactory(source, info['rel_path'], info['metadata']) | |
125 | |
126 | |
127 class LoadJobHandler(JobHandler): | |
128 def handleJob(self, job): | |
129 # Just make sure the page has been cached. | |
130 fac = load_factory(self.app, job) | |
131 logger.debug("Loading page: %s" % fac.ref_spec) | |
132 self.app.env.addManifestEntry('LoadJobs', fac.ref_spec) | |
133 result = { | |
134 'source_name': fac.source.name, | |
135 'path': fac.path, | |
136 'config': None, | |
137 'timestamp': None, | |
138 'errors': None} | |
139 try: | |
140 page = fac.buildPage() | |
141 page._load() | |
142 result['config'] = page.config.getAll() | |
143 result['timestamp'] = page.datetime.timestamp() | |
144 except Exception as ex: | |
145 logger.debug("Got loading error. Sending it to master.") | |
146 result['errors'] = _get_errors(ex) | |
147 if self.ctx.app.debug: | |
148 logger.exception(ex) | |
149 return result | |
150 | |
151 | |
152 class RenderFirstSubJobHandler(JobHandler): | |
153 def handleJob(self, job): | |
154 # Render the segments for the first sub-page of this page. | |
155 fac = load_factory(self.app, job['factory_info']) | |
156 self.app.env.addManifestEntry('RenderJobs', fac.ref_spec) | |
157 | |
158 route_index = job['route_index'] | |
159 route = self.app.routes[route_index] | |
160 | |
161 page = fac.buildPage() | |
162 route_metadata = create_route_metadata(page) | |
163 qp = QualifiedPage(page, route, route_metadata) | |
164 ctx = PageRenderingContext(qp) | |
165 self.app.env.abort_source_use = True | |
166 | |
167 result = { | |
168 'path': fac.path, | |
169 'aborted': False, | |
170 'errors': None} | |
171 logger.debug("Preparing page: %s" % fac.ref_spec) | |
172 try: | |
173 render_page_segments(ctx) | |
174 except AbortedSourceUseError: | |
175 logger.debug("Page %s was aborted." % fac.ref_spec) | |
176 self.app.env.stepCounter("SourceUseAbortions") | |
177 result['aborted'] = True | |
178 except Exception as ex: | |
179 logger.debug("Got rendering error. Sending it to master.") | |
180 result['errors'] = _get_errors(ex) | |
181 if self.ctx.app.debug: | |
182 logger.exception(ex) | |
183 finally: | |
184 self.app.env.abort_source_use = False | |
185 return result | |
186 | |
187 | |
188 class BakeJobHandler(JobHandler): | |
189 def __init__(self, ctx): | |
190 super(BakeJobHandler, self).__init__(ctx) | |
191 self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force) | |
192 | |
193 def shutdown(self): | |
194 self.page_baker.shutdown() | |
195 | |
196 def handleJob(self, job): | |
197 # Actually bake the page and all its sub-pages to the output folder. | |
198 fac = load_factory(self.app, job['factory_info']) | |
199 self.app.env.addManifestEntry('BakeJobs', fac.ref_spec) | |
200 | |
201 route_index = job['route_index'] | |
202 route_metadata = job['route_metadata'] | |
203 route = self.app.routes[route_index] | |
204 | |
205 gen_name = job['generator_name'] | |
206 gen_key = job['generator_record_key'] | |
207 dirty_source_names = job['dirty_source_names'] | |
208 | |
209 page = fac.buildPage() | |
210 qp = QualifiedPage(page, route, route_metadata) | |
211 | |
212 result = { | |
213 'path': fac.path, | |
214 'generator_name': gen_name, | |
215 'generator_record_key': gen_key, | |
216 'sub_entries': None, | |
217 'errors': None} | |
218 | |
219 if job.get('needs_config', False): | |
220 result['config'] = page.config.getAll() | |
221 | |
222 previous_entry = None | |
223 if self.ctx.previous_record_index is not None: | |
224 key = _get_transition_key(fac.path, gen_key) | |
225 previous_entry = self.ctx.previous_record_index.get(key) | |
226 | |
227 logger.debug("Baking page: %s" % fac.ref_spec) | |
228 logger.debug("With route metadata: %s" % route_metadata) | |
229 try: | |
230 sub_entries = self.page_baker.bake( | |
231 qp, previous_entry, dirty_source_names, gen_name) | |
232 result['sub_entries'] = sub_entries | |
233 | |
234 except Exception as ex: | |
235 logger.debug("Got baking error. Sending it to master.") | |
236 result['errors'] = _get_errors(ex) | |
237 if self.ctx.app.debug: | |
238 logger.exception(ex) | |
239 | |
240 return result | |
241 |