Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 85:3471ffa059b2
Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 03 Sep 2014 17:27:50 -0700 |
parents | 2fec3ee1298f |
children | e88e330eb8dc |
comparison
equal
deleted
inserted
replaced
84:b3ce11b2cf36 | 85:3471ffa059b2 |
---|---|
4 import shutil | 4 import shutil |
5 import hashlib | 5 import hashlib |
6 import logging | 6 import logging |
7 import threading | 7 import threading |
8 import urllib.request, urllib.error, urllib.parse | 8 import urllib.request, urllib.error, urllib.parse |
9 from queue import Queue, Empty | |
10 from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry | 9 from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry |
11 from piecrust.chefutil import format_timed, log_friendly_exception | 10 from piecrust.chefutil import format_timed, log_friendly_exception |
12 from piecrust.data.filters import (PaginationFilter, HasFilterClause, | 11 from piecrust.data.filters import (PaginationFilter, HasFilterClause, |
13 IsFilterClause, AndBooleanClause) | 12 IsFilterClause, AndBooleanClause) |
14 from piecrust.processing.base import ProcessorPipeline | 13 from piecrust.processing.base import ProcessorPipeline |
25 copy_assets=True): | 24 copy_assets=True): |
26 self.app = app | 25 self.app = app |
27 self.out_dir = out_dir | 26 self.out_dir = out_dir |
28 self.force = force | 27 self.force = force |
29 self.record = record | 28 self.record = record |
30 self.force = force | |
31 self.copy_assets = copy_assets | 29 self.copy_assets = copy_assets |
32 self.site_root = app.config.get('site/root') | 30 self.site_root = app.config.get('site/root') |
33 self.pretty_urls = app.config.get('site/pretty_urls') | 31 self.pretty_urls = app.config.get('site/pretty_urls') |
34 self.pagination_suffix = app.config.get('site/pagination_suffix') | 32 self.pagination_suffix = app.config.get('site/pagination_suffix') |
35 | 33 |
134 has_more_subs = True | 132 has_more_subs = True |
135 page = factory.buildPage() | 133 page = factory.buildPage() |
136 cur_record_entry = BakeRecordPageEntry(page) | 134 cur_record_entry = BakeRecordPageEntry(page) |
137 cur_record_entry.taxonomy_name = taxonomy_name | 135 cur_record_entry.taxonomy_name = taxonomy_name |
138 cur_record_entry.taxonomy_term = taxonomy_term | 136 cur_record_entry.taxonomy_term = taxonomy_term |
139 prev_record_entry = self.record.getPreviousEntry(page, taxonomy_name, | 137 prev_record_entry = self.record.getPreviousEntry( |
140 taxonomy_term) | 138 factory.source.name, factory.rel_path, |
139 taxonomy_name, taxonomy_term) | |
141 | 140 |
142 logger.debug("Baking '%s'..." % uri) | 141 logger.debug("Baking '%s'..." % uri) |
143 while has_more_subs: | 142 while has_more_subs: |
144 sub_uri = self.getOutputUri(uri, cur_sub) | 143 sub_uri = self.getOutputUri(uri, cur_sub) |
145 out_path = self.getOutputPath(sub_uri) | 144 out_path = self.getOutputPath(sub_uri) |
349 max_time = max(max_time, os.path.getmtime(full_fn)) | 348 max_time = max(max_time, os.path.getmtime(full_fn)) |
350 if max_time >= record.previous.bake_time: | 349 if max_time >= record.previous.bake_time: |
351 reason = "templates modified" | 350 reason = "templates modified" |
352 | 351 |
353 if reason is not None: | 352 if reason is not None: |
353 # We have to bake everything from scratch. | |
354 cache_dir = self.app.cache.getCacheDir('baker') | 354 cache_dir = self.app.cache.getCacheDir('baker') |
355 if os.path.isdir(cache_dir): | 355 if os.path.isdir(cache_dir): |
356 logger.debug("Cleaning baker cache: %s" % cache_dir) | 356 logger.debug("Cleaning baker cache: %s" % cache_dir) |
357 shutil.rmtree(cache_dir) | 357 shutil.rmtree(cache_dir) |
358 self.force = True | 358 self.force = True |
380 if route is None: | 380 if route is None: |
381 logger.error("Can't get route for page: %s" % fac.ref_spec) | 381 logger.error("Can't get route for page: %s" % fac.ref_spec) |
382 continue | 382 continue |
383 | 383 |
384 logger.debug("Queuing: %s" % fac.ref_spec) | 384 logger.debug("Queuing: %s" % fac.ref_spec) |
385 queue.put_nowait(BakeWorkerJob(fac, route)) | 385 queue.addJob(BakeWorkerJob(fac, route)) |
386 | 386 |
387 self._waitOnWorkerPool(pool, abort) | 387 self._waitOnWorkerPool(pool, abort) |
388 | 388 |
389 def _bakeTaxonomies(self, record): | 389 def _bakeTaxonomies(self, record): |
390 logger.debug("Baking taxonomies") | 390 logger.debug("Baking taxonomies") |
467 for term in terms: | 467 for term in terms: |
468 fac = PageFactory(tax_page_source, tax_page_rel_path, | 468 fac = PageFactory(tax_page_source, tax_page_rel_path, |
469 {tax.term_name: term}) | 469 {tax.term_name: term}) |
470 logger.debug("Queuing: %s [%s, %s]" % | 470 logger.debug("Queuing: %s [%s, %s]" % |
471 (fac.ref_spec, tax_name, term)) | 471 (fac.ref_spec, tax_name, term)) |
472 queue.put_nowait( | 472 queue.addJob( |
473 BakeWorkerJob(fac, route, tax_name, term)) | 473 BakeWorkerJob(fac, route, tax_name, term)) |
474 | 474 |
475 self._waitOnWorkerPool(pool, abort) | 475 self._waitOnWorkerPool(pool, abort) |
476 | 476 |
477 def _bakeAssets(self, record): | 477 def _bakeAssets(self, record): |
485 num_workers=self.num_workers) | 485 num_workers=self.num_workers) |
486 proc.run() | 486 proc.run() |
487 | 487 |
488 def _createWorkerPool(self, record, pool_size=4): | 488 def _createWorkerPool(self, record, pool_size=4): |
489 pool = [] | 489 pool = [] |
490 queue = Queue() | 490 queue = BakeScheduler(record) |
491 abort = threading.Event() | 491 abort = threading.Event() |
492 for i in range(pool_size): | 492 for i in range(pool_size): |
493 ctx = BakeWorkerContext(self.app, self.out_dir, self.force, | 493 ctx = BakeWorkerContext(self.app, self.out_dir, self.force, |
494 record, queue, abort) | 494 record, queue, abort) |
495 worker = BakeWorker(i, ctx) | 495 worker = BakeWorker(i, ctx) |
496 worker.start() | |
497 pool.append(worker) | 496 pool.append(worker) |
498 return pool, queue, abort | 497 return pool, queue, abort |
499 | 498 |
500 def _waitOnWorkerPool(self, pool, abort): | 499 def _waitOnWorkerPool(self, pool, abort): |
500 for w in pool: | |
501 w.start() | |
501 for w in pool: | 502 for w in pool: |
502 w.join() | 503 w.join() |
503 if abort.is_set(): | 504 if abort.is_set(): |
504 excs = [w.abort_exception for w in pool | 505 excs = [w.abort_exception for w in pool |
505 if w.abort_exception is not None] | 506 if w.abort_exception is not None] |
506 logger.error("%s errors" % len(excs)) | 507 logger.error("Baking was aborted due to %s error(s):" % len(excs)) |
507 if self.app.debug: | 508 if self.app.debug: |
508 for e in excs: | 509 for e in excs: |
509 logger.exception(e) | 510 logger.exception(e) |
510 else: | 511 else: |
511 for e in excs: | 512 for e in excs: |
512 log_friendly_exception(logger, e) | 513 log_friendly_exception(logger, e) |
513 raise Exception("Baking was aborted due to errors.") | 514 raise Exception("Baking was aborted due to errors.") |
515 | |
516 | |
517 class BakeScheduler(object): | |
518 _EMPTY = object() | |
519 _WAIT = object() | |
520 | |
521 def __init__(self, record, jobs=None): | |
522 self.record = record | |
523 self.jobs = list(jobs) if jobs is not None else [] | |
524 self._active_jobs = [] | |
525 self._lock = threading.Lock() | |
526 self._added_event = threading.Event() | |
527 self._done_event = threading.Event() | |
528 | |
529 def addJob(self, job): | |
530 logger.debug("Adding job '%s:%s' to scheduler." % ( | |
531 job.factory.source.name, job.factory.rel_path)) | |
532 with self._lock: | |
533 self.jobs.append(job) | |
534 self._added_event.set() | |
535 | |
536 def onJobFinished(self, job): | |
537 logger.debug("Removing job '%s:%s' from scheduler." % ( | |
538 job.factory.source.name, job.factory.rel_path)) | |
539 with self._lock: | |
540 self._active_jobs.remove(job) | |
541 self._done_event.set() | |
542 | |
543 def getNextJob(self, timeout=None): | |
544 self._added_event.clear() | |
545 self._done_event.clear() | |
546 job = self._doGetNextJob() | |
547 while job in (self._EMPTY, self._WAIT): | |
548 if timeout is None: | |
549 return None | |
550 if job == self._EMPTY: | |
551 logger.debug("Waiting for a new job to be added...") | |
552 res = self._added_event.wait(timeout) | |
553 elif job == self._WAIT: | |
554 logger.debug("Waiting for a job to be finished...") | |
555 res = self._done_event.wait(timeout) | |
556 if not res: | |
557 logger.debug("Timed-out. No job found.") | |
558 return None | |
559 job = self._doGetNextJob() | |
560 return job | |
561 | |
562 def _doGetNextJob(self): | |
563 with self._lock: | |
564 if len(self.jobs) == 0: | |
565 return self._EMPTY | |
566 | |
567 job = self.jobs.pop(0) | |
568 first_job = job | |
569 while not self._isJobReady(job): | |
570 logger.debug("Job '%s:%s' isn't ready yet." % ( | |
571 job.factory.source.name, job.factory.rel_path)) | |
572 self.jobs.append(job) | |
573 job = self.jobs.pop(0) | |
574 if job == first_job: | |
575 # None of the jobs are ready... we need to wait. | |
576 return self._WAIT | |
577 | |
578 logger.debug("Job '%s:%s' is ready to go, moving to active " | |
579 "queue." % (job.factory.source.name, job.factory.rel_path)) | |
580 self._active_jobs.append(job) | |
581 return job | |
582 | |
583 def _isJobReady(self, job): | |
584 e = self.record.getPreviousEntry(job.factory.source.name, | |
585 job.factory.rel_path) | |
586 if not e: | |
587 return True | |
588 for sn in e.used_source_names: | |
589 if any(filter(lambda j: j.factory.source.name == sn, self.jobs)): | |
590 return False | |
591 if any(filter(lambda j: j.factory.source.name == sn, | |
592 self._active_jobs)): | |
593 return False | |
594 return True | |
514 | 595 |
515 | 596 |
516 class BakeWorkerContext(object): | 597 class BakeWorkerContext(object): |
517 def __init__(self, app, out_dir, force, record, work_queue, | 598 def __init__(self, app, out_dir, force, record, work_queue, |
518 abort_event): | 599 abort_event): |
545 self._page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force, | 626 self._page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force, |
546 ctx.record) | 627 ctx.record) |
547 | 628 |
548 def run(self): | 629 def run(self): |
549 while(not self.ctx.abort_event.is_set()): | 630 while(not self.ctx.abort_event.is_set()): |
550 try: | 631 job = self.ctx.work_queue.getNextJob() |
551 job = self.ctx.work_queue.get(True, 0.1) | 632 if job is None: |
552 except Empty: | |
553 logger.debug("[%d] No more work... shutting down." % self.wid) | 633 logger.debug("[%d] No more work... shutting down." % self.wid) |
554 break | 634 break |
555 | 635 |
556 try: | 636 try: |
557 self._unsafeRun(job) | 637 self._unsafeRun(job) |
558 logger.debug("[%d] Done with page." % self.wid) | 638 logger.debug("[%d] Done with page." % self.wid) |
559 self.ctx.work_queue.task_done() | 639 self.ctx.work_queue.onJobFinished(job) |
560 except Exception as ex: | 640 except Exception as ex: |
561 self.ctx.abort_event.set() | 641 self.ctx.abort_event.set() |
562 self.abort_exception = ex | 642 self.abort_exception = ex |
563 logger.debug("[%d] Critical error, aborting." % self.wid) | 643 logger.debug("[%d] Critical error, aborting." % self.wid) |
564 if self.ctx.app.debug: | 644 if self.ctx.app.debug: |