Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 415:0e9a94b7fdfa
bake: Improve bake record information.
* Store things in the bake record that require less interaction between the
master process and the workers. For instance, don't store the paginator
object in the render pass info -- instead, just store whether pagination
was used, and whether it had more items.
* Simplify information passing between workers and bake passes by saving the
rendering info to the JSON cache. This means the "render first sub" job
doesn't have to return anything except errors now.
* Add more performance counter info.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:23:16 -0700 |
parents | e7b865f8f335 |
children | 4a43d7015b75 |
comparison
equal
deleted
inserted
replaced
414:c4b3a7fd2f87 | 415:0e9a94b7fdfa |
---|---|
4 import queue | 4 import queue |
5 import hashlib | 5 import hashlib |
6 import logging | 6 import logging |
7 import multiprocessing | 7 import multiprocessing |
8 from piecrust.baking.records import ( | 8 from piecrust.baking.records import ( |
9 BakeRecordEntry, TransitionalBakeRecord, TaxonomyInfo, FirstRenderInfo) | 9 BakeRecordEntry, TransitionalBakeRecord, TaxonomyInfo) |
10 from piecrust.baking.worker import ( | 10 from piecrust.baking.worker import ( |
11 BakeWorkerJob, LoadJobPayload, RenderFirstSubJobPayload, | 11 BakeWorkerJob, LoadJobPayload, RenderFirstSubJobPayload, |
12 BakeJobPayload, | 12 BakeJobPayload, |
13 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE) | 13 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE) |
14 from piecrust.chefutil import ( | 14 from piecrust.chefutil import ( |
37 for src in self.app.sources: | 37 for src in self.app.sources: |
38 tax_page_ref = tax.getPageRef(src) | 38 tax_page_ref = tax.getPageRef(src) |
39 for path in tax_page_ref.possible_paths: | 39 for path in tax_page_ref.possible_paths: |
40 self.taxonomy_pages.append(path) | 40 self.taxonomy_pages.append(path) |
41 logger.debug(" - %s" % path) | 41 logger.debug(" - %s" % path) |
42 | |
43 # Register some timers. | |
44 self.app.env.registerTimer('LoadJob', raise_if_registered=False) | |
45 self.app.env.registerTimer('RenderFirstSubJob', | |
46 raise_if_registered=False) | |
47 self.app.env.registerTimer('BakeJob', raise_if_registered=False) | |
42 | 48 |
43 def bake(self): | 49 def bake(self): |
44 logger.debug(" Bake Output: %s" % self.out_dir) | 50 logger.debug(" Bake Output: %s" % self.out_dir) |
45 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) | 51 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) |
46 | 52 |
106 timers = pool.results.get(True, 0.1) | 112 timers = pool.results.get(True, 0.1) |
107 except queue.Empty: | 113 except queue.Empty: |
108 logger.error("Didn't get timing information from all workers.") | 114 logger.error("Didn't get timing information from all workers.") |
109 break | 115 break |
110 | 116 |
111 for name, val in timers.items(): | 117 for name, val in timers['data'].items(): |
112 main_val = record.current.timers.setdefault(name, 0) | 118 main_val = record.current.timers.setdefault(name, 0) |
113 record.current.timers[name] = main_val + val | 119 record.current.timers[name] = main_val + val |
114 | 120 |
115 # Delete files from the output. | 121 # Delete files from the output. |
116 self._handleDeletetions(record) | 122 self._handleDeletetions(record) |
183 colored=False)) | 189 colored=False)) |
184 | 190 |
185 def _bakeRealm(self, record, pool, realm, srclist): | 191 def _bakeRealm(self, record, pool, realm, srclist): |
186 start_time = time.perf_counter() | 192 start_time = time.perf_counter() |
187 try: | 193 try: |
194 record.current.baked_count[realm] = 0 | |
195 | |
188 all_factories = [] | 196 all_factories = [] |
189 for source in srclist: | 197 for source in srclist: |
190 factories = source.getPageFactories() | 198 factories = source.getPageFactories() |
191 all_factories += [f for f in factories | 199 all_factories += [f for f in factories |
192 if f.path not in self.taxonomy_pages] | 200 if f.path not in self.taxonomy_pages] |
193 | 201 |
194 self._loadRealmPages(record, pool, all_factories) | 202 self._loadRealmPages(record, pool, all_factories) |
195 self._renderRealmPages(record, pool, all_factories) | 203 self._renderRealmPages(record, pool, all_factories) |
196 self._bakeRealmPages(record, pool, all_factories) | 204 self._bakeRealmPages(record, pool, realm, all_factories) |
197 finally: | 205 finally: |
198 page_count = len(all_factories) | 206 page_count = record.current.baked_count[realm] |
199 logger.info(format_timed( | 207 logger.info(format_timed( |
200 start_time, | 208 start_time, |
201 "baked %d %s pages" % | 209 "baked %d %s pages." % |
202 (page_count, REALM_NAMES[realm].lower()))) | 210 (page_count, REALM_NAMES[realm].lower()))) |
203 | 211 |
204 def _loadRealmPages(self, record, pool, factories): | 212 def _loadRealmPages(self, record, pool, factories): |
213 logger.debug("Loading %d realm pages..." % len(factories)) | |
205 with format_timed_scope(logger, | 214 with format_timed_scope(logger, |
206 "loaded %d pages" % len(factories), | 215 "loaded %d pages" % len(factories), |
207 level=logging.DEBUG, colored=False): | 216 level=logging.DEBUG, colored=False, |
217 timer_env=self.app.env, | |
218 timer_category='LoadJob'): | |
208 for fac in factories: | 219 for fac in factories: |
209 job = BakeWorkerJob( | 220 job = BakeWorkerJob( |
210 JOB_LOAD, | 221 JOB_LOAD, |
211 LoadJobPayload(fac)) | 222 LoadJobPayload(fac)) |
212 pool.queue.put_nowait(job) | 223 pool.queue.put_nowait(job) |
216 record_entry = BakeRecordEntry(res.source_name, res.path) | 227 record_entry = BakeRecordEntry(res.source_name, res.path) |
217 record_entry.config = res.config | 228 record_entry.config = res.config |
218 if res.errors: | 229 if res.errors: |
219 record_entry.errors += res.errors | 230 record_entry.errors += res.errors |
220 record.current.success = False | 231 record.current.success = False |
232 self._logErrors(res.path, res.errors) | |
221 record.addEntry(record_entry) | 233 record.addEntry(record_entry) |
222 | 234 |
223 self._waitOnWorkerPool( | 235 self._waitOnWorkerPool( |
224 pool, | 236 pool, |
225 expected_result_count=len(factories), | 237 expected_result_count=len(factories), |
226 result_handler=_handler) | 238 result_handler=_handler) |
227 | 239 |
228 def _renderRealmPages(self, record, pool, factories): | 240 def _renderRealmPages(self, record, pool, factories): |
241 logger.debug("Rendering %d realm pages..." % len(factories)) | |
229 with format_timed_scope(logger, | 242 with format_timed_scope(logger, |
230 "prepared %d pages" % len(factories), | 243 "prepared %d pages" % len(factories), |
231 level=logging.DEBUG, colored=False): | 244 level=logging.DEBUG, colored=False, |
245 timer_env=self.app.env, | |
246 timer_category='RenderFirstSubJob'): | |
232 expected_result_count = 0 | 247 expected_result_count = 0 |
233 for fac in factories: | 248 for fac in factories: |
234 record_entry = record.getCurrentEntry(fac.path) | 249 record_entry = record.getCurrentEntry(fac.path) |
235 if record_entry.errors: | 250 if record_entry.errors: |
236 logger.debug("Ignoring %s because it had previous " | 251 logger.debug("Ignoring %s because it had previous " |
262 pool.queue.put_nowait(job) | 277 pool.queue.put_nowait(job) |
263 expected_result_count += 1 | 278 expected_result_count += 1 |
264 | 279 |
265 def _handler(res): | 280 def _handler(res): |
266 entry = record.getCurrentEntry(res.path) | 281 entry = record.getCurrentEntry(res.path) |
267 | |
268 entry.first_render_info = FirstRenderInfo() | |
269 entry.first_render_info.used_assets = res.used_assets | |
270 entry.first_render_info.used_pagination = \ | |
271 res.used_pagination | |
272 entry.first_render_info.pagination_has_more = \ | |
273 res.pagination_has_more | |
274 | |
275 if res.errors: | 282 if res.errors: |
276 entry.errors += res.errors | 283 entry.errors += res.errors |
277 record.current.success = False | 284 record.current.success = False |
285 self._logErrors(res.path, res.errors) | |
278 | 286 |
279 self._waitOnWorkerPool( | 287 self._waitOnWorkerPool( |
280 pool, | 288 pool, |
281 expected_result_count=expected_result_count, | 289 expected_result_count=expected_result_count, |
282 result_handler=_handler) | 290 result_handler=_handler) |
283 | 291 |
284 def _bakeRealmPages(self, record, pool, factories): | 292 def _bakeRealmPages(self, record, pool, realm, factories): |
293 logger.debug("Baking %d realm pages..." % len(factories)) | |
285 with format_timed_scope(logger, | 294 with format_timed_scope(logger, |
286 "baked %d pages" % len(factories), | 295 "baked %d pages" % len(factories), |
287 level=logging.DEBUG, colored=False): | 296 level=logging.DEBUG, colored=False, |
297 timer_env=self.app.env, | |
298 timer_category='BakeJob'): | |
288 expected_result_count = 0 | 299 expected_result_count = 0 |
289 for fac in factories: | 300 for fac in factories: |
290 if self._queueBakeJob(record, pool, fac): | 301 if self._queueBakeJob(record, pool, fac): |
291 expected_result_count += 1 | 302 expected_result_count += 1 |
292 | 303 |
293 def _handler(res): | 304 def _handler(res): |
294 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | 305 entry = record.getCurrentEntry(res.path, res.taxonomy_info) |
295 entry.bake_info = res.bake_info | 306 entry.subs = res.sub_entries |
296 if res.errors: | 307 if res.errors: |
297 entry.errors += res.errors | 308 entry.errors += res.errors |
309 self._logErrors(res.path, res.errors) | |
298 if entry.has_any_error: | 310 if entry.has_any_error: |
299 record.current.success = False | 311 record.current.success = False |
312 if entry.was_any_sub_baked: | |
313 record.current.baked_count[realm] += 1 | |
314 record.dirty_source_names.add(entry.source_name) | |
300 | 315 |
301 self._waitOnWorkerPool( | 316 self._waitOnWorkerPool( |
302 pool, | 317 pool, |
303 expected_result_count=expected_result_count, | 318 expected_result_count=expected_result_count, |
304 result_handler=_handler) | 319 result_handler=_handler) |
305 | 320 |
306 def _bakeTaxonomies(self, record, pool): | 321 def _bakeTaxonomies(self, record, pool): |
322 logger.debug("Baking taxonomy pages...") | |
307 with format_timed_scope(logger, 'built taxonomy buckets', | 323 with format_timed_scope(logger, 'built taxonomy buckets', |
308 level=logging.DEBUG, colored=False): | 324 level=logging.DEBUG, colored=False): |
309 buckets = self._buildTaxonomyBuckets(record) | 325 buckets = self._buildTaxonomyBuckets(record) |
310 | 326 |
311 start_time = time.perf_counter() | 327 start_time = time.perf_counter() |
418 if self._queueBakeJob(record, pool, fac, tax_info): | 434 if self._queueBakeJob(record, pool, fac, tax_info): |
419 expected_result_count += 1 | 435 expected_result_count += 1 |
420 | 436 |
421 def _handler(res): | 437 def _handler(res): |
422 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | 438 entry = record.getCurrentEntry(res.path, res.taxonomy_info) |
423 entry.bake_info = res.bake_info | 439 entry.subs = res.sub_entries |
424 if res.errors: | 440 if res.errors: |
425 entry.errors += res.errors | 441 entry.errors += res.errors |
426 if entry.has_any_error: | 442 if entry.has_any_error: |
427 record.current.success = False | 443 record.current.success = False |
428 | 444 |
498 return False | 514 return False |
499 | 515 |
500 job = BakeWorkerJob( | 516 job = BakeWorkerJob( |
501 JOB_BAKE, | 517 JOB_BAKE, |
502 BakeJobPayload(fac, route_metadata, prev_entry, | 518 BakeJobPayload(fac, route_metadata, prev_entry, |
503 cur_entry.first_render_info, | |
504 record.dirty_source_names, | 519 record.dirty_source_names, |
505 tax_info)) | 520 tax_info)) |
506 pool.queue.put_nowait(job) | 521 pool.queue.put_nowait(job) |
507 return True | 522 return True |
508 | 523 |
509 def _handleDeletetions(self, record): | 524 def _handleDeletetions(self, record): |
525 logger.debug("Handling deletions...") | |
510 for path, reason in record.getDeletions(): | 526 for path, reason in record.getDeletions(): |
511 logger.debug("Removing '%s': %s" % (path, reason)) | 527 logger.debug("Removing '%s': %s" % (path, reason)) |
512 try: | 528 try: |
513 os.remove(path) | 529 os.remove(path) |
514 logger.info('[delete] %s' % path) | 530 logger.info('[delete] %s' % path) |
515 except OSError: | 531 except OSError: |
516 # Not a big deal if that file had already been removed | 532 # Not a big deal if that file had already been removed |
517 # by the user. | 533 # by the user. |
518 pass | 534 pass |
519 | 535 |
536 def _logErrors(self, path, errors): | |
537 rel_path = os.path.relpath(path, self.app.root_dir) | |
538 logger.error("Errors found in %s:" % rel_path) | |
539 for e in errors: | |
540 logger.error(" " + e) | |
541 | |
520 def _createWorkerPool(self): | 542 def _createWorkerPool(self): |
521 from piecrust.baking.worker import BakeWorkerContext, worker_func | 543 from piecrust.baking.worker import BakeWorkerContext, worker_func |
522 | 544 |
523 pool = _WorkerPool() | 545 pool = _WorkerPool() |
524 for i in range(self.num_workers): | 546 for i in range(self.num_workers): |
525 ctx = BakeWorkerContext( | 547 ctx = BakeWorkerContext( |
526 self.app.root_dir, self.out_dir, | 548 self.app.root_dir, self.out_dir, |
527 pool.queue, pool.results, pool.abort_event, | 549 pool.queue, pool.results, pool.abort_event, |
528 force=self.force, debug=self.app.debug) | 550 force=self.force, debug=self.app.debug) |
529 w = multiprocessing.Process( | 551 w = multiprocessing.Process( |
552 name='Worker_%d' % i, | |
530 target=worker_func, args=(i, ctx)) | 553 target=worker_func, args=(i, ctx)) |
531 w.start() | 554 w.start() |
532 pool.workers.append(w) | 555 pool.workers.append(w) |
533 return pool | 556 return pool |
534 | 557 |
555 "for 10 seconds. A worker might be stuck?" % | 578 "for 10 seconds. A worker might be stuck?" % |
556 (got_count, expected_result_count)) | 579 (got_count, expected_result_count)) |
557 abort_with_exception = Exception("Worker time-out.") | 580 abort_with_exception = Exception("Worker time-out.") |
558 break | 581 break |
559 | 582 |
583 if isinstance(res, dict) and res.get('type') == 'error': | |
584 abort_with_exception = Exception( | |
585 'Worker critical error:\n' + | |
586 '\n'.join(res['messages'])) | |
587 break | |
588 | |
560 got_count += 1 | 589 got_count += 1 |
561 result_handler(res) | 590 result_handler(res) |
562 except KeyboardInterrupt as kiex: | 591 except KeyboardInterrupt as kiex: |
563 logger.warning("Bake aborted by user... " | 592 logger.warning("Bake aborted by user... " |
564 "waiting for workers to stop.") | 593 "waiting for workers to stop.") |