Mercurial > piecrust2
diff piecrust/processing/base.py @ 120:133845647083
Better error management and removal support in baking/processing.
* Baker and processor pipeline now store errors in their records.
* They also support deleting output files that are no longer valid.
* The basic transitional record class implements more boilerplate code.
* The processor pipeline is run from the `bake` command directly.
* New unit tests.
* Unit test mocking now mocks `os.remove` too.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 09 Nov 2014 14:46:23 -0800 |
parents | 6827dcc9d3fb |
children | e5cba2622d26 |
line wrap: on
line diff
--- a/piecrust/processing/base.py Wed Oct 29 08:19:58 2014 -0700 +++ b/piecrust/processing/base.py Sun Nov 09 14:46:23 2014 -0800 @@ -3,12 +3,15 @@ import shutil import os.path import logging +import hashlib import threading from queue import Queue, Empty from piecrust.chefutil import format_timed +from piecrust.processing.records import ( + ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, + FLAG_PROCESSED, FLAG_OVERRIDEN) from piecrust.processing.tree import (ProcessingTreeBuilder, - ProcessingTreeRunner, STATE_DIRTY, print_node) -from piecrust.records import Record + ProcessingTreeRunner, ProcessingTreeError, STATE_DIRTY, print_node) logger = logging.getLogger(__name__) @@ -96,41 +99,6 @@ raise NotImplementedError() -class ProcessorPipelineRecord(Record): - VERSION = 1 - - def __init__(self): - super(ProcessorPipelineRecord, self).__init__() - - def addEntry(self, item): - self.entries.append(item) - - def hasOverrideEntry(self, rel_path): - return self.findEntry(rel_path) is not None - - def findEntry(self, rel_path): - rel_path = rel_path.lower() - for entry in self.entries: - for out_path in entry.rel_outputs: - if out_path.lower() == rel_path: - return entry - return None - - -class ProcessorPipelineRecordEntry(object): - def __init__(self, base_dir, rel_input, is_processed=False, - is_overridden=False): - self.base_dir = base_dir - self.rel_input = rel_input - self.rel_outputs = [] - self.is_processed = is_processed - self.is_overridden = is_overridden - - @property - def path(self): - return os.path.join(self.base_dir, self.rel_input) - - class ProcessingContext(object): def __init__(self, base_dir, job_queue, record=None): self.base_dir = base_dir @@ -177,15 +145,27 @@ # patching the processors with some new ones. self.processors.sort(key=lambda p: p.priority) + # Create the pipeline record. + record = TransitionalProcessorPipelineRecord() + record_cache = self.app.cache.getCache('baker') + record_name = ( + 'assets_' + + hashlib.md5(self.out_dir.encode('utf8')).hexdigest() + + '.record') + if not self.force and record_cache.has(record_name): + t = time.clock() + record.loadPrevious(record_cache.getCachePath(record_name)) + logger.debug(format_timed(t, 'loaded previous bake record', + colored=False)) + # Create the workers. pool = [] queue = Queue() abort = threading.Event() pipeline_lock = threading.Lock() - record = ProcessorPipelineRecord() for i in range(self.num_workers): - ctx = ProcessingWorkerContext(self, record, queue, abort, - pipeline_lock) + ctx = ProcessingWorkerContext(self, record, + queue, abort, pipeline_lock) worker = ProcessingWorker(i, ctx) worker.start() pool.append(worker) @@ -222,10 +202,24 @@ if abort.is_set(): raise Exception("Worker pool was aborted.") + # Handle deletions. + for path, reason in record.getDeletions(): + logger.debug("Removing '%s': %s" % (path, reason)) + os.remove(path) + logger.info('[delete] %s' % path) + # Invoke post-processors. for proc in self.processors: proc.onPipelineEnd(self) + # Save the process record. + t = time.clock() + record.current.process_time = time.time() + record.current.out_dir = self.out_dir + record.collapseRecords() + record.saveCurrent(record_cache.getCachePath(record_name)) + logger.debug(format_timed(t, 'saved bake record', colored=False)) + return record def processDirectory(self, ctx, start_dir): @@ -246,8 +240,8 @@ class ProcessingWorkerContext(object): - def __init__(self, pipeline, record, work_queue, abort_event, - pipeline_lock): + def __init__(self, pipeline, record, + work_queue, abort_event, pipeline_lock): self.pipeline = pipeline self.record = record self.work_queue = work_queue @@ -291,38 +285,49 @@ record = self.ctx.record rel_path = os.path.relpath(job.path, job.base_dir) + previous_entry = record.getPreviousEntry(rel_path) + record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) + record.addEntry(record_entry) # Figure out if a previously processed file is overriding this one. # This can happen if a theme file (processed via a mount point) # is overridden in the user's website. - if record.hasOverrideEntry(rel_path): - record.addEntry(ProcessorPipelineRecordEntry( - job.base_dir, rel_path, - is_processed=False, is_overridden=True)) + if record.current.hasOverrideEntry(rel_path): + record_entry.flags |= FLAG_OVERRIDEN logger.info(format_timed(start_time, '%s [not baked, overridden]' % rel_path)) return - builder = ProcessingTreeBuilder(pipeline.processors) - tree_root = builder.build(rel_path) + try: + builder = ProcessingTreeBuilder(pipeline.processors) + tree_root = builder.build(rel_path) + except ProcessingTreeError as ex: + record_entry.errors.append(str(ex)) + logger.error("Error processing %s: %s" % (rel_path, ex)) + return + print_node(tree_root, recursive=True) leaves = tree_root.getLeaves() - fi = ProcessorPipelineRecordEntry(job.base_dir, rel_path) - fi.rel_outputs = [l.path for l in leaves] - record.addEntry(fi) + record_entry.rel_outputs = [l.path for l in leaves] - force = pipeline.force + force = (pipeline.force or previous_entry is None or + not previous_entry.was_processed_successfully) if not force: force = re_matchany(rel_path, pipeline.force_patterns) if force: tree_root.setState(STATE_DIRTY, True) - runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir, - pipeline.out_dir, self.ctx.pipeline_lock) - if runner.processSubTree(tree_root): - fi.is_processed = True - logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) + try: + runner = ProcessingTreeRunner( + job.base_dir, pipeline.tmp_dir, + pipeline.out_dir, self.ctx.pipeline_lock) + if runner.processSubTree(tree_root): + record_entry.flags |= FLAG_PROCESSED + logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) + except ProcessingTreeError as ex: + record_entry.errors.append(str(ex)) + logger.error("Error processing %s: %s" % (rel_path, ex)) def make_re(patterns):