comparison piecrust/baking/baker.py @ 447:aefe70229fdd

bake: Commonize worker pool code between html and asset baking. The `workerpool` package now defines a generic-ish worker pool. It's similar to the Python framework pool but with a simpler use-case (only one way to queue jobs) and support for workers to send a final "report" to the master process, which we use to get timing information here. The rest of the changes basically remove a whole bunch of duplicated code that's not needed anymore.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 05 Jul 2015 00:09:41 -0700
parents 21e26ed867b6
children 838f3964f400
comparison
equal deleted inserted replaced
446:4cdf6c2157a0 447:aefe70229fdd
101 self._bakeRealm(record, pool, realm, srclist) 101 self._bakeRealm(record, pool, realm, srclist)
102 102
103 # Bake taxonomies. 103 # Bake taxonomies.
104 self._bakeTaxonomies(record, pool) 104 self._bakeTaxonomies(record, pool)
105 105
106 # All done with the workers. 106 # All done with the workers. Close the pool and get timing reports.
107 self._terminateWorkerPool(pool) 107 reports = pool.close()
108
109 # Get the timing information from the workers.
110 record.current.timers = {} 108 record.current.timers = {}
111 for i in range(len(pool.workers)): 109 for i in range(len(reports)):
112 try: 110 timers = reports[i]
113 timers = pool.results.get(True, 0.1) 111 if timers is None:
114 except queue.Empty: 112 continue
115 logger.error("Didn't get timing information from all workers.")
116 break
117 113
118 worker_name = 'BakeWorker_%d' % i 114 worker_name = 'BakeWorker_%d' % i
119 record.current.timers[worker_name] = {} 115 record.current.timers[worker_name] = {}
120 for name, val in timers['data'].items(): 116 for name, val in timers['data'].items():
121 main_val = record.current.timers.setdefault(name, 0) 117 main_val = record.current.timers.setdefault(name, 0)
212 start_time, 208 start_time,
213 "baked %d %s pages." % 209 "baked %d %s pages." %
214 (page_count, REALM_NAMES[realm].lower()))) 210 (page_count, REALM_NAMES[realm].lower())))
215 211
216 def _loadRealmPages(self, record, pool, factories): 212 def _loadRealmPages(self, record, pool, factories):
213 def _handler(res):
214 # Create the record entry for this page.
215 record_entry = BakeRecordEntry(res.source_name, res.path)
216 record_entry.config = res.config
217 if res.errors:
218 record_entry.errors += res.errors
219 record.current.success = False
220 self._logErrors(res.path, res.errors)
221 record.addEntry(record_entry)
222
217 logger.debug("Loading %d realm pages..." % len(factories)) 223 logger.debug("Loading %d realm pages..." % len(factories))
218 with format_timed_scope(logger, 224 with format_timed_scope(logger,
219 "loaded %d pages" % len(factories), 225 "loaded %d pages" % len(factories),
220 level=logging.DEBUG, colored=False, 226 level=logging.DEBUG, colored=False,
221 timer_env=self.app.env, 227 timer_env=self.app.env,
222 timer_category='LoadJob'): 228 timer_category='LoadJob'):
223 for fac in factories: 229 jobs = [
224 job = BakeWorkerJob( 230 BakeWorkerJob(JOB_LOAD, LoadJobPayload(fac))
225 JOB_LOAD, 231 for fac in factories]
226 LoadJobPayload(fac)) 232 ar = pool.queueJobs(jobs, handler=_handler)
227 pool.queue.put_nowait(job) 233 ar.wait()
228
229 def _handler(res):
230 # Create the record entry for this page.
231 record_entry = BakeRecordEntry(res.source_name, res.path)
232 record_entry.config = res.config
233 if res.errors:
234 record_entry.errors += res.errors
235 record.current.success = False
236 self._logErrors(res.path, res.errors)
237 record.addEntry(record_entry)
238
239 self._waitOnWorkerPool(
240 pool,
241 expected_result_count=len(factories),
242 result_handler=_handler)
243 234
244 def _renderRealmPages(self, record, pool, factories): 235 def _renderRealmPages(self, record, pool, factories):
236 def _handler(res):
237 entry = record.getCurrentEntry(res.path)
238 if res.errors:
239 entry.errors += res.errors
240 record.current.success = False
241 self._logErrors(res.path, res.errors)
242
245 logger.debug("Rendering %d realm pages..." % len(factories)) 243 logger.debug("Rendering %d realm pages..." % len(factories))
246 with format_timed_scope(logger, 244 with format_timed_scope(logger,
247 "prepared %d pages" % len(factories), 245 "prepared %d pages" % len(factories),
248 level=logging.DEBUG, colored=False, 246 level=logging.DEBUG, colored=False,
249 timer_env=self.app.env, 247 timer_env=self.app.env,
250 timer_category='RenderFirstSubJob'): 248 timer_category='RenderFirstSubJob'):
251 expected_result_count = 0 249 jobs = []
252 for fac in factories: 250 for fac in factories:
253 record_entry = record.getCurrentEntry(fac.path) 251 record_entry = record.getCurrentEntry(fac.path)
254 if record_entry.errors: 252 if record_entry.errors:
255 logger.debug("Ignoring %s because it had previous " 253 logger.debug("Ignoring %s because it had previous "
256 "errors." % fac.ref_spec) 254 "errors." % fac.ref_spec)
276 274
277 # All good, queue the job. 275 # All good, queue the job.
278 job = BakeWorkerJob( 276 job = BakeWorkerJob(
279 JOB_RENDER_FIRST, 277 JOB_RENDER_FIRST,
280 RenderFirstSubJobPayload(fac)) 278 RenderFirstSubJobPayload(fac))
281 pool.queue.put_nowait(job) 279 jobs.append(job)
282 expected_result_count += 1 280
283 281 ar = pool.queueJobs(jobs, handler=_handler)
284 def _handler(res): 282 ar.wait()
285 entry = record.getCurrentEntry(res.path)
286 if res.errors:
287 entry.errors += res.errors
288 record.current.success = False
289 self._logErrors(res.path, res.errors)
290
291 self._waitOnWorkerPool(
292 pool,
293 expected_result_count=expected_result_count,
294 result_handler=_handler)
295 283
296 def _bakeRealmPages(self, record, pool, realm, factories): 284 def _bakeRealmPages(self, record, pool, realm, factories):
285 def _handler(res):
286 entry = record.getCurrentEntry(res.path, res.taxonomy_info)
287 entry.subs = res.sub_entries
288 if res.errors:
289 entry.errors += res.errors
290 self._logErrors(res.path, res.errors)
291 if entry.has_any_error:
292 record.current.success = False
293 if entry.was_any_sub_baked:
294 record.current.baked_count[realm] += 1
295 record.dirty_source_names.add(entry.source_name)
296
297 logger.debug("Baking %d realm pages..." % len(factories)) 297 logger.debug("Baking %d realm pages..." % len(factories))
298 with format_timed_scope(logger, 298 with format_timed_scope(logger,
299 "baked %d pages" % len(factories), 299 "baked %d pages" % len(factories),
300 level=logging.DEBUG, colored=False, 300 level=logging.DEBUG, colored=False,
301 timer_env=self.app.env, 301 timer_env=self.app.env,
302 timer_category='BakeJob'): 302 timer_category='BakeJob'):
303 expected_result_count = 0 303 jobs = []
304 for fac in factories: 304 for fac in factories:
305 if self._queueBakeJob(record, pool, fac): 305 job = self._makeBakeJob(record, fac)
306 expected_result_count += 1 306 if job is not None:
307 307 jobs.append(job)
308 def _handler(res): 308
309 entry = record.getCurrentEntry(res.path, res.taxonomy_info) 309 ar = pool.queueJobs(jobs, handler=_handler)
310 entry.subs = res.sub_entries 310 ar.wait()
311 if res.errors:
312 entry.errors += res.errors
313 self._logErrors(res.path, res.errors)
314 if entry.has_any_error:
315 record.current.success = False
316 if entry.was_any_sub_baked:
317 record.current.baked_count[realm] += 1
318 record.dirty_source_names.add(entry.source_name)
319
320 self._waitOnWorkerPool(
321 pool,
322 expected_result_count=expected_result_count,
323 result_handler=_handler)
324 311
325 def _bakeTaxonomies(self, record, pool): 312 def _bakeTaxonomies(self, record, pool):
326 logger.debug("Baking taxonomy pages...") 313 logger.debug("Baking taxonomy pages...")
327 with format_timed_scope(logger, 'built taxonomy buckets', 314 with format_timed_scope(logger, 'built taxonomy buckets',
328 level=logging.DEBUG, colored=False): 315 level=logging.DEBUG, colored=False):
398 tt_info.dirty_terms.add(terms) 385 tt_info.dirty_terms.add(terms)
399 386
400 return buckets 387 return buckets
401 388
402 def _bakeTaxonomyBuckets(self, record, pool, buckets): 389 def _bakeTaxonomyBuckets(self, record, pool, buckets):
390 def _handler(res):
391 entry = record.getCurrentEntry(res.path, res.taxonomy_info)
392 entry.subs = res.sub_entries
393 if res.errors:
394 entry.errors += res.errors
395 if entry.has_any_error:
396 record.current.success = False
397
403 # Start baking those terms. 398 # Start baking those terms.
404 expected_result_count = 0 399 jobs = []
405 for source_name, source_taxonomies in buckets.items(): 400 for source_name, source_taxonomies in buckets.items():
406 for tax_name, tt_info in source_taxonomies.items(): 401 for tax_name, tt_info in source_taxonomies.items():
407 terms = tt_info.dirty_terms 402 terms = tt_info.dirty_terms
408 if len(terms) == 0: 403 if len(terms) == 0:
409 continue 404 continue
433 428
434 cur_entry = BakeRecordEntry( 429 cur_entry = BakeRecordEntry(
435 fac.source.name, fac.path, tax_info) 430 fac.source.name, fac.path, tax_info)
436 record.addEntry(cur_entry) 431 record.addEntry(cur_entry)
437 432
438 if self._queueBakeJob(record, pool, fac, tax_info): 433 job = self._makeBakeJob(record, fac, tax_info)
439 expected_result_count += 1 434 if job is not None:
440 435 jobs.append(job)
441 def _handler(res): 436
442 entry = record.getCurrentEntry(res.path, res.taxonomy_info) 437 ar = pool.queueJobs(jobs, handler=_handler)
443 entry.subs = res.sub_entries 438 ar.wait()
444 if res.errors:
445 entry.errors += res.errors
446 if entry.has_any_error:
447 record.current.success = False
448
449 self._waitOnWorkerPool(
450 pool,
451 expected_result_count=expected_result_count,
452 result_handler=_handler)
453 439
454 # Now we create bake entries for all the terms that were *not* dirty. 440 # Now we create bake entries for all the terms that were *not* dirty.
455 # This is because otherwise, on the next incremental bake, we wouldn't 441 # This is because otherwise, on the next incremental bake, we wouldn't
456 # find any entry for those things, and figure that we need to delete 442 # find any entry for those things, and figure that we need to delete
457 # their outputs. 443 # their outputs.
468 record.collapseEntry(prev_entry) 454 record.collapseEntry(prev_entry)
469 else: 455 else:
470 logger.debug("Taxonomy term '%s:%s' isn't used anymore." % 456 logger.debug("Taxonomy term '%s:%s' isn't used anymore." %
471 (ti.taxonomy_name, ti.term)) 457 (ti.taxonomy_name, ti.term))
472 458
473 return expected_result_count 459 return len(jobs)
474 460
475 def _queueBakeJob(self, record, pool, fac, tax_info=None): 461 def _makeBakeJob(self, record, fac, tax_info=None):
476 # Get the previous (if any) and current entry for this page. 462 # Get the previous (if any) and current entry for this page.
477 pair = record.getPreviousAndCurrentEntries(fac.path, tax_info) 463 pair = record.getPreviousAndCurrentEntries(fac.path, tax_info)
478 assert pair is not None 464 assert pair is not None
479 prev_entry, cur_entry = pair 465 prev_entry, cur_entry = pair
480 assert cur_entry is not None 466 assert cur_entry is not None
481 467
482 # Ignore if there were errors in the previous passes. 468 # Ignore if there were errors in the previous passes.
483 if cur_entry.errors: 469 if cur_entry.errors:
484 logger.debug("Ignoring %s because it had previous " 470 logger.debug("Ignoring %s because it had previous "
485 "errors." % fac.ref_spec) 471 "errors." % fac.ref_spec)
486 return False 472 return None
487 473
488 # Build the route metadata and find the appropriate route. 474 # Build the route metadata and find the appropriate route.
489 page = fac.buildPage() 475 page = fac.buildPage()
490 route_metadata = create_route_metadata(page) 476 route_metadata = create_route_metadata(page)
491 if tax_info is not None: 477 if tax_info is not None:
513 "Page '%s' maps to URL '%s' but is overriden " 499 "Page '%s' maps to URL '%s' but is overriden "
514 "by page '%s'." % 500 "by page '%s'." %
515 (fac.ref_spec, uri, override_entry.path)) 501 (fac.ref_spec, uri, override_entry.path))
516 logger.error(cur_entry.errors[-1]) 502 logger.error(cur_entry.errors[-1])
517 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN 503 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN
518 return False 504 return None
519 505
520 job = BakeWorkerJob( 506 job = BakeWorkerJob(
521 JOB_BAKE, 507 JOB_BAKE,
522 BakeJobPayload(fac, route_metadata, prev_entry, 508 BakeJobPayload(fac, route_metadata, prev_entry,
523 record.dirty_source_names, 509 record.dirty_source_names,
524 tax_info)) 510 tax_info))
525 pool.queue.put_nowait(job) 511 return job
526 return True
527 512
528 def _handleDeletetions(self, record): 513 def _handleDeletetions(self, record):
529 logger.debug("Handling deletions...") 514 logger.debug("Handling deletions...")
530 for path, reason in record.getDeletions(): 515 for path, reason in record.getDeletions():
531 logger.debug("Removing '%s': %s" % (path, reason)) 516 logger.debug("Removing '%s': %s" % (path, reason))
542 logger.error("Errors found in %s:" % rel_path) 527 logger.error("Errors found in %s:" % rel_path)
543 for e in errors: 528 for e in errors:
544 logger.error(" " + e) 529 logger.error(" " + e)
545 530
546 def _createWorkerPool(self): 531 def _createWorkerPool(self):
547 import sys 532 from piecrust.workerpool import WorkerPool
548 from piecrust.baking.worker import BakeWorkerContext, worker_func 533 from piecrust.baking.worker import BakeWorkerContext, BakeWorker
549 534
550 main_module = sys.modules['__main__'] 535 ctx = BakeWorkerContext(
551 is_profiling = os.path.basename(main_module.__file__) in [ 536 self.app.root_dir, self.app.cache.base_dir, self.out_dir,
552 'profile.py', 'cProfile.py'] 537 force=self.force, debug=self.app.debug)
553 538 pool = WorkerPool(
554 pool = _WorkerPool() 539 worker_class=BakeWorker,
555 for i in range(self.num_workers): 540 initargs=(ctx,))
556 ctx = BakeWorkerContext(
557 self.app.root_dir, self.app.cache.base_dir, self.out_dir,
558 pool.queue, pool.results, pool.abort_event,
559 force=self.force, debug=self.app.debug,
560 is_profiling=is_profiling)
561 w = multiprocessing.Process(
562 name='BakeWorker_%d' % i,
563 target=worker_func, args=(i, ctx))
564 w.start()
565 pool.workers.append(w)
566 return pool 541 return pool
567
568 def _terminateWorkerPool(self, pool):
569 pool.abort_event.set()
570 for w in pool.workers:
571 w.join()
572
573 def _waitOnWorkerPool(self, pool,
574 expected_result_count=-1, result_handler=None):
575 assert result_handler is None or expected_result_count >= 0
576 abort_with_exception = None
577 try:
578 if result_handler is None:
579 pool.queue.join()
580 else:
581 got_count = 0
582 while got_count < expected_result_count:
583 try:
584 res = pool.results.get(True, 10)
585 except queue.Empty:
586 logger.error(
587 "Got %d results, expected %d, and timed-out "
588 "for 10 seconds. A worker might be stuck?" %
589 (got_count, expected_result_count))
590 abort_with_exception = Exception("Worker time-out.")
591 break
592
593 if isinstance(res, dict) and res.get('type') == 'error':
594 abort_with_exception = Exception(
595 'Worker critical error:\n' +
596 '\n'.join(res['messages']))
597 break
598
599 got_count += 1
600 result_handler(res)
601 except KeyboardInterrupt as kiex:
602 logger.warning("Bake aborted by user... "
603 "waiting for workers to stop.")
604 abort_with_exception = kiex
605
606 if abort_with_exception:
607 pool.abort_event.set()
608 for w in pool.workers:
609 w.join(2)
610 raise abort_with_exception
611
612
613 class _WorkerPool(object):
614 def __init__(self):
615 self.queue = multiprocessing.JoinableQueue()
616 self.results = multiprocessing.Queue()
617 self.abort_event = multiprocessing.Event()
618 self.workers = []
619 542
620 543
621 class _TaxonomyTermsInfo(object): 544 class _TaxonomyTermsInfo(object):
622 def __init__(self): 545 def __init__(self):
623 self.dirty_terms = set() 546 self.dirty_terms = set()