changeset 414:c4b3a7fd2f87

bake: Make pipeline processing multi-process. Not many changes here, as it's pretty straightforward, but an API change for processors so they know if they're being initialized/disposed from the main process or from one of the workers. This makes it possible to do global stuff that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:20:30 -0700
parents eacf0a3afd0c
children 0e9a94b7fdfa
files piecrust/processing/base.py piecrust/processing/compass.py piecrust/processing/less.py piecrust/processing/pipeline.py piecrust/processing/records.py piecrust/processing/sass.py piecrust/processing/tree.py piecrust/processing/worker.py tests/test_processing_base.py
diffstat 9 files changed, 659 insertions(+), 439 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/processing/base.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/piecrust/processing/base.py	Sat Jun 20 19:20:30 2015 -0700
@@ -1,35 +1,36 @@
-import re
-import time
 import shutil
 import os.path
 import logging
-import hashlib
-import threading
-from queue import Queue, Empty
-from piecrust.chefutil import format_timed
-from piecrust.processing.records import (
-        ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
-        FLAG_PREPARED, FLAG_PROCESSED, FLAG_OVERRIDEN,
-        FLAG_BYPASSED_STRUCTURED_PROCESSING)
-from piecrust.processing.tree import (
-        ProcessingTreeBuilder, ProcessingTreeRunner,
-        ProcessingTreeError, ProcessorError,
-        STATE_DIRTY,
-        print_node, get_node_name_tree)
 
 
 logger = logging.getLogger(__name__)
 
 
-re_ansicolors = re.compile('\033\\[\d+m')
-
-
 PRIORITY_FIRST = -1
 PRIORITY_NORMAL = 0
 PRIORITY_LAST = 1
 
 
-split_processor_names_re = re.compile(r'[ ,]+')
+class PipelineContext(object):
+    def __init__(self, worker_id, app, out_dir, tmp_dir, force=None):
+        self.worker_id = worker_id
+        self.app = app
+        self.out_dir = out_dir
+        self.tmp_dir = tmp_dir
+        self.force = force
+        self.record = None
+        self._additional_ignore_patterns = []
+
+    @property
+    def is_first_worker(self):
+        return self.worker_id == 0
+
+    @property
+    def is_pipeline_process(self):
+        return self.worker_id < 0
+
+    def addIgnorePatterns(self, patterns):
+        self._additional_ignore_patterns += patterns
 
 
 class Processor(object):
@@ -43,10 +44,10 @@
     def initialize(self, app):
         self.app = app
 
-    def onPipelineStart(self, pipeline):
+    def onPipelineStart(self, ctx):
         pass
 
-    def onPipelineEnd(self, pipeline):
+    def onPipelineEnd(self, ctx):
         pass
 
     def matches(self, path):
@@ -117,341 +118,3 @@
         return self.stderr_data
 
 
-class ProcessingContext(object):
-    def __init__(self, base_dir, mount_info, job_queue, record=None):
-        self.base_dir = base_dir
-        self.mount_info = mount_info
-        self.job_queue = job_queue
-        self.record = record
-
-
-class ProcessorPipeline(object):
-    def __init__(self, app, out_dir, force=False):
-        assert app and out_dir
-        self.app = app
-        self.out_dir = out_dir
-        self.force = force
-
-        tmp_dir = app.sub_cache_dir
-        if not tmp_dir:
-            import tempfile
-            tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust')
-        self.tmp_dir = os.path.join(tmp_dir, 'proc')
-
-        baker_params = app.config.get('baker') or {}
-
-        assets_dirs = baker_params.get('assets_dirs', app.assets_dirs)
-        self.mounts = make_mount_infos(assets_dirs, self.app.root_dir)
-
-        self.num_workers = baker_params.get('workers', 4)
-
-        ignores = baker_params.get('ignore', [])
-        ignores += [
-                '_cache', '_counter',
-                'theme_info.yml',
-                '.DS_Store', 'Thumbs.db',
-                '.git*', '.hg*', '.svn']
-        self.skip_patterns = make_re(ignores)
-        self.force_patterns = make_re(baker_params.get('force', []))
-
-        self.processors = app.plugin_loader.getProcessors()
-
-    def addSkipPatterns(self, patterns):
-        self.skip_patterns += make_re(patterns)
-
-    def filterProcessors(self, authorized_names):
-        self.processors = self.getFilteredProcessors(authorized_names)
-
-    def getFilteredProcessors(self, authorized_names):
-        if not authorized_names or authorized_names == 'all':
-            return self.processors
-
-        if isinstance(authorized_names, str):
-            authorized_names = split_processor_names_re.split(authorized_names)
-
-        procs = []
-        has_star = 'all' in authorized_names
-        for p in self.processors:
-            for name in authorized_names:
-                if name == p.PROCESSOR_NAME:
-                    procs.append(p)
-                    break
-                if name == ('-%s' % p.PROCESSOR_NAME):
-                    break
-            else:
-                if has_star:
-                    procs.append(p)
-        return procs
-
-    def run(self, src_dir_or_file=None, *,
-            delete=True, previous_record=None, save_record=True):
-        # Invoke pre-processors.
-        for proc in self.processors:
-            proc.onPipelineStart(self)
-
-        # Sort our processors again in case the pre-process step involved
-        # patching the processors with some new ones.
-        self.processors.sort(key=lambda p: p.priority)
-
-        # Create the pipeline record.
-        record = TransitionalProcessorPipelineRecord()
-        record_cache = self.app.cache.getCache('proc')
-        record_name = (
-                hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
-                '.record')
-        if previous_record:
-            record.setPrevious(previous_record)
-        elif not self.force and record_cache.has(record_name):
-            t = time.clock()
-            record.loadPrevious(record_cache.getCachePath(record_name))
-            logger.debug(format_timed(t, 'loaded previous bake record',
-                         colored=False))
-        logger.debug("Got %d entries in process record." %
-                len(record.previous.entries))
-
-        # Create the workers.
-        pool = []
-        queue = Queue()
-        abort = threading.Event()
-        pipeline_lock = threading.Lock()
-        for i in range(self.num_workers):
-            ctx = ProcessingWorkerContext(self, record,
-                                          queue, abort, pipeline_lock)
-            worker = ProcessingWorker(i, ctx)
-            worker.start()
-            pool.append(worker)
-
-        if src_dir_or_file is not None:
-            # Process only the given path.
-            # Find out what mount point this is in.
-            for name, info in self.mounts.items():
-                path = info['path']
-                if src_dir_or_file[:len(path)] == path:
-                    base_dir = path
-                    mount_info = info
-                    break
-            else:
-                known_roots = [i['path'] for i in self.mounts.values()]
-                raise Exception("Input path '%s' is not part of any known "
-                                "mount point: %s" %
-                                (src_dir_or_file, known_roots))
-
-            ctx = ProcessingContext(base_dir, mount_info, queue, record)
-            logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file)
-            if os.path.isdir(src_dir_or_file):
-                self.processDirectory(ctx, src_dir_or_file)
-            elif os.path.isfile(src_dir_or_file):
-                self.processFile(ctx, src_dir_or_file)
-
-        else:
-            # Process everything.
-            for name, info in self.mounts.items():
-                path = info['path']
-                ctx = ProcessingContext(path, info, queue, record)
-                logger.debug("Initiating processing pipeline on: %s" % path)
-                self.processDirectory(ctx, path)
-
-        # Wait on all workers.
-        record.current.success = True
-        for w in pool:
-            w.join()
-            record.current.success &= w.success
-        if abort.is_set():
-            raise Exception("Worker pool was aborted.")
-
-        # Handle deletions.
-        if delete:
-            for path, reason in record.getDeletions():
-                logger.debug("Removing '%s': %s" % (path, reason))
-                try:
-                    os.remove(path)
-                except FileNotFoundError:
-                    pass
-                logger.info('[delete] %s' % path)
-
-        # Invoke post-processors.
-        for proc in self.processors:
-            proc.onPipelineEnd(self)
-
-        # Finalize the process record.
-        record.current.process_time = time.time()
-        record.current.out_dir = self.out_dir
-        record.collapseRecords()
-
-        # Save the process record.
-        if save_record:
-            t = time.clock()
-            record.saveCurrent(record_cache.getCachePath(record_name))
-            logger.debug(format_timed(t, 'saved bake record', colored=False))
-
-        return record.detach()
-
-    def processDirectory(self, ctx, start_dir):
-        for dirpath, dirnames, filenames in os.walk(start_dir):
-            rel_dirpath = os.path.relpath(dirpath, start_dir)
-            dirnames[:] = [d for d in dirnames
-                    if not re_matchany(d, self.skip_patterns, rel_dirpath)]
-
-            for filename in filenames:
-                if re_matchany(filename, self.skip_patterns, rel_dirpath):
-                    continue
-                self.processFile(ctx, os.path.join(dirpath, filename))
-
-    def processFile(self, ctx, path):
-        logger.debug("Queuing: %s" % path)
-        job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path)
-        ctx.job_queue.put_nowait(job)
-
-
-class ProcessingWorkerContext(object):
-    def __init__(self, pipeline, record,
-            work_queue, abort_event, pipeline_lock):
-        self.pipeline = pipeline
-        self.record = record
-        self.work_queue = work_queue
-        self.abort_event = abort_event
-        self.pipeline_lock = pipeline_lock
-
-
-class ProcessingWorkerJob(object):
-    def __init__(self, base_dir, mount_info, path):
-        self.base_dir = base_dir
-        self.mount_info = mount_info
-        self.path = path
-
-
-class ProcessingWorker(threading.Thread):
-    def __init__(self, wid, ctx):
-        super(ProcessingWorker, self).__init__()
-        self.wid = wid
-        self.ctx = ctx
-        self.success = True
-
-    def run(self):
-        while(not self.ctx.abort_event.is_set()):
-            try:
-                job = self.ctx.work_queue.get(True, 0.1)
-            except Empty:
-                logger.debug("[%d] No more work... shutting down." % self.wid)
-                break
-
-            try:
-                success = self._unsafeRun(job)
-                logger.debug("[%d] Done with file." % self.wid)
-                self.ctx.work_queue.task_done()
-                self.success &= success
-            except Exception as ex:
-                self.ctx.abort_event.set()
-                self.success = False
-                logger.error("[%d] Critical error, aborting." % self.wid)
-                logger.exception(ex)
-                break
-
-    def _unsafeRun(self, job):
-        start_time = time.clock()
-        pipeline = self.ctx.pipeline
-        record = self.ctx.record
-
-        rel_path = os.path.relpath(job.path, job.base_dir)
-        previous_entry = record.getPreviousEntry(rel_path)
-
-        record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
-        record.addEntry(record_entry)
-
-        # Figure out if a previously processed file is overriding this one.
-        # This can happen if a theme file (processed via a mount point)
-        # is overridden in the user's website.
-        if record.current.hasOverrideEntry(rel_path):
-            record_entry.flags |= FLAG_OVERRIDEN
-            logger.info(format_timed(start_time,
-                    '%s [not baked, overridden]' % rel_path))
-            return True
-
-        processors = pipeline.getFilteredProcessors(
-                job.mount_info['processors'])
-        try:
-            builder = ProcessingTreeBuilder(processors)
-            tree_root = builder.build(rel_path)
-            record_entry.flags |= FLAG_PREPARED
-        except ProcessingTreeError as ex:
-            msg = str(ex)
-            logger.error("Error preparing %s:\n%s" % (rel_path, msg))
-            while ex:
-                record_entry.errors.append(str(ex))
-                ex = ex.__cause__
-            return False
-
-        print_node(tree_root, recursive=True)
-        leaves = tree_root.getLeaves()
-        record_entry.rel_outputs = [l.path for l in leaves]
-        record_entry.proc_tree = get_node_name_tree(tree_root)
-        if tree_root.getProcessor().is_bypassing_structured_processing:
-            record_entry.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING
-
-        force = (pipeline.force or previous_entry is None or
-                 not previous_entry.was_processed_successfully)
-        if not force:
-            force = re_matchany(rel_path, pipeline.force_patterns)
-
-        if force:
-            tree_root.setState(STATE_DIRTY, True)
-
-        try:
-            runner = ProcessingTreeRunner(
-                    job.base_dir, pipeline.tmp_dir,
-                    pipeline.out_dir, self.ctx.pipeline_lock)
-            if runner.processSubTree(tree_root):
-                record_entry.flags |= FLAG_PROCESSED
-                logger.info(format_timed(
-                    start_time, "[%d] %s" % (self.wid, rel_path)))
-            return True
-        except ProcessingTreeError as ex:
-            msg = str(ex)
-            if isinstance(ex, ProcessorError):
-                msg = str(ex.__cause__)
-            logger.error("Error processing %s:\n%s" % (rel_path, msg))
-            while ex:
-                msg = re_ansicolors.sub('', str(ex))
-                record_entry.errors.append(msg)
-                ex = ex.__cause__
-            return False
-
-
-def make_mount_infos(mounts, root_dir):
-    if isinstance(mounts, list):
-        mounts = {m: {} for m in mounts}
-
-    for name, info in mounts.items():
-        if not isinstance(info, dict):
-            raise Exception("Asset directory info for '%s' is not a "
-                            "dictionary." % name)
-        info.setdefault('processors', 'all -uglifyjs -cleancss')
-        info['path'] = os.path.join(root_dir, name)
-
-    return mounts
-
-
-def make_re(patterns):
-    re_patterns = []
-    for pat in patterns:
-        if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2:
-            re_patterns.append(pat[1:-1])
-        else:
-            escaped_pat = (re.escape(pat)
-                    .replace(r'\*', r'[^/\\]*')
-                    .replace(r'\?', r'[^/\\]'))
-            re_patterns.append(escaped_pat)
-    return [re.compile(p) for p in re_patterns]
-
-
-def re_matchany(filename, patterns, dirname=None):
-    if dirname and dirname != '.':
-        filename = os.path.join(dirname, filename)
-
-    # skip patterns use a forward slash regardless of the platform.
-    filename = filename.replace('\\', '/')
-    for pattern in patterns:
-        if pattern.search(filename):
-            return True
-    return False
-
--- a/piecrust/processing/compass.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/piecrust/processing/compass.py	Sat Jun 20 19:20:30 2015 -0700
@@ -1,7 +1,6 @@
 import os
 import os.path
 import logging
-import platform
 import subprocess
 from piecrust.processing.base import Processor, PRIORITY_FIRST
 from piecrust.uriutil import multi_replace
--- a/piecrust/processing/less.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/piecrust/processing/less.py	Sat Jun 20 19:20:30 2015 -0700
@@ -24,7 +24,8 @@
 
     def onPipelineStart(self, pipeline):
         self._map_dir = os.path.join(pipeline.tmp_dir, 'less')
-        if not os.path.isdir(self._map_dir):
+        if (pipeline.is_first_worker and
+                not os.path.isdir(self._map_dir)):
             os.makedirs(self._map_dir)
 
     def getDependencies(self, path):
@@ -42,6 +43,7 @@
             # Get the sources, but make all paths absolute.
             sources = dep_map.get('sources')
             path_dir = os.path.dirname(path)
+
             def _makeAbs(p):
                 return os.path.join(path_dir, p)
             deps = list(map(_makeAbs, sources))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/processing/pipeline.py	Sat Jun 20 19:20:30 2015 -0700
@@ -0,0 +1,351 @@
+import os
+import os.path
+import re
+import time
+import queue
+import hashlib
+import logging
+import multiprocessing
+from piecrust.chefutil import format_timed, format_timed_scope
+from piecrust.processing.base import PipelineContext
+from piecrust.processing.records import (
+        ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
+        FLAG_PROCESSED)
+from piecrust.processing.worker import (
+        ProcessingWorkerContext, ProcessingWorkerJob,
+        worker_func, get_filtered_processors)
+
+
+logger = logging.getLogger(__name__)
+
+
+class _ProcessingContext(object):
+    def __init__(self, pool, record, base_dir, mount_info):
+        self.pool = pool
+        self.record = record
+        self.base_dir = base_dir
+        self.mount_info = mount_info
+
+
+class ProcessorPipeline(object):
+    def __init__(self, app, out_dir, force=False):
+        assert app and out_dir
+        self.app = app
+        self.out_dir = out_dir
+        self.force = force
+
+        tmp_dir = app.sub_cache_dir
+        if not tmp_dir:
+            import tempfile
+            tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust')
+        self.tmp_dir = os.path.join(tmp_dir, 'proc')
+
+        baker_params = app.config.get('baker') or {}
+
+        assets_dirs = baker_params.get('assets_dirs', app.assets_dirs)
+        self.mounts = make_mount_infos(assets_dirs, self.app.root_dir)
+
+        self.num_workers = baker_params.get(
+                'workers', multiprocessing.cpu_count())
+
+        ignores = baker_params.get('ignore', [])
+        ignores += [
+                '_cache', '_counter',
+                'theme_info.yml',
+                '.DS_Store', 'Thumbs.db',
+                '.git*', '.hg*', '.svn']
+        self.ignore_patterns = make_re(ignores)
+        self.force_patterns = make_re(baker_params.get('force', []))
+
+        # Those things are mostly for unit-testing.
+        self.enabled_processors = None
+        self.additional_processors = None
+
+    def addIgnorePatterns(self, patterns):
+        self.ignore_patterns += make_re(patterns)
+
+    def run(self, src_dir_or_file=None, *,
+            delete=True, previous_record=None, save_record=True):
+        start_time = time.perf_counter()
+
+        # Get the list of processors for this run.
+        processors = self.app.plugin_loader.getProcessors()
+        if self.enabled_processors is not None:
+            logger.debug("Filtering processors to: %s" %
+                         self.enabled_processors)
+            processors = get_filtered_processors(processors,
+                                                 self.enabled_processors)
+        if self.additional_processors is not None:
+            logger.debug("Adding %s additional processors." %
+                         len(self.additional_processors))
+            for proc in self.additional_processors:
+                self.app.env.registerTimer(proc.__class__.__name__,
+                                           raise_if_registered=False)
+                proc.initialize(self.app)
+                processors.append(proc)
+
+        # Invoke pre-processors.
+        pipeline_ctx = PipelineContext(-1, self.app, self.out_dir,
+                                       self.tmp_dir, self.force)
+        for proc in processors:
+            proc.onPipelineStart(pipeline_ctx)
+
+        # Pre-processors can define additional ignore patterns.
+        self.ignore_patterns += make_re(
+                pipeline_ctx._additional_ignore_patterns)
+
+        # Create the worker pool.
+        pool = _WorkerPool()
+
+        # Create the pipeline record.
+        record = TransitionalProcessorPipelineRecord()
+        record_cache = self.app.cache.getCache('proc')
+        record_name = (
+                hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
+                '.record')
+        if previous_record:
+            record.setPrevious(previous_record)
+        elif not self.force and record_cache.has(record_name):
+            with format_timed_scope(logger, 'loaded previous bake record',
+                                    level=logging.DEBUG, colored=False):
+                record.loadPrevious(record_cache.getCachePath(record_name))
+        logger.debug("Got %d entries in process record." %
+                     len(record.previous.entries))
+        record.current.success = True
+        record.current.processed_count = 0
+
+        # Work!
+        def _handler(res):
+            entry = record.getCurrentEntry(res.path)
+            assert entry is not None
+            entry.flags |= res.flags
+            entry.proc_tree = res.proc_tree
+            entry.rel_outputs = res.rel_outputs
+            if res.errors:
+                entry.errors += res.errors
+                record.current.success = False
+            if entry.flags & FLAG_PROCESSED:
+                record.current.processed_count += 1
+
+        pool = self._createWorkerPool()
+        expected_result_count = self._process(src_dir_or_file, pool, record)
+        self._waitOnWorkerPool(pool, expected_result_count, _handler)
+        self._terminateWorkerPool(pool)
+
+        # Get timing information from the workers.
+        record.current.timers = {}
+        for _ in range(len(pool.workers)):
+            try:
+                timers = pool.results.get(True, 0.1)
+            except queue.Empty:
+                logger.error("Didn't get timing information from all workers.")
+                break
+
+            for name, val in timers['data'].items():
+                main_val = record.current.timers.setdefault(name, 0)
+                record.current.timers[name] = main_val + val
+
+        # Invoke post-processors.
+        pipeline_ctx.record = record.current
+        for proc in processors:
+            proc.onPipelineEnd(pipeline_ctx)
+
+        # Handle deletions.
+        if delete:
+            for path, reason in record.getDeletions():
+                logger.debug("Removing '%s': %s" % (path, reason))
+                try:
+                    os.remove(path)
+                except FileNotFoundError:
+                    pass
+                logger.info('[delete] %s' % path)
+
+        # Finalize the process record.
+        record.current.process_time = time.time()
+        record.current.out_dir = self.out_dir
+        record.collapseRecords()
+
+        # Save the process record.
+        if save_record:
+            with format_timed_scope(logger, 'saved bake record',
+                                    level=logging.DEBUG, colored=False):
+                record.saveCurrent(record_cache.getCachePath(record_name))
+
+        logger.info(format_timed(
+                start_time,
+                "processed %d assets." % record.current.processed_count))
+
+        return record.detach()
+
+    def _process(self, src_dir_or_file, pool, record):
+        expected_result_count = 0
+
+        if src_dir_or_file is not None:
+            # Process only the given path.
+            # Find out what mount point this is in.
+            for name, info in self.mounts.items():
+                path = info['path']
+                if src_dir_or_file[:len(path)] == path:
+                    base_dir = path
+                    mount_info = info
+                    break
+            else:
+                known_roots = [i['path'] for i in self.mounts.values()]
+                raise Exception("Input path '%s' is not part of any known "
+                                "mount point: %s" %
+                                (src_dir_or_file, known_roots))
+
+            ctx = _ProcessingContext(pool, record, base_dir, mount_info)
+            logger.debug("Initiating processing pipeline on: %s" %
+                         src_dir_or_file)
+            if os.path.isdir(src_dir_or_file):
+                expected_result_count = self._processDirectory(
+                        ctx, src_dir_or_file)
+            elif os.path.isfile(src_dir_or_file):
+                self._processFile(ctx, src_dir_or_file)
+                expected_result_count = 1
+
+        else:
+            # Process everything.
+            for name, info in self.mounts.items():
+                path = info['path']
+                ctx = _ProcessingContext(pool, record, path, info)
+                logger.debug("Initiating processing pipeline on: %s" % path)
+                expected_result_count = self._processDirectory(ctx, path)
+
+        return expected_result_count
+
+    def _processDirectory(self, ctx, start_dir):
+        queued_count = 0
+        for dirpath, dirnames, filenames in os.walk(start_dir):
+            rel_dirpath = os.path.relpath(dirpath, start_dir)
+            dirnames[:] = [d for d in dirnames
+                           if not re_matchany(
+                               d, self.ignore_patterns, rel_dirpath)]
+
+            for filename in filenames:
+                if re_matchany(filename, self.ignore_patterns, rel_dirpath):
+                    continue
+                self._processFile(ctx, os.path.join(dirpath, filename))
+                queued_count += 1
+        return queued_count
+
+    def _processFile(self, ctx, path):
+        # TODO: handle overrides between mount-points.
+
+        entry = ProcessorPipelineRecordEntry(path)
+        ctx.record.addEntry(entry)
+
+        previous_entry = ctx.record.getPreviousEntry(path)
+        force_this = (self.force or previous_entry is None or
+                      not previous_entry.was_processed_successfully)
+
+        job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path,
+                                  force=force_this)
+
+        logger.debug("Queuing: %s" % path)
+        ctx.pool.queue.put_nowait(job)
+
+    def _createWorkerPool(self):
+        pool = _WorkerPool()
+        for i in range(self.num_workers):
+            ctx = ProcessingWorkerContext(
+                    self.app.root_dir, self.out_dir, self.tmp_dir,
+                    pool.queue, pool.results, pool.abort_event,
+                    self.force, self.app.debug)
+            ctx.enabled_processors = self.enabled_processors
+            ctx.additional_processors = self.additional_processors
+            w = multiprocessing.Process(
+                    name='Worker_%d' % i,
+                    target=worker_func, args=(i, ctx))
+            w.start()
+            pool.workers.append(w)
+        return pool
+
+    def _waitOnWorkerPool(self, pool, expected_result_count, result_handler):
+        abort_with_exception = None
+        try:
+            got_count = 0
+            while got_count < expected_result_count:
+                try:
+                    res = pool.results.get(True, 10)
+                except queue.Empty:
+                    logger.error(
+                            "Got %d results, expected %d, and timed-out "
+                            "for 10 seconds. A worker might be stuck?" %
+                            (got_count, expected_result_count))
+                    abort_with_exception = Exception("Worker time-out.")
+                    break
+
+                if isinstance(res, dict) and res.get('type') == 'error':
+                    abort_with_exception = Exception(
+                            'Worker critical error:\n' +
+                            '\n'.join(res['messages']))
+                    break
+
+                got_count += 1
+                result_handler(res)
+        except KeyboardInterrupt as kiex:
+            logger.warning("Bake aborted by user... "
+                           "waiting for workers to stop.")
+            abort_with_exception = kiex
+
+        if abort_with_exception:
+            pool.abort_event.set()
+            for w in pool.workers:
+                w.join(2)
+            raise abort_with_exception
+
+    def _terminateWorkerPool(self, pool):
+        pool.abort_event.set()
+        for w in pool.workers:
+            w.join()
+
+
+class _WorkerPool(object):
+    def __init__(self):
+        self.queue = multiprocessing.JoinableQueue()
+        self.results = multiprocessing.Queue()
+        self.abort_event = multiprocessing.Event()
+        self.workers = []
+
+
+def make_mount_infos(mounts, root_dir):
+    if isinstance(mounts, list):
+        mounts = {m: {} for m in mounts}
+
+    for name, info in mounts.items():
+        if not isinstance(info, dict):
+            raise Exception("Asset directory info for '%s' is not a "
+                            "dictionary." % name)
+        info.setdefault('processors', 'all -uglifyjs -cleancss')
+        info['path'] = os.path.join(root_dir, name)
+
+    return mounts
+
+
+def make_re(patterns):
+    re_patterns = []
+    for pat in patterns:
+        if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2:
+            re_patterns.append(pat[1:-1])
+        else:
+            escaped_pat = (
+                    re.escape(pat)
+                    .replace(r'\*', r'[^/\\]*')
+                    .replace(r'\?', r'[^/\\]'))
+            re_patterns.append(escaped_pat)
+    return [re.compile(p) for p in re_patterns]
+
+
+def re_matchany(filename, patterns, dirname=None):
+    if dirname and dirname != '.':
+        filename = os.path.join(dirname, filename)
+
+    # skip patterns use a forward slash regardless of the platform.
+    filename = filename.replace('\\', '/')
+    for pattern in patterns:
+        if pattern.search(filename):
+            return True
+    return False
+
--- a/piecrust/processing/records.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/piecrust/processing/records.py	Sat Jun 20 19:20:30 2015 -0700
@@ -1,48 +1,33 @@
 import os.path
+import hashlib
 from piecrust.records import Record, TransitionalRecord
 
 
 class ProcessorPipelineRecord(Record):
-    RECORD_VERSION = 4
+    RECORD_VERSION = 5
 
     def __init__(self):
         super(ProcessorPipelineRecord, self).__init__()
         self.out_dir = None
         self.process_time = None
+        self.processed_count = 0
         self.success = False
-
-    def hasOverrideEntry(self, rel_path):
-        return self.findEntry(rel_path) is not None
-
-    def findEntry(self, rel_path):
-        rel_path = rel_path.lower()
-        for entry in self.entries:
-            for out_path in entry.rel_outputs:
-                if out_path.lower() == rel_path:
-                    return entry
-        return None
-
-    def replaceEntry(self, new_entry):
-        for e in self.entries:
-            if (e.base_dir == new_entry.base_dir and
-                    e.rel_input == new_entry.rel_input):
-                e.flags = new_entry.flags
-                e.rel_outputs = list(new_entry.rel_outputs)
-                e.errors = list(new_entry.errors)
-                break
+        self.timers = None
 
 
 FLAG_NONE = 0
 FLAG_PREPARED = 2**0
 FLAG_PROCESSED = 2**1
-FLAG_OVERRIDEN = 2**2
 FLAG_BYPASSED_STRUCTURED_PROCESSING = 2**3
 
 
+def _get_transition_key(path):
+    return hashlib.md5(path.encode('utf8')).hexdigest()
+
+
 class ProcessorPipelineRecordEntry(object):
-    def __init__(self, base_dir, rel_input):
-        self.base_dir = base_dir
-        self.rel_input = rel_input
+    def __init__(self, path):
+        self.path = path
 
         self.flags = FLAG_NONE
         self.rel_outputs = []
@@ -50,10 +35,6 @@
         self.errors = []
 
     @property
-    def path(self):
-        return os.path.join(self.base_dir, self.rel_input)
-
-    @property
     def was_prepared(self):
         return bool(self.flags & FLAG_PREPARED)
 
@@ -73,10 +54,18 @@
                 ProcessorPipelineRecord, previous_path)
 
     def getTransitionKey(self, entry):
-        return entry.rel_input
+        return _get_transition_key(entry.path)
 
-    def getPreviousEntry(self, rel_path):
-        pair = self.transitions.get(rel_path)
+    def getCurrentEntry(self, path):
+        key = _get_transition_key(path)
+        pair = self.transitions.get(key)
+        if pair is not None:
+            return pair[1]
+        return None
+
+    def getPreviousEntry(self, path):
+        key = _get_transition_key(path)
+        pair = self.transitions.get(key)
         if pair is not None:
             return pair[0]
         return None
--- a/piecrust/processing/sass.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/piecrust/processing/sass.py	Sat Jun 20 19:20:30 2015 -0700
@@ -26,12 +26,14 @@
 
     def onPipelineStart(self, pipeline):
         super(SassProcessor, self).onPipelineStart(pipeline)
-        self._map_dir = os.path.join(pipeline.tmp_dir, 'sass')
-        if not os.path.isdir(self._map_dir):
-            os.makedirs(self._map_dir)
+
+        if pipeline.is_first_worker:
+            self._map_dir = os.path.join(pipeline.tmp_dir, 'sass')
+            if not os.path.isdir(self._map_dir):
+                os.makedirs(self._map_dir)
 
         # Ignore include-only Sass files.
-        pipeline.addSkipPatterns(['_*.scss', '_*.sass'])
+        pipeline.addIgnorePatterns(['_*.scss', '_*.sass'])
 
     def getDependencies(self, path):
         if _is_include_only(path):
--- a/piecrust/processing/tree.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/piecrust/processing/tree.py	Sat Jun 20 19:20:30 2015 -0700
@@ -79,7 +79,6 @@
         self.processors = processors
 
     def build(self, path):
-        start_time = time.clock()
         tree_root = ProcessingTreeNode(path, list(self.processors))
 
         loop_guard = 100
@@ -97,7 +96,7 @@
             if proc.is_bypassing_structured_processing:
                 if cur_node != tree_root:
                     raise ProcessingTreeError("Only root processors can "
-                            "bypass structured processing.")
+                                              "bypass structured processing.")
                 break
 
             # Get the destination directory and output files.
@@ -116,18 +115,14 @@
                 if proc.PROCESSOR_NAME != 'copy':
                     walk_stack.append(out_node)
 
-        logger.debug(format_timed(
-            start_time, "Built processing tree for: %s" % path,
-            colored=False))
         return tree_root
 
 
 class ProcessingTreeRunner(object):
-    def __init__(self, base_dir, tmp_dir, out_dir, lock=None):
+    def __init__(self, base_dir, tmp_dir, out_dir):
         self.base_dir = base_dir
         self.tmp_dir = tmp_dir
         self.out_dir = out_dir
-        self.lock = lock
 
     def processSubTree(self, tree_root):
         did_process = False
@@ -155,8 +150,9 @@
         proc = node.getProcessor()
         if proc.is_bypassing_structured_processing:
             try:
-                start_time = time.clock()
-                proc.process(full_path, self.out_dir)
+                start_time = time.perf_counter()
+                with proc.app.env.timerScope(proc.__class__.__name__):
+                    proc.process(full_path, self.out_dir)
                 print_node(
                         node,
                         format_timed(
@@ -172,16 +168,15 @@
         rel_out_dir = os.path.dirname(node.path)
         out_dir = os.path.join(base_out_dir, rel_out_dir)
         if not os.path.isdir(out_dir):
-            if self.lock:
-                with self.lock:
-                    if not os.path.isdir(out_dir):
-                        os.makedirs(out_dir, 0o755)
-            else:
-                os.makedirs(out_dir, 0o755)
+            try:
+                os.makedirs(out_dir, 0o755, exist_ok=True)
+            except OSError:
+                pass
 
         try:
-            start_time = time.clock()
-            proc_res = proc.process(full_path, out_dir)
+            start_time = time.perf_counter()
+            with proc.app.env.timerScope(proc.__class__.__name__):
+                proc_res = proc.process(full_path, out_dir)
             if proc_res is None:
                 raise Exception("Processor '%s' didn't return a boolean "
                                 "result value." % proc)
@@ -200,12 +195,12 @@
 
         proc = node.getProcessor()
         if (proc.is_bypassing_structured_processing or
-            not proc.is_delegating_dependency_check):
+                not proc.is_delegating_dependency_check):
             # This processor wants to handle things on its own...
             node.setState(STATE_DIRTY, False)
             return
 
-        start_time = time.clock()
+        start_time = time.perf_counter()
 
         # Get paths and modification times for the input path and
         # all dependencies (if any).
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/processing/worker.py	Sat Jun 20 19:20:30 2015 -0700
@@ -0,0 +1,213 @@
+import os.path
+import re
+import time
+import queue
+import logging
+from piecrust.app import PieCrust
+from piecrust.processing.base import PipelineContext
+from piecrust.processing.records import (
+        FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED,
+        FLAG_BYPASSED_STRUCTURED_PROCESSING)
+from piecrust.processing.tree import (
+        ProcessingTreeBuilder, ProcessingTreeRunner,
+        ProcessingTreeError, ProcessorError,
+        get_node_name_tree, print_node,
+        STATE_DIRTY)
+
+
+logger = logging.getLogger(__name__)
+
+
+split_processor_names_re = re.compile(r'[ ,]+')
+re_ansicolors = re.compile('\033\\[\d+m')
+
+
+def worker_func(wid, ctx):
+    logger.debug("Worker %d booting up..." % wid)
+    w = ProcessingWorker(wid, ctx)
+    w.run()
+
+
+class ProcessingWorkerContext(object):
+    def __init__(self, root_dir, out_dir, tmp_dir,
+                 work_queue, results, abort_event,
+                 force=False, debug=False):
+        self.root_dir = root_dir
+        self.out_dir = out_dir
+        self.tmp_dir = tmp_dir
+        self.work_queue = work_queue
+        self.results = results
+        self.abort_event = abort_event
+        self.force = force
+        self.debug = debug
+        self.enabled_processors = None
+        self.additional_processors = None
+
+
+class ProcessingWorkerJob(object):
+    def __init__(self, base_dir, mount_info, path, *, force=False):
+        self.base_dir = base_dir
+        self.mount_info = mount_info
+        self.path = path
+        self.force = force
+
+
+class ProcessingWorkerResult(object):
+    def __init__(self, path):
+        self.path = path
+        self.flags = FLAG_NONE
+        self.proc_tree = None
+        self.rel_outputs = None
+        self.errors = None
+
+
+class ProcessingWorker(object):
+    def __init__(self, wid, ctx):
+        self.wid = wid
+        self.ctx = ctx
+
+    def run(self):
+        logger.debug("Worker %d initializing..." % self.wid)
+        work_start_time = time.perf_counter()
+
+        # Create the app local to this worker.
+        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
+        app.env.fs_cache_only_for_main_page = True
+        app.env.registerTimer("Worker_%d" % self.wid)
+        app.env.registerTimer("JobReceive")
+        app.env.registerTimer('BuildProcessingTree')
+        app.env.registerTimer('RunProcessingTree')
+
+        processors = app.plugin_loader.getProcessors()
+        if self.ctx.enabled_processors:
+            logger.debug("Filtering processors to: %s" %
+                         self.ctx.enabled_processors)
+            processors = get_filtered_processors(processors,
+                                                 self.ctx.enabled_processors)
+        if self.ctx.additional_processors:
+            logger.debug("Adding %s additional processors." %
+                         len(self.ctx.additional_processors))
+            for proc in self.ctx.additional_processors:
+                app.env.registerTimer(proc.__class__.__name__)
+                proc.initialize(app)
+                processors.append(proc)
+
+        # Invoke pre-processors.
+        pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir,
+                                       self.ctx.tmp_dir, self.ctx.force)
+        for proc in processors:
+            proc.onPipelineStart(pipeline_ctx)
+
+        # Sort our processors again in case the pre-process step involved
+        # patching the processors with some new ones.
+        processors.sort(key=lambda p: p.priority)
+
+        aborted_with_exception = None
+        while not self.ctx.abort_event.is_set():
+            try:
+                with app.env.timerScope('JobReceive'):
+                    job = self.ctx.work_queue.get(True, 0.01)
+            except queue.Empty:
+                continue
+
+            try:
+                result = self._unsafeRun(app, processors, job)
+                self.ctx.results.put_nowait(result)
+            except Exception as ex:
+                self.ctx.abort_event.set()
+                aborted_with_exception = ex
+                logger.error("[%d] Critical error, aborting." % self.wid)
+                if self.ctx.debug:
+                    logger.exception(ex)
+                break
+            finally:
+                self.ctx.work_queue.task_done()
+
+        if aborted_with_exception is not None:
+            msgs = _get_errors(aborted_with_exception)
+            self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
+
+        # Invoke post-processors.
+        for proc in processors:
+            proc.onPipelineEnd(pipeline_ctx)
+
+        app.env.stepTimer("Worker_%d" % self.wid,
+                          time.perf_counter() - work_start_time)
+        self.ctx.results.put_nowait({
+                'type': 'timers', 'data': app.env._timers})
+
+    def _unsafeRun(self, app, processors, job):
+        result = ProcessingWorkerResult(job.path)
+
+        processors = get_filtered_processors(
+                processors, job.mount_info['processors'])
+
+        # Build the processing tree for this job.
+        rel_path = os.path.relpath(job.path, job.base_dir)
+        try:
+            with app.env.timerScope('BuildProcessingTree'):
+                builder = ProcessingTreeBuilder(processors)
+                tree_root = builder.build(rel_path)
+                result.flags |= FLAG_PREPARED
+        except ProcessingTreeError as ex:
+            result.errors += _get_errors(ex)
+            return result
+
+        # Prepare and run the tree.
+        print_node(tree_root, recursive=True)
+        leaves = tree_root.getLeaves()
+        result.rel_outputs = [l.path for l in leaves]
+        result.proc_tree = get_node_name_tree(tree_root)
+        if tree_root.getProcessor().is_bypassing_structured_processing:
+            result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING
+
+        if job.force:
+            tree_root.setState(STATE_DIRTY, True)
+
+        try:
+            with app.env.timerScope('RunProcessingTree'):
+                runner = ProcessingTreeRunner(
+                        job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
+                if runner.processSubTree(tree_root):
+                    result.flags |= FLAG_PROCESSED
+        except ProcessingTreeError as ex:
+            if isinstance(ex, ProcessorError):
+                ex = ex.__cause__
+            # Need to strip out colored errors from external processes.
+            result.errors += _get_errors(ex, strip_colors=True)
+
+        return result
+
+
+def get_filtered_processors(processors, authorized_names):
+    if not authorized_names or authorized_names == 'all':
+        return processors
+
+    if isinstance(authorized_names, str):
+        authorized_names = split_processor_names_re.split(authorized_names)
+
+    procs = []
+    has_star = 'all' in authorized_names
+    for p in processors:
+        for name in authorized_names:
+            if name == p.PROCESSOR_NAME:
+                procs.append(p)
+                break
+            if name == ('-%s' % p.PROCESSOR_NAME):
+                break
+        else:
+            if has_star:
+                procs.append(p)
+    return procs
+
+
+def _get_errors(ex, strip_colors=False):
+    errors = []
+    while ex is not None:
+        msg = str(ex)
+        if strip_colors:
+            msg = re_ansicolors.sub('', msg)
+        errors.append(msg)
+        ex = ex.__cause__
+    return errors
+
--- a/tests/test_processing_base.py	Sat Jun 20 19:16:38 2015 -0700
+++ b/tests/test_processing_base.py	Sat Jun 20 19:20:30 2015 -0700
@@ -2,8 +2,10 @@
 import os.path
 import shutil
 import pytest
-from piecrust.processing.base import (ProcessorPipeline, SimpleFileProcessor)
+from piecrust.processing.base import SimpleFileProcessor
+from piecrust.processing.pipeline import ProcessorPipeline
 from piecrust.processing.records import ProcessorPipelineRecord
+from piecrust.processing.worker import get_filtered_processors
 from .mockutil import mock_fs, mock_fs_scope
 
 
@@ -36,7 +38,6 @@
 
 def _get_pipeline(fs, app=None):
     app = app or fs.getApp()
-    app.config.set('baker/num_workers', 1)
     return ProcessorPipeline(app, fs.path('counter'))
 
 
@@ -44,7 +45,7 @@
     fs = mock_fs()
     with mock_fs_scope(fs):
         pp = _get_pipeline(fs)
-        pp.filterProcessors(['copy'])
+        pp.enabled_processors = ['copy']
         expected = {}
         assert expected == fs.getStructure('counter')
         pp.run()
@@ -57,7 +58,7 @@
             .withFile('kitchen/assets/something.html', 'A test file.'))
     with mock_fs_scope(fs):
         pp = _get_pipeline(fs)
-        pp.filterProcessors(['copy'])
+        pp.enabled_processors = ['copy']
         expected = {}
         assert expected == fs.getStructure('counter')
         pp.run()
@@ -70,7 +71,7 @@
             .withFile('kitchen/assets/blah.foo', 'A test file.'))
     with mock_fs_scope(fs):
         pp = _get_pipeline(fs)
-        pp.filterProcessors(['copy'])
+        pp.enabled_processors = ['copy']
         pp.run()
         expected = {'blah.foo': 'A test file.'}
         assert expected == fs.getStructure('counter')
@@ -93,10 +94,10 @@
 def test_two_levels_dirtyness():
     fs = (mock_fs()
             .withFile('kitchen/assets/blah.foo', 'A test file.'))
-    with mock_fs_scope(fs) as scope:
+    with mock_fs_scope(fs):
         pp = _get_pipeline(fs)
-        pp.processors.append(FooProcessor(('foo', 'bar'), scope._open))
-        pp.filterProcessors(['foo', 'copy'])
+        pp.enabled_processors = ['copy']
+        pp.additional_processors = [FooProcessor(('foo', 'bar'))]
         pp.run()
         expected = {'blah.bar': 'FOO: A test file.'}
         assert expected == fs.getStructure('counter')
@@ -126,7 +127,7 @@
                 'blah2.foo': 'Ooops'}
         assert expected == fs.getStructure('kitchen/assets')
         pp = _get_pipeline(fs)
-        pp.filterProcessors(['copy'])
+        pp.enabled_processors = ['copy']
         pp.run()
         assert expected == fs.getStructure('counter')
 
@@ -145,18 +146,21 @@
     with mock_fs_scope(fs):
         pp = _get_pipeline(fs)
         noop = NoopProcessor(('foo', 'foo'))
-        pp.processors.append(noop)
-        pp.filterProcessors(['foo', 'copy'])
+        pp.enabled_processors = ['copy']
+        pp.additional_processors = [noop]
         pp.run()
-        assert 1 == len(noop.processed)
+        assert os.path.exists(fs.path('/counter/blah.foo')) is True
+        mtime = os.path.getmtime(fs.path('/counter/blah.foo'))
 
+        time.sleep(1)
         pp.run()
-        assert 1 == len(noop.processed)
+        assert mtime == os.path.getmtime(fs.path('/counter/blah.foo'))
 
+        time.sleep(1)
         ProcessorPipelineRecord.RECORD_VERSION += 1
         try:
             pp.run()
-            assert 2 == len(noop.processed)
+            assert mtime < os.path.getmtime(fs.path('/counter/blah.foo'))
         finally:
             ProcessorPipelineRecord.RECORD_VERSION -= 1
 
@@ -170,24 +174,27 @@
             {'something.html': 'A test file.',
                 'foo': {'_important.html': 'Important!'}})
         ])
-def test_skip_pattern(patterns, expected):
+def test_ignore_pattern(patterns, expected):
     fs = (mock_fs()
             .withFile('kitchen/assets/something.html', 'A test file.')
             .withFile('kitchen/assets/_hidden.html', 'Shhh')
             .withFile('kitchen/assets/foo/_important.html', 'Important!'))
     with mock_fs_scope(fs):
         pp = _get_pipeline(fs)
-        pp.addSkipPatterns(patterns)
-        pp.filterProcessors(['copy'])
+        pp.addIgnorePatterns(patterns)
+        pp.enabled_processors = ['copy']
         assert {} == fs.getStructure('counter')
         pp.run()
         assert expected == fs.getStructure('counter')
 
 
 @pytest.mark.parametrize('names, expected', [
-        ('all', ['copy', 'concat', 'less', 'sass', 'sitemap']),
-        ('all -sitemap', ['copy', 'concat', 'less', 'sass']),
-        ('-sitemap -less -sass all', ['copy', 'concat']),
+        ('all', ['cleancss', 'compass', 'copy', 'concat', 'less', 'requirejs',
+                 'sass', 'sitemap', 'uglifyjs']),
+        ('all -sitemap', ['cleancss', 'copy', 'compass', 'concat', 'less',
+                          'requirejs', 'sass', 'uglifyjs']),
+        ('-sitemap -less -sass all', ['cleancss', 'copy', 'compass', 'concat',
+                                      'requirejs', 'uglifyjs']),
         ('copy', ['copy']),
         ('less sass', ['less', 'sass'])
     ])
@@ -195,9 +202,8 @@
     fs = mock_fs()
     with mock_fs_scope(fs):
         app = fs.getApp()
-        pp = _get_pipeline(fs, app=app)
-        pp.filterProcessors('copy concat less sass sitemap')
-        procs = pp.getFilteredProcessors(names)
+        processors = app.plugin_loader.getProcessors()
+        procs = get_filtered_processors(processors, names)
         actual = [p.PROCESSOR_NAME for p in procs]
         assert sorted(actual) == sorted(expected)