Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 447:aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
The `workerpool` package now defines a generic-ish worker pool. It's similar
to the Python framework pool but with a simpler use-case (only one way to
queue jobs) and support for workers to send a final "report" to the master
process, which we use to get timing information here.
The rest of the changes basically remove a whole bunch of duplicated code
that's not needed anymore.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 05 Jul 2015 00:09:41 -0700 |
parents | 21e26ed867b6 |
children | 838f3964f400 |
comparison
equal
deleted
inserted
replaced
446:4cdf6c2157a0 | 447:aefe70229fdd |
---|---|
101 self._bakeRealm(record, pool, realm, srclist) | 101 self._bakeRealm(record, pool, realm, srclist) |
102 | 102 |
103 # Bake taxonomies. | 103 # Bake taxonomies. |
104 self._bakeTaxonomies(record, pool) | 104 self._bakeTaxonomies(record, pool) |
105 | 105 |
106 # All done with the workers. | 106 # All done with the workers. Close the pool and get timing reports. |
107 self._terminateWorkerPool(pool) | 107 reports = pool.close() |
108 | |
109 # Get the timing information from the workers. | |
110 record.current.timers = {} | 108 record.current.timers = {} |
111 for i in range(len(pool.workers)): | 109 for i in range(len(reports)): |
112 try: | 110 timers = reports[i] |
113 timers = pool.results.get(True, 0.1) | 111 if timers is None: |
114 except queue.Empty: | 112 continue |
115 logger.error("Didn't get timing information from all workers.") | |
116 break | |
117 | 113 |
118 worker_name = 'BakeWorker_%d' % i | 114 worker_name = 'BakeWorker_%d' % i |
119 record.current.timers[worker_name] = {} | 115 record.current.timers[worker_name] = {} |
120 for name, val in timers['data'].items(): | 116 for name, val in timers['data'].items(): |
121 main_val = record.current.timers.setdefault(name, 0) | 117 main_val = record.current.timers.setdefault(name, 0) |
212 start_time, | 208 start_time, |
213 "baked %d %s pages." % | 209 "baked %d %s pages." % |
214 (page_count, REALM_NAMES[realm].lower()))) | 210 (page_count, REALM_NAMES[realm].lower()))) |
215 | 211 |
216 def _loadRealmPages(self, record, pool, factories): | 212 def _loadRealmPages(self, record, pool, factories): |
213 def _handler(res): | |
214 # Create the record entry for this page. | |
215 record_entry = BakeRecordEntry(res.source_name, res.path) | |
216 record_entry.config = res.config | |
217 if res.errors: | |
218 record_entry.errors += res.errors | |
219 record.current.success = False | |
220 self._logErrors(res.path, res.errors) | |
221 record.addEntry(record_entry) | |
222 | |
217 logger.debug("Loading %d realm pages..." % len(factories)) | 223 logger.debug("Loading %d realm pages..." % len(factories)) |
218 with format_timed_scope(logger, | 224 with format_timed_scope(logger, |
219 "loaded %d pages" % len(factories), | 225 "loaded %d pages" % len(factories), |
220 level=logging.DEBUG, colored=False, | 226 level=logging.DEBUG, colored=False, |
221 timer_env=self.app.env, | 227 timer_env=self.app.env, |
222 timer_category='LoadJob'): | 228 timer_category='LoadJob'): |
223 for fac in factories: | 229 jobs = [ |
224 job = BakeWorkerJob( | 230 BakeWorkerJob(JOB_LOAD, LoadJobPayload(fac)) |
225 JOB_LOAD, | 231 for fac in factories] |
226 LoadJobPayload(fac)) | 232 ar = pool.queueJobs(jobs, handler=_handler) |
227 pool.queue.put_nowait(job) | 233 ar.wait() |
228 | |
229 def _handler(res): | |
230 # Create the record entry for this page. | |
231 record_entry = BakeRecordEntry(res.source_name, res.path) | |
232 record_entry.config = res.config | |
233 if res.errors: | |
234 record_entry.errors += res.errors | |
235 record.current.success = False | |
236 self._logErrors(res.path, res.errors) | |
237 record.addEntry(record_entry) | |
238 | |
239 self._waitOnWorkerPool( | |
240 pool, | |
241 expected_result_count=len(factories), | |
242 result_handler=_handler) | |
243 | 234 |
244 def _renderRealmPages(self, record, pool, factories): | 235 def _renderRealmPages(self, record, pool, factories): |
236 def _handler(res): | |
237 entry = record.getCurrentEntry(res.path) | |
238 if res.errors: | |
239 entry.errors += res.errors | |
240 record.current.success = False | |
241 self._logErrors(res.path, res.errors) | |
242 | |
245 logger.debug("Rendering %d realm pages..." % len(factories)) | 243 logger.debug("Rendering %d realm pages..." % len(factories)) |
246 with format_timed_scope(logger, | 244 with format_timed_scope(logger, |
247 "prepared %d pages" % len(factories), | 245 "prepared %d pages" % len(factories), |
248 level=logging.DEBUG, colored=False, | 246 level=logging.DEBUG, colored=False, |
249 timer_env=self.app.env, | 247 timer_env=self.app.env, |
250 timer_category='RenderFirstSubJob'): | 248 timer_category='RenderFirstSubJob'): |
251 expected_result_count = 0 | 249 jobs = [] |
252 for fac in factories: | 250 for fac in factories: |
253 record_entry = record.getCurrentEntry(fac.path) | 251 record_entry = record.getCurrentEntry(fac.path) |
254 if record_entry.errors: | 252 if record_entry.errors: |
255 logger.debug("Ignoring %s because it had previous " | 253 logger.debug("Ignoring %s because it had previous " |
256 "errors." % fac.ref_spec) | 254 "errors." % fac.ref_spec) |
276 | 274 |
277 # All good, queue the job. | 275 # All good, queue the job. |
278 job = BakeWorkerJob( | 276 job = BakeWorkerJob( |
279 JOB_RENDER_FIRST, | 277 JOB_RENDER_FIRST, |
280 RenderFirstSubJobPayload(fac)) | 278 RenderFirstSubJobPayload(fac)) |
281 pool.queue.put_nowait(job) | 279 jobs.append(job) |
282 expected_result_count += 1 | 280 |
283 | 281 ar = pool.queueJobs(jobs, handler=_handler) |
284 def _handler(res): | 282 ar.wait() |
285 entry = record.getCurrentEntry(res.path) | |
286 if res.errors: | |
287 entry.errors += res.errors | |
288 record.current.success = False | |
289 self._logErrors(res.path, res.errors) | |
290 | |
291 self._waitOnWorkerPool( | |
292 pool, | |
293 expected_result_count=expected_result_count, | |
294 result_handler=_handler) | |
295 | 283 |
296 def _bakeRealmPages(self, record, pool, realm, factories): | 284 def _bakeRealmPages(self, record, pool, realm, factories): |
285 def _handler(res): | |
286 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | |
287 entry.subs = res.sub_entries | |
288 if res.errors: | |
289 entry.errors += res.errors | |
290 self._logErrors(res.path, res.errors) | |
291 if entry.has_any_error: | |
292 record.current.success = False | |
293 if entry.was_any_sub_baked: | |
294 record.current.baked_count[realm] += 1 | |
295 record.dirty_source_names.add(entry.source_name) | |
296 | |
297 logger.debug("Baking %d realm pages..." % len(factories)) | 297 logger.debug("Baking %d realm pages..." % len(factories)) |
298 with format_timed_scope(logger, | 298 with format_timed_scope(logger, |
299 "baked %d pages" % len(factories), | 299 "baked %d pages" % len(factories), |
300 level=logging.DEBUG, colored=False, | 300 level=logging.DEBUG, colored=False, |
301 timer_env=self.app.env, | 301 timer_env=self.app.env, |
302 timer_category='BakeJob'): | 302 timer_category='BakeJob'): |
303 expected_result_count = 0 | 303 jobs = [] |
304 for fac in factories: | 304 for fac in factories: |
305 if self._queueBakeJob(record, pool, fac): | 305 job = self._makeBakeJob(record, fac) |
306 expected_result_count += 1 | 306 if job is not None: |
307 | 307 jobs.append(job) |
308 def _handler(res): | 308 |
309 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | 309 ar = pool.queueJobs(jobs, handler=_handler) |
310 entry.subs = res.sub_entries | 310 ar.wait() |
311 if res.errors: | |
312 entry.errors += res.errors | |
313 self._logErrors(res.path, res.errors) | |
314 if entry.has_any_error: | |
315 record.current.success = False | |
316 if entry.was_any_sub_baked: | |
317 record.current.baked_count[realm] += 1 | |
318 record.dirty_source_names.add(entry.source_name) | |
319 | |
320 self._waitOnWorkerPool( | |
321 pool, | |
322 expected_result_count=expected_result_count, | |
323 result_handler=_handler) | |
324 | 311 |
325 def _bakeTaxonomies(self, record, pool): | 312 def _bakeTaxonomies(self, record, pool): |
326 logger.debug("Baking taxonomy pages...") | 313 logger.debug("Baking taxonomy pages...") |
327 with format_timed_scope(logger, 'built taxonomy buckets', | 314 with format_timed_scope(logger, 'built taxonomy buckets', |
328 level=logging.DEBUG, colored=False): | 315 level=logging.DEBUG, colored=False): |
398 tt_info.dirty_terms.add(terms) | 385 tt_info.dirty_terms.add(terms) |
399 | 386 |
400 return buckets | 387 return buckets |
401 | 388 |
402 def _bakeTaxonomyBuckets(self, record, pool, buckets): | 389 def _bakeTaxonomyBuckets(self, record, pool, buckets): |
390 def _handler(res): | |
391 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | |
392 entry.subs = res.sub_entries | |
393 if res.errors: | |
394 entry.errors += res.errors | |
395 if entry.has_any_error: | |
396 record.current.success = False | |
397 | |
403 # Start baking those terms. | 398 # Start baking those terms. |
404 expected_result_count = 0 | 399 jobs = [] |
405 for source_name, source_taxonomies in buckets.items(): | 400 for source_name, source_taxonomies in buckets.items(): |
406 for tax_name, tt_info in source_taxonomies.items(): | 401 for tax_name, tt_info in source_taxonomies.items(): |
407 terms = tt_info.dirty_terms | 402 terms = tt_info.dirty_terms |
408 if len(terms) == 0: | 403 if len(terms) == 0: |
409 continue | 404 continue |
433 | 428 |
434 cur_entry = BakeRecordEntry( | 429 cur_entry = BakeRecordEntry( |
435 fac.source.name, fac.path, tax_info) | 430 fac.source.name, fac.path, tax_info) |
436 record.addEntry(cur_entry) | 431 record.addEntry(cur_entry) |
437 | 432 |
438 if self._queueBakeJob(record, pool, fac, tax_info): | 433 job = self._makeBakeJob(record, fac, tax_info) |
439 expected_result_count += 1 | 434 if job is not None: |
440 | 435 jobs.append(job) |
441 def _handler(res): | 436 |
442 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | 437 ar = pool.queueJobs(jobs, handler=_handler) |
443 entry.subs = res.sub_entries | 438 ar.wait() |
444 if res.errors: | |
445 entry.errors += res.errors | |
446 if entry.has_any_error: | |
447 record.current.success = False | |
448 | |
449 self._waitOnWorkerPool( | |
450 pool, | |
451 expected_result_count=expected_result_count, | |
452 result_handler=_handler) | |
453 | 439 |
454 # Now we create bake entries for all the terms that were *not* dirty. | 440 # Now we create bake entries for all the terms that were *not* dirty. |
455 # This is because otherwise, on the next incremental bake, we wouldn't | 441 # This is because otherwise, on the next incremental bake, we wouldn't |
456 # find any entry for those things, and figure that we need to delete | 442 # find any entry for those things, and figure that we need to delete |
457 # their outputs. | 443 # their outputs. |
468 record.collapseEntry(prev_entry) | 454 record.collapseEntry(prev_entry) |
469 else: | 455 else: |
470 logger.debug("Taxonomy term '%s:%s' isn't used anymore." % | 456 logger.debug("Taxonomy term '%s:%s' isn't used anymore." % |
471 (ti.taxonomy_name, ti.term)) | 457 (ti.taxonomy_name, ti.term)) |
472 | 458 |
473 return expected_result_count | 459 return len(jobs) |
474 | 460 |
475 def _queueBakeJob(self, record, pool, fac, tax_info=None): | 461 def _makeBakeJob(self, record, fac, tax_info=None): |
476 # Get the previous (if any) and current entry for this page. | 462 # Get the previous (if any) and current entry for this page. |
477 pair = record.getPreviousAndCurrentEntries(fac.path, tax_info) | 463 pair = record.getPreviousAndCurrentEntries(fac.path, tax_info) |
478 assert pair is not None | 464 assert pair is not None |
479 prev_entry, cur_entry = pair | 465 prev_entry, cur_entry = pair |
480 assert cur_entry is not None | 466 assert cur_entry is not None |
481 | 467 |
482 # Ignore if there were errors in the previous passes. | 468 # Ignore if there were errors in the previous passes. |
483 if cur_entry.errors: | 469 if cur_entry.errors: |
484 logger.debug("Ignoring %s because it had previous " | 470 logger.debug("Ignoring %s because it had previous " |
485 "errors." % fac.ref_spec) | 471 "errors." % fac.ref_spec) |
486 return False | 472 return None |
487 | 473 |
488 # Build the route metadata and find the appropriate route. | 474 # Build the route metadata and find the appropriate route. |
489 page = fac.buildPage() | 475 page = fac.buildPage() |
490 route_metadata = create_route_metadata(page) | 476 route_metadata = create_route_metadata(page) |
491 if tax_info is not None: | 477 if tax_info is not None: |
513 "Page '%s' maps to URL '%s' but is overriden " | 499 "Page '%s' maps to URL '%s' but is overriden " |
514 "by page '%s'." % | 500 "by page '%s'." % |
515 (fac.ref_spec, uri, override_entry.path)) | 501 (fac.ref_spec, uri, override_entry.path)) |
516 logger.error(cur_entry.errors[-1]) | 502 logger.error(cur_entry.errors[-1]) |
517 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN | 503 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN |
518 return False | 504 return None |
519 | 505 |
520 job = BakeWorkerJob( | 506 job = BakeWorkerJob( |
521 JOB_BAKE, | 507 JOB_BAKE, |
522 BakeJobPayload(fac, route_metadata, prev_entry, | 508 BakeJobPayload(fac, route_metadata, prev_entry, |
523 record.dirty_source_names, | 509 record.dirty_source_names, |
524 tax_info)) | 510 tax_info)) |
525 pool.queue.put_nowait(job) | 511 return job |
526 return True | |
527 | 512 |
528 def _handleDeletetions(self, record): | 513 def _handleDeletetions(self, record): |
529 logger.debug("Handling deletions...") | 514 logger.debug("Handling deletions...") |
530 for path, reason in record.getDeletions(): | 515 for path, reason in record.getDeletions(): |
531 logger.debug("Removing '%s': %s" % (path, reason)) | 516 logger.debug("Removing '%s': %s" % (path, reason)) |
542 logger.error("Errors found in %s:" % rel_path) | 527 logger.error("Errors found in %s:" % rel_path) |
543 for e in errors: | 528 for e in errors: |
544 logger.error(" " + e) | 529 logger.error(" " + e) |
545 | 530 |
546 def _createWorkerPool(self): | 531 def _createWorkerPool(self): |
547 import sys | 532 from piecrust.workerpool import WorkerPool |
548 from piecrust.baking.worker import BakeWorkerContext, worker_func | 533 from piecrust.baking.worker import BakeWorkerContext, BakeWorker |
549 | 534 |
550 main_module = sys.modules['__main__'] | 535 ctx = BakeWorkerContext( |
551 is_profiling = os.path.basename(main_module.__file__) in [ | 536 self.app.root_dir, self.app.cache.base_dir, self.out_dir, |
552 'profile.py', 'cProfile.py'] | 537 force=self.force, debug=self.app.debug) |
553 | 538 pool = WorkerPool( |
554 pool = _WorkerPool() | 539 worker_class=BakeWorker, |
555 for i in range(self.num_workers): | 540 initargs=(ctx,)) |
556 ctx = BakeWorkerContext( | |
557 self.app.root_dir, self.app.cache.base_dir, self.out_dir, | |
558 pool.queue, pool.results, pool.abort_event, | |
559 force=self.force, debug=self.app.debug, | |
560 is_profiling=is_profiling) | |
561 w = multiprocessing.Process( | |
562 name='BakeWorker_%d' % i, | |
563 target=worker_func, args=(i, ctx)) | |
564 w.start() | |
565 pool.workers.append(w) | |
566 return pool | 541 return pool |
567 | |
568 def _terminateWorkerPool(self, pool): | |
569 pool.abort_event.set() | |
570 for w in pool.workers: | |
571 w.join() | |
572 | |
573 def _waitOnWorkerPool(self, pool, | |
574 expected_result_count=-1, result_handler=None): | |
575 assert result_handler is None or expected_result_count >= 0 | |
576 abort_with_exception = None | |
577 try: | |
578 if result_handler is None: | |
579 pool.queue.join() | |
580 else: | |
581 got_count = 0 | |
582 while got_count < expected_result_count: | |
583 try: | |
584 res = pool.results.get(True, 10) | |
585 except queue.Empty: | |
586 logger.error( | |
587 "Got %d results, expected %d, and timed-out " | |
588 "for 10 seconds. A worker might be stuck?" % | |
589 (got_count, expected_result_count)) | |
590 abort_with_exception = Exception("Worker time-out.") | |
591 break | |
592 | |
593 if isinstance(res, dict) and res.get('type') == 'error': | |
594 abort_with_exception = Exception( | |
595 'Worker critical error:\n' + | |
596 '\n'.join(res['messages'])) | |
597 break | |
598 | |
599 got_count += 1 | |
600 result_handler(res) | |
601 except KeyboardInterrupt as kiex: | |
602 logger.warning("Bake aborted by user... " | |
603 "waiting for workers to stop.") | |
604 abort_with_exception = kiex | |
605 | |
606 if abort_with_exception: | |
607 pool.abort_event.set() | |
608 for w in pool.workers: | |
609 w.join(2) | |
610 raise abort_with_exception | |
611 | |
612 | |
613 class _WorkerPool(object): | |
614 def __init__(self): | |
615 self.queue = multiprocessing.JoinableQueue() | |
616 self.results = multiprocessing.Queue() | |
617 self.abort_event = multiprocessing.Event() | |
618 self.workers = [] | |
619 | 542 |
620 | 543 |
621 class _TaxonomyTermsInfo(object): | 544 class _TaxonomyTermsInfo(object): |
622 def __init__(self): | 545 def __init__(self): |
623 self.dirty_terms = set() | 546 self.dirty_terms = set() |