comparison piecrust/baking/baker.py @ 85:3471ffa059b2

Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 03 Sep 2014 17:27:50 -0700
parents 2fec3ee1298f
children e88e330eb8dc
comparison
equal deleted inserted replaced
84:b3ce11b2cf36 85:3471ffa059b2
4 import shutil 4 import shutil
5 import hashlib 5 import hashlib
6 import logging 6 import logging
7 import threading 7 import threading
8 import urllib.request, urllib.error, urllib.parse 8 import urllib.request, urllib.error, urllib.parse
9 from queue import Queue, Empty
10 from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry 9 from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry
11 from piecrust.chefutil import format_timed, log_friendly_exception 10 from piecrust.chefutil import format_timed, log_friendly_exception
12 from piecrust.data.filters import (PaginationFilter, HasFilterClause, 11 from piecrust.data.filters import (PaginationFilter, HasFilterClause,
13 IsFilterClause, AndBooleanClause) 12 IsFilterClause, AndBooleanClause)
14 from piecrust.processing.base import ProcessorPipeline 13 from piecrust.processing.base import ProcessorPipeline
25 copy_assets=True): 24 copy_assets=True):
26 self.app = app 25 self.app = app
27 self.out_dir = out_dir 26 self.out_dir = out_dir
28 self.force = force 27 self.force = force
29 self.record = record 28 self.record = record
30 self.force = force
31 self.copy_assets = copy_assets 29 self.copy_assets = copy_assets
32 self.site_root = app.config.get('site/root') 30 self.site_root = app.config.get('site/root')
33 self.pretty_urls = app.config.get('site/pretty_urls') 31 self.pretty_urls = app.config.get('site/pretty_urls')
34 self.pagination_suffix = app.config.get('site/pagination_suffix') 32 self.pagination_suffix = app.config.get('site/pagination_suffix')
35 33
134 has_more_subs = True 132 has_more_subs = True
135 page = factory.buildPage() 133 page = factory.buildPage()
136 cur_record_entry = BakeRecordPageEntry(page) 134 cur_record_entry = BakeRecordPageEntry(page)
137 cur_record_entry.taxonomy_name = taxonomy_name 135 cur_record_entry.taxonomy_name = taxonomy_name
138 cur_record_entry.taxonomy_term = taxonomy_term 136 cur_record_entry.taxonomy_term = taxonomy_term
139 prev_record_entry = self.record.getPreviousEntry(page, taxonomy_name, 137 prev_record_entry = self.record.getPreviousEntry(
140 taxonomy_term) 138 factory.source.name, factory.rel_path,
139 taxonomy_name, taxonomy_term)
141 140
142 logger.debug("Baking '%s'..." % uri) 141 logger.debug("Baking '%s'..." % uri)
143 while has_more_subs: 142 while has_more_subs:
144 sub_uri = self.getOutputUri(uri, cur_sub) 143 sub_uri = self.getOutputUri(uri, cur_sub)
145 out_path = self.getOutputPath(sub_uri) 144 out_path = self.getOutputPath(sub_uri)
349 max_time = max(max_time, os.path.getmtime(full_fn)) 348 max_time = max(max_time, os.path.getmtime(full_fn))
350 if max_time >= record.previous.bake_time: 349 if max_time >= record.previous.bake_time:
351 reason = "templates modified" 350 reason = "templates modified"
352 351
353 if reason is not None: 352 if reason is not None:
353 # We have to bake everything from scratch.
354 cache_dir = self.app.cache.getCacheDir('baker') 354 cache_dir = self.app.cache.getCacheDir('baker')
355 if os.path.isdir(cache_dir): 355 if os.path.isdir(cache_dir):
356 logger.debug("Cleaning baker cache: %s" % cache_dir) 356 logger.debug("Cleaning baker cache: %s" % cache_dir)
357 shutil.rmtree(cache_dir) 357 shutil.rmtree(cache_dir)
358 self.force = True 358 self.force = True
380 if route is None: 380 if route is None:
381 logger.error("Can't get route for page: %s" % fac.ref_spec) 381 logger.error("Can't get route for page: %s" % fac.ref_spec)
382 continue 382 continue
383 383
384 logger.debug("Queuing: %s" % fac.ref_spec) 384 logger.debug("Queuing: %s" % fac.ref_spec)
385 queue.put_nowait(BakeWorkerJob(fac, route)) 385 queue.addJob(BakeWorkerJob(fac, route))
386 386
387 self._waitOnWorkerPool(pool, abort) 387 self._waitOnWorkerPool(pool, abort)
388 388
389 def _bakeTaxonomies(self, record): 389 def _bakeTaxonomies(self, record):
390 logger.debug("Baking taxonomies") 390 logger.debug("Baking taxonomies")
467 for term in terms: 467 for term in terms:
468 fac = PageFactory(tax_page_source, tax_page_rel_path, 468 fac = PageFactory(tax_page_source, tax_page_rel_path,
469 {tax.term_name: term}) 469 {tax.term_name: term})
470 logger.debug("Queuing: %s [%s, %s]" % 470 logger.debug("Queuing: %s [%s, %s]" %
471 (fac.ref_spec, tax_name, term)) 471 (fac.ref_spec, tax_name, term))
472 queue.put_nowait( 472 queue.addJob(
473 BakeWorkerJob(fac, route, tax_name, term)) 473 BakeWorkerJob(fac, route, tax_name, term))
474 474
475 self._waitOnWorkerPool(pool, abort) 475 self._waitOnWorkerPool(pool, abort)
476 476
477 def _bakeAssets(self, record): 477 def _bakeAssets(self, record):
485 num_workers=self.num_workers) 485 num_workers=self.num_workers)
486 proc.run() 486 proc.run()
487 487
488 def _createWorkerPool(self, record, pool_size=4): 488 def _createWorkerPool(self, record, pool_size=4):
489 pool = [] 489 pool = []
490 queue = Queue() 490 queue = BakeScheduler(record)
491 abort = threading.Event() 491 abort = threading.Event()
492 for i in range(pool_size): 492 for i in range(pool_size):
493 ctx = BakeWorkerContext(self.app, self.out_dir, self.force, 493 ctx = BakeWorkerContext(self.app, self.out_dir, self.force,
494 record, queue, abort) 494 record, queue, abort)
495 worker = BakeWorker(i, ctx) 495 worker = BakeWorker(i, ctx)
496 worker.start()
497 pool.append(worker) 496 pool.append(worker)
498 return pool, queue, abort 497 return pool, queue, abort
499 498
500 def _waitOnWorkerPool(self, pool, abort): 499 def _waitOnWorkerPool(self, pool, abort):
500 for w in pool:
501 w.start()
501 for w in pool: 502 for w in pool:
502 w.join() 503 w.join()
503 if abort.is_set(): 504 if abort.is_set():
504 excs = [w.abort_exception for w in pool 505 excs = [w.abort_exception for w in pool
505 if w.abort_exception is not None] 506 if w.abort_exception is not None]
506 logger.error("%s errors" % len(excs)) 507 logger.error("Baking was aborted due to %s error(s):" % len(excs))
507 if self.app.debug: 508 if self.app.debug:
508 for e in excs: 509 for e in excs:
509 logger.exception(e) 510 logger.exception(e)
510 else: 511 else:
511 for e in excs: 512 for e in excs:
512 log_friendly_exception(logger, e) 513 log_friendly_exception(logger, e)
513 raise Exception("Baking was aborted due to errors.") 514 raise Exception("Baking was aborted due to errors.")
515
516
517 class BakeScheduler(object):
518 _EMPTY = object()
519 _WAIT = object()
520
521 def __init__(self, record, jobs=None):
522 self.record = record
523 self.jobs = list(jobs) if jobs is not None else []
524 self._active_jobs = []
525 self._lock = threading.Lock()
526 self._added_event = threading.Event()
527 self._done_event = threading.Event()
528
529 def addJob(self, job):
530 logger.debug("Adding job '%s:%s' to scheduler." % (
531 job.factory.source.name, job.factory.rel_path))
532 with self._lock:
533 self.jobs.append(job)
534 self._added_event.set()
535
536 def onJobFinished(self, job):
537 logger.debug("Removing job '%s:%s' from scheduler." % (
538 job.factory.source.name, job.factory.rel_path))
539 with self._lock:
540 self._active_jobs.remove(job)
541 self._done_event.set()
542
543 def getNextJob(self, timeout=None):
544 self._added_event.clear()
545 self._done_event.clear()
546 job = self._doGetNextJob()
547 while job in (self._EMPTY, self._WAIT):
548 if timeout is None:
549 return None
550 if job == self._EMPTY:
551 logger.debug("Waiting for a new job to be added...")
552 res = self._added_event.wait(timeout)
553 elif job == self._WAIT:
554 logger.debug("Waiting for a job to be finished...")
555 res = self._done_event.wait(timeout)
556 if not res:
557 logger.debug("Timed-out. No job found.")
558 return None
559 job = self._doGetNextJob()
560 return job
561
562 def _doGetNextJob(self):
563 with self._lock:
564 if len(self.jobs) == 0:
565 return self._EMPTY
566
567 job = self.jobs.pop(0)
568 first_job = job
569 while not self._isJobReady(job):
570 logger.debug("Job '%s:%s' isn't ready yet." % (
571 job.factory.source.name, job.factory.rel_path))
572 self.jobs.append(job)
573 job = self.jobs.pop(0)
574 if job == first_job:
575 # None of the jobs are ready... we need to wait.
576 return self._WAIT
577
578 logger.debug("Job '%s:%s' is ready to go, moving to active "
579 "queue." % (job.factory.source.name, job.factory.rel_path))
580 self._active_jobs.append(job)
581 return job
582
583 def _isJobReady(self, job):
584 e = self.record.getPreviousEntry(job.factory.source.name,
585 job.factory.rel_path)
586 if not e:
587 return True
588 for sn in e.used_source_names:
589 if any(filter(lambda j: j.factory.source.name == sn, self.jobs)):
590 return False
591 if any(filter(lambda j: j.factory.source.name == sn,
592 self._active_jobs)):
593 return False
594 return True
514 595
515 596
516 class BakeWorkerContext(object): 597 class BakeWorkerContext(object):
517 def __init__(self, app, out_dir, force, record, work_queue, 598 def __init__(self, app, out_dir, force, record, work_queue,
518 abort_event): 599 abort_event):
545 self._page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force, 626 self._page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force,
546 ctx.record) 627 ctx.record)
547 628
548 def run(self): 629 def run(self):
549 while(not self.ctx.abort_event.is_set()): 630 while(not self.ctx.abort_event.is_set()):
550 try: 631 job = self.ctx.work_queue.getNextJob()
551 job = self.ctx.work_queue.get(True, 0.1) 632 if job is None:
552 except Empty:
553 logger.debug("[%d] No more work... shutting down." % self.wid) 633 logger.debug("[%d] No more work... shutting down." % self.wid)
554 break 634 break
555 635
556 try: 636 try:
557 self._unsafeRun(job) 637 self._unsafeRun(job)
558 logger.debug("[%d] Done with page." % self.wid) 638 logger.debug("[%d] Done with page." % self.wid)
559 self.ctx.work_queue.task_done() 639 self.ctx.work_queue.onJobFinished(job)
560 except Exception as ex: 640 except Exception as ex:
561 self.ctx.abort_event.set() 641 self.ctx.abort_event.set()
562 self.abort_exception = ex 642 self.abort_exception = ex
563 logger.debug("[%d] Critical error, aborting." % self.wid) 643 logger.debug("[%d] Critical error, aborting." % self.wid)
564 if self.ctx.app.debug: 644 if self.ctx.app.debug: