changeset 120:133845647083

Better error management and removal support in baking/processing. * Baker and processor pipeline now store errors in their records. * They also support deleting output files that are no longer valid. * The basic transitional record class implements more boilerplate code. * The processor pipeline is run from the `bake` command directly. * New unit tests. * Unit test mocking now mocks `os.remove` too.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 09 Nov 2014 14:46:23 -0800
parents 0811f92cbdc7
children 300eb1c2cb14
files piecrust/baking/baker.py piecrust/baking/records.py piecrust/chefutil.py piecrust/commands/builtin/baking.py piecrust/processing/base.py piecrust/processing/records.py piecrust/processing/tree.py piecrust/records.py tests/mockutil.py tests/test_baking_baker.py tests/test_processing_base.py
diffstat 11 files changed, 509 insertions(+), 180 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/baking/baker.py	Sun Nov 09 14:46:23 2014 -0800
@@ -12,7 +12,6 @@
 from piecrust.chefutil import format_timed, log_friendly_exception
 from piecrust.data.filters import (PaginationFilter, HasFilterClause,
         IsFilterClause, AndBooleanClause)
-from piecrust.processing.base import ProcessorPipeline
 from piecrust.rendering import (PageRenderingContext, render_page,
         PASS_FORMATTING, PASS_RENDERING)
 from piecrust.sources.base import (PageFactory,
@@ -144,7 +143,7 @@
         # see if any of those got baked, or are going to be baked for some
         # reason. If so, we need to bake this one too.
         # (this happens for instance with the main page of a blog).
-        if prev_record_entry:
+        if prev_record_entry and prev_record_entry.was_baked_successfully:
             invalidated_render_passes = set()
             used_src_names = list(prev_record_entry.used_source_names)
             for src_name, rdr_pass in used_src_names:
@@ -266,13 +265,13 @@
 
 class Baker(object):
     def __init__(self, app, out_dir=None, force=False, portable=False,
-            no_assets=False):
+            no_assets=False, num_workers=4):
         self.app = app
         self.out_dir = out_dir or os.path.join(app.root_dir, '_counter')
         self.force = force
         self.portable = portable
         self.no_assets = no_assets
-        self.num_workers = app.config.get('baker/workers') or 4
+        self.num_workers = num_workers
 
         # Remember what taxonomy pages we should skip
         # (we'll bake them repeatedly later with each taxonomy term)
@@ -301,7 +300,9 @@
         # Load/create the bake record.
         record = TransitionalBakeRecord()
         record_cache = self.app.cache.getCache('baker')
-        record_name = (hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
+        record_name = (
+                'pages_' +
+                hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
                 '.record')
         if not self.force and record_cache.has(record_name):
             t = time.clock()
@@ -331,9 +332,8 @@
         # Bake taxonomies.
         self._bakeTaxonomies(record)
 
-        # Bake the assets.
-        if not self.no_assets:
-            self._bakeAssets(record)
+        # Delete files from the output.
+        self._handleDeletetions(record)
 
         # Save the bake record.
         t = time.clock()
@@ -345,8 +345,7 @@
 
         # All done.
         self.app.config.set('baker/is_baking', False)
-        logger.info('-------------------------');
-        logger.info(format_timed(start_time, 'done baking'));
+        logger.debug(format_timed(start_time, 'done baking'));
 
     def _handleCacheValidity(self, record):
         start_time = time.clock()
@@ -407,13 +406,16 @@
                             (source.name, fac.ref_spec))
                     continue
 
+                entry = BakeRecordPageEntry(fac)
+                record.addEntry(entry)
+
                 route = self.app.getRoute(source.name, fac.metadata)
                 if route is None:
-                    logger.error("Can't get route for page: %s" % fac.ref_spec)
+                    entry.errors.append("Can't get route for page: %s" %
+                            fac.ref_spec)
+                    logger.error(entry.errors[-1])
                     continue
 
-                entry = BakeRecordPageEntry(fac)
-                record.addEntry(entry)
                 queue.addJob(BakeWorkerJob(fac, route, entry))
 
         self._waitOnWorkerPool(pool, abort)
@@ -439,9 +441,11 @@
                 changed_terms = None
                 # Re-bake all taxonomy pages that include new or changed
                 # pages.
-                if not prev_entry and cur_entry and cur_entry.was_baked:
+                if (not prev_entry and cur_entry and
+                        cur_entry.was_baked_successfully):
                     changed_terms = cur_entry.config.get(tax.name)
-                elif prev_entry and cur_entry and cur_entry.was_baked:
+                elif (prev_entry and cur_entry and
+                        cur_entry.was_baked_successfully):
                     changed_terms = []
                     prev_terms = prev_entry.config.get(tax.name)
                     cur_terms = cur_entry.config.get(tax.name)
@@ -508,16 +512,11 @@
 
         self._waitOnWorkerPool(pool, abort)
 
-    def _bakeAssets(self, record):
-        mounts = self.app.assets_dirs
-        baker_params = self.app.config.get('baker') or {}
-        skip_patterns = baker_params.get('skip_patterns')
-        force_patterns = baker_params.get('force_patterns')
-        proc = ProcessorPipeline(
-                self.app, mounts, self.out_dir, force=self.force,
-                skip_patterns=skip_patterns, force_patterns=force_patterns,
-                num_workers=self.num_workers)
-        proc.run()
+    def _handleDeletetions(self, record):
+        for path, reason in record.getDeletions():
+            logger.debug("Removing '%s': %s" % (path, reason))
+            os.remove(path)
+            logger.info('[delete] %s' % path)
 
     def _createWorkerPool(self, record, pool_size=4):
         pool = []
@@ -697,11 +696,17 @@
         start_time = time.clock()
 
         entry = job.record_entry
-        self._page_baker.bake(job.factory, job.route, entry,
-                taxonomy_name=job.taxonomy_name,
-                taxonomy_term=job.taxonomy_term)
+        try:
+            self._page_baker.bake(job.factory, job.route, entry,
+                    taxonomy_name=job.taxonomy_name,
+                    taxonomy_term=job.taxonomy_term)
+        except BakingError as ex:
+            logger.debug("Got baking error. Adding it to the record.")
+            while ex:
+                entry.errors.append(str(ex))
+                ex = ex.__cause__
 
-        if entry.was_baked:
+        if entry.was_baked_successfully:
             uri = entry.out_uris[0]
             friendly_uri = uri if uri != '' else '[main page]'
             friendly_count = ''
@@ -709,4 +714,7 @@
                 friendly_count = ' (%d pages)' % entry.num_subs
             logger.info(format_timed(start_time, '[%d] %s%s' %
                     (self.wid, friendly_uri, friendly_count)))
+        elif entry.errors:
+            for e in entry.errors:
+                logger.error(e)
 
--- a/piecrust/baking/records.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/baking/records.py	Sun Nov 09 14:46:23 2014 -0800
@@ -1,16 +1,12 @@
 import os.path
 import logging
-from piecrust import APP_VERSION
 from piecrust.sources.base import PageSource
-from piecrust.records import Record
+from piecrust.records import Record, TransitionalRecord
 
 
 logger = logging.getLogger(__name__)
 
 
-RECORD_VERSION = 6
-
-
 def _get_transition_key(source_name, rel_path, taxonomy_name=None,
         taxonomy_term=None):
     key = '%s:%s' % (source_name, rel_path)
@@ -24,21 +20,12 @@
 
 
 class BakeRecord(Record):
+    RECORD_VERSION = 8
+
     def __init__(self):
         super(BakeRecord, self).__init__()
         self.out_dir = None
         self.bake_time = None
-        self.app_version = APP_VERSION
-        self.record_version = RECORD_VERSION
-
-    def hasLatestVersion(self):
-        return (self.app_version == APP_VERSION and
-                self.record_version == RECORD_VERSION)
-
-    def __setstate__(self, state):
-        state.setdefault('app_version', -1)
-        state.setdefault('record_version', -1)
-        super(BakeRecord, self).__setstate__(state)
 
 
 FLAG_NONE = 0
@@ -57,6 +44,7 @@
 
         self.flags = FLAG_NONE
         self.config = None
+        self.errors = []
         self.out_uris = []
         self.out_paths = []
         self.used_source_names = set()
@@ -64,59 +52,36 @@
 
     @property
     def was_baked(self):
-        return len(self.out_paths) > 0
+        return len(self.out_paths) > 0 or len(self.errors) > 0
+
+    @property
+    def was_baked_successfully(self):
+        return len(self.out_paths) > 0 and len(self.errors) == 0
 
     @property
     def num_subs(self):
         return len(self.out_paths)
 
-    @property
-    def transition_key(self):
-        return _get_transition_key(self.source_name, self.rel_path,
-                self.taxonomy_name, self.taxonomy_term)
-
     def __getstate__(self):
         state = self.__dict__.copy()
         del state['path_mtime']
         return state
 
 
-class TransitionalBakeRecord(object):
-    DELETION_MISSING = 1
-    DELETION_CHANGED = 2
-
+class TransitionalBakeRecord(TransitionalRecord):
     def __init__(self, previous_path=None):
-        self.previous = BakeRecord()
-        self.current = BakeRecord()
-        self.transitions = {}
-        self.incremental_count = 0
-        if previous_path:
-            self.loadPrevious(previous_path)
-        self.current.entry_added += self._onCurrentEntryAdded
-
-    def loadPrevious(self, previous_path):
-        try:
-            self.previous = BakeRecord.load(previous_path)
-        except Exception as ex:
-            logger.debug("Error loading previous record: %s" % ex)
-            logger.debug("Will reset to an empty one.")
-            self.previous = BakeRecord()
-            return
-
-        for e in self.previous.entries:
-            self.transitions[e.transition_key] = (e, None)
-
-    def clearPrevious(self):
-        self.previous = BakeRecord()
-
-    def saveCurrent(self, current_path):
-        self.current.save(current_path)
+        super(TransitionalBakeRecord, self).__init__(BakeRecord,
+                                                     previous_path)
 
     def addEntry(self, entry):
         if (self.previous.bake_time and
                 entry.path_mtime >= self.previous.bake_time):
             entry.flags |= FLAG_SOURCE_MODIFIED
-        self.current.addEntry(entry)
+        super(TransitionalBakeRecord, self).addEntry(entry)
+
+    def getTransitionKey(self, entry):
+        return _get_transition_key(entry.source_name, entry.rel_path,
+                                   entry.taxonomy_name, entry.taxonomy_term)
 
     def getOverrideEntry(self, factory, uri):
         for pair in self.transitions.values():
@@ -148,10 +113,7 @@
                 if e.source_name == source_name]
 
     def collapseRecords(self):
-        for pair in self.transitions.values():
-            prev = pair[0]
-            cur = pair[1]
-
+        for prev, cur in self.transitions.values():
             if prev and cur and not cur.was_baked:
                 # This page wasn't baked, so the information from last
                 # time is still valid (we didn't get any information
@@ -161,20 +123,17 @@
                     cur.config = prev.config.copy()
                 cur.out_uris = list(prev.out_uris)
                 cur.out_paths = list(prev.out_paths)
+                cur.errors = list(prev.errors)
                 cur.used_source_names = set(prev.used_source_names)
                 cur.used_taxonomy_terms = set(prev.used_taxonomy_terms)
 
-    def _onCurrentEntryAdded(self, entry):
-        key = entry.transition_key
-        te = self.transitions.get(key)
-        if te is None:
-            logger.debug("Adding new record entry: %s" % key)
-            self.transitions[key] = (None, entry)
-            return
+    def getDeletions(self):
+        for prev, cur in self.transitions.values():
+            if prev and not cur:
+                for p in prev.out_paths:
+                    yield (p, 'previous source file was removed')
+            elif prev and cur and cur.was_baked_successfully:
+                diff = set(prev.out_paths) - set(cur.out_paths)
+                for p in diff:
+                    yield (p, 'source file changed outputs')
 
-        if te[1] is not None:
-            raise Exception("A current entry already exists for: %s" %
-                    key)
-        logger.debug("Setting current record entry: %s" % key)
-        self.transitions[key] = (te[0], entry)
-
--- a/piecrust/chefutil.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/chefutil.py	Sun Nov 09 14:46:23 2014 -0800
@@ -2,12 +2,13 @@
 from colorama import Fore
 
 
-def format_timed(start_time, message, colored=True):
+def format_timed(start_time, message, indent_level=0, colored=True):
     end_time = time.clock()
+    indent = indent_level * '  '
     time_str = '%8.1f ms' % ((end_time - start_time) * 1000.0)
     if colored:
         return '[%s%s%s] %s' % (Fore.GREEN, time_str, Fore.RESET, message)
-    return '[%s] %s' % (time_str, message)
+    return '%s[%s] %s' % (indent, time_str, message)
 
 
 def log_friendly_exception(logger, ex):
--- a/piecrust/commands/builtin/baking.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/commands/builtin/baking.py	Sun Nov 09 14:46:23 2014 -0800
@@ -5,7 +5,9 @@
 import datetime
 from piecrust.baking.baker import Baker
 from piecrust.baking.records import BakeRecord
+from piecrust.chefutil import format_timed
 from piecrust.commands.base import ChefCommand
+from piecrust.processing.base import ProcessorPipeline
 
 
 logger = logging.getLogger(__name__)
@@ -32,19 +34,22 @@
                 action='store_true')
 
     def run(self, ctx):
-        baker = Baker(
-                ctx.app,
-                out_dir=ctx.args.output,
-                force=ctx.args.force,
-                portable=ctx.args.portable,
-                no_assets=ctx.args.no_assets)
         if ctx.args.portable:
             # Disable pretty URLs because there's likely not going to be
             # a web server to handle serving default documents.
             ctx.app.config.set('site/pretty_urls', False)
 
         try:
-            baker.bake()
+            # Bake the site sources.
+            self._bakeSources(ctx)
+
+            # Bake the assets.
+            if not ctx.args.no_assets:
+                self._bakeAssets(ctx)
+
+            # All done.
+            logger.info('-------------------------');
+            logger.info(format_timed(start_time, 'done baking'));
             return 0
         except Exception as ex:
             if ctx.app.debug:
@@ -53,6 +58,32 @@
                 logger.error(str(ex))
             return 1
 
+    def _bakeSources(self, ctx):
+        num_workers = ctx.app.config.get('baker/workers') or 4
+        baker = Baker(
+                ctx.app,
+                out_dir=ctx.args.output,
+                force=ctx.args.force,
+                portable=ctx.args.portable,
+                no_assets=ctx.args.no_assets,
+                num_workers=num_workers)
+        baker.bake()
+
+    def _bakeAssets(self, ctx):
+        mounts = ctx.app.assets_dirs
+        baker_params = ctx.app.config.get('baker') or {}
+        skip_patterns = baker_params.get('skip_patterns')
+        force_patterns = baker_params.get('force_patterns')
+        num_workers = ctx.app.config.get('baker/workers') or 4
+        proc = ProcessorPipeline(
+                ctx.app, mounts, ctx.args.output,
+                force=ctx.args.force,
+                skip_patterns=skip_patterns,
+                force_patterns=force_patterns,
+                num_workers=num_workers)
+        proc.run()
+
+
 
 class ShowRecordCommand(ChefCommand):
     def __init__(self):
@@ -99,4 +130,6 @@
             logging.info("   out URLs:  %s" % entry.out_uris)
             logging.info("   out paths: %s" % entry.out_paths)
             logging.info("   used srcs: %s" % entry.used_source_names)
+            if entry.errors:
+                logging.error("   errors: %s" % entry.errors)
 
--- a/piecrust/processing/base.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/processing/base.py	Sun Nov 09 14:46:23 2014 -0800
@@ -3,12 +3,15 @@
 import shutil
 import os.path
 import logging
+import hashlib
 import threading
 from queue import Queue, Empty
 from piecrust.chefutil import format_timed
+from piecrust.processing.records import (
+        ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
+        FLAG_PROCESSED, FLAG_OVERRIDEN)
 from piecrust.processing.tree import (ProcessingTreeBuilder,
-        ProcessingTreeRunner, STATE_DIRTY, print_node)
-from piecrust.records import Record
+        ProcessingTreeRunner, ProcessingTreeError, STATE_DIRTY, print_node)
 
 
 logger = logging.getLogger(__name__)
@@ -96,41 +99,6 @@
         raise NotImplementedError()
 
 
-class ProcessorPipelineRecord(Record):
-    VERSION = 1
-
-    def __init__(self):
-        super(ProcessorPipelineRecord, self).__init__()
-
-    def addEntry(self, item):
-        self.entries.append(item)
-
-    def hasOverrideEntry(self, rel_path):
-        return self.findEntry(rel_path) is not None
-
-    def findEntry(self, rel_path):
-        rel_path = rel_path.lower()
-        for entry in self.entries:
-            for out_path in entry.rel_outputs:
-                if out_path.lower() == rel_path:
-                    return entry
-        return None
-
-
-class ProcessorPipelineRecordEntry(object):
-    def __init__(self, base_dir, rel_input, is_processed=False,
-            is_overridden=False):
-        self.base_dir = base_dir
-        self.rel_input = rel_input
-        self.rel_outputs = []
-        self.is_processed = is_processed
-        self.is_overridden = is_overridden
-
-    @property
-    def path(self):
-        return os.path.join(self.base_dir, self.rel_input)
-
-
 class ProcessingContext(object):
     def __init__(self, base_dir, job_queue, record=None):
         self.base_dir = base_dir
@@ -177,15 +145,27 @@
         # patching the processors with some new ones.
         self.processors.sort(key=lambda p: p.priority)
 
+        # Create the pipeline record.
+        record = TransitionalProcessorPipelineRecord()
+        record_cache = self.app.cache.getCache('baker')
+        record_name = (
+                'assets_' +
+                hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
+                '.record')
+        if not self.force and record_cache.has(record_name):
+            t = time.clock()
+            record.loadPrevious(record_cache.getCachePath(record_name))
+            logger.debug(format_timed(t, 'loaded previous bake record',
+                    colored=False))
+
         # Create the workers.
         pool = []
         queue = Queue()
         abort = threading.Event()
         pipeline_lock = threading.Lock()
-        record = ProcessorPipelineRecord()
         for i in range(self.num_workers):
-            ctx = ProcessingWorkerContext(self, record, queue, abort,
-                    pipeline_lock)
+            ctx = ProcessingWorkerContext(self, record,
+                                          queue, abort, pipeline_lock)
             worker = ProcessingWorker(i, ctx)
             worker.start()
             pool.append(worker)
@@ -222,10 +202,24 @@
         if abort.is_set():
             raise Exception("Worker pool was aborted.")
 
+        # Handle deletions.
+        for path, reason in record.getDeletions():
+            logger.debug("Removing '%s': %s" % (path, reason))
+            os.remove(path)
+            logger.info('[delete] %s' % path)
+
         # Invoke post-processors.
         for proc in self.processors:
             proc.onPipelineEnd(self)
 
+        # Save the process record.
+        t = time.clock()
+        record.current.process_time = time.time()
+        record.current.out_dir = self.out_dir
+        record.collapseRecords()
+        record.saveCurrent(record_cache.getCachePath(record_name))
+        logger.debug(format_timed(t, 'saved bake record', colored=False))
+
         return record
 
     def processDirectory(self, ctx, start_dir):
@@ -246,8 +240,8 @@
 
 
 class ProcessingWorkerContext(object):
-    def __init__(self, pipeline, record, work_queue, abort_event,
-            pipeline_lock):
+    def __init__(self, pipeline, record,
+            work_queue, abort_event, pipeline_lock):
         self.pipeline = pipeline
         self.record = record
         self.work_queue = work_queue
@@ -291,38 +285,49 @@
         record = self.ctx.record
 
         rel_path = os.path.relpath(job.path, job.base_dir)
+        previous_entry = record.getPreviousEntry(rel_path)
+        record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
+        record.addEntry(record_entry)
 
         # Figure out if a previously processed file is overriding this one.
         # This can happen if a theme file (processed via a mount point)
         # is overridden in the user's website.
-        if record.hasOverrideEntry(rel_path):
-            record.addEntry(ProcessorPipelineRecordEntry(
-                    job.base_dir, rel_path,
-                    is_processed=False, is_overridden=True))
+        if record.current.hasOverrideEntry(rel_path):
+            record_entry.flags |= FLAG_OVERRIDEN
             logger.info(format_timed(start_time,
                     '%s [not baked, overridden]' % rel_path))
             return
 
-        builder = ProcessingTreeBuilder(pipeline.processors)
-        tree_root = builder.build(rel_path)
+        try:
+            builder = ProcessingTreeBuilder(pipeline.processors)
+            tree_root = builder.build(rel_path)
+        except ProcessingTreeError as ex:
+            record_entry.errors.append(str(ex))
+            logger.error("Error processing %s: %s" % (rel_path, ex))
+            return
+
         print_node(tree_root, recursive=True)
         leaves = tree_root.getLeaves()
-        fi = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
-        fi.rel_outputs = [l.path for l in leaves]
-        record.addEntry(fi)
+        record_entry.rel_outputs = [l.path for l in leaves]
 
-        force = pipeline.force
+        force = (pipeline.force or previous_entry is None or
+                 not previous_entry.was_processed_successfully)
         if not force:
             force = re_matchany(rel_path, pipeline.force_patterns)
 
         if force:
             tree_root.setState(STATE_DIRTY, True)
 
-        runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir,
-                pipeline.out_dir, self.ctx.pipeline_lock)
-        if runner.processSubTree(tree_root):
-            fi.is_processed = True
-            logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path)))
+        try:
+            runner = ProcessingTreeRunner(
+                    job.base_dir, pipeline.tmp_dir,
+                    pipeline.out_dir, self.ctx.pipeline_lock)
+            if runner.processSubTree(tree_root):
+                record_entry.flags |= FLAG_PROCESSED
+                logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path)))
+        except ProcessingTreeError as ex:
+            record_entry.errors.append(str(ex))
+            logger.error("Error processing %s: %s" % (rel_path, ex))
 
 
 def make_re(patterns):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/processing/records.py	Sun Nov 09 14:46:23 2014 -0800
@@ -0,0 +1,86 @@
+import os.path
+from piecrust.records import Record, TransitionalRecord
+
+
+class ProcessorPipelineRecord(Record):
+    RECORD_VERSION = 2
+
+    def __init__(self):
+        super(ProcessorPipelineRecord, self).__init__()
+        self.out_dir = None
+        self.process_time = None
+
+    def hasOverrideEntry(self, rel_path):
+        return self.findEntry(rel_path) is not None
+
+    def findEntry(self, rel_path):
+        rel_path = rel_path.lower()
+        for entry in self.entries:
+            for out_path in entry.rel_outputs:
+                if out_path.lower() == rel_path:
+                    return entry
+        return None
+
+
+FLAG_NONE = 0
+FLAG_PROCESSED = 2**0
+FLAG_OVERRIDEN = 2**1
+
+
+class ProcessorPipelineRecordEntry(object):
+    def __init__(self, base_dir, rel_input):
+        self.base_dir = base_dir
+        self.rel_input = rel_input
+
+        self.flags = FLAG_NONE
+        self.rel_outputs = []
+        self.errors = []
+
+    @property
+    def path(self):
+        return os.path.join(self.base_dir, self.rel_input)
+
+    @property
+    def was_processed(self):
+        return bool(self.flags & FLAG_PROCESSED)
+
+    @property
+    def was_processed_successfully(self):
+        return self.was_processed and not self.errors
+
+
+class TransitionalProcessorPipelineRecord(TransitionalRecord):
+    def __init__(self, previous_path=None):
+        super(TransitionalProcessorPipelineRecord, self).__init__(
+                ProcessorPipelineRecord, previous_path)
+
+    def getTransitionKey(self, entry):
+        return entry.rel_input
+
+    def getPreviousEntry(self, rel_path):
+        pair = self.transitions.get(rel_path)
+        if pair is not None:
+            return pair[0]
+        return None
+
+    def collapseRecords(self):
+        for prev, cur in self.transitions.values():
+            if prev and cur and not cur.was_processed:
+                # This asset wasn't processed, so the information from
+                # last time is still valid.
+                cur.flags = prev.flags
+                cur.rel_outputs = list(prev.rel_outputs)
+                cur.errors = list(prev.errors)
+
+    def getDeletions(self):
+        for prev, cur in self.transitions.values():
+            if prev and not cur:
+                for p in prev.rel_outputs:
+                    abs_p = os.path.join(self.previous.out_dir, p)
+                    yield (abs_p, 'previous asset was removed')
+            elif prev and cur and cur.was_processed_successfully:
+                diff = set(prev.rel_outputs) - set(cur.rel_outputs)
+                for p in diff:
+                    abs_p = os.path.join(self.previous.out_dir, p)
+                    yield (abs_p, 'asset changed outputs')
+
--- a/piecrust/processing/tree.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/processing/tree.py	Sun Nov 09 14:46:23 2014 -0800
@@ -2,6 +2,7 @@
 import time
 import os.path
 import logging
+from piecrust.chefutil import format_timed
 
 
 logger = logging.getLogger(__name__)
@@ -150,7 +151,8 @@
                             start_time, "(bypassing structured processing)"))
                 return True
             except Exception as e:
-                raise Exception("Error processing: %s" % node.path) from e
+                raise ProcessingTreeError("Error processing: %s" %
+                        node.path) from e
 
         # All outputs of a node must go to the same directory, so we can get
         # the output directory off of the first output.
@@ -239,7 +241,9 @@
                 node.setState(STATE_CLEAN, False)
 
         state = "dirty" if node.state == STATE_DIRTY else "clean"
-        logger.debug(format_timed(start_time, "Computed node dirtyness: %s" % state, node.level))
+        logger.debug(format_timed(start_time,
+                                  "Computed node dirtyness: %s" % state,
+                                  indent_level=node.level))
 
     def _getNodeBaseDir(self, node):
         if node.level == 0:
@@ -267,10 +271,3 @@
         for o in node.outputs:
             print_node(o, None, True)
 
-
-def format_timed(start_time, message, indent_level=0):
-    end_time = time.clock()
-    indent = indent_level * '  '
-    build_time = '{0:8.1f} ms'.format((end_time - start_time) / 1000.0)
-    return "%s[%s] %s" % (indent, build_time, message)
-
--- a/piecrust/records.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/piecrust/records.py	Sun Nov 09 14:46:23 2014 -0800
@@ -2,6 +2,7 @@
 import os.path
 import pickle
 import logging
+from piecrust import APP_VERSION
 from piecrust.events import Event
 
 
@@ -12,6 +13,12 @@
     def __init__(self):
         self.entries = []
         self.entry_added = Event()
+        self.app_version = APP_VERSION
+        self.record_version = self.__class__.RECORD_VERSION
+
+    def hasLatestVersion(self):
+        return (self.app_version == APP_VERSION and
+                self.record_version == self.__class__.RECORD_VERSION)
 
     def addEntry(self, entry):
         self.entries.append(entry)
@@ -31,9 +38,8 @@
         return odict
 
     def __setstate__(self, state):
-        for k, v in state.items():
-            setattr(self, k, v)
-        self.entry_added = Event()
+        state['entry_added'] = Event()
+        self.__dict__.update(state)
 
     @staticmethod
     def load(path):
@@ -41,3 +47,65 @@
         with open(path, 'rb') as fp:
             return pickle.load(fp)
 
+
+class TransitionalRecord(object):
+    def __init__(self, record_class, previous_path=None):
+        self._record_class = record_class
+        self.transitions = {}
+        self.incremental_count = 0
+        self.current = record_class()
+        if previous_path:
+            self.loadPrevious(previous_path)
+        else:
+            self.previous = record_class()
+        self.current.entry_added += self._onCurrentEntryAdded
+
+    def loadPrevious(self, previous_path):
+        previous_record_valid = True
+        try:
+            self.previous = self._record_class.load(previous_path)
+        except Exception as ex:
+            logger.debug("Error loading previous record: %s" % ex)
+            logger.debug("Will reset to an empty one.")
+            previous_record_valid = False
+
+        if self.previous.record_version != self._record_class.RECORD_VERSION:
+            logger.debug("Previous record has old version %d." %
+                    self.previous.record_version)
+            logger.debug("Will reset to an empty one.")
+            previous_record_valid = False
+
+        if not previous_record_valid:
+            self.previous = self._record_class()
+            return
+
+        for e in self.previous.entries:
+            key = self.getTransitionKey(e)
+            self.transitions[key] = (e, None)
+
+    def clearPrevious(self):
+        self.previous = self._record_class()
+
+    def saveCurrent(self, current_path):
+        self.current.save(current_path)
+
+    def addEntry(self, entry):
+        self.current.addEntry(entry)
+
+    def getTransitionKey(self, entry):
+        raise NotImplementedError()
+
+    def _onCurrentEntryAdded(self, entry):
+        key = self.getTransitionKey(entry)
+        te = self.transitions.get(key)
+        if te is None:
+            logger.debug("Adding new record entry: %s" % key)
+            self.transitions[key] = (None, entry)
+            return
+
+        if te[1] is not None:
+            raise Exception("A current entry already exists for: %s" %
+                    key)
+        logger.debug("Setting current record entry: %s" % key)
+        self.transitions[key] = (te[0], entry)
+
--- a/tests/mockutil.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/tests/mockutil.py	Sun Nov 09 14:46:23 2014 -0800
@@ -46,6 +46,7 @@
 
     def __exit__(self, exc_type, exc_value, exc_tb):
         self._entry.contents = self._stream.getvalue()
+        self._entry.metadata['mtime'] = time.time()
         self._stream.close()
 
 
@@ -197,12 +198,13 @@
         self._endMock()
 
     def _startMock(self):
+        # TODO: sadly, there seems to be no way to replace `open` everywhere?
         self._createMock('__main__.open', open, self._open, create=True)
-        # TODO: WTF, apparently the previous one doesn't really work?
         self._createMock('piecrust.records.open', open, self._open, create=True)
         self._createMock('codecs.open', codecs.open, self._codecsOpen)
         self._createMock('os.listdir', os.listdir, self._listdir)
         self._createMock('os.makedirs', os.makedirs, self._makedirs)
+        self._createMock('os.remove', os.remove, self._remove)
         self._createMock('os.path.isdir', os.path.isdir, self._isdir)
         self._createMock('os.path.isfile', os.path.isfile, self._isfile)
         self._createMock('os.path.islink', os.path.islink, self._islink)
@@ -272,6 +274,10 @@
             raise Exception("Shouldn't create directory: %s" % path)
         self._fs._createDir(path)
 
+    def _remove(self, path):
+        path = os.path.normpath(path)
+        self._fs._deleteEntry(path)
+
     def _isdir(self, path):
         path = os.path.normpath(path)
         if path.startswith(resources_path):
--- a/tests/test_baking_baker.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/tests/test_baking_baker.py	Sun Nov 09 14:46:23 2014 -0800
@@ -1,6 +1,7 @@
 import os.path
 import pytest
 from piecrust.baking.baker import PageBaker, Baker
+from piecrust.baking.records import BakeRecord
 from .mockutil import get_mock_app, mock_fs, mock_fs_scope
 
 
@@ -71,3 +72,47 @@
                 '2010': {'01': {'01': {'post1.html': 'post one'}}},
                 'index.html': 'something'}
 
+def test_removed():
+    fs = (mock_fs()
+            .withPage('pages/foo.md', {'layout': 'none', 'format': 'none'}, 'a foo page')
+            .withPage('pages/_index.md', {'layout': 'none', 'format': 'none'}, "something"))
+    with mock_fs_scope(fs):
+        app = fs.getApp()
+        baker = Baker(app)
+        baker.bake()
+        structure = fs.getStructure('kitchen/_counter')
+        assert structure == {
+                'foo.html': 'a foo page',
+                'index.html': 'something'}
+
+        os.remove(fs.path('kitchen/pages/foo.md'))
+        app = fs.getApp()
+        baker = Baker(app)
+        baker.bake()
+        structure = fs.getStructure('kitchen/_counter')
+        assert structure == {
+                'index.html': 'something'}
+
+def test_record_version_change():
+    fs = (mock_fs()
+            .withPage('pages/foo.md', {'layout': 'none', 'format': 'none'}, 'a foo page'))
+    with mock_fs_scope(fs):
+        app = fs.getApp()
+        baker = Baker(app)
+        baker.bake()
+        mtime = os.path.getmtime(fs.path('kitchen/_counter/foo.html'))
+
+        app = fs.getApp()
+        baker = Baker(app)
+        baker.bake()
+        assert mtime == os.path.getmtime(fs.path('kitchen/_counter/foo.html'))
+
+        BakeRecord.RECORD_VERSION += 1
+        try:
+            app = fs.getApp()
+            baker = Baker(app)
+            baker.bake()
+            assert mtime < os.path.getmtime(fs.path('kitchen/_counter/foo.html'))
+        finally:
+            BakeRecord.RECORD_VERSION -= 1
+
--- a/tests/test_processing_base.py	Wed Oct 29 08:19:58 2014 -0700
+++ b/tests/test_processing_base.py	Sun Nov 09 14:46:23 2014 -0800
@@ -1,11 +1,41 @@
+import time
 import os.path
+import shutil
 import pytest
-from piecrust.processing.base import ProcessorPipeline
+from piecrust.processing.base import (ProcessorPipeline, SimpleFileProcessor)
+from piecrust.processing.records import ProcessorPipelineRecord
 from .mockutil import mock_fs, mock_fs_scope
 
 
-def _get_pipeline(fs, **kwargs):
-    app = fs.getApp(cache=False)
+class FooProcessor(SimpleFileProcessor):
+    def __init__(self, exts=None, open_func=None):
+        exts = exts or {'foo', 'foo'}
+        super(FooProcessor, self).__init__({exts[0]: exts[1]})
+        self.PROCESSOR_NAME = exts[0]
+        self.open_func = open_func or open
+
+    def _doProcess(self, in_path, out_path):
+        with self.open_func(in_path, 'r') as f:
+            text = f.read()
+        with self.open_func(out_path, 'w') as f:
+            f.write("%s: %s" % (self.PROCESSOR_NAME.upper(), text))
+        return True
+
+
+class NoopProcessor(SimpleFileProcessor):
+    def __init__(self, exts):
+        super(NoopProcessor, self).__init__({exts[0]: exts[1]})
+        self.PROCESSOR_NAME = exts[0]
+        self.processed = []
+
+    def _doProcess(self, in_path, out_path):
+        self.processed.append(in_path)
+        shutil.copyfile(in_path, out_path)
+        return True
+
+
+def _get_pipeline(fs, cache=True, **kwargs):
+    app = fs.getApp(cache=cache)
     mounts = [os.path.join(app.root_dir, 'assets')]
     return ProcessorPipeline(app, mounts, fs.path('counter'),
             num_workers=1, **kwargs)
@@ -36,6 +66,97 @@
         assert expected == fs.getStructure('counter')
 
 
+def test_one_level_dirtyness():
+    fs = (mock_fs()
+            .withFile('kitchen/assets/blah.foo', 'A test file.'))
+    with mock_fs_scope(fs):
+        pp = _get_pipeline(fs)
+        pp.filterProcessors(['copy'])
+        pp.run()
+        expected = {'blah.foo': 'A test file.'}
+        assert expected == fs.getStructure('counter')
+        mtime = os.path.getmtime(fs.path('/counter/blah.foo'))
+        assert abs(time.time() - mtime) <= 2
+
+        pp.run()
+        assert expected == fs.getStructure('counter')
+        assert mtime == os.path.getmtime(fs.path('/counter/blah.foo'))
+
+        fs.withFile('kitchen/assets/blah.foo', 'A new test file.')
+        pp.run()
+        expected = {'blah.foo': 'A new test file.'}
+        assert expected == fs.getStructure('counter')
+        assert mtime < os.path.getmtime(fs.path('/counter/blah.foo'))
+
+
+def test_two_levels_dirtyness():
+    fs = (mock_fs()
+            .withFile('kitchen/assets/blah.foo', 'A test file.'))
+    with mock_fs_scope(fs) as scope:
+        pp = _get_pipeline(fs)
+        pp.processors.append(FooProcessor(('foo', 'bar'), scope._open))
+        pp.filterProcessors(['foo', 'copy'])
+        pp.run()
+        expected = {'blah.bar': 'FOO: A test file.'}
+        assert expected == fs.getStructure('counter')
+        mtime = os.path.getmtime(fs.path('/counter/blah.bar'))
+        assert abs(time.time() - mtime) <= 2
+
+        pp.run()
+        assert expected == fs.getStructure('counter')
+        assert mtime == os.path.getmtime(fs.path('/counter/blah.bar'))
+
+        fs.withFile('kitchen/assets/blah.foo', 'A new test file.')
+        pp.run()
+        expected = {'blah.bar': 'FOO: A new test file.'}
+        assert expected == fs.getStructure('counter')
+        assert mtime < os.path.getmtime(fs.path('/counter/blah.bar'))
+
+
+def test_removed():
+    fs = (mock_fs()
+            .withFile('kitchen/assets/blah1.foo', 'A test file.')
+            .withFile('kitchen/assets/blah2.foo', 'Ooops'))
+    with mock_fs_scope(fs):
+        expected = {
+                'blah1.foo': 'A test file.',
+                'blah2.foo': 'Ooops'}
+        assert expected == fs.getStructure('kitchen/assets')
+        pp = _get_pipeline(fs)
+        pp.filterProcessors(['copy'])
+        pp.run()
+        assert expected == fs.getStructure('counter')
+
+        os.remove(fs.path('/kitchen/assets/blah2.foo'))
+        expected = {
+                'blah1.foo': 'A test file.'}
+        assert expected == fs.getStructure('kitchen/assets')
+        pp.run()
+        assert expected == fs.getStructure('counter')
+
+
+def test_record_version_change():
+    fs = (mock_fs()
+            .withFile('kitchen/assets/blah.foo', 'A test file.'))
+    with mock_fs_scope(fs):
+        pp = _get_pipeline(fs)
+        noop = NoopProcessor(('foo', 'foo'))
+        pp.processors.append(noop)
+        pp.filterProcessors(['foo', 'copy'])
+        pp.run()
+        assert 1 == len(noop.processed)
+
+        pp.run()
+        assert 1 == len(noop.processed)
+
+        ProcessorPipelineRecord.RECORD_VERSION += 1
+        try:
+            pp.run()
+            assert 2 == len(noop.processed)
+        finally:
+            ProcessorPipelineRecord.RECORD_VERSION -= 1
+
+
 @pytest.mark.parametrize('patterns, expected', [
         (['_'],
             {'something.html': 'A test file.'}),