diff piecrust/processing/base.py @ 120:133845647083

Better error management and removal support in baking/processing. * Baker and processor pipeline now store errors in their records. * They also support deleting output files that are no longer valid. * The basic transitional record class implements more boilerplate code. * The processor pipeline is run from the `bake` command directly. * New unit tests. * Unit test mocking now mocks `os.remove` too.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 09 Nov 2014 14:46:23 -0800
parents 6827dcc9d3fb
children e5cba2622d26
line wrap: on
line diff
--- a/piecrust/processing/base.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/processing/base.py	Sun Nov 09 14:46:23 2014 -0800
@@ -3,12 +3,15 @@
 import shutil
 import os.path
 import logging
+import hashlib
 import threading
 from queue import Queue, Empty
 from piecrust.chefutil import format_timed
+from piecrust.processing.records import (
+        ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
+        FLAG_PROCESSED, FLAG_OVERRIDEN)
 from piecrust.processing.tree import (ProcessingTreeBuilder,
-        ProcessingTreeRunner, STATE_DIRTY, print_node)
-from piecrust.records import Record
+        ProcessingTreeRunner, ProcessingTreeError, STATE_DIRTY, print_node)
 
 
 logger = logging.getLogger(__name__)
@@ -96,41 +99,6 @@
         raise NotImplementedError()
 
 
-class ProcessorPipelineRecord(Record):
-    VERSION = 1
-
-    def __init__(self):
-        super(ProcessorPipelineRecord, self).__init__()
-
-    def addEntry(self, item):
-        self.entries.append(item)
-
-    def hasOverrideEntry(self, rel_path):
-        return self.findEntry(rel_path) is not None
-
-    def findEntry(self, rel_path):
-        rel_path = rel_path.lower()
-        for entry in self.entries:
-            for out_path in entry.rel_outputs:
-                if out_path.lower() == rel_path:
-                    return entry
-        return None
-
-
-class ProcessorPipelineRecordEntry(object):
-    def __init__(self, base_dir, rel_input, is_processed=False,
-            is_overridden=False):
-        self.base_dir = base_dir
-        self.rel_input = rel_input
-        self.rel_outputs = []
-        self.is_processed = is_processed
-        self.is_overridden = is_overridden
-
-    @property
-    def path(self):
-        return os.path.join(self.base_dir, self.rel_input)
-
-
 class ProcessingContext(object):
     def __init__(self, base_dir, job_queue, record=None):
         self.base_dir = base_dir
@@ -177,15 +145,27 @@
         # patching the processors with some new ones.
         self.processors.sort(key=lambda p: p.priority)
 
+        # Create the pipeline record.
+        record = TransitionalProcessorPipelineRecord()
+        record_cache = self.app.cache.getCache('baker')
+        record_name = (
+                'assets_' +
+                hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
+                '.record')
+        if not self.force and record_cache.has(record_name):
+            t = time.clock()
+            record.loadPrevious(record_cache.getCachePath(record_name))
+            logger.debug(format_timed(t, 'loaded previous bake record',
+                    colored=False))
+
         # Create the workers.
         pool = []
         queue = Queue()
         abort = threading.Event()
         pipeline_lock = threading.Lock()
-        record = ProcessorPipelineRecord()
         for i in range(self.num_workers):
-            ctx = ProcessingWorkerContext(self, record, queue, abort,
-                    pipeline_lock)
+            ctx = ProcessingWorkerContext(self, record,
+                                          queue, abort, pipeline_lock)
             worker = ProcessingWorker(i, ctx)
             worker.start()
             pool.append(worker)
@@ -222,10 +202,24 @@
         if abort.is_set():
             raise Exception("Worker pool was aborted.")
 
+        # Handle deletions.
+        for path, reason in record.getDeletions():
+            logger.debug("Removing '%s': %s" % (path, reason))
+            os.remove(path)
+            logger.info('[delete] %s' % path)
+
         # Invoke post-processors.
         for proc in self.processors:
             proc.onPipelineEnd(self)
 
+        # Save the process record.
+        t = time.clock()
+        record.current.process_time = time.time()
+        record.current.out_dir = self.out_dir
+        record.collapseRecords()
+        record.saveCurrent(record_cache.getCachePath(record_name))
+        logger.debug(format_timed(t, 'saved bake record', colored=False))
+
         return record
 
     def processDirectory(self, ctx, start_dir):
@@ -246,8 +240,8 @@
 
 
 class ProcessingWorkerContext(object):
-    def __init__(self, pipeline, record, work_queue, abort_event,
-            pipeline_lock):
+    def __init__(self, pipeline, record,
+            work_queue, abort_event, pipeline_lock):
         self.pipeline = pipeline
         self.record = record
         self.work_queue = work_queue
@@ -291,38 +285,49 @@
         record = self.ctx.record
 
         rel_path = os.path.relpath(job.path, job.base_dir)
+        previous_entry = record.getPreviousEntry(rel_path)
+        record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
+        record.addEntry(record_entry)
 
         # Figure out if a previously processed file is overriding this one.
         # This can happen if a theme file (processed via a mount point)
         # is overridden in the user's website.
-        if record.hasOverrideEntry(rel_path):
-            record.addEntry(ProcessorPipelineRecordEntry(
-                    job.base_dir, rel_path,
-                    is_processed=False, is_overridden=True))
+        if record.current.hasOverrideEntry(rel_path):
+            record_entry.flags |= FLAG_OVERRIDEN
             logger.info(format_timed(start_time,
                     '%s [not baked, overridden]' % rel_path))
             return
 
-        builder = ProcessingTreeBuilder(pipeline.processors)
-        tree_root = builder.build(rel_path)
+        try:
+            builder = ProcessingTreeBuilder(pipeline.processors)
+            tree_root = builder.build(rel_path)
+        except ProcessingTreeError as ex:
+            record_entry.errors.append(str(ex))
+            logger.error("Error processing %s: %s" % (rel_path, ex))
+            return
+
         print_node(tree_root, recursive=True)
         leaves = tree_root.getLeaves()
-        fi = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
-        fi.rel_outputs = [l.path for l in leaves]
-        record.addEntry(fi)
+        record_entry.rel_outputs = [l.path for l in leaves]
 
-        force = pipeline.force
+        force = (pipeline.force or previous_entry is None or
+                 not previous_entry.was_processed_successfully)
         if not force:
             force = re_matchany(rel_path, pipeline.force_patterns)
 
         if force:
             tree_root.setState(STATE_DIRTY, True)
 
-        runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir,
-                pipeline.out_dir, self.ctx.pipeline_lock)
-        if runner.processSubTree(tree_root):
-            fi.is_processed = True
-            logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path)))
+        try:
+            runner = ProcessingTreeRunner(
+                    job.base_dir, pipeline.tmp_dir,
+                    pipeline.out_dir, self.ctx.pipeline_lock)
+            if runner.processSubTree(tree_root):
+                record_entry.flags |= FLAG_PROCESSED
+                logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path)))
+        except ProcessingTreeError as ex:
+            record_entry.errors.append(str(ex))
+            logger.error("Error processing %s: %s" % (rel_path, ex))
 
 
 def make_re(patterns):