changeset 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents f070a4fc033c
children 448710d84121
files piecrust/app.py piecrust/appconfig.py piecrust/baking/baker.py piecrust/baking/worker.py piecrust/commands/builtin/baking.py piecrust/data/builder.py piecrust/data/filters.py piecrust/data/linker.py piecrust/data/pagedata.py piecrust/data/paginationdata.py piecrust/data/paginator.py piecrust/data/providersdata.py piecrust/dataproviders/asset_iterator.py piecrust/dataproviders/assetiterator.py piecrust/dataproviders/base.py piecrust/dataproviders/blog.py piecrust/dataproviders/page_iterator.py piecrust/dataproviders/pageiterator.py piecrust/environment.py piecrust/pipelines/_pagebaker.py piecrust/pipelines/_pagerecords.py piecrust/pipelines/_procrecords.py piecrust/pipelines/asset.py piecrust/pipelines/base.py piecrust/pipelines/page.py piecrust/pipelines/records.py piecrust/plugins/builtin.py piecrust/processing/base.py piecrust/processing/sitemap.py piecrust/rendering.py piecrust/routing.py piecrust/serving/procloop.py piecrust/serving/server.py piecrust/serving/util.py piecrust/sources/base.py piecrust/sources/blogarchives.py piecrust/sources/default.py piecrust/sources/fs.py piecrust/sources/posts.py piecrust/sources/taxonomy.py piecrust/templating/jinja/environment.py piecrust/workerpool.py requirements.txt
diffstat 41 files changed, 1606 insertions(+), 1637 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/app.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/app.py	Sun Jun 04 23:34:28 2017 -0700
@@ -159,6 +159,7 @@
         for r in self.config.get('site/routes'):
             rte = Route(self, r)
             routes.append(rte)
+        routes = sorted(routes, key=lambda r: r.pass_num)
         return routes
 
     @cached_property
--- a/piecrust/appconfig.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/appconfig.py	Sun Jun 04 23:34:28 2017 -0700
@@ -391,7 +391,7 @@
         sc.setdefault('items_per_page', 5)
         sc.setdefault('date_format', DEFAULT_DATE_FORMAT)
         sc.setdefault('realm', REALM_USER)
-        sc.setdefault('pipeline', 'page')
+        sc.setdefault('pipeline', None)
 
         # Validate endpoints.
         endpoint = sc['data_endpoint']
@@ -436,6 +436,7 @@
                                      r_source)
         used_sources.add(r_source)
 
+        rc.setdefault('pass', 1)
         rc.setdefault('page_suffix', '/%num%')
 
     return v
--- a/piecrust/baking/baker.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/baking/baker.py	Sun Jun 04 23:34:28 2017 -0700
@@ -2,11 +2,12 @@
 import os.path
 import hashlib
 import logging
-from piecrust.baking.worker import BakeJob
 from piecrust.chefutil import (
     format_timed_scope, format_timed)
 from piecrust.environment import ExecutionStats
-from piecrust.pipelines.base import PipelineContext
+from piecrust.pipelines.base import (
+    PipelineMergeRecordContext, PipelineManager,
+    get_pipeline_name_for_source)
 from piecrust.pipelines.records import (
     MultiRecordHistory, MultiRecord, RecordEntry,
     load_records)
@@ -31,16 +32,10 @@
         self.out_dir = out_dir
         self.force = force
 
-        self._pipeline_classes = {}
-        for pclass in app.plugin_loader.getPipelines():
-            self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
-
         self.allowed_pipelines = allowed_pipelines
         if allowed_pipelines is None:
             self.allowed_pipelines = list(self._pipeline_classes.keys())
 
-        self._records = None
-
     def bake(self):
         start_time = time.perf_counter()
         logger.debug("  Bake Output: %s" % self.out_dir)
@@ -63,18 +58,19 @@
                 previous_records = load_records(records_path)
         else:
             previous_records = MultiRecord()
-        self._records = MultiRecord()
+        current_records = MultiRecord()
 
         # Figure out if we need to clean the cache because important things
         # have changed.
         is_cache_valid = self._handleCacheValidity(previous_records,
-                                                   self._records)
+                                                   current_records)
         if not is_cache_valid:
             previous_records = MultiRecord()
 
         # Create the bake records history which tracks what's up-to-date
         # or not since last time we baked to the given output folder.
-        record_histories = MultiRecordHistory(previous_records, self._records)
+        record_histories = MultiRecordHistory(
+            previous_records, current_records)
 
         # Pre-create all caches.
         for cache_name in ['app', 'baker', 'pages', 'renders']:
@@ -86,49 +82,54 @@
         # realm).
         #
         # Also, create and initialize each pipeline for each source.
-        sources_by_realm = {}
+        has_any_pp = False
+        ppmngr = PipelineManager(
+            self.app, self.out_dir, record_histories)
         for source in self.app.sources:
-            pname = source.config['pipeline']
+            pname = get_pipeline_name_for_source(source)
             if pname in self.allowed_pipelines:
-                srclist = sources_by_realm.setdefault(
-                    source.config['realm'], [])
-
-                pp = self._pipeline_classes[pname](source)
-
-                record_name = _get_record_name(source.name, pname)
-                record_history = record_histories.getHistory(record_name)
-                ppctx = PipelineContext(self.out_dir, record_history,
-                                        force=self.force)
-                pp.initialize(ppctx)
-
-                srclist.append((source, pp, ppctx))
+                ppinfo = ppmngr.createPipeline(source)
+                logger.debug(
+                    "Created pipeline '%s' for source: %s" %
+                    (ppinfo.pipeline.PIPELINE_NAME, source.name))
+                has_any_pp = True
             else:
                 logger.debug(
                     "Skip source '%s' because pipeline '%s' is ignored." %
                     (source.name, pname))
+        if not has_any_pp:
+            raise Exception("The website has no content sources, or the bake "
+                            "command was invoked with all pipelines filtered "
+                            "out. There's nothing to do.")
 
         # Create the worker processes.
-        pool = self._createWorkerPool(records_path)
+        pool_userdata = _PoolUserData(self, ppmngr, current_records)
+        pool = self._createWorkerPool(records_path, pool_userdata)
+        realm_list = [REALM_USER, REALM_THEME]
 
         # Bake the realms -- user first, theme second, so that a user item
         # can override a theme item.
-        realm_list = [REALM_USER, REALM_THEME]
-        for realm in realm_list:
-            srclist = sources_by_realm.get(realm)
-            if srclist is not None:
-                self._bakeRealm(pool, srclist)
+        # Do this for as many times as we have pipeline passes left to do.
+        pp_by_pass_and_realm = {}
+        for ppinfo in ppmngr.getPipelines():
+            pp_by_realm = pp_by_pass_and_realm.setdefault(
+                ppinfo.pipeline.PASS_NUM, {})
+            pplist = pp_by_realm.setdefault(
+                ppinfo.pipeline.source.config['realm'], [])
+            pplist.append(ppinfo)
 
-        # Handle deletions.
-        for realm in realm_list:
-            srclist = sources_by_realm.get(realm)
-            if srclist is not None:
-                self._deleteStaleOutputs(pool, srclist)
+        for pp_pass in sorted(pp_by_pass_and_realm.keys()):
+            logger.debug("Pipelines pass %d" % pp_pass)
+            pp_by_realm = pp_by_pass_and_realm[pp_pass]
+            for realm in realm_list:
+                pplist = pp_by_realm.get(realm)
+                if pplist is not None:
+                    self._bakeRealm(pool, pplist)
 
-        # Collapse records.
-        for realm in realm_list:
-            srclist = sources_by_realm.get(realm)
-            if srclist is not None:
-                self._collapseRecords(srclist)
+        # Handle deletions, collapse records, etc.
+        ppmngr.buildHistoryDiffs()
+        ppmngr.deleteStaleOutputs()
+        ppmngr.collapseRecords()
 
         # All done with the workers. Close the pool and get reports.
         pool_stats = pool.close()
@@ -136,14 +137,10 @@
         for ps in pool_stats:
             if ps is not None:
                 total_stats.mergeStats(ps)
-        record_histories.current.stats = total_stats
+        current_records.stats = total_stats
 
         # Shutdown the pipelines.
-        for realm in realm_list:
-            srclist = sources_by_realm.get(realm)
-            if srclist is not None:
-                for _, pp, ppctx in srclist:
-                    pp.shutdown(ppctx)
+        ppmngr.shutdownPipelines()
 
         # Backup previous records.
         records_dir, records_fn = os.path.split(records_path)
@@ -164,16 +161,15 @@
         # Save the bake records.
         with format_timed_scope(logger, "saved bake records.",
                                 level=logging.DEBUG, colored=False):
-            record_histories.current.bake_time = time.time()
-            record_histories.current.out_dir = self.out_dir
-            record_histories.current.save(records_path)
+            current_records.bake_time = time.time()
+            current_records.out_dir = self.out_dir
+            current_records.save(records_path)
 
         # All done.
         self.app.config.set('baker/is_baking', False)
         logger.debug(format_timed(start_time, 'done baking'))
 
-        self._records = None
-        return record_histories.current
+        return current_records
 
     def _handleCacheValidity(self, previous_records, current_records):
         start_time = time.perf_counter()
@@ -216,40 +212,58 @@
                 start_time, "cache is assumed valid", colored=False))
             return True
 
-    def _bakeRealm(self, pool, srclist):
-        for source, pp, ppctx in srclist:
-            logger.debug("Queuing jobs for source '%s' using pipeline '%s'." %
-                         (source.name, pp.PIPELINE_NAME))
-            jobs = [BakeJob(source.name, item.spec, item.metadata)
-                    for item in source.getAllContents()]
+    def _bakeRealm(self, pool, pplist):
+        # Start with the first pass, where we iterate on the content sources'
+        # items and run jobs on those.
+        pool.userdata.cur_pass = 0
+        next_pass_jobs = {}
+        pool.userdata.next_pass_jobs = next_pass_jobs
+        for ppinfo in pplist:
+            src = ppinfo.source
+            pp = ppinfo.pipeline
+
+            logger.debug(
+                "Queuing jobs for source '%s' using pipeline '%s' (pass 0)." %
+                (src.name, pp.PIPELINE_NAME))
+
+            next_pass_jobs[src.name] = []
+            jobs = pp.createJobs()
             pool.queueJobs(jobs)
         pool.wait()
 
-    def _deleteStaleOutputs(self, pool, srclist):
-        for source, pp, ppctx in srclist:
-            ppctx.record_history.build()
+        # Now let's see if any job created a follow-up job. Let's keep
+        # processing those jobs as long as they create new ones.
+        pool.userdata.cur_pass = 1
+        while True:
+            had_any_job = False
+
+            # Make a copy of out next pass jobs and reset the list, so
+            # the first jobs to be processed don't mess it up as we're
+            # still iterating on it.
+            next_pass_jobs = pool.userdata.next_pass_jobs
+            pool.userdata.next_pass_jobs = {}
 
-            to_delete = pp.getDeletions(ppctx)
-            if to_delete is not None:
-                for path, reason in to_delete:
-                    logger.debug("Removing '%s': %s" % (path, reason))
-                    ppctx.current_record.deleted_out_paths.append(path)
-                    try:
-                        os.remove(path)
-                    except FileNotFoundError:
-                        pass
-                    logger.info('[delete] %s' % path)
+            for sn, jobs in next_pass_jobs.items():
+                if jobs:
+                    logger.debug(
+                        "Queuing jobs for source '%s' (pass %d)." %
+                        (sn, pool.userdata.cur_pass))
+                    pool.userdata.next_pass_jobs[sn] = []
+                    pool.queueJobs(jobs)
+                    had_any_job = True
 
-    def _collapseRecords(self, srclist):
-        for source, pp, ppctx in srclist:
-            pp.collapseRecords(ppctx)
+            if not had_any_job:
+                break
+
+            pool.wait()
+            pool.userdata.cur_pass += 1
 
     def _logErrors(self, item_spec, errors):
         logger.error("Errors found in %s:" % item_spec)
         for e in errors:
             logger.error("  " + e)
 
-    def _createWorkerPool(self, previous_records_path):
+    def _createWorkerPool(self, previous_records_path, pool_userdata):
         from piecrust.workerpool import WorkerPool
         from piecrust.baking.worker import BakeWorkerContext, BakeWorker
 
@@ -268,36 +282,59 @@
             worker_class=BakeWorker,
             initargs=(ctx,),
             callback=self._handleWorkerResult,
-            error_callback=self._handleWorkerError)
+            error_callback=self._handleWorkerError,
+            userdata=pool_userdata)
         return pool
 
-    def _handleWorkerResult(self, job, res):
-        record_name = _get_record_name(job.source_name, res.pipeline_name)
-        record = self._records.getRecord(record_name)
-        record.entries.append(res.record_entry)
+    def _handleWorkerResult(self, job, res, userdata):
+        cur_pass = userdata.cur_pass
+        record = userdata.records.getRecord(job.record_name)
+
+        if cur_pass == 0:
+            record.addEntry(res.record_entry)
+        else:
+            ppinfo = userdata.ppmngr.getPipeline(job.source_name)
+            ppmrctx = PipelineMergeRecordContext(
+                record, job, cur_pass)
+            ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx)
+
+        npj = res.next_pass_job
+        if npj is not None:
+            npj.data['pass'] = cur_pass + 1
+            userdata.next_pass_jobs[job.source_name].append(npj)
+
         if not res.record_entry.success:
             record.success = False
-            self._records.success = False
-            self._logErrors(job.item_spec, res.record_entry.errors)
+            userdata.records.success = False
+            self._logErrors(job.content_item.spec, res.record_entry.errors)
+
+    def _handleWorkerError(self, job, exc_data, userdata):
+        cur_pass = userdata.cur_pass
+        record = userdata.records.getRecord(job.record_name)
 
-    def _handleWorkerError(self, job, exc_data):
-        e = RecordEntry()
-        e.item_spec = job.item_spec
-        e.errors.append(str(exc_data))
-
-        ppname = self.app.getSource(job.source_name).config['pipeline']
-        record_name = _get_record_name(job.source_name, ppname)
-        record_name = self._getRecordName(job)
-        record = self._records.getRecord(record_name)
-        record.entries.append(e)
+        if cur_pass == 0:
+            ppinfo = userdata.ppmngr.getPipeline(job.source_name)
+            entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry
+            e = entry_class()
+            e.item_spec = job.content_item.spec
+            e.errors.append(str(exc_data))
+            record.addEntry(e)
+        else:
+            e = record.getEntry(job.content_item.spec)
+            e.errors.append(str(exc_data))
 
         record.success = False
-        self._records.success = False
+        userdata.records.success = False
 
-        self._logErrors(job.item_spec, e.errors)
+        self._logErrors(job.content_item.spec, e.errors)
         if self.app.debug:
             logger.error(exc_data.traceback)
 
 
-def _get_record_name(source_name, pipeline_name):
-    return '%s@%s' % (source_name, pipeline_name)
+class _PoolUserData:
+    def __init__(self, baker, ppmngr, current_records):
+        self.baker = baker
+        self.ppmngr = ppmngr
+        self.records = current_records
+        self.cur_pass = 0
+        self.next_pass_jobs = {}
--- a/piecrust/baking/worker.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/baking/worker.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,9 +1,10 @@
 import time
 import logging
-from piecrust.pipelines.base import PipelineContext, PipelineResult
+from piecrust.pipelines.base import (
+    PipelineManager, PipelineJobRunContext, PipelineJobResult,
+    get_pipeline_name_for_source)
 from piecrust.pipelines.records import (
-    MultiRecordHistory, MultiRecord, RecordEntry, load_records)
-from piecrust.sources.base import ContentItem
+    MultiRecordHistory, MultiRecord, load_records)
 from piecrust.workerpool import IWorker
 
 
@@ -25,7 +26,7 @@
     def __init__(self, ctx):
         self.ctx = ctx
         self.app = None
-        self.record_history = None
+        self.record_histories = None
         self._work_start_time = time.perf_counter()
         self._sources = {}
         self._ppctx = None
@@ -51,41 +52,39 @@
         else:
             previous_records = MultiRecord()
         current_records = MultiRecord()
-        self.record_history = MultiRecordHistory(
+        self.record_histories = MultiRecordHistory(
             previous_records, current_records)
 
-        # Cache sources and create pipelines.
-        ppclasses = {}
-        for ppclass in app.plugin_loader.getPipelines():
-            ppclasses[ppclass.PIPELINE_NAME] = ppclass
-
-        self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history,
-                                      worker_id=self.wid,
-                                      force=self.ctx.force)
+        # Create the pipelines.
+        self.ppmngr = PipelineManager(
+            app, self.ctx.out_dir, self.record_histories,
+            worker_id=self.wid, force=self.ctx.force)
         for src in app.sources:
-            ppname = src.config['pipeline']
+            pname = get_pipeline_name_for_source(src)
             if (self.ctx.allowed_pipelines is not None and
-                    ppname not in self.ctx.allowed_pipelines):
+                    pname not in self.ctx.allowed_pipelines):
                 continue
 
-            pp = ppclasses[ppname](src)
-            pp.initialize(self._ppctx)
-            self._sources[src.name] = (src, pp)
+            self.ppmngr.createPipeline(src)
 
         stats.stepTimerSince("BakeWorkerInit", self._work_start_time)
 
     def process(self, job):
-        logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec))
-        src, pp = self._sources[job.source_name]
-        item = ContentItem(job.item_spec, job.item_metadata)
+        item = job.content_item
+        logger.debug("Received job: %s@%s" % (job.source_name, item.spec))
+
+        ppinfo = self.ppmngr.getPipeline(job.source_name)
+        pp = ppinfo.pipeline
 
-        entry_class = pp.RECORD_ENTRY_CLASS or RecordEntry
-        ppres = PipelineResult()
-        ppres.pipeline_name = pp.PIPELINE_NAME
-        ppres.record_entry = entry_class()
-        ppres.record_entry.item_spec = job.item_spec
+        ppres = PipelineJobResult()
+        # For subsequent pass jobs, there will be a record entry given. For
+        # first pass jobs, there's none so we get the pipeline to create it.
+        ppres.record_entry = job.data.get('record_entry')
+        if ppres.record_entry is None:
+            ppres.record_entry = pp.createRecordEntry(job)
 
-        pp.run(item, self._ppctx, ppres)
+        runctx = PipelineJobRunContext(job, pp, self.record_histories)
+        pp.run(job, runctx, ppres)
         return ppres
 
     def getStats(self):
@@ -98,10 +97,3 @@
         for src, pp in self._sources.values():
             pp.shutdown(self._ppctx)
 
-
-class BakeJob:
-    def __init__(self, source_name, item_spec, item_metadata):
-        self.source_name = source_name
-        self.item_spec = item_spec
-        self.item_metadata = item_metadata
-
--- a/piecrust/commands/builtin/baking.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/commands/builtin/baking.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,7 +1,9 @@
+import os.path
 import time
-import os.path
+import pprint
 import logging
 import fnmatch
+import textwrap
 import datetime
 from colorama import Fore
 from piecrust.commands.base import ChefCommand
@@ -28,7 +30,7 @@
         parser.add_argument(
             '-p', '--pipelines',
             help="The pipelines to run.",
-            nargs='*')
+            action='append')
         parser.add_argument(
             '-w', '--workers',
             help="The number of worker processes to spawn.",
@@ -91,6 +93,10 @@
         elif ctx.args.assets_only:
             allowed_pipelines = ['asset']
         elif ctx.args.pipelines:
+            if allowed_pipelines:
+                raise Exception(
+                    "Can't specify `--html-only` or `--assets-only` with "
+                    "`--pipelines`.")
             allowed_pipelines = ctx.args.pipelines
 
         baker = Baker(
@@ -200,14 +206,14 @@
             logger.info("Record: %s" % rec.name)
             logger.info("Status: %s" % ('SUCCESS' if rec.success
                                         else 'FAILURE'))
-            for e in rec.entries:
+            for e in rec.getEntries():
                 if ctx.args.fails and e.success:
                     continue
                 if in_pattern and not fnmatch.fnmatch(e.item_spec, in_pattern):
                     continue
                 if out_pattern and not any(
                         [fnmatch.fnmatch(op, out_pattern)
-                         for op in e.out_paths]):
+                         for op in e.getAllOutputPaths()]):
                     continue
                 _print_record_entry(e)
 
@@ -260,17 +266,24 @@
 def _print_record_entry(e):
     logger.info(" - %s" % e.item_spec)
     logger.info("   Outputs:")
-    if e.out_paths:
-        for op in e.out_paths:
+    out_paths = list(e.getAllOutputPaths())
+    if out_paths:
+        for op in out_paths:
             logger.info("    - %s" % op)
     else:
         logger.info("      <none>")
 
     e_desc = e.describe()
-    for k in sorted(e_desc.keys()):
-        logger.info("   %s: %s" % (k, e_desc[k]))
+    for k, v in e_desc.items():
+        if isinstance(v, dict):
+            text = pprint.pformat(v, indent=2)
+            logger.info("   %s:" % k)
+            logger.info(textwrap.indent(text, '     '))
+        else:
+            logger.info("   %s: %s" % (k, v))
 
-    if e.errors:
+    errors = list(e.getAllErrors())
+    if errors:
         logger.error("   Errors:")
-        for err in e.errors:
+        for err in errors:
             logger.error("    - %s" % err)
--- a/piecrust/data/builder.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/builder.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,7 +1,7 @@
 import logging
 from piecrust.data.assetor import Assetor
 from piecrust.data.base import MergedMapping
-# from piecrust.data.linker import PageLinkerData
+from piecrust.data.linker import Linker
 from piecrust.data.pagedata import PageData
 from piecrust.data.paginator import Paginator
 from piecrust.data.piecrustdata import PieCrustData
@@ -32,13 +32,13 @@
     paginator = Paginator(pgn_source, page, sub_num,
                           pgn_filter=ctx.pagination_filter)
     assetor = Assetor(page)
-    # linker = PageLinkerData(page.source, page.rel_path)
+    linker = Linker(page)
     data = {
         'piecrust': pc_data,
         'page': config_data,
         'assets': assetor,
         'pagination': paginator,
-        # 'family': linker
+        'family': linker
     }
 
     for route in app.routes:
@@ -49,6 +49,12 @@
         func = data.get(name)
         if func is None:
             data[name] = RouteFunction(route)
+        elif isinstance(func, RouteFunction):
+            if not func._isCompatibleRoute(route):
+                raise Exception(
+                    "Route function '%s' can't target both route '%s' and "
+                    "route '%s' as the 2 patterns are incompatible." %
+                    (name, func._route.uri_pattern, route.uri_pattern))
         else:
             raise Exception("Route function '%s' collides with an "
                             "existing function or template data." %
--- a/piecrust/data/filters.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/filters.py	Sun Jun 04 23:34:28 2017 -0700
@@ -4,14 +4,9 @@
 logger = logging.getLogger(__name__)
 
 
-def page_value_accessor(page, name):
-    return page.config.get(name)
-
-
 class PaginationFilter(object):
-    def __init__(self, value_accessor=None):
+    def __init__(self):
         self.root_clause = None
-        self.value_accessor = value_accessor or self._default_value_accessor
 
     @property
     def is_empty(self):
@@ -81,13 +76,6 @@
             else:
                 raise Exception("Unknown filter clause: %s" % key)
 
-    @staticmethod
-    def _default_value_accessor(item, name):
-        try:
-            return getattr(item, name)
-        except AttributeError:
-            return None
-
 
 class IFilterClause(object):
     def addClause(self, clause):
@@ -151,7 +139,7 @@
 
 class HasFilterClause(SettingFilterClause):
     def pageMatches(self, fil, page):
-        actual_value = fil.value_accessor(page, self.name)
+        actual_value = page.config.get(self.name)
         if actual_value is None or not isinstance(actual_value, list):
             return False
 
@@ -163,7 +151,7 @@
 
 class IsFilterClause(SettingFilterClause):
     def pageMatches(self, fil, page):
-        actual_value = fil.value_accessor(page, self.name)
+        actual_value = page.config.get(self.name)
         if self.coercer:
             actual_value = self.coercer(actual_value)
         return actual_value == self.value
--- a/piecrust/data/linker.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/linker.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,16 +1,18 @@
 import logging
-import collections
-from piecrust.data.pagedata import LazyPageConfigLoaderHasNoValue
 from piecrust.data.paginationdata import PaginationData
-from piecrust.dataproviders.page_iterator import PageIterator
+from piecrust.sources.base import (
+    REL_PARENT_GROUP, REL_LOGICAL_PARENT_ITEM, REL_LOGICAl_CHILD_GROUP)
 
 
 logger = logging.getLogger(__name__)
 
 
-class PageLinkerData(object):
-    """ Entry template data to get access to related pages from a given
-        root page.
+_unloaded = object()
+
+
+class Linker:
+    """ A template-exposed data class that lets the user navigate the
+        logical hierarchy of pages in a page source.
     """
     debug_render = ['parent', 'ancestors', 'siblings', 'children', 'root',
                     'forpath']
@@ -22,68 +24,68 @@
         'children': '_debugRenderChildren',
         'root': '_debugRenderRoot'}
 
-    def __init__(self, source, page_path):
-        self._source = source
-        self._root_page_path = page_path
-        self._linker = None
-        self._is_loaded = False
+    def __init__(self, page):
+        self._page = page
+        self._content_item = page.content_item
+        self._source = page.source
+        self._app = page.app
+
+        self._parent = _unloaded
+        self._ancestors = None
+        self._siblings = None
+        self._children = None
 
     @property
     def parent(self):
-        self._load()
-        if self._linker is not None:
-            return self._linker.parent
-        return None
+        if self._parent is _unloaded:
+            pi = self._source.getRelatedContents(self._content_item,
+                                                 REL_LOGICAL_PARENT_ITEM)
+            if pi is not None:
+                pipage = self._app.getPage(self._source, pi)
+                self._parent = PaginationData(pipage)
+            else:
+                self._parent = None
+        return self._parent
 
     @property
     def ancestors(self):
-        cur = self.parent
-        while cur:
-            yield cur
-            cur = cur.parent
+        if self._ancestors is None:
+            cur_item = self._content_item
+            self._ancestors = []
+            while True:
+                pi = self._source.getRelatedContents(
+                    cur_item, REL_LOGICAL_PARENT_ITEM)
+                if pi is not None:
+                    pipage = self._app.getPage(self._source, pi)
+                    self._ancestors.append(PaginationData(pipage))
+                    cur_item = pi
+                else:
+                    break
+        return self._ancestors
 
     @property
     def siblings(self):
-        self._load()
-        if self._linker is None:
-            return []
-        return self._linker
+        if self._siblings is None:
+            self._siblings = []
+            parent_group = self._source.getRelatedContents(
+                self._content_item, REL_PARENT_GROUP)
+            for i in self._source.getContents(parent_group):
+                if not i.is_group:
+                    ipage = self._app.getPage(self._source, i)
+                    self._siblings.append(PaginationData(ipage))
+        return self._siblings
 
     @property
     def children(self):
-        self._load()
-        if self._linker is None:
-            return []
-        self._linker._load()
-        if self._linker._self_item is None:
-            return []
-        children = self._linker._self_item._linker_info.child_linker
-        if children is None:
-            return []
-        return children
-
-    @property
-    def root(self):
-        self._load()
-        if self._linker is None:
-            return None
-        return self._linker.root
-
-    def forpath(self, rel_path):
-        self._load()
-        if self._linker is None:
-            return None
-        return self._linker.forpath(rel_path)
-
-    def _load(self):
-        if self._is_loaded:
-            return
-
-        self._is_loaded = True
-
-        dir_path = self._source.getDirpath(self._root_page_path)
-        self._linker = Linker(self._source, dir_path,
-                              root_page_path=self._root_page_path)
+        if self._children is None:
+            self._children = []
+            child_group = self._source.getRelatedContents(
+                self._content_item, REL_LOGICAl_CHILD_GROUP)
+            if child_group:
+                for i in self._source.getContents(child_group):
+                    ipage = self._app.getPage(self._source, i)
+                    self._children.append(PaginationData(ipage))
+        return self._children
 
     def _debugRenderAncestors(self):
         return [i.name for i in self.ancestors]
@@ -100,257 +102,3 @@
             return r.name
         return None
 
-
-class LinkedPageData(PaginationData):
-    """ Class whose instances get returned when iterating on a `Linker`
-        or `RecursiveLinker`. It's just like what gets usually returned by
-        `Paginator` and other page iterators, but with a few additional data
-        like hierarchical data.
-    """
-    debug_render = (['is_dir', 'is_self', 'parent', 'children'] +
-                    PaginationData.debug_render)
-    debug_render_invoke = (['is_dir', 'is_self', 'parent', 'children'] +
-                           PaginationData.debug_render_invoke)
-
-    def __init__(self, page):
-        super(LinkedPageData, self).__init__(page)
-        self.name = page._linker_info.name
-        self.is_self = page._linker_info.is_self
-        self.is_dir = page._linker_info.is_dir
-        self.is_page = True
-        self._child_linker = page._linker_info.child_linker
-
-        self._mapLoader('*', self._linkerChildLoader)
-
-    @property
-    def parent(self):
-        if self._child_linker is not None:
-            return self._child_linker.parent
-        return None
-
-    @property
-    def children(self):
-        if self._child_linker is not None:
-            return self._child_linker
-        return []
-
-    def _linkerChildLoader(self, data, name):
-        if self.children and hasattr(self.children, name):
-            return getattr(self.children, name)
-        raise LazyPageConfigLoaderHasNoValue
-
-
-class LinkedPageDataBuilderIterator(object):
-    """ Iterator that builds `LinkedPageData` out of pages.
-    """
-    def __init__(self, it):
-        self.it = it
-
-    def __iter__(self):
-        for item in self.it:
-            yield LinkedPageData(item)
-
-
-class LinkerSource(IPaginationSource):
-    """ Source iterator that returns pages given by `Linker`.
-    """
-    def __init__(self, pages, orig_source):
-        self._pages = list(pages)
-        self._orig_source = None
-        if isinstance(orig_source, IPaginationSource):
-            self._orig_source = orig_source
-
-    def getItemsPerPage(self):
-        raise NotImplementedError()
-
-    def getSourceIterator(self):
-        return self._pages
-
-    def getSorterIterator(self, it):
-        # We don't want to sort the pages -- we expect the original source
-        # to return hierarchical items in the order it wants already.
-        return None
-
-    def getTailIterator(self, it):
-        return LinkedPageDataBuilderIterator(it)
-
-    def getPaginationFilter(self, page):
-        return None
-
-    def getSettingAccessor(self):
-        if self._orig_source:
-            return self._orig_source.getSettingAccessor()
-        return None
-
-
-class _LinkerInfo(object):
-    def __init__(self):
-        self.name = None
-        self.is_dir = False
-        self.is_self = False
-        self.child_linker = None
-
-
-class _LinkedPage(object):
-    def __init__(self, page):
-        self._page = page
-        self._linker_info = _LinkerInfo()
-
-    def __getattr__(self, name):
-        return getattr(self._page, name)
-
-
-class Linker(object):
-    debug_render_doc = """Provides access to sibling and children pages."""
-
-    def __init__(self, source, dir_path, *, root_page_path=None):
-        self._source = source
-        self._dir_path = dir_path
-        self._root_page_path = root_page_path
-        self._items = None
-        self._parent = None
-        self._self_item = None
-
-        self.is_dir = True
-        self.is_page = False
-        self.is_self = False
-
-    def __iter__(self):
-        return iter(self.pages)
-
-    def __getattr__(self, name):
-        self._load()
-        try:
-            item = self._items[name]
-        except KeyError:
-            raise AttributeError()
-
-        if isinstance(item, Linker):
-            return item
-
-        return LinkedPageData(item)
-
-    def __str__(self):
-        return self.name
-
-    @property
-    def name(self):
-        return self._source.getBasename(self._dir_path)
-
-    @property
-    def children(self):
-        return self._iterItems(0)
-
-    @property
-    def parent(self):
-        if self._dir_path == '':
-            return None
-
-        if self._parent is None:
-            parent_name = self._source.getBasename(self._dir_path)
-            parent_dir_path = self._source.getDirpath(self._dir_path)
-            for is_dir, name, data in self._source.listPath(parent_dir_path):
-                if not is_dir and name == parent_name:
-                    parent_page = data.buildPage()
-                    item = _LinkedPage(parent_page)
-                    item._linker_info.name = parent_name
-                    item._linker_info.child_linker = Linker(
-                        self._source, parent_dir_path,
-                        root_page_path=self._root_page_path)
-                    self._parent = LinkedPageData(item)
-                    break
-            else:
-                self._parent = Linker(self._source, parent_dir_path,
-                                      root_page_path=self._root_page_path)
-
-        return self._parent
-
-    @property
-    def pages(self):
-        return self._iterItems(0, filter_page_items)
-
-    @property
-    def directories(self):
-        return self._iterItems(0, filter_directory_items)
-
-    @property
-    def all(self):
-        return self._iterItems()
-
-    @property
-    def allpages(self):
-        return self._iterItems(-1, filter_page_items)
-
-    @property
-    def alldirectories(self):
-        return self._iterItems(-1, filter_directory_items)
-
-    @property
-    def root(self):
-        return self.forpath('/')
-
-    def forpath(self, rel_path):
-        return Linker(self._source, rel_path,
-                      root_page_path=self._root_page_path)
-
-    def _iterItems(self, max_depth=-1, filter_func=None):
-        items = walk_linkers(self, max_depth=max_depth,
-                             filter_func=filter_func)
-        src = LinkerSource(items, self._source)
-        return PageIterator(src)
-
-    def _load(self):
-        if self._items is not None:
-            return
-
-        items = list(self._source.listPath(self._dir_path))
-        self._items = collections.OrderedDict()
-        for is_dir, name, data in items:
-            # If `is_dir` is true, `data` will be the directory's source
-            # path. If not, it will be a page factory.
-            if is_dir:
-                item = Linker(self._source, data,
-                              root_page_path=self._root_page_path)
-            else:
-                page = data.buildPage()
-                is_self = (page.rel_path == self._root_page_path)
-                item = _LinkedPage(page)
-                item._linker_info.name = name
-                item._linker_info.is_self = is_self
-                if is_self:
-                    self._self_item = item
-
-            existing = self._items.get(name)
-            if existing is None:
-                self._items[name] = item
-            elif is_dir:
-                # The current item is a directory. The existing item
-                # should be a page.
-                existing._linker_info.child_linker = item
-                existing._linker_info.is_dir = True
-            else:
-                # The current item is a page. The existing item should
-                # be a directory.
-                item._linker_info.child_linker = existing
-                item._linker_info.is_dir = True
-                self._items[name] = item
-
-
-def filter_page_items(item):
-    return not isinstance(item, Linker)
-
-
-def filter_directory_items(item):
-    return isinstance(item, Linker)
-
-
-def walk_linkers(linker, depth=0, max_depth=-1, filter_func=None):
-    linker._load()
-    for item in linker._items.values():
-        if not filter_func or filter_func(item):
-            yield item
-
-        if (isinstance(item, Linker) and
-                (max_depth < 0 or depth + 1 <= max_depth)):
-            yield from walk_linkers(item, depth + 1, max_depth)
-
--- a/piecrust/data/pagedata.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/pagedata.py	Sun Jun 04 23:34:28 2017 -0700
@@ -76,7 +76,7 @@
                 logger.exception(ex)
                 raise Exception(
                     "Error while loading attribute '%s' for: %s" %
-                    (name, self._page.rel_path)) from ex
+                    (name, self._page.content_spec)) from ex
 
             # Forget this loader now that it served its purpose.
             try:
@@ -96,11 +96,14 @@
                 logger.exception(ex)
                 raise Exception(
                     "Error while loading attribute '%s' for: %s" %
-                    (name, self._page.rel_path)) from ex
+                    (name, self._page.content_spec)) from ex
             # We always keep the wildcard loader in the loaders list.
-            return self._values[name]
+            try:
+                return self._values[name]
+            except KeyError:
+                pass
 
-        raise LazyPageConfigLoaderHasNoValue("No such value: %s" % name)
+        raise LazyPageConfigLoaderHasNoValue()
 
     def _setValue(self, name, value):
         self._values[name] = value
@@ -136,7 +139,7 @@
             logger.exception(ex)
             raise Exception(
                 "Error while loading data for: %s" %
-                self._page.rel_path) from ex
+                self._page.content_spec) from ex
 
     def _load(self):
         pass
@@ -162,7 +165,7 @@
         dt = page.datetime
         for k, v in page.source_metadata.items():
             self._setValue(k, v)
-        self._setValue('url', self._ctx.uri)
+        self._setValue('url', self._page.getUri(self._ctx.sub_num))
         self._setValue('timestamp', time.mktime(dt.timetuple()))
         self._setValue('datetime', {
             'year': dt.year, 'month': dt.month, 'day': dt.day,
--- a/piecrust/data/paginationdata.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/paginationdata.py	Sun Jun 04 23:34:28 2017 -0700
@@ -49,7 +49,7 @@
         assert self is data
 
         if do_render:
-            uri = self.getUri()
+            uri = self._page.getUri()
             try:
                 from piecrust.rendering import (
                     RenderingContext, render_page_segments)
--- a/piecrust/data/paginator.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/paginator.py	Sun Jun 04 23:34:28 2017 -0700
@@ -90,6 +90,10 @@
         if ipp is not None:
             return ipp
 
+        ipp = self._source.config.get('items_per_page')
+        if ipp is not None:
+            return ipp
+
         raise Exception("No way to figure out how many items to display "
                         "per page.")
 
@@ -182,29 +186,34 @@
             return
 
         from piecrust.data.filters import PaginationFilter
-        from piecrust.dataproviders.page_iterator import PageIterator
-
-        pag_filter = PaginationFilter()
-        if self._pgn_filter is not None:
-            pag_filter.addClause(self._pgn_filter.root_clause)
+        from piecrust.dataproviders.pageiterator import (
+            PageIterator, HardCodedFilterIterator)
 
         self._iterator = PageIterator(
             self._source,
-            current_page=self._page,
-            pagination_filter=pag_filter,
-            locked=True)
-        self._iterator._iter_event += self._onIteration
+            current_page=self._page)
+        #self._iterator._iter_event += self._onIteration
+
+        if self._pgn_filter is not None:
+            pag_fil = PaginationFilter()
+            pag_fil.addClause(self._pgn_filter.root_clause)
+            self._iterator._simpleNonSortedWrap(
+                HardCodedFilterIterator, pag_fil)
 
         offset = (self._sub_num - 1) * self.items_per_page
         limit = self.items_per_page
         self._iterator.slice(offset, limit)
 
+        self._iterator._lockIterator()
+
+        self._onIteration(self._iterator)
+
     def _getPageUri(self, index):
         return self._page.getUri(index)
 
-    def _onIteration(self):
+    def _onIteration(self, it):
         if not self._pgn_set_on_ctx:
-            eis = self._page.app.env.exec_info_stack
-            eis.current_page_info.render_ctx.setPagination(self)
+            rcs = self._page.app.env.render_ctx_stack
+            rcs.current_ctx.setPagination(self)
             self._pgn_set_on_ctx = True
 
--- a/piecrust/data/providersdata.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/data/providersdata.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,5 +1,8 @@
 import re
 import collections.abc
+from piecrust.configuration import ConfigurationError
+from piecrust.dataproviders.base import (
+    DataProvider, build_data_provider)
 
 
 re_endpoint_sep = re.compile(r'[\/\.]')
@@ -27,15 +30,31 @@
             return
 
         self._dict = {}
-        for source in self._page.app.sources + self._page.app.generators:
-            if source.data_endpoint:
-                endpoint_bits = re_endpoint_sep.split(source.data_endpoint)
-                endpoint = self._dict
-                for e in endpoint_bits[:-1]:
-                    if e not in endpoint:
-                        endpoint[e] = {}
-                    endpoint = endpoint[e]
-                override = endpoint.get(endpoint_bits[-1])
-                provider = source.buildDataProvider(self._page, override)
-                if provider is not None:
-                    endpoint[endpoint_bits[-1]] = provider
+        for source in self._page.app.sources:
+            pname = source.config.get('data_type')
+            pendpoint = source.config.get('data_endpoint')
+            if not pname or not pendpoint:
+                continue
+
+            endpoint_bits = re_endpoint_sep.split(pendpoint)
+            endpoint = self._dict
+            for e in endpoint_bits[:-1]:
+                if e not in endpoint:
+                    endpoint[e] = {}
+                endpoint = endpoint[e]
+            existing = endpoint.get(endpoint_bits[-1])
+
+            if existing is None:
+                provider = build_data_provider(pname, source, self._page)
+                endpoint[endpoint_bits[-1]] = provider
+            elif isinstance(existing, DataProvider):
+                if existing.PROVIDER_NAME != pname:
+                    raise ConfigurationError(
+                        "Can't combine data providers '%s' and '%' on "
+                        "endpoint '%s'." %
+                        (existing.PROVIDER_NAME, pname, pendpoint))
+                existing._addSource(source)
+            else:
+                raise ConfigurationError(
+                    "Endpoint '%s' can't be used for a data provider because "
+                    "it's already used for something else." % pendpoint)
--- a/piecrust/dataproviders/base.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/dataproviders/base.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,10 +1,6 @@
 from piecrust.configuration import ConfigurationError
 
 
-class UnsupportedWrappedDataProviderError(Exception):
-    pass
-
-
 class DataProvider:
     """ The base class for a data provider.
     """
@@ -13,19 +9,27 @@
     debug_render_dynamic = []
     debug_render_invoke_dynamic = []
 
-    def __init__(self, source):
-        self._source = source
+    def __init__(self, source, page):
+        self._page = page
+        self._sources = []
+        if source is not None:
+            self._sources.append(source)
 
-    def _wrapDataProvider(self, provider):
-        raise UnsupportedWrappedDataProviderError()
+    def _addSource(self, source):
+        self._sources.append(source)
 
 
-def get_data_provider_class(app, provider_type):
+def build_data_provider(provider_type, source, page):
     if not provider_type:
         raise Exception("No data provider type specified.")
-    for prov in app.plugin_loader.getDataProviders():
-        if prov.PROVIDER_NAME == provider_type:
-            return prov
-    raise ConfigurationError(
-        "Unknown data provider type: %s" % provider_type)
 
+    for p in page.app.plugin_loader.getDataProviders():
+        if p.PROVIDER_NAME == provider_type:
+            pclass = p
+            break
+    else:
+        raise ConfigurationError("Unknown data provider type: %s" %
+                                 provider_type)
+
+    return pclass(source, page)
+
--- a/piecrust/dataproviders/blog.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/dataproviders/blog.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,7 +1,8 @@
 import time
 import collections.abc
 from piecrust.dataproviders.base import DataProvider
-from piecrust.generation.taxonomy import Taxonomy
+from piecrust.dataproviders.pageiterator import PageIterator
+from piecrust.sources.taxonomy import Taxonomy
 
 
 class BlogDataProvider(DataProvider, collections.abc.Mapping):
@@ -12,8 +13,8 @@
     debug_render_dynamic = (['_debugRenderTaxonomies'] +
                             DataProvider.debug_render_dynamic)
 
-    def __init__(self, source, page, override):
-        super(BlogDataProvider, self).__init__(source, page, override)
+    def __init__(self, source, page):
+        super().__init__(source, page)
         self._yearly = None
         self._monthly = None
         self._taxonomies = {}
@@ -72,15 +73,15 @@
             posts_this_year = yearly_index.get(year)
             if posts_this_year is None:
                 timestamp = time.mktime(
-                        (post.datetime.year, 1, 1, 0, 0, 0, 0, 0, -1))
+                    (post.datetime.year, 1, 1, 0, 0, 0, 0, 0, -1))
                 posts_this_year = BlogArchiveEntry(self._page, year, timestamp)
                 self._yearly.append(posts_this_year)
                 yearly_index[year] = posts_this_year
 
             posts_this_year._data_source.append(post)
         self._yearly = sorted(self._yearly,
-                key=lambda e: e.timestamp,
-                reverse=True)
+                              key=lambda e: e.timestamp,
+                              reverse=True)
         self._onIteration()
         return self._yearly
 
@@ -93,19 +94,20 @@
             month = post.datetime.strftime('%B %Y')
 
             posts_this_month = next(
-                    filter(lambda m: m.name == month, self._monthly),
-                    None)
+                filter(lambda m: m.name == month, self._monthly),
+                None)
             if posts_this_month is None:
                 timestamp = time.mktime(
-                        (post.datetime.year, post.datetime.month, 1,
-                            0, 0, 0, 0, 0, -1))
-                posts_this_month = BlogArchiveEntry(self._page, month, timestamp)
+                    (post.datetime.year, post.datetime.month, 1,
+                     0, 0, 0, 0, 0, -1))
+                posts_this_month = BlogArchiveEntry(
+                    self._page, month, timestamp)
                 self._monthly.append(posts_this_month)
 
             posts_this_month._data_source.append(post)
         self._monthly = sorted(self._monthly,
-                key=lambda e: e.timestamp,
-                reverse=True)
+                               key=lambda e: e.timestamp,
+                               reverse=True)
         self._onIteration()
         return self._monthly
 
--- a/piecrust/dataproviders/page_iterator.py	Sun May 21 00:06:59 2017 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,345 +0,0 @@
-import logging
-from piecrust.data.filters import PaginationFilter
-from piecrust.data.paginationdata import PaginationData
-from piecrust.events import Event
-from piecrust.dataproviders.base import DataProvider
-from piecrust.sources.base import AbortedSourceUseError
-
-
-logger = logging.getLogger(__name__)
-
-
-class PageIteratorDataProvider(DataProvider):
-    """ A data provider that reads a content source as a list of pages.
-
-        This class supports wrapping another `PageIteratorDataProvider`
-        instance because several sources may want to be merged under the
-        same data endpoint (e.g. `site.pages` which lists both the user
-        pages and the theme pages).
-    """
-    PROVIDER_NAME = 'page_iterator'
-
-    debug_render_doc_dynamic = ['_debugRenderDoc']
-    debug_render_not_empty = True
-
-    def __init__(self, source, current_page=None):
-        super().__init__(source)
-        self._it = PageIterator(source, current_page=current_page)
-        self._it._iter_event += self._onIteration
-        self._innerProvider = None
-        self._iterated = False
-
-    def __len__(self):
-        res = len(self._it)
-        if self._innerProvider is not None:
-            res += len(self._innerProvider)
-        return res
-
-    def __iter__(self):
-        yield from self._it
-        if self._innerProvider is not None:
-            yield from self._innerProvider
-
-    def _onIteration(self):
-        if not self._iterated:
-            rcs = self._source.app.env.render_ctx_stack
-            rcs.current_ctx.addUsedSource(self._source.name)
-            self._iterated = True
-
-    def _debugRenderDoc(self):
-        return 'Provides a list of %d items' % len(self)
-
-
-class PageIterator:
-    def __init__(self, source, *,
-                 current_page=None, locked=False):
-        self._source = source
-        self._cache = None
-        self._pagination_slicer = None
-        self._has_sorter = False
-        self._next_page = None
-        self._prev_page = None
-        self._locked = locked
-        self._iter_event = Event()
-        self._current_page = current_page
-        self._it = PageContentSourceIterator(self._source)
-
-    @property
-    def total_count(self):
-        self._load()
-        if self._pagination_slicer is not None:
-            return self._pagination_slicer.inner_count
-        return len(self._cache)
-
-    @property
-    def next_page(self):
-        self._load()
-        return self._next_page
-
-    @property
-    def prev_page(self):
-        self._load()
-        return self._prev_page
-
-    def __len__(self):
-        self._load()
-        return len(self._cache)
-
-    def __getitem__(self, key):
-        self._load()
-        return self._cache[key]
-
-    def __iter__(self):
-        self._load()
-        return iter(self._cache)
-
-    def __getattr__(self, name):
-        if name[:3] == 'is_' or name[:3] == 'in_':
-            def is_filter(value):
-                conf = {'is_%s' % name[3:]: value}
-                return self._simpleNonSortedWrap(SettingFilterIterator, conf)
-            return is_filter
-
-        if name[:4] == 'has_':
-            def has_filter(value):
-                conf = {name: value}
-                return self._simpleNonSortedWrap(SettingFilterIterator, conf)
-            return has_filter
-
-        if name[:5] == 'with_':
-            def has_filter(value):
-                conf = {'has_%s' % name[5:]: value}
-                return self._simpleNonSortedWrap(SettingFilterIterator, conf)
-            return has_filter
-
-        return self.__getattribute__(name)
-
-    def skip(self, count):
-        return self._simpleWrap(SliceIterator, count)
-
-    def limit(self, count):
-        return self._simpleWrap(SliceIterator, 0, count)
-
-    def slice(self, skip, limit):
-        return self._simpleWrap(SliceIterator, skip, limit)
-
-    def filter(self, filter_name):
-        if self._current_page is None:
-            raise Exception("Can't use `filter()` because no parent page was "
-                            "set for this page iterator.")
-        filter_conf = self._current_page.config.get(filter_name)
-        if filter_conf is None:
-            raise Exception("Couldn't find filter '%s' in the configuration "
-                            "header for page: %s" %
-                            (filter_name, self._current_page.path))
-        return self._simpleNonSortedWrap(SettingFilterIterator, filter_conf)
-
-    def sort(self, setting_name, reverse=False):
-        if not setting_name:
-            raise Exception("You need to specify a configuration setting "
-                            "to sort by.")
-        self._ensureUnlocked()
-        self._ensureUnloaded()
-        self._pages = SettingSortIterator(self._pages, setting_name, reverse)
-        self._has_sorter = True
-        return self
-
-    def reset(self):
-        self._ensureUnlocked()
-        self._unload()
-        return self
-
-    @property
-    def _is_loaded(self):
-        return self._cache is not None
-
-    @property
-    def _has_more(self):
-        if self._cache is None:
-            return False
-        if self._pagination_slicer:
-            return self._pagination_slicer.has_more
-        return False
-
-    def _simpleWrap(self, it_class, *args, **kwargs):
-        self._ensureUnlocked()
-        self._ensureUnloaded()
-        self._ensureSorter()
-        self._it = it_class(self._it, *args, **kwargs)
-        if self._pagination_slicer is None and it_class is SliceIterator:
-            self._pagination_slicer = self._it
-            self._pagination_slicer.current_page = self._current_page
-        return self
-
-    def _simpleNonSortedWrap(self, it_class, *args, **kwargs):
-        self._ensureUnlocked()
-        self._ensureUnloaded()
-        self._it = it_class(self._it, *args, **kwargs)
-        return self
-
-    def _ensureUnlocked(self):
-        if self._locked:
-            raise Exception(
-                "This page iterator has been locked and can't be modified.")
-
-    def _ensureUnloaded(self):
-        if self._cache:
-            raise Exception(
-                "This page iterator has already been iterated upon and "
-                "can't be modified anymore.")
-
-    def _ensureSorter(self):
-        if self._has_sorter:
-            return
-        self._it = DateSortIterator(self._it, reverse=True)
-        self._has_sorter = True
-
-    def _unload(self):
-        self._it = PageContentSourceIterator(self._source)
-        self._cache = None
-        self._paginationSlicer = None
-        self._has_sorter = False
-        self._next_page = None
-        self._prev_page = None
-
-    def _load(self):
-        if self._cache is not None:
-            return
-
-        if self._source.app.env.abort_source_use:
-            if self._current_page is not None:
-                logger.debug("Aborting iteration of '%s' from: %s." %
-                             (self.source.name,
-                              self._current_page.content_spec))
-            else:
-                logger.debug("Aborting iteration of '%s'." %
-                             self._source.name)
-            raise AbortedSourceUseError()
-
-        self._ensureSorter()
-
-        tail_it = PaginationDataBuilderIterator(self._it, self._source.route)
-        self._cache = list(tail_it)
-
-        if (self._current_page is not None and
-                self._pagination_slicer is not None):
-            pn = [self._pagination_slicer.prev_page,
-                  self._pagination_slicer.next_page]
-            pn_it = PaginationDataBuilderIterator(iter(pn),
-                                                  self._source.route)
-            self._prev_page, self._next_page = (list(pn_it))
-
-        self._iter_event.fire()
-
-    def _debugRenderDoc(self):
-        return "Contains %d items" % len(self)
-
-
-class SettingFilterIterator:
-    def __init__(self, it, fil_conf):
-        self.it = it
-        self.fil_conf = fil_conf
-        self._fil = None
-
-    def __iter__(self):
-        if self._fil is None:
-            self._fil = PaginationFilter()
-            self._fil.addClausesFromConfig(self.fil_conf)
-
-        for i in self.it:
-            if self._fil.pageMatches(i):
-                yield i
-
-
-class SliceIterator:
-    def __init__(self, it, offset=0, limit=-1):
-        self.it = it
-        self.offset = offset
-        self.limit = limit
-        self.current_page = None
-        self.has_more = False
-        self.inner_count = -1
-        self.next_page = None
-        self.prev_page = None
-        self._cache = None
-
-    def __iter__(self):
-        if self._cache is None:
-            inner_list = list(self.it)
-            self.inner_count = len(inner_list)
-
-            if self.limit > 0:
-                self.has_more = self.inner_count > (self.offset + self.limit)
-                self._cache = inner_list[self.offset:self.offset + self.limit]
-            else:
-                self.has_more = False
-                self._cache = inner_list[self.offset:]
-
-            if self.current_page:
-                try:
-                    idx = inner_list.index(self.current_page)
-                except ValueError:
-                    idx = -1
-                if idx >= 0:
-                    if idx < self.inner_count - 1:
-                        self.next_page = inner_list[idx + 1]
-                    if idx > 0:
-                        self.prev_page = inner_list[idx - 1]
-
-        return iter(self._cache)
-
-
-class SettingSortIterator:
-    def __init__(self, it, name, reverse=False):
-        self.it = it
-        self.name = name
-        self.reverse = reverse
-
-    def __iter__(self):
-        return iter(sorted(self.it, key=self._key_getter,
-                           reverse=self.reverse))
-
-    def _key_getter(self, item):
-        key = item.config.get(item)
-        if key is None:
-            return 0
-        return key
-
-
-class DateSortIterator:
-    def __init__(self, it, reverse=True):
-        self.it = it
-        self.reverse = reverse
-
-    def __iter__(self):
-        return iter(sorted(self.it,
-                           key=lambda x: x.datetime, reverse=self.reverse))
-
-
-class PageContentSourceIterator:
-    def __init__(self, source):
-        self.source = source
-
-        # This is to permit recursive traversal of the
-        # iterator chain. It acts as the end.
-        self.it = None
-
-    def __iter__(self):
-        source = self.source
-        app = source.app
-        for item in source.getAllContents():
-            yield app.getPage(source, item)
-
-
-class PaginationDataBuilderIterator:
-    def __init__(self, it, route):
-        self.it = it
-        self.route = route
-
-    def __iter__(self):
-        for page in self.it:
-            if page is not None:
-                yield PaginationData(page)
-            else:
-                yield None
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/dataproviders/pageiterator.py	Sun Jun 04 23:34:28 2017 -0700
@@ -0,0 +1,373 @@
+import logging
+from piecrust.data.filters import PaginationFilter
+from piecrust.data.paginationdata import PaginationData
+from piecrust.events import Event
+from piecrust.dataproviders.base import DataProvider
+from piecrust.sources.base import AbortedSourceUseError
+
+
+logger = logging.getLogger(__name__)
+
+
+class _ItInfo:
+    def __init__(self):
+        self.it = None
+        self.iterated = False
+
+
+class PageIteratorDataProvider(DataProvider):
+    """ A data provider that reads a content source as a list of pages.
+
+        This class supports wrapping another `PageIteratorDataProvider`
+        instance because several sources may want to be merged under the
+        same data endpoint (e.g. `site.pages` which lists both the user
+        pages and the theme pages).
+    """
+    PROVIDER_NAME = 'page_iterator'
+
+    debug_render_doc_dynamic = ['_debugRenderDoc']
+    debug_render_not_empty = True
+
+    def __init__(self, source, page):
+        super().__init__(source, page)
+        self._its = None
+        self._app = source.app
+
+    def __len__(self):
+        self._load()
+        return sum([len(i.it) for i in self._its])
+
+    def __iter__(self):
+        self._load()
+        for i in self._its:
+            yield from i.it
+
+    def _load(self):
+        if self._its is not None:
+            return
+
+        self._its = []
+        for source in self._sources:
+            i = _ItInfo()
+            i.it = PageIterator(source, current_page=self._page)
+            i.it._iter_event += self._onIteration
+            self._its.append(i)
+
+    def _onIteration(self, it):
+        ii = next(filter(lambda i: i.it == it, self._its))
+        if not ii.iterated:
+            rcs = self._app.env.render_ctx_stack
+            rcs.current_ctx.addUsedSource(self._source.name)
+            ii.iterated = True
+
+    def _debugRenderDoc(self):
+        return 'Provides a list of %d items' % len(self)
+
+
+class PageIterator:
+    def __init__(self, source, *, current_page=None):
+        self._source = source
+        self._cache = None
+        self._pagination_slicer = None
+        self._has_sorter = False
+        self._next_page = None
+        self._prev_page = None
+        self._locked = False
+        self._iter_event = Event()
+        self._current_page = current_page
+        self._it = PageContentSourceIterator(self._source)
+
+    @property
+    def total_count(self):
+        self._load()
+        if self._pagination_slicer is not None:
+            return self._pagination_slicer.inner_count
+        return len(self._cache)
+
+    @property
+    def next_page(self):
+        self._load()
+        return self._next_page
+
+    @property
+    def prev_page(self):
+        self._load()
+        return self._prev_page
+
+    def __len__(self):
+        self._load()
+        return len(self._cache)
+
+    def __getitem__(self, key):
+        self._load()
+        return self._cache[key]
+
+    def __iter__(self):
+        self._load()
+        return iter(self._cache)
+
+    def __getattr__(self, name):
+        if name[:3] == 'is_' or name[:3] == 'in_':
+            def is_filter(value):
+                conf = {'is_%s' % name[3:]: value}
+                return self._simpleNonSortedWrap(SettingFilterIterator, conf)
+            return is_filter
+
+        if name[:4] == 'has_':
+            def has_filter(value):
+                conf = {name: value}
+                return self._simpleNonSortedWrap(SettingFilterIterator, conf)
+            return has_filter
+
+        if name[:5] == 'with_':
+            def has_filter(value):
+                conf = {'has_%s' % name[5:]: value}
+                return self._simpleNonSortedWrap(SettingFilterIterator, conf)
+            return has_filter
+
+        return self.__getattribute__(name)
+
+    def skip(self, count):
+        return self._simpleWrap(SliceIterator, count)
+
+    def limit(self, count):
+        return self._simpleWrap(SliceIterator, 0, count)
+
+    def slice(self, skip, limit):
+        return self._simpleWrap(SliceIterator, skip, limit)
+
+    def filter(self, filter_name):
+        if self._current_page is None:
+            raise Exception("Can't use `filter()` because no parent page was "
+                            "set for this page iterator.")
+        filter_conf = self._current_page.config.get(filter_name)
+        if filter_conf is None:
+            raise Exception("Couldn't find filter '%s' in the configuration "
+                            "header for page: %s" %
+                            (filter_name, self._current_page.path))
+        return self._simpleNonSortedWrap(SettingFilterIterator, filter_conf)
+
+    def sort(self, setting_name, reverse=False):
+        if not setting_name:
+            raise Exception("You need to specify a configuration setting "
+                            "to sort by.")
+        self._ensureUnlocked()
+        self._ensureUnloaded()
+        self._pages = SettingSortIterator(self._pages, setting_name, reverse)
+        self._has_sorter = True
+        return self
+
+    def reset(self):
+        self._ensureUnlocked()
+        self._unload()
+        return self
+
+    @property
+    def _is_loaded(self):
+        return self._cache is not None
+
+    @property
+    def _has_more(self):
+        if self._cache is None:
+            return False
+        if self._pagination_slicer:
+            return self._pagination_slicer.has_more
+        return False
+
+    def _simpleWrap(self, it_class, *args, **kwargs):
+        self._ensureUnlocked()
+        self._ensureUnloaded()
+        self._ensureSorter()
+        self._it = it_class(self._it, *args, **kwargs)
+        if self._pagination_slicer is None and it_class is SliceIterator:
+            self._pagination_slicer = self._it
+            self._pagination_slicer.current_page = self._current_page
+        return self
+
+    def _simpleNonSortedWrap(self, it_class, *args, **kwargs):
+        self._ensureUnlocked()
+        self._ensureUnloaded()
+        self._it = it_class(self._it, *args, **kwargs)
+        return self
+
+    def _lockIterator(self):
+        self._ensureUnlocked()
+        self._locked = True
+
+    def _ensureUnlocked(self):
+        if self._locked:
+            raise Exception(
+                "This page iterator has been locked and can't be modified.")
+
+    def _ensureUnloaded(self):
+        if self._cache:
+            raise Exception(
+                "This page iterator has already been iterated upon and "
+                "can't be modified anymore.")
+
+    def _ensureSorter(self):
+        if self._has_sorter:
+            return
+        self._it = DateSortIterator(self._it, reverse=True)
+        self._has_sorter = True
+
+    def _unload(self):
+        self._it = PageContentSourceIterator(self._source)
+        self._cache = None
+        self._paginationSlicer = None
+        self._has_sorter = False
+        self._next_page = None
+        self._prev_page = None
+
+    def _load(self):
+        if self._cache is not None:
+            return
+
+        if self._source.app.env.abort_source_use:
+            if self._current_page is not None:
+                logger.debug("Aborting iteration of '%s' from: %s." %
+                             (self._source.name,
+                              self._current_page.content_spec))
+            else:
+                logger.debug("Aborting iteration of '%s'." %
+                             self._source.name)
+            raise AbortedSourceUseError()
+
+        self._ensureSorter()
+
+        tail_it = PaginationDataBuilderIterator(self._it, self._source.route)
+        self._cache = list(tail_it)
+
+        if (self._current_page is not None and
+                self._pagination_slicer is not None):
+            pn = [self._pagination_slicer.prev_page,
+                  self._pagination_slicer.next_page]
+            pn_it = PaginationDataBuilderIterator(iter(pn),
+                                                  self._source.route)
+            self._prev_page, self._next_page = (list(pn_it))
+
+        self._iter_event.fire(self)
+
+    def _debugRenderDoc(self):
+        return "Contains %d items" % len(self)
+
+
+class SettingFilterIterator:
+    def __init__(self, it, fil_conf):
+        self.it = it
+        self.fil_conf = fil_conf
+        self._fil = None
+
+    def __iter__(self):
+        if self._fil is None:
+            self._fil = PaginationFilter()
+            self._fil.addClausesFromConfig(self.fil_conf)
+
+        for i in self.it:
+            if self._fil.pageMatches(i):
+                yield i
+
+
+class HardCodedFilterIterator:
+    def __init__(self, it, fil):
+        self.it = it
+        self._fil = fil
+
+    def __iter__(self):
+        for i in self.it:
+            if self._fil.pageMatches(i):
+                yield i
+
+
+class SliceIterator:
+    def __init__(self, it, offset=0, limit=-1):
+        self.it = it
+        self.offset = offset
+        self.limit = limit
+        self.current_page = None
+        self.has_more = False
+        self.inner_count = -1
+        self.next_page = None
+        self.prev_page = None
+        self._cache = None
+
+    def __iter__(self):
+        if self._cache is None:
+            inner_list = list(self.it)
+            self.inner_count = len(inner_list)
+
+            if self.limit > 0:
+                self.has_more = self.inner_count > (self.offset + self.limit)
+                self._cache = inner_list[self.offset:self.offset + self.limit]
+            else:
+                self.has_more = False
+                self._cache = inner_list[self.offset:]
+
+            if self.current_page:
+                try:
+                    idx = inner_list.index(self.current_page)
+                except ValueError:
+                    idx = -1
+                if idx >= 0:
+                    if idx < self.inner_count - 1:
+                        self.next_page = inner_list[idx + 1]
+                    if idx > 0:
+                        self.prev_page = inner_list[idx - 1]
+
+        return iter(self._cache)
+
+
+class SettingSortIterator:
+    def __init__(self, it, name, reverse=False):
+        self.it = it
+        self.name = name
+        self.reverse = reverse
+
+    def __iter__(self):
+        return iter(sorted(self.it, key=self._key_getter,
+                           reverse=self.reverse))
+
+    def _key_getter(self, item):
+        key = item.config.get(item)
+        if key is None:
+            return 0
+        return key
+
+
+class DateSortIterator:
+    def __init__(self, it, reverse=True):
+        self.it = it
+        self.reverse = reverse
+
+    def __iter__(self):
+        return iter(sorted(self.it,
+                           key=lambda x: x.datetime, reverse=self.reverse))
+
+
+class PageContentSourceIterator:
+    def __init__(self, source):
+        self.source = source
+
+        # This is to permit recursive traversal of the
+        # iterator chain. It acts as the end.
+        self.it = None
+
+    def __iter__(self):
+        source = self.source
+        app = source.app
+        for item in source.getAllContents():
+            yield app.getPage(source, item)
+
+
+class PaginationDataBuilderIterator:
+    def __init__(self, it, route):
+        self.it = it
+        self.route = route
+
+    def __iter__(self):
+        for page in self.it:
+            if page is not None:
+                yield PaginationData(page)
+            else:
+                yield None
+
--- a/piecrust/environment.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/environment.py	Sun Jun 04 23:34:28 2017 -0700
@@ -74,7 +74,6 @@
         self.render_ctx_stack = RenderingContextStack()
         self.fs_cache_only_for_main_page = False
         self.abort_source_use = False
-        self._default_layout_extensions = None
         self._stats = ExecutionStats()
 
     @property
--- a/piecrust/pipelines/_pagebaker.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/_pagebaker.py	Sun Jun 04 23:34:28 2017 -0700
@@ -25,6 +25,7 @@
         self.pretty_urls = app.config.get('site/pretty_urls')
         self._writer_queue = None
         self._writer = None
+        self._stats = app.env.stats
 
     def startWriterQueue(self):
         self._writer_queue = queue.Queue()
@@ -53,24 +54,21 @@
 
         return os.path.normpath(os.path.join(*bake_path))
 
-    def bake(self, qualified_page, prev_entry, dirty_source_names):
+    def bake(self, page, prev_entry, cur_entry, dirty_source_names):
         # Start baking the sub-pages.
         cur_sub = 1
         has_more_subs = True
-        sub_entries = []
-        pretty_urls = qualified_page.config.get(
-            'pretty_urls', self.pretty_urls)
+        pretty_urls = page.config.get('pretty_urls', self.pretty_urls)
 
         while has_more_subs:
-            sub_page = qualified_page.getSubPage(cur_sub)
-            sub_uri = sub_page.uri
+            sub_uri = page.getUri(sub_num=cur_sub)
             logger.debug("Baking '%s' [%d]..." % (sub_uri, cur_sub))
 
             out_path = self.getOutputPath(sub_uri, pretty_urls)
 
             # Create the sub-entry for the bake record.
             sub_entry = SubPagePipelineRecordEntry(sub_uri, out_path)
-            sub_entries.append(sub_entry)
+            cur_entry.subs.append(sub_entry)
 
             # Find a corresponding sub-entry in the previous bake record.
             prev_sub_entry = None
@@ -89,7 +87,7 @@
             do_bake = True
             if not force_this_sub:
                 try:
-                    in_path_time = qualified_page.path_mtime
+                    in_path_time = page.content_mtime
                     out_path_time = os.path.getmtime(out_path)
                     if out_path_time >= in_path_time:
                         do_bake = False
@@ -123,13 +121,11 @@
                         SubPagePipelineRecordEntry.FLAG_FORMATTING_INVALIDATED
 
                 logger.debug("  p%d -> %s" % (cur_sub, out_path))
-                rp = self._bakeSingle(qualified_page, cur_sub, out_path)
+                rp = self._bakeSingle(page, cur_sub, out_path)
             except Exception as ex:
                 logger.exception(ex)
-                page_rel_path = os.path.relpath(qualified_page.path,
-                                                self.app.root_dir)
                 raise BakingError("%s: error baking '%s'." %
-                                  (page_rel_path, sub_uri)) from ex
+                                  (page.content_spec, sub_uri)) from ex
 
             # Record what we did.
             sub_entry.flags |= SubPagePipelineRecordEntry.FLAG_BAKED
@@ -149,8 +145,7 @@
 
                 logger.debug("Copying page assets to: %s" % out_assets_dir)
                 _ensure_dir_exists(out_assets_dir)
-
-                qualified_page.source.buildAssetor(qualified_page, sub_uri).copyAssets(out_assets_dir)
+                # TODO: copy assets to out dir
 
             # Figure out if we have more work.
             has_more_subs = False
@@ -158,16 +153,14 @@
                 cur_sub += 1
                 has_more_subs = True
 
-        return sub_entries
+    def _bakeSingle(self, page, sub_num, out_path):
+        ctx = RenderingContext(page, sub_num=sub_num)
+        page.source.prepareRenderContext(ctx)
 
-    def _bakeSingle(self, qp, out_path):
-        ctx = RenderingContext(qp)
-        qp.source.prepareRenderContext(ctx)
-
-        with self.app.env.timerScope("PageRender"):
+        with self._stats.timerScope("PageRender"):
             rp = render_page(ctx)
 
-        with self.app.env.timerScope("PageSerialize"):
+        with self._stats.timerScope("PageSerialize"):
             if self._writer_queue is not None:
                 self._writer_queue.put_nowait((out_path, rp.content))
             else:
--- a/piecrust/pipelines/_pagerecords.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/_pagerecords.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,5 +1,5 @@
 import copy
-from piecrust.pipelines.records import RecordEntry
+from piecrust.pipelines.records import RecordEntry, get_flag_descriptions
 
 
 class SubPagePipelineRecordEntry:
@@ -44,6 +44,7 @@
     FLAG_NEW = 2**0
     FLAG_SOURCE_MODIFIED = 2**1
     FLAG_OVERRIDEN = 2**2
+    FLAG_COLLAPSED_FROM_LAST_RUN = 2**3
 
     def __init__(self):
         super().__init__()
@@ -52,6 +53,10 @@
         self.subs = []
 
     @property
+    def was_touched(self):
+        return (self.flags & self.FLAG_SOURCE_MODIFIED) != 0
+
+    @property
     def was_overriden(self):
         return (self.flags & self.FLAG_OVERRIDEN) != 0
 
@@ -67,16 +72,6 @@
         return False
 
     @property
-    def all_assets(self):
-        for sub in self.subs:
-            yield from sub.assets
-
-    @property
-    def all_out_paths(self):
-        for sub in self.subs:
-            yield sub.out_path
-
-    @property
     def has_any_error(self):
         if len(self.errors) > 0:
             return True
@@ -101,3 +96,36 @@
                     res |= pinfo.used_source_names
         return res
 
+    def getAllOutputPaths(self):
+        for o in self.subs:
+            yield o.out_path
+
+    def describe(self):
+        d = super().describe()
+        d['Flags'] = get_flag_descriptions(self.flags, flag_descriptions)
+        for i, sub in enumerate(self.subs):
+            d['Sub%02d' % i] = {
+                'URI': sub.out_uri,
+                'Path': sub.out_path,
+                'Flags': get_flag_descriptions(
+                    sub.flags, sub_flag_descriptions)
+            }
+        return d
+
+
+flag_descriptions = {
+    PagePipelineRecordEntry.FLAG_NEW: 'new',
+    PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED: 'touched',
+    PagePipelineRecordEntry.FLAG_OVERRIDEN: 'overriden',
+    PagePipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN: 'from last run'}
+
+
+sub_flag_descriptions = {
+    SubPagePipelineRecordEntry.FLAG_BAKED: 'baked',
+    SubPagePipelineRecordEntry.FLAG_FORCED_BY_SOURCE: 'forced by source',
+    SubPagePipelineRecordEntry.FLAG_FORCED_BY_NO_PREVIOUS: 'forced b/c new',
+    SubPagePipelineRecordEntry.FLAG_FORCED_BY_PREVIOUS_ERRORS:
+    'forced by errors',
+    SubPagePipelineRecordEntry.FLAG_FORMATTING_INVALIDATED:
+    'formatting invalidated'
+}
--- a/piecrust/pipelines/_procrecords.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/_procrecords.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,4 +1,5 @@
-from piecrust.pipelines.records import RecordEntry
+from piecrust.pipelines.records import (
+    RecordEntry, get_flag_descriptions)
 
 
 class AssetPipelineRecordEntry(RecordEntry):
@@ -12,6 +13,7 @@
         super().__init__()
         self.flags = self.FLAG_NONE
         self.proc_tree = None
+        self.out_paths = []
 
     @property
     def was_prepared(self):
@@ -33,8 +35,9 @@
 
     def describe(self):
         d = super().describe()
-        d['Flags'] = _get_flag_descriptions(self.flags)
+        d['Flags'] = get_flag_descriptions(self.flags, flag_descriptions)
         d['Processing Tree'] = _format_proc_tree(self.proc_tree, 20 * ' ')
+        d['Outputs'] = list(self.out_paths)
         return d
 
 
@@ -45,16 +48,6 @@
     AssetPipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN: 'from last run'}
 
 
-def _get_flag_descriptions(flags):
-    res = []
-    for k, v in flag_descriptions.items():
-        if flags & k:
-            res.append(v)
-    if res:
-        return ', '.join(res)
-    return 'none'
-
-
 def _format_proc_tree(tree, margin='', level=0):
     name, children = tree
     res = '%s%s+ %s\n' % (margin if level > 0 else '', level * '  ', name)
--- a/piecrust/pipelines/asset.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/asset.py	Sun Jun 04 23:34:28 2017 -0700
@@ -19,18 +19,18 @@
     PIPELINE_NAME = 'asset'
     RECORD_ENTRY_CLASS = AssetPipelineRecordEntry
 
-    def __init__(self, source):
+    def __init__(self, source, ppctx):
         if not isinstance(source, FSContentSourceBase):
             raise Exception(
                 "The asset pipeline only support file-system sources.")
 
-        super().__init__(source)
+        super().__init__(source, ppctx)
         self.enabled_processors = None
         self.ignore_patterns = []
         self._processors = None
         self._base_dir = source.fs_endpoint_path
 
-    def initialize(self, ctx):
+    def initialize(self):
         # Get the list of processors for this run.
         processors = self.app.plugin_loader.getProcessors()
         if self.enabled_processors is not None:
@@ -40,7 +40,7 @@
                                                  self.enabled_processors)
 
         # Invoke pre-processors.
-        proc_ctx = ProcessorContext(self, ctx)
+        proc_ctx = ProcessorContext(self)
         for proc in processors:
             proc.onPipelineStart(proc_ctx)
 
@@ -62,14 +62,15 @@
         stats.registerTimer('BuildProcessingTree', raise_if_registered=False)
         stats.registerTimer('RunProcessingTree', raise_if_registered=False)
 
-    def run(self, content_item, ctx, result):
+    def run(self, job, ctx, result):
         # See if we need to ignore this item.
-        rel_path = os.path.relpath(content_item.spec, self._base_dir)
+        rel_path = os.path.relpath(job.content_item.spec, self._base_dir)
         if re_matchany(rel_path, self.ignore_patterns):
             return
 
         record_entry = result.record_entry
         stats = self.app.env.stats
+        out_dir = self.ctx.out_dir
 
         # Build the processing tree for this job.
         with stats.timerScope('BuildProcessingTree'):
@@ -80,19 +81,19 @@
         # Prepare and run the tree.
         print_node(tree_root, recursive=True)
         leaves = tree_root.getLeaves()
-        record_entry.out_paths = [os.path.join(ctx.out_dir, l.path)
+        record_entry.out_paths = [os.path.join(out_dir, l.path)
                                   for l in leaves]
         record_entry.proc_tree = get_node_name_tree(tree_root)
         if tree_root.getProcessor().is_bypassing_structured_processing:
             record_entry.flags |= (
                 AssetPipelineRecordEntry.FLAG_BYPASSED_STRUCTURED_PROCESSING)
 
-        if ctx.force:
+        if self.ctx.force:
             tree_root.setState(STATE_DIRTY, True)
 
         with stats.timerScope('RunProcessingTree'):
             runner = ProcessingTreeRunner(
-                self._base_dir, self.tmp_dir, ctx.out_dir)
+                self._base_dir, self.tmp_dir, out_dir)
             if runner.processSubTree(tree_root):
                 record_entry.flags |= (
                     AssetPipelineRecordEntry.FLAG_PROCESSED)
@@ -118,9 +119,9 @@
                 cur.out_paths = list(prev.out_paths)
                 cur.errors = list(prev.errors)
 
-    def shutdown(self, ctx):
+    def shutdown(self):
         # Invoke post-processors.
-        proc_ctx = ProcessorContext(self, ctx)
+        proc_ctx = ProcessorContext(self)
         for proc in self._processors:
             proc.onPipelineEnd(proc_ctx)
 
--- a/piecrust/pipelines/base.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/base.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,5 +1,7 @@
 import os.path
 import logging
+from werkzeug.utils import cached_property
+from piecrust.configuration import ConfigurationError
 
 
 logger = logging.getLogger(__name__)
@@ -8,10 +10,9 @@
 class PipelineContext:
     """ The context for running a content pipeline.
     """
-    def __init__(self, out_dir, record_history, *,
+    def __init__(self, out_dir, *,
                  worker_id=-1, force=None):
         self.out_dir = out_dir
-        self.record_history = record_history
         self.worker_id = worker_id
         self.force = force
 
@@ -30,28 +31,73 @@
         """
         return self.worker_id < 0
 
-    @property
-    def current_record(self):
-        return self.record_history.current
+
+class PipelineJob:
+    """ Base class for a pipline baking job.
+    """
+    def __init__(self, pipeline, content_item):
+        self.source_name = pipeline.source.name
+        self.record_name = pipeline.record_name
+        self.content_item = content_item
+        self.data = {}
 
 
-class PipelineResult:
+class PipelineJobRunContext:
+    """ Context for running pipeline baking jobs.
+    """
+    def __init__(self, job, pipeline, record_histories):
+        self.record_histories = record_histories
+        self._job_item_spec = job.content_item.spec
+        self._record_name = pipeline.record_name
+
+    @cached_property
+    def previous_record(self):
+        return self.record_histories.getPreviousRecord(self._record_name)
+
+    @cached_property
+    def previous_entry(self):
+        return self.previous_record.getEntry(self._job_item_spec)
+
+
+class PipelineJobResult:
     """ Result of running a pipeline on a content item.
     """
     def __init__(self):
-        self.pipeline_name = None
         self.record_entry = None
+        self.next_pass_job = None
+
+
+class PipelineMergeRecordContext:
+    """ The context for merging a record entry for a second or higher pass
+        into the bake record.
+    """
+    def __init__(self, record, job, pass_num):
+        self.record = record
+        self.job = job
+        self.pass_num = pass_num
+
+
+class PipelineDeletionContext:
+    def __init__(self, record_history):
+        self.record_history = record_history
+
+
+class PipelineCollapseRecordContext:
+    def __init__(self, record_history):
+        self.record_history = record_history
 
 
 class ContentPipeline:
     """ A pipeline that processes content from a `ContentSource`.
     """
     PIPELINE_NAME = None
-    PIPELINE_PASSES = 1
     RECORD_ENTRY_CLASS = None
+    PASS_NUM = 0
 
-    def __init__(self, source):
+    def __init__(self, source, ctx):
         self.source = source
+        self.ctx = ctx
+        self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME)
 
         app = source.app
         tmp_dir = app.cache_dir
@@ -64,10 +110,27 @@
     def app(self):
         return self.source.app
 
-    def initialize(self, ctx):
+    def initialize(self):
         pass
 
-    def run(self, content_item, ctx, result):
+    def createJobs(self):
+        return [
+            self.createJob(item)
+            for item in self.source.getAllContents()]
+
+    def createJob(self, content_item):
+        return PipelineJob(self, content_item)
+
+    def createRecordEntry(self, job):
+        entry_class = self.RECORD_ENTRY_CLASS
+        record_entry = entry_class()
+        record_entry.item_spec = job.content_item.spec
+        return record_entry
+
+    def mergeRecordEntry(self, record_entry, ctx):
+        raise NotImplementedError()
+
+    def run(self, job, ctx, result):
         raise NotImplementedError()
 
     def getDeletions(self, ctx):
@@ -76,5 +139,100 @@
     def collapseRecords(self, ctx):
         pass
 
-    def shutdown(self, ctx):
+    def shutdown(self):
         pass
+
+
+def get_pipeline_name_for_source(source):
+    pname = source.config['pipeline']
+    if not pname:
+        pname = source.DEFAULT_PIPELINE_NAME
+    if not pname:
+        raise ConfigurationError(
+            "Source '%s' doesn't specify any pipeline." % source.name)
+    return pname
+
+
+class PipelineManager:
+    def __init__(self, app, out_dir, record_histories, *,
+                 worker_id=-1, force=False):
+        self.app = app
+        self.record_histories = record_histories
+        self.out_dir = out_dir
+        self.worker_id = worker_id
+        self.force = force
+
+        self._pipeline_classes = {}
+        for pclass in app.plugin_loader.getPipelines():
+            self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
+
+        self._pipelines = {}
+
+    def getPipeline(self, source_name):
+        return self._pipelines[source_name]
+
+    def getPipelines(self):
+        return self._pipelines.values()
+
+    def createPipeline(self, source):
+        if source.name in self._pipelines:
+            raise ValueError("Pipeline for source '%s' was already created." %
+                             source.name)
+
+        pname = get_pipeline_name_for_source(source)
+        ppctx = PipelineContext(self.out_dir,
+                                worker_id=self.worker_id, force=self.force)
+        pp = self._pipeline_classes[pname](source, ppctx)
+        pp.initialize()
+
+        record_history = self.record_histories.getHistory(pp.record_name)
+
+        info = _PipelineInfo(pp, record_history)
+        self._pipelines[source.name] = info
+        return info
+
+    def buildHistoryDiffs(self):
+        for ppinfo in self.getPipelines():
+            ppinfo.record_history.build()
+
+    def deleteStaleOutputs(self):
+        for ppinfo in self.getPipelines():
+            ctx = PipelineDeletionContext(ppinfo.record_history)
+            to_delete = ppinfo.pipeline.getDeletions(ctx)
+            current_record = ppinfo.record_history.current
+            if to_delete is not None:
+                for path, reason in to_delete:
+                    logger.debug("Removing '%s': %s" % (path, reason))
+                    current_record.deleted_out_paths.append(path)
+                    try:
+                        os.remove(path)
+                    except FileNotFoundError:
+                        pass
+                    logger.info('[delete] %s' % path)
+
+    def collapseRecords(self):
+        for ppinfo in self.getPipelines():
+            ctx = PipelineCollapseRecordContext(ppinfo.record_history)
+            ppinfo.pipeline.collapseRecords(ctx)
+
+    def shutdownPipelines(self):
+        for ppinfo in self.getPipelines():
+            ppinfo.pipeline.shutdown()
+
+        self._pipelines = {}
+
+
+class _PipelineInfo:
+    def __init__(self, pipeline, record_history):
+        self.pipeline = pipeline
+        self.record_history = record_history
+        self.userdata = None
+
+    @property
+    def source(self):
+        return self.pipeline.source
+
+    @property
+    def pipeline_name(self):
+        return self.pipeline.PIPELINE_NAME
+
--- a/piecrust/pipelines/page.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/page.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,25 +1,48 @@
-import hashlib
+import logging
 from piecrust.pipelines.base import ContentPipeline
+from piecrust.pipelines._pagebaker import PageBaker
+from piecrust.pipelines._pagerecords import PagePipelineRecordEntry
+from piecrust.rendering import (
+    RenderingContext, render_page_segments)
+from piecrust.sources.base import AbortedSourceUseError
+
+
+logger = logging.getLogger(__name__)
 
 
 class PagePipeline(ContentPipeline):
     PIPELINE_NAME = 'page'
-    PIPELINE_PASSES = 3
+    RECORD_ENTRY_CLASS = PagePipelineRecordEntry
+
+    def __init__(self, source, ppctx):
+        super().__init__(source, ppctx)
+        self._pagebaker = None
+        self._stats = source.app.env.stats
 
-    def initialize(self, ctx):
-        pass
+    def initialize(self):
+        stats = self.app.env.stats
+        stats.registerCounter('SourceUseAbortions', raise_if_registered=False)
 
-    def run(self, content_item, ctx):
-        raise NotImplementedError()
+        self._pagebaker = PageBaker(self.app,
+                                    self.ctx.out_dir,
+                                    force=self.ctx.force)
+        self._pagebaker.startWriterQueue()
 
-    def shutdown(self, ctx):
-        pass
+    def mergeRecordEntry(self, record_entry, ctx):
+        existing = ctx.record.getEntry(record_entry.item_spec)
+        existing.errors += record_entry.errors
+        existing.flags |= record_entry.flags
+        existing.subs = record_entry.subs
 
-    def collapseRecords(self, record_history):
-        pass
+    def run(self, job, ctx, result):
+        pass_name = job.data.get('pass', 0)
+        if pass_name == 0:
+            self._renderSegmentsOrPostpone(job.content_item, ctx, result)
+        elif pass_name == 1:
+            self._fullRender(job.content_item, ctx, result)
 
-    def getDeletions(self, record_history):
-        for prev, cur in record_history.diffs():
+    def getDeletions(self, ctx):
+        for prev, cur in ctx.record_history.diffs:
             if prev and not cur:
                 for sub in prev.subs:
                     yield (sub.out_path, 'previous source file was removed')
@@ -30,344 +53,44 @@
                 for p in diff:
                     yield (p, 'source file changed outputs')
 
-
-JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)
-
-
-def _get_transition_key(path, extra_key=None):
-    key = path
-    if extra_key:
-        key += '+%s' % extra_key
-    return hashlib.md5(key.encode('utf8')).hexdigest()
-
+    def collapseRecords(self, ctx):
+        pass
 
-# def getOverrideEntry(self, path, uri):
-#     for pair in self.transitions.values():
-#         cur = pair[1]
-#         if cur and cur.path != path:
-#             for o in cur.subs:
-#                 if o.out_uri == uri:
-#                     return cur
-#     return None
-
-
-
-#        # Create the job handlers.
-#        job_handlers = {
-#            JOB_LOAD: LoadJobHandler(self.ctx),
-#            JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx),
-#            JOB_BAKE: BakeJobHandler(self.ctx)}
-#        for jt, jh in job_handlers.items():
-#            app.env.registerTimer(type(jh).__name__)
-#        self.job_handlers = job_handlers
-#
-#    def process(self, job):
-#        handler = self.job_handlers[job['type']]
-#        with self.ctx.app.env.timerScope(type(handler).__name__):
-#            return handler.handleJob(job['job'])
+    def shutdown(self):
+        self._pagebaker.stopWriterQueue()
 
-#    def _loadRealmPages(self, record_history, pool, factories):
-#        def _handler(res):
-#            # Create the record entry for this page.
-#            # This will also update the `dirty_source_names` for the record
-#            # as we add page files whose last modification times are later
-#            # than the last bake.
-#            record_entry = BakeRecordEntry(res['source_name'], res['path'])
-#            record_entry.config = res['config']
-#            record_entry.timestamp = res['timestamp']
-#            if res['errors']:
-#                record_entry.errors += res['errors']
-#                record_history.current.success = False
-#                self._logErrors(res['path'], res['errors'])
-#            record_history.addEntry(record_entry)
-#
-#        logger.debug("Loading %d realm pages..." % len(factories))
-#        with format_timed_scope(logger,
-#                                "loaded %d pages" % len(factories),
-#                                level=logging.DEBUG, colored=False,
-#                                timer_env=self.app.env,
-#                                timer_category='LoadJob'):
-#            jobs = []
-#            for fac in factories:
-#                job = {
-#                        'type': JOB_LOAD,
-#                        'job': save_factory(fac)}
-#                jobs.append(job)
-#            ar = pool.queueJobs(jobs, handler=_handler)
-#            ar.wait()
-#
-#    def _renderRealmPages(self, record_history, pool, factories):
-#        def _handler(res):
-#            entry = record_history.getCurrentEntry(res['path'])
-#            if res['errors']:
-#                entry.errors += res['errors']
-#                record_history.current.success = False
-#                self._logErrors(res['path'], res['errors'])
-#
-#        logger.debug("Rendering %d realm pages..." % len(factories))
-#        with format_timed_scope(logger,
-#                                "prepared %d pages" % len(factories),
-#                                level=logging.DEBUG, colored=False,
-#                                timer_env=self.app.env,
-#                                timer_category='RenderFirstSubJob'):
-#            jobs = []
-#            for fac in factories:
-#                record_entry = record_history.getCurrentEntry(fac.path)
-#                if record_entry.errors:
-#                    logger.debug("Ignoring %s because it had previous "
-#                                 "errors." % fac.ref_spec)
-#                    continue
-#
-#                # Make sure the source and the route exist for this page,
-#                # otherwise we add errors to the record entry and we'll skip
-#                # this page for the rest of the bake.
-#                source = self.app.getSource(fac.source.name)
-#                if source is None:
-#                    record_entry.errors.append(
-#                            "Can't get source for page: %s" % fac.ref_spec)
-#                    logger.error(record_entry.errors[-1])
-#                    continue
-#
-#                route = self.app.getSourceRoute(fac.source.name, fac.metadata)
-#                if route is None:
-#                    record_entry.errors.append(
-#                            "Can't get route for page: %s" % fac.ref_spec)
-#                    logger.error(record_entry.errors[-1])
-#                    continue
-#
-#                # All good, queue the job.
-#                route_index = self.app.routes.index(route)
-#                job = {
-#                        'type': JOB_RENDER_FIRST,
-#                        'job': {
-#                            'factory_info': save_factory(fac),
-#                            'route_index': route_index
-#                            }
-#                        }
-#                jobs.append(job)
-#
-#            ar = pool.queueJobs(jobs, handler=_handler)
-#            ar.wait()
-#
-#    def _bakeRealmPages(self, record_history, pool, realm, factories):
-#        def _handler(res):
-#            entry = record_history.getCurrentEntry(res['path'])
-#            entry.subs = res['sub_entries']
-#            if res['errors']:
-#                entry.errors += res['errors']
-#                self._logErrors(res['path'], res['errors'])
-#            if entry.has_any_error:
-#                record_history.current.success = False
-#            if entry.subs and entry.was_any_sub_baked:
-#                record_history.current.baked_count[realm] += 1
-#                record_history.current.total_baked_count[realm] += len(entry.subs)
-#
-#        logger.debug("Baking %d realm pages..." % len(factories))
-#        with format_timed_scope(logger,
-#                                "baked %d pages" % len(factories),
-#                                level=logging.DEBUG, colored=False,
-#                                timer_env=self.app.env,
-#                                timer_category='BakeJob'):
-#            jobs = []
-#            for fac in factories:
-#                job = self._makeBakeJob(record_history, fac)
-#                if job is not None:
-#                    jobs.append(job)
-#
-#            ar = pool.queueJobs(jobs, handler=_handler)
-#            ar.wait()
-#
+    def _renderSegmentsOrPostpone(self, content_item, ctx, result):
+        # Here our job is to render the page's segments so that they're
+        # cached in memory and on disk... unless we detect that the page
+        # is using some other sources, in which case we abort and we'll try
+        # again on the second pass.
+        logger.debug("Rendering segments for: %s" % content_item.spec)
+        record_entry = result.record_entry
+        stats = self.app.env.stats
 
+        page = self.app.getPage(self.source, content_item)
+        record_entry.config = page.config.getAll()
 
-#    def _makeBakeJob(self, record_history, fac):
-#        # Get the previous (if any) and current entry for this page.
-#        pair = record_history.getPreviousAndCurrentEntries(fac.path)
-#        assert pair is not None
-#        prev_entry, cur_entry = pair
-#        assert cur_entry is not None
-#
-#        # Ignore if there were errors in the previous passes.
-#        if cur_entry.errors:
-#            logger.debug("Ignoring %s because it had previous "
-#                         "errors." % fac.ref_spec)
-#            return None
-#
-#        # Build the route metadata and find the appropriate route.
-#        page = fac.buildPage()
-#        route_metadata = create_route_metadata(page)
-#        route = self.app.getSourceRoute(fac.source.name, route_metadata)
-#        assert route is not None
-#
-#        # Figure out if this page is overriden by another previously
-#        # baked page. This happens for example when the user has
-#        # made a page that has the same page/URL as a theme page.
-#        uri = route.getUri(route_metadata)
-#        override_entry = record_history.getOverrideEntry(page.path, uri)
-#        if override_entry is not None:
-#            override_source = self.app.getSource(
-#                    override_entry.source_name)
-#            if override_source.realm == fac.source.realm:
-#                cur_entry.errors.append(
-#                        "Page '%s' maps to URL '%s' but is overriden "
-#                        "by page '%s'." %
-#                        (fac.ref_spec, uri, override_entry.path))
-#                logger.error(cur_entry.errors[-1])
-#            cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN
-#            return None
-#
-#        route_index = self.app.routes.index(route)
-#        job = {
-#                'type': JOB_BAKE,
-#                'job': {
-#                        'factory_info': save_factory(fac),
-#                        'generator_name': None,
-#                        'generator_record_key': None,
-#                        'route_index': route_index,
-#                        'route_metadata': route_metadata,
-#                        'dirty_source_names': record_history.dirty_source_names
-#                        }
-#                }
-#        return job
-#
-#    def _handleDeletetions(self, record_history):
-#        logger.debug("Handling deletions...")
-#        for path, reason in record_history.getDeletions():
-#            logger.debug("Removing '%s': %s" % (path, reason))
-#            record_history.current.deleted.append(path)
-#            try:
-#                os.remove(path)
-#                logger.info('[delete] %s' % path)
-#            except OSError:
-#                # Not a big deal if that file had already been removed
-#                # by the user.
-#                pass
-#
-
-
+        rdrctx = RenderingContext(page)
+        self.app.env.abort_source_use = True
+        try:
+            render_page_segments(rdrctx)
+        except AbortedSourceUseError:
+            logger.debug("Page was aborted for using source: %s" %
+                         content_item.spec)
+            stats.stepCounter("SourceUseAbortions")
+        finally:
+            self.app.env.abort_source_use = False
 
-#def save_factory(fac):
-#    return {
-#        'source_name': fac.source.name,
-#        'rel_path': fac.rel_path,
-#        'metadata': fac.metadata}
-#
-#
-#def load_factory(app, info):
-#    source = app.getSource(info['source_name'])
-#    return PageFactory(source, info['rel_path'], info['metadata'])
-#
-#
-#class LoadJobHandler(JobHandler):
-#    def handleJob(self, job):
-#        # Just make sure the page has been cached.
-#        fac = load_factory(self.app, job)
-#        logger.debug("Loading page: %s" % fac.ref_spec)
-#        self.app.env.addManifestEntry('LoadJobs', fac.ref_spec)
-#        result = {
-#            'source_name': fac.source.name,
-#            'path': fac.path,
-#            'config': None,
-#            'timestamp': None,
-#            'errors': None}
-#        try:
-#            page = fac.buildPage()
-#            page._load()
-#            result['config'] = page.config.getAll()
-#            result['timestamp'] = page.datetime.timestamp()
-#        except Exception as ex:
-#            logger.debug("Got loading error. Sending it to master.")
-#            result['errors'] = _get_errors(ex)
-#            if self.ctx.app.debug:
-#                logger.exception(ex)
-#        return result
-#
-#
-#class RenderFirstSubJobHandler(JobHandler):
-#    def handleJob(self, job):
-#        # Render the segments for the first sub-page of this page.
-#        fac = load_factory(self.app, job['factory_info'])
-#        self.app.env.addManifestEntry('RenderJobs', fac.ref_spec)
-#
-#        route_index = job['route_index']
-#        route = self.app.routes[route_index]
-#
-#        page = fac.buildPage()
-#        qp = QualifiedPage(page, route, route_metadata)
-#        ctx = RenderingContext(qp)
-#        self.app.env.abort_source_use = True
-#
-#        result = {
-#            'path': fac.path,
-#            'aborted': False,
-#            'errors': None}
-#        logger.debug("Preparing page: %s" % fac.ref_spec)
-#        try:
-#            render_page_segments(ctx)
-#        except AbortedSourceUseError:
-#            logger.debug("Page %s was aborted." % fac.ref_spec)
-#            self.app.env.stepCounter("SourceUseAbortions")
-#            result['aborted'] = True
-#        except Exception as ex:
-#            logger.debug("Got rendering error. Sending it to master.")
-#            result['errors'] = _get_errors(ex)
-#            if self.ctx.app.debug:
-#                logger.exception(ex)
-#        finally:
-#            self.app.env.abort_source_use = False
-#        return result
-#
-#
-#class BakeJobHandler(JobHandler):
-#    def __init__(self, ctx):
-#        super(BakeJobHandler, self).__init__(ctx)
-#        self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force)
-#
-#    def shutdown(self):
-#        self.page_baker.shutdown()
-#
-#    def handleJob(self, job):
-#        # Actually bake the page and all its sub-pages to the output folder.
-#        fac = load_factory(self.app, job['factory_info'])
-#        self.app.env.addManifestEntry('BakeJobs', fac.ref_spec)
-#
-#        route_index = job['route_index']
-#        route_metadata = job['route_metadata']
-#        route = self.app.routes[route_index]
-#
-#        gen_name = job['generator_name']
-#        gen_key = job['generator_record_key']
-#        dirty_source_names = job['dirty_source_names']
-#
-#        page = fac.buildPage()
-#        qp = QualifiedPage(page, route, route_metadata)
-#
-#        result = {
-#            'path': fac.path,
-#            'generator_name': gen_name,
-#            'generator_record_key': gen_key,
-#            'sub_entries': None,
-#            'errors': None}
-#
-#        if job.get('needs_config', False):
-#            result['config'] = page.config.getAll()
-#
-#        previous_entry = None
-#        if self.ctx.previous_record_index is not None:
-#            key = _get_transition_key(fac.path, gen_key)
-#            previous_entry = self.ctx.previous_record_index.get(key)
-#
-#        logger.debug("Baking page: %s" % fac.ref_spec)
-#        logger.debug("With route metadata: %s" % route_metadata)
-#        try:
-#            sub_entries = self.page_baker.bake(
-#                qp, previous_entry, dirty_source_names, gen_name)
-#            result['sub_entries'] = sub_entries
-#
-#        except Exception as ex:
-#            logger.debug("Got baking error. Sending it to master.")
-#            result['errors'] = _get_errors(ex)
-#            if self.ctx.app.debug:
-#                logger.exception(ex)
-#
-#        return result
-#
+        result.next_pass_job = self.createJob(content_item)
+        result.next_pass_job.data.update({
+            'pass': 1,
+            'record_entry': record_entry
+        })
+
+    def _fullRender(self, content_item, ctx, result):
+        logger.debug("Full render for: %s" % content_item.spec)
+        page = self.app.getPage(self.source, content_item)
+        prev_entry = ctx.previous_entry
+        cur_entry = result.record_entry
+        self._pagebaker.bake(page, prev_entry, cur_entry, [])
--- a/piecrust/pipelines/records.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/pipelines/records.py	Sun Jun 04 23:34:28 2017 -0700
@@ -9,8 +9,53 @@
 logger = logging.getLogger(__name__)
 
 
+class RecordEntry:
+    """ An entry in a record, for a specific content item.
+    """
+    def __init__(self):
+        self.item_spec = None
+        self.errors = []
+
+    @property
+    def success(self):
+        return len(self.errors) == 0
+
+    def describe(self):
+        return {}
+
+    def getAllOutputPaths(self):
+        return None
+
+    def getAllErrors(self):
+        return self.errors
+
+
+class Record:
+    """ A class that represents a 'record' of a bake operation on a
+        content source.
+    """
+    def __init__(self, name):
+        self.name = name
+        self.deleted_out_paths = []
+        self.success = True
+        self._entries = {}
+
+    def addEntry(self, entry):
+        if entry.item_spec in self._entries:
+            raise ValueError("Entry '%s' is already in the record." %
+                             entry.item_spec)
+        self._entries[entry.item_spec] = entry
+
+    def getEntries(self):
+        return self._entries.values()
+
+    def getEntry(self, item_spec):
+        return self._entries[item_spec]
+
+
 class MultiRecord:
-    """ A container that includes multiple `Record` instances.
+    """ A container that includes multiple `Record` instances -- one for
+        each content source that was baked.
     """
     RECORD_VERSION = 12
 
@@ -49,31 +94,14 @@
             return pickle.load(fp)
 
 
-class Record:
-    """ A basic class that represents a 'record' of a bake operation on a
-        content source.
-    """
-    def __init__(self, name):
-        self.name = name
-        self.entries = []
-        self.deleted_out_paths = []
-        self.success = True
-
-
-class RecordEntry:
-    """ An entry in a record, for a specific content item.
-    """
-    def __init__(self):
-        self.item_spec = None
-        self.out_paths = []
-        self.errors = []
-
-    @property
-    def success(self):
-        return len(self.errors) == 0
-
-    def describe(self):
-        return {}
+def get_flag_descriptions(flags, flag_descriptions):
+    res = []
+    for k, v in flag_descriptions.items():
+        if flags & k:
+            res.append(v)
+    if res:
+        return ', '.join(res)
+    return 'none'
 
 
 def _are_records_valid(multi_record):
@@ -106,58 +134,6 @@
     return multi_record
 
 
-def _build_diff_key(item_spec):
-    return hashlib.md5(item_spec.encode('utf8')).hexdigest()
-
-
-class MultiRecordHistory:
-    """ Tracks the differences between an 'old' and a 'new' record
-        container.
-    """
-    def __init__(self, previous, current):
-        if previous is None or current is None:
-            raise ValueError()
-
-        self.previous = previous
-        self.current = current
-        self.histories = []
-        self._buildHistories(previous, current)
-
-    def getHistory(self, record_name):
-        for h in self.histories:
-            if h.name == record_name:
-                return h
-        rh = RecordHistory(
-            Record(record_name),
-            Record(record_name))
-        self.histories.append(rh)
-        self.previous.records.append(rh.previous)
-        self.current.records.append(rh.current)
-        return rh
-
-    def _buildHistories(self, previous, current):
-        pairs = {}
-        if previous:
-            for r in previous.records:
-                pairs[r.name] = (r, None)
-        if current:
-            for r in current.records:
-                p = pairs.get(r.name, (None, None))
-                if p[1] is not None:
-                    raise Exception("Got several records named: %s" % r.name)
-                pairs[r.name] = (p[0], r)
-
-        for name, pair in pairs.items():
-            p, c = pair
-            if p is None:
-                p = Record(name)
-                previous.records.append(p)
-            if c is None:
-                c = Record(name)
-                current.records.append(c)
-            self.histories.append(RecordHistory(p, c))
-
-
 class RecordHistory:
     def __init__(self, previous, current):
         if previous is None or current is None:
@@ -190,18 +166,26 @@
             raise Exception("This record history hasn't been built yet.")
         return self._diffs.values()
 
+    def getPreviousEntry(self, item_spec):
+        key = _build_diff_key(item_spec)
+        return self._diffs[key][0]
+
+    def getCurrentEntry(self, item_spec):
+        key = _build_diff_key(item_spec)
+        return self._diffs[key][1]
+
     def build(self):
         if self._diffs is not None:
             raise Exception("This record history has already been built.")
 
         self._diffs = {}
         if self._previous is not None:
-            for e in self._previous.entries:
+            for e in self._previous.getEntries():
                 key = _build_diff_key(e.item_spec)
                 self._diffs[key] = (e, None)
 
         if self._current is not None:
-            for e in self._current.entries:
+            for e in self._current.getEntries():
                 key = _build_diff_key(e.item_spec)
                 diff = self._diffs.get(key)
                 if diff is None:
@@ -210,5 +194,65 @@
                     self._diffs[key] = (diff[0], e)
                 else:
                     raise Exception(
-                        "A current record entry already exists for: %s" % key)
+                        "A current record entry already exists for '%s' "
+                        "(%s)" % (key, diff[1].item_spec))
+
+
+class MultiRecordHistory:
+    """ Tracks the differences between an 'old' and a 'new' record
+        container.
+    """
+    def __init__(self, previous, current):
+        if previous is None or current is None:
+            raise ValueError()
+
+        self.previous = previous
+        self.current = current
+        self.histories = []
+        self._linkHistories(previous, current)
+
+    def getPreviousRecord(self, record_name, auto_create=True):
+        return self.previous.getRecord(record_name, auto_create=auto_create)
+
+    def getCurrentRecord(self, record_name):
+        return self.current.getRecord(record_name)
+
+    def getHistory(self, record_name):
+        for h in self.histories:
+            if h.name == record_name:
+                return h
 
+        rh = RecordHistory(
+            Record(record_name),
+            Record(record_name))
+        self.histories.append(rh)
+        self.previous.records.append(rh.previous)
+        self.current.records.append(rh.current)
+        return rh
+
+    def _linkHistories(self, previous, current):
+        pairs = {}
+        if previous:
+            for r in previous.records:
+                pairs[r.name] = (r, None)
+        if current:
+            for r in current.records:
+                p = pairs.get(r.name, (None, None))
+                if p[1] is not None:
+                    raise Exception("Got several records named: %s" % r.name)
+                pairs[r.name] = (p[0], r)
+
+        for name, pair in pairs.items():
+            p, c = pair
+            if p is None:
+                p = Record(name)
+                previous.records.append(p)
+            if c is None:
+                c = Record(name)
+                current.records.append(c)
+            self.histories.append(RecordHistory(p, c))
+
+
+def _build_diff_key(item_spec):
+    return hashlib.md5(item_spec.encode('utf8')).hexdigest()
+
--- a/piecrust/plugins/builtin.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/plugins/builtin.py	Sun Jun 04 23:34:28 2017 -0700
@@ -78,17 +78,22 @@
     def getPipelines(self):
         from piecrust.pipelines.page import PagePipeline
         from piecrust.pipelines.asset import AssetPipeline
+        from piecrust.sources.taxonomy import TaxonomyPipeline
+        from piecrust.sources.blogarchives import BlogArchivesPipeline
 
         return [
             PagePipeline,
-            AssetPipeline]
+            AssetPipeline,
+            TaxonomyPipeline,
+            BlogArchivesPipeline]
 
     def getDataProviders(self):
-        from piecrust.data.provider import (
-            IteratorDataProvider, BlogDataProvider)
+        from piecrust.dataproviders.pageiterator import \
+            PageIteratorDataProvider
+        from piecrust.dataproviders.blog import BlogDataProvider
 
         return [
-            IteratorDataProvider,
+            PageIteratorDataProvider,
             BlogDataProvider]
 
     def getTemplateEngines(self):
--- a/piecrust/processing/base.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/processing/base.py	Sun Jun 04 23:34:28 2017 -0700
@@ -14,11 +14,11 @@
 
 
 class ProcessorContext:
-    def __init__(self, pipeline, pipeline_ctx):
+    def __init__(self, pipeline):
         self.ignore_patterns = []
         self.extra_processors = []
         self._pipeline = pipeline
-        self._pipeline_ctx = pipeline_ctx
+        self._pipeline_ctx = pipeline.ctx
 
     @property
     def tmp_dir(self):
--- a/piecrust/processing/sitemap.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/processing/sitemap.py	Sun Jun 04 23:34:28 2017 -0700
@@ -3,7 +3,7 @@
 import time
 import logging
 import yaml
-from piecrust.dataproviders.page_iterator import PageIterator
+from piecrust.dataproviders.pageiterator import PageIterator
 from piecrust.processing.base import SimpleFileProcessor
 
 
--- a/piecrust/rendering.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/rendering.py	Sun Jun 04 23:34:28 2017 -0700
@@ -5,7 +5,6 @@
 from piecrust.data.builder import (
     DataBuildingContext, build_page_data, add_layout_data)
 from piecrust.fastpickle import _pickle_object, _unpickle_object
-from piecrust.sources.base import ContentSource
 from piecrust.templating.base import TemplateNotFoundError, TemplatingError
 
 
@@ -115,9 +114,8 @@
 
     def addUsedSource(self, source):
         self._raiseIfNoCurrentPass()
-        if isinstance(source, ContentSource):
-            pass_info = self.current_pass_info
-            pass_info.used_source_names.add(source.name)
+        pass_info = self.current_pass_info
+        pass_info.used_source_names.add(source.name)
 
     def _raiseIfNoCurrentPass(self):
         if self._current_pass == PASS_NONE:
@@ -159,6 +157,7 @@
 
 def render_page(ctx):
     env = ctx.app.env
+    stats = env.stats
 
     stack = env.render_ctx_stack
     stack.pushCtx(ctx)
@@ -168,7 +167,7 @@
 
     try:
         # Build the data for both segment and layout rendering.
-        with env.timerScope("BuildRenderData"):
+        with stats.timerScope("BuildRenderData"):
             page_data = _build_render_data(ctx)
 
         # Render content segments.
@@ -177,7 +176,7 @@
         save_to_fs = True
         if env.fs_cache_only_for_main_page and not stack.is_main_ctx:
             save_to_fs = False
-        with env.timerScope("PageRenderSegments"):
+        with stats.timerScope("PageRenderSegments"):
             if repo is not None and not ctx.force_render:
                 render_result = repo.get(
                     page_uri,
@@ -197,10 +196,10 @@
                 'default_layout', 'default')
         null_names = ['', 'none', 'nil']
         if layout_name not in null_names:
-            with ctx.app.env.timerScope("BuildRenderData"):
+            with stats.timerScope("BuildRenderData"):
                 add_layout_data(page_data, render_result['segments'])
 
-            with ctx.app.env.timerScope("PageRenderLayout"):
+            with stats.timerScope("PageRenderLayout"):
                 layout_result = _do_render_layout(
                     layout_name, page, page_data)
         else:
@@ -222,8 +221,8 @@
         if ctx.app.debug:
             raise
         logger.exception(ex)
-        page_rel_path = os.path.relpath(ctx.page.path, ctx.app.root_dir)
-        raise Exception("Error rendering page: %s" % page_rel_path) from ex
+        raise Exception("Error rendering page: %s" %
+                        ctx.page.content_spec) from ex
 
     finally:
         ctx.setCurrentPass(PASS_NONE)
@@ -232,6 +231,7 @@
 
 def render_page_segments(ctx):
     env = ctx.app.env
+    stats = env.stats
 
     stack = env.render_ctx_stack
     stack.pushCtx(ctx)
@@ -241,11 +241,13 @@
 
     try:
         ctx.setCurrentPass(PASS_FORMATTING)
-        repo = ctx.app.env.rendered_segments_repository
+        repo = env.rendered_segments_repository
+
         save_to_fs = True
-        if ctx.app.env.fs_cache_only_for_main_page and not stack.is_main_ctx:
+        if env.fs_cache_only_for_main_page and not stack.is_main_ctx:
             save_to_fs = False
-        with ctx.app.env.timerScope("PageRenderSegments"):
+
+        with stats.timerScope("PageRenderSegments"):
             if repo is not None and not ctx.force_render:
                 render_result = repo.get(
                     page_uri,
@@ -267,7 +269,7 @@
 
 
 def _build_render_data(ctx):
-    with ctx.app.env.timerScope("PageDataBuild"):
+    with ctx.app.env.stats.timerScope("PageDataBuild"):
         data_ctx = DataBuildingContext(ctx.page, ctx.sub_num)
         data_ctx.pagination_source = ctx.pagination_source
         data_ctx.pagination_filter = ctx.pagination_filter
@@ -297,10 +299,10 @@
         for seg_part in seg.parts:
             part_format = seg_part.fmt or format_name
             try:
-                with app.env.timerScope(
+                with app.env.stats.timerScope(
                         engine.__class__.__name__ + '_segment'):
                     part_text = engine.renderSegmentPart(
-                        page.path, seg_part, page_data)
+                        page.content_spec, seg_part, page_data)
             except TemplatingError as err:
                 err.lineno += seg_part.line
                 raise err
@@ -324,17 +326,15 @@
 
 
 def _do_render_layout(layout_name, page, layout_data):
-    cpi = page.app.env.exec_info_stack.current_page_info
-    assert cpi is not None
-    assert cpi.page == page
+    cur_ctx = page.app.env.render_ctx_stack.current_ctx
+    assert cur_ctx is not None
+    assert cur_ctx.page == page
 
     names = layout_name.split(',')
-    default_exts = page.app.env.default_layout_extensions
     full_names = []
     for name in names:
         if '.' not in name:
-            for ext in default_exts:
-                full_names.append(name + ext)
+            full_names.append(name + '.html')
         else:
             full_names.append(name)
 
@@ -343,7 +343,8 @@
     engine = get_template_engine(page.app, engine_name)
 
     try:
-        with page.app.env.timerScope(engine.__class__.__name__ + '_layout'):
+        with page.app.env.stats.timerScope(
+                engine.__class__.__name__ + '_layout'):
             output = engine.renderFile(full_names, layout_data)
     except TemplateNotFoundError as ex:
         logger.exception(ex)
@@ -351,7 +352,7 @@
         msg += "Looked for: %s" % ', '.join(full_names)
         raise Exception(msg) from ex
 
-    pass_info = cpi.render_ctx.render_passes[PASS_RENDERING]
+    pass_info = cur_ctx.render_passes[PASS_RENDERING]
     res = {'content': output, 'pass_info': _pickle_object(pass_info)}
     return res
 
@@ -376,7 +377,7 @@
         if not fmt.enabled:
             continue
         if fmt.FORMAT_NAMES is None or format_name in fmt.FORMAT_NAMES:
-            with app.env.timerScope(fmt.__class__.__name__):
+            with app.env.stats.timerScope(fmt.__class__.__name__):
                 txt = fmt.render(format_name, txt)
             format_count += 1
             if fmt.OUTPUT_FORMAT is not None:
--- a/piecrust/routing.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/routing.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,5 +1,6 @@
 import re
 import os.path
+import copy
 import logging
 import urllib.parse
 from werkzeug.utils import cached_property
@@ -41,9 +42,11 @@
     """
     def __init__(self, app, cfg):
         self.app = app
+        self.config = copy.deepcopy(cfg)
 
         self.source_name = cfg['source']
         self.uri_pattern = cfg['url'].lstrip('/')
+        self.pass_num = cfg['pass']
 
         self.supported_params = self.source.getSupportedRouteParameters()
 
@@ -230,7 +233,7 @@
             route_params[arg_name] = self._coerceRouteParameter(
                 arg_name, arg_val)
 
-        self.source.onRouteFunctionUsed(self, route_params)
+        self.source.onRouteFunctionUsed(route_params)
 
         return self.getUri(route_params)
 
@@ -310,3 +313,6 @@
 
     def __call__(self, *args, **kwargs):
         return self._route.execTemplateFunc(*args, **kwargs)
+
+    def _isCompatibleRoute(self, route):
+        return self._route.uri_pattern == route.uri_pattern
--- a/piecrust/serving/procloop.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/serving/procloop.py	Sun Jun 04 23:34:28 2017 -0700
@@ -7,8 +7,11 @@
 import itertools
 import threading
 from piecrust import CONFIG_PATH, THEME_CONFIG_PATH
-from piecrust.app import PieCrust
-from piecrust.processing.pipeline import ProcessorPipeline
+from piecrust.pipelines.base import (
+    PipelineJobCreateContext, PipelineJobRunContext, PipelineJobResult,
+    PipelineManager)
+from piecrust.pipelines.records import (
+    MultiRecord, MultiRecordHistory)
 
 
 logger = logging.getLogger(__name__)
@@ -74,25 +77,28 @@
         self._running = 2
 
 
+class _AssetProcessingInfo:
+    def __init__(self, source):
+        self.source = source
+        self.paths = set()
+        self.last_bake_time = time.time()
+
+
 class ProcessingLoop(threading.Thread):
     def __init__(self, appfactory, out_dir):
-        super(ProcessingLoop, self).__init__(
-                name='pipeline-reloader', daemon=True)
+        super().__init__(name='pipeline-reloader', daemon=True)
         self.appfactory = appfactory
         self.out_dir = out_dir
         self.last_status_id = 0
         self.interval = 1
-        self.app = None
-        self._roots = []
-        self._monitor_assets_root = False
-        self._paths = set()
-        self._record = None
-        self._last_bake = 0
+        self._app = None
+        self._proc_infos = None
+        self._last_records = None
         self._last_config_mtime = 0
         self._obs = []
         self._obs_lock = threading.Lock()
         config_name = (
-                THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
+            THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
         self._config_path = os.path.join(appfactory.root_dir, config_name)
 
     def addObserver(self, obs):
@@ -104,116 +110,162 @@
             self._obs.remove(obs)
 
     def run(self):
-        self._initPipeline()
+        self._init()
 
-        self._last_bake = time.time()
         self._last_config_mtime = os.path.getmtime(self._config_path)
-        self._record = self.pipeline.run()
 
         while True:
             cur_config_time = os.path.getmtime(self._config_path)
             if self._last_config_mtime < cur_config_time:
                 logger.info("Site configuration changed, reloading pipeline.")
                 self._last_config_mtime = cur_config_time
-                self._initPipeline()
-                for root in self._roots:
-                    self._runPipeline(root)
+                self._init()
+                self._runPipelines()
                 continue
 
-            if self._monitor_assets_root:
-                assets_dir = os.path.join(self.app.root_dir, 'assets')
-                if os.path.isdir(assets_dir):
-                    logger.info("Assets directory was created, reloading "
-                                "pipeline.")
-                    self._initPipeline()
-                    self._runPipeline(assets_dir)
-                    continue
-
-            for root in self._roots:
-                # For each mount root we try to find the first new or
+            for procinfo in self._proc_infos:
+                # For each assets folder we try to find the first new or
                 # modified file. If any, we just run the pipeline on
-                # that mount.
+                # that source.
                 found_new_or_modified = False
-                for dirpath, dirnames, filenames in os.walk(root):
-                    for filename in filenames:
-                        path = os.path.join(dirpath, filename)
-                        if path not in self._paths:
-                            logger.debug("Found new asset: %s" % path)
-                            self._paths.add(path)
-                            found_new_or_modified = True
-                            break
-                        if os.path.getmtime(path) > self._last_bake:
-                            logger.debug("Found modified asset: %s" % path)
-                            found_new_or_modified = True
-                            break
-
-                    if found_new_or_modified:
+                for item in procinfo.source.getAllContents():
+                    path = item.spec
+                    if path not in procinfo.paths:
+                        logger.debug("Found new asset: %s" % path)
+                        procinfo.paths.add(path)
+                        found_new_or_modified = True
                         break
-
+                    if os.path.getmtime(path) > procinfo.last_bake_time:
+                        logger.debug("Found modified asset: %s" % path)
+                        found_new_or_modified = True
+                        break
                 if found_new_or_modified:
-                    self._runPipeline(root)
+                    self._runPipeline(procinfo)
 
             time.sleep(self.interval)
 
-    def _initPipeline(self):
-        # Create the app and pipeline.
-        self.app = self.appfactory.create()
-        self.pipeline = ProcessorPipeline(self.app, self.out_dir)
+    def _init(self):
+        self._app = self.appfactory.create()
+        self._last_records = MultiRecord()
+
+        self._proc_infos = []
+        for src in self._app.sources:
+            if src.config['pipeline'] != 'asset':
+                continue
 
-        # Get the list of assets directories.
-        self._roots = list(self.pipeline.mounts.keys())
+            procinfo = _AssetProcessingInfo(src)
+            self._proc_infos.append(procinfo)
 
-        # The 'assets' folder may not be in the mounts list if it doesn't
-        # exist yet, but we want to monitor for when the user creates it.
-        default_root = os.path.join(self.app.root_dir, 'assets')
-        self._monitor_assets_root = (default_root not in self._roots)
+            # Build the list of initial asset files.
+            for item in src.getAllContents():
+                procinfo.paths.add(item.spec)
+
+    def _runPipelines(self):
+        record_histories = MultiRecordHistory(MultiRecord(), self._records)
+        self._ppmngr = PipelineManager(
+            self._app, self.out_dir, record_histories)
 
-        # Build the list of initial asset files.
-        self._paths = set()
-        for root in self._roots:
-            for dirpath, dirnames, filenames in os.walk(root):
-                self._paths |= set([os.path.join(dirpath, f)
-                                    for f in filenames])
+        # Create the pipelines, but also remember some stuff for what
+        # we want to do.
+        for src in self._app.sources:
+            if src.config['pipeline'] != 'asset':
+                continue
+
+            ppinfo = self._ppmngr.createPipeline(src)
+            api = _AssetProcessingInfo()
+            ppinfo.userdata = api
+
+        current_records = MultiRecord()
+        record_histories = MultiRecordHistory(
+            self._records, current_records)
 
-    def _runPipeline(self, root):
-        self._last_bake = time.time()
-        try:
-            self._record = self.pipeline.run(
-                    root,
-                    previous_record=self._record,
-                    save_record=False)
+        for ppinfo, procinfo in self._pipelines:
+            self._runPipeline(ppinfo, procinfo, record_histories)
+
+        status_id = self.last_status_id + 1
+        self.last_status_id += 1
+
+        if self._records.success:
+            changed = filter(
+                lambda i: not i.was_collapsed_from_last_run,
+                self._record.entries)
+            changed = itertools.chain.from_iterable(
+                map(lambda i: i.rel_outputs, changed))
+            changed = list(changed)
+            item = {
+                'id': status_id,
+                'type': 'pipeline_success',
+                'assets': changed}
 
-            status_id = self.last_status_id + 1
-            self.last_status_id += 1
+            self._notifyObservers(item)
+        else:
+            item = {
+                'id': status_id,
+                'type': 'pipeline_error',
+                'assets': []}
+            for entry in self._record.entries:
+                if entry.errors:
+                    asset_item = {
+                        'path': entry.path,
+                        'errors': list(entry.errors)}
+                    item['assets'].append(asset_item)
+
+            self._notifyObservers(item)
 
-            if self._record.success:
-                changed = filter(
-                        lambda i: not i.was_collapsed_from_last_run,
-                        self._record.entries)
-                changed = itertools.chain.from_iterable(
-                        map(lambda i: i.rel_outputs, changed))
-                changed = list(changed)
-                item = {
-                        'id': status_id,
-                        'type': 'pipeline_success',
-                        'assets': changed}
+    def _runPipeline(self, procinfo):
+        procinfo.last_bake_time = time.time()
+
+        src = procinfo.source
+
+        current_records = MultiRecord()
+        record_histories = MultiRecordHistory(
+            self._last_records, current_records)
+        ppmngr = PipelineManager(
+            self._app, self.out_dir, record_histories)
+        ppinfo = ppmngr.createPipeline(src)
+
+        logger.debug("Running pipeline '%s' on: %s" %
+                     (ppinfo.pipeline_name, src.name))
 
-                self._notifyObservers(item)
-            else:
-                item = {
-                        'id': status_id,
-                        'type': 'pipeline_error',
-                        'assets': []}
-                for entry in self._record.entries:
-                    if entry.errors:
-                        asset_item = {
-                                'path': entry.path,
-                                'errors': list(entry.errors)}
-                        item['assets'].append(asset_item)
+        # Process all items in the source.
+        pp = ppinfo.pipeline
+        cr = ppinfo.record_history.current
+        jobctx = PipelineJobCreateContext(src)
+        for item in src.getAllContents():
+            job = pp.createJob(item, jobctx)
+
+            ppres = PipelineJobResult()
+            ppres.record_entry = pp.createRecordEntry(job)
+
+            runctx = PipelineJobRunContext(
+                ppinfo.pipeline_ctx, job, record_histories)
+            try:
+                pp.run(item, runctx, ppres)
+            except Exception as e:
+                ppres.record_entry.errors.append(str(e))
 
-                self._notifyObservers(item)
-        except Exception as ex:
-            logger.exception(ex)
+            if ppres.next_pass_job is not None:
+                logger.error("The processing loop for the server "
+                             "doesn't support multi-pass pipelines.")
+
+            cr.addEntry(ppres.record_entry)
+            if not ppres.record_entry.success:
+                cr.success = False
+                current_records.success = False
+                logger.error("Errors found in %s:" % item.spec)
+                for e in ppres.record_entry.errors:
+                    logger.error("  " + e)
+
+        # Do all the final stuff.
+        ppmngr.buildHistoryDiffs()
+        ppmngr.deleteStaleOutputs()
+        ppmngr.collapseRecords()
+        ppmngr.shutdownPipelines()
+
+        # Swap the old record with the next record.
+        pr = ppinfo.record_history.previous
+        self._last_records.records.remove(pr)
+        self._last_records.records.append(cr)
 
     def _notifyObservers(self, item):
         with self._obs_lock:
--- a/piecrust/serving/server.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/serving/server.py	Sun Jun 04 23:34:28 2017 -0700
@@ -165,7 +165,7 @@
             raise MultipleNotFound(msg, req_page.not_found_errors)
 
         # We have a page, let's try to render it.
-        render_ctx = RenderingContext(req_page,
+        render_ctx = RenderingContext(req_page.page,
                                       sub_num=req_page.sub_num,
                                       force_render=True)
         req_page.page.source.prepareRenderContext(render_ctx)
--- a/piecrust/serving/util.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/serving/util.py	Sun Jun 04 23:34:28 2017 -0700
@@ -63,8 +63,7 @@
         if route_sub_num > 1:
             cur_req_path = req_path_no_num
 
-        page = _get_requested_page_for_route(app, route, route_params,
-                                             route_sub_num)
+        page = _get_requested_page_for_route(app, route, route_params)
         if page is not None:
             req_page.page = page
             req_page.sub_num = route_sub_num
@@ -82,7 +81,7 @@
     source = app.getSource(route.source_name)
     item = source.findContent(route_params)
     if item is not None:
-        return app.getPage(item)
+        return app.getPage(source, item)
     return None
 
 
--- a/piecrust/sources/base.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/sources/base.py	Sun Jun 04 23:34:28 2017 -0700
@@ -13,7 +13,10 @@
 
 
 # Types of relationships a content source can be asked for.
-REL_ASSETS = 1
+REL_PARENT_GROUP = 1
+REL_LOGICAL_PARENT_ITEM = 2
+REL_LOGICAl_CHILD_GROUP = 3
+REL_ASSETS = 10
 
 
 logger = logging.getLogger(__name__)
@@ -43,10 +46,13 @@
     """ Describes a piece of content.
 
         Some known metadata that PieCrust will use include:
-        - `route_params`: A dictionary of route parameters to generate
-              the URL to the content.
-        - `config`: A dictionary of configuration settings to merge
-              into the settings found in the content itself.
+        - `date`: A `datetime.date` object that will set the date of the page.
+        - `datetime`: A `datetime.datetime` object that will set the date and
+            time of the page.
+        - `route_params`: A dictionary of route parameters to generate the
+            URL to the content.
+        - `config`: A dictionary of configuration settings to merge into the
+            settings found in the content itself.
     """
     def __init__(self, spec, metadata):
         self.spec = spec
@@ -72,6 +78,8 @@
 class ContentSource:
     """ A source for content.
     """
+    DEFAULT_PIPELINE_NAME = None
+
     def __init__(self, app, name, config):
         self.app = app
         self.name = name
--- a/piecrust/sources/blogarchives.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/sources/blogarchives.py	Sun Jun 04 23:34:28 2017 -0700
@@ -2,7 +2,8 @@
 import datetime
 from piecrust.chefutil import format_timed_scope
 from piecrust.data.filters import PaginationFilter, IFilterClause
-from piecrust.dataproviders.page_iterator import PageIterator
+from piecrust.dataproviders.pageiterator import PageIterator
+from piecrust.pipelines.base import ContentPipeline
 from piecrust.routing import RouteParameter
 from piecrust.sources.base import ContentSource, GeneratedContentException
 
@@ -12,6 +13,7 @@
 
 class BlogArchivesSource(ContentSource):
     SOURCE_NAME = 'blog_archives'
+    DEFAULT_PIPELINE_NAME = 'blog_archives'
 
     def __init__(self, app, name, config):
         super().__init__(app, name, config)
@@ -124,3 +126,7 @@
 def _date_sorter(it):
     return sorted(it, key=lambda x: x.datetime)
 
+
+class BlogArchivesPipeline(ContentPipeline):
+    PIPELINE_NAME = 'blog_archives'
+    PASS_NUM = 1
--- a/piecrust/sources/default.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/sources/default.py	Sun Jun 04 23:34:28 2017 -0700
@@ -16,6 +16,7 @@
                            SimpleAssetsSubDirMixin,
                            IPreparingSource, IInteractiveSource):
     SOURCE_NAME = 'default'
+    DEFAULT_PIPELINE_NAME = 'page'
 
     def __init__(self, app, name, config):
         super().__init__(app, name, config)
--- a/piecrust/sources/fs.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/sources/fs.py	Sun Jun 04 23:34:28 2017 -0700
@@ -1,10 +1,13 @@
 import os.path
 import re
+import glob
 import fnmatch
 import logging
 from piecrust import osutil
 from piecrust.routing import RouteParameter
-from piecrust.sources.base import ContentItem, ContentGroup, ContentSource
+from piecrust.sources.base import (
+    ContentItem, ContentGroup, ContentSource,
+    REL_PARENT_GROUP, REL_LOGICAL_PARENT_ITEM, REL_LOGICAl_CHILD_GROUP)
 
 
 logger = logging.getLogger(__name__)
@@ -73,7 +76,6 @@
         self._ignore_regexes = ir
 
     def getContents(self, group):
-        logger.debug("Scanning for content in: %s" % self.fs_endpoint_path)
         if not self._checkFSEndpoint():
             return None
 
@@ -122,6 +124,39 @@
         pass
 
     def getRelatedContents(self, item, relationship):
+        if relationship == REL_PARENT_GROUP:
+            parent_dir = os.path.dirname(item.spec)
+            if len(parent_dir) >= len(self.fs_endpoint_path):
+                metadata = self._createGroupMetadata(parent_dir)
+                return ContentGroup(parent_dir, metadata)
+
+            # Don't return a group for paths that are outside of our
+            # endpoint directory.
+            return None
+
+        if relationship == REL_LOGICAL_PARENT_ITEM:
+            # If we want the logical parent item of a folder, we find a
+            # page file with the same name as the folder.
+            if not item.is_group:
+                raise ValueError()
+            parent_glob = os.path.join(item.spec, '*')
+            for n in glob.iglob(parent_glob):
+                if os.path.isfile(n):
+                    metadata = self._createItemMetadata(n)
+                    return ContentItem(n, metadata)
+            return None
+
+        if relationship == REL_LOGICAl_CHILD_GROUP:
+            # If we want the children items of an item, we look for
+            # a directory that has the same name as the item's file.
+            if item.is_group:
+                raise ValueError()
+            dir_path, _ = os.path.splitext(item.spec)
+            if os.path.isdir(dir_path):
+                metadata = self._createGroupMetadata(dir_path)
+                return [ContentGroup(dir_path, metadata)]
+            return None
+
         return None
 
     def findContent(self, route_params):
--- a/piecrust/sources/posts.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/sources/posts.py	Sun Jun 04 23:34:28 2017 -0700
@@ -21,6 +21,7 @@
                   SimpleAssetsSubDirMixin,
                   IPreparingSource, IInteractiveSource):
     PATH_FORMAT = None
+    DEFAULT_PIPELINE_NAME = 'page'
 
     def __init__(self, app, name, config):
         super().__init__(app, name, config)
@@ -122,10 +123,12 @@
         day = int(m.group('day'))
         timestamp = datetime.date(year, month, day)
         metadata = {
-            'year': year,
-            'month': month,
-            'day': day,
-            'slug': m.group('slug'),
+            'route_params': {
+                'year': year,
+                'month': month,
+                'day': day,
+                'slug': m.group('slug')
+            },
             'date': timestamp
         }
         return metadata
--- a/piecrust/sources/taxonomy.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/sources/taxonomy.py	Sun Jun 04 23:34:28 2017 -0700
@@ -5,8 +5,8 @@
 from piecrust.chefutil import format_timed, format_timed_scope
 from piecrust.configuration import ConfigurationError
 from piecrust.data.filters import (
-    PaginationFilter, SettingFilterClause,
-    page_value_accessor)
+    PaginationFilter, SettingFilterClause)
+from piecrust.pipelines.base import ContentPipeline
 from piecrust.routing import RouteParameter
 from piecrust.sources.base import ContentSource, GeneratedContentException
 
@@ -27,6 +27,8 @@
 
 
 class Taxonomy(object):
+    """ Describes a taxonomy.
+    """
     def __init__(self, name, config):
         self.name = name
         self.config = config
@@ -43,11 +45,10 @@
 
 
 class TaxonomySource(ContentSource):
-    """ A page generator that handles taxonomies, _i.e._ lists of keywords
-        that pages are labelled with, and for which we need to generate
-        listing pages.
+    """ A content source that generates taxonomy listing pages.
     """
     SOURCE_NAME = 'taxonomy'
+    DEFAULT_PIPELINE_NAME = 'taxonomy'
 
     def __init__(self, app, name, config):
         super().__init__(app, name, config)
@@ -55,21 +56,19 @@
         tax_name = config.get('taxonomy')
         if tax_name is None:
             raise ConfigurationError(
-                "Generator '%s' requires a taxonomy name." % name)
-        tax_config = app.config.get('site/taxonomies/' + tax_name)
-        if tax_config is None:
-            raise ConfigurationError(
-                "Error initializing generator '%s', no such taxonomy: %s",
-                (name, tax_name))
-        self.taxonomy = Taxonomy(tax_name, tax_config)
+                "Taxonomy source '%s' requires a taxonomy name." % name)
+        self.taxonomy = _get_taxonomy(app, tax_name)
 
         sm = config.get('slugify_mode')
-        if not sm:
-            sm = app.config.get('site/slugify_mode', 'encode')
-        self.slugify_mode = _parse_slugify_mode(sm)
-        self.slugifier = _Slugifier(self.taxonomy, self.slugify_mode)
+        self.slugifier = _get_slugifier(app, self.taxonomy, sm)
 
     def getContents(self, group):
+        # Our content is procedurally generated from other content sources,
+        # so we really don't support listing anything here -- it would be
+        # quite costly.
+        #
+        # Instead, our pipeline (the `TaxonomyPipeline`) will generate
+        # content items for us when it is asked to produce bake jobs.
         raise GeneratedContentException()
 
     def getSupportedRouteParameters(self):
@@ -102,14 +101,14 @@
         #    term, we'll get a merge of the 2 on the listing page, which is
         #    what the user expects.
         #
-        tax_terms, is_combination = self._getTaxonomyTerms(
-                ctx.page.route_metadata)
+        route_params = ctx.page.source_metadata['route_params']
+        tax_terms, is_combination = self._getTaxonomyTerms(route_params)
         self._setTaxonomyFilter(ctx, tax_terms, is_combination)
 
         # Add some custom data for rendering.
         ctx.custom_data.update({
-                self.taxonomy.term_name: tax_terms,
-                'is_multiple_%s' % self.taxonomy.term_name: is_combination})
+            self.taxonomy.term_name: tax_terms,
+            'is_multiple_%s' % self.taxonomy.term_name: is_combination})
         # Add some "plural" version of the term... so for instance, if this
         # is the "tags" taxonomy, "tag" will have one term most of the time,
         # except when it's a combination. Here, we add "tags" as something that
@@ -121,12 +120,9 @@
                 mult_val = (mult_val,)
             ctx.custom_data[self.taxonomy.name] = mult_val
 
-    def _getSource(self):
-        return self.app.getSource(self.config['source'])
-
-    def _getTaxonomyTerms(self, route_metadata):
+    def _getTaxonomyTerms(self, route_params):
         # Get the individual slugified terms from the route metadata.
-        all_values = route_metadata.get(self.taxonomy.term_name)
+        all_values = route_params.get(self.taxonomy.term_name)
         if all_values is None:
             raise Exception("'%s' values couldn't be found in route metadata" %
                             self.taxonomy.term_name)
@@ -143,14 +139,14 @@
 
     def _setTaxonomyFilter(self, ctx, term_value, is_combination):
         # Set up the filter that will check the pages' terms.
-        flt = PaginationFilter(value_accessor=page_value_accessor)
+        flt = PaginationFilter()
         flt.addClause(HasTaxonomyTermsFilterClause(
-                self.taxonomy, self.slugify_mode, term_value, is_combination))
+            self.taxonomy, self.slugify.mode, term_value, is_combination))
         ctx.pagination_filter = flt
 
-    def onRouteFunctionUsed(self, route, route_metadata):
+    def onRouteFunctionUsed(self, route_params):
         # Get the values, and slugify them appropriately.
-        values = route_metadata[self.taxonomy.term_name]
+        values = route_params[self.taxonomy.term_name]
         if self.taxonomy.is_multiple:
             # TODO: here we assume the route has been properly configured.
             slugified_values = self.slugifyMultiple((str(v) for v in values))
@@ -160,94 +156,20 @@
             route_val = slugified_values
 
         # We need to register this use of a taxonomy term.
-        eis = self.app.env.exec_info_stack
-        cpi = eis.current_page_info.render_ctx.current_pass_info
+        rcs = self.app.env.render_ctx_stack
+        cpi = rcs.current_ctx.current_pass_info
         if cpi:
             utt = cpi.getCustomInfo('used_taxonomy_terms', [], True)
             utt.append(slugified_values)
 
         # Put the slugified values in the route metadata so they're used to
         # generate the URL.
-        route_metadata[self.taxonomy.term_name] = route_val
-
-    def bake(self, ctx):
-        if not self.page_ref.exists:
-            logger.debug(
-                    "No page found at '%s', skipping taxonomy '%s'." %
-                    (self.page_ref, self.taxonomy.name))
-            return
-
-        logger.debug("Baking %s pages...", self.taxonomy.name)
-        analyzer = _TaxonomyTermsAnalyzer(self.source_name, self.taxonomy,
-                                          self.slugify_mode)
-        with format_timed_scope(logger, 'gathered taxonomy terms',
-                                level=logging.DEBUG, colored=False):
-            analyzer.analyze(ctx)
-
-        start_time = time.perf_counter()
-        page_count = self._bakeTaxonomyTerms(ctx, analyzer)
-        if page_count > 0:
-            logger.info(format_timed(
-                start_time,
-                "baked %d %s pages for %s." % (
-                    page_count, self.taxonomy.term_name, self.source_name)))
-
-    def _bakeTaxonomyTerms(self, ctx, analyzer):
-        # Start baking those terms.
-        logger.debug(
-                "Baking '%s' for source '%s': %d terms" %
-                (self.taxonomy.name, self.source_name,
-                 len(analyzer.dirty_slugified_terms)))
-
-        route = self.app.getGeneratorRoute(self.name)
-        if route is None:
-            raise Exception("No routes have been defined for generator: %s" %
-                            self.name)
-
-        logger.debug("Using taxonomy page: %s" % self.page_ref)
-        fac = self.page_ref.getFactory()
-
-        job_count = 0
-        for slugified_term in analyzer.dirty_slugified_terms:
-            extra_route_metadata = {
-                self.taxonomy.term_name: slugified_term}
-
-            # Use the slugified term as the record's extra key seed.
-            logger.debug(
-                "Queuing: %s [%s=%s]" %
-                (fac.ref_spec, self.taxonomy.name, slugified_term))
-            ctx.queueBakeJob(fac, route, extra_route_metadata, slugified_term)
-            job_count += 1
-        ctx.runJobQueue()
-
-        # Now we create bake entries for all the terms that were *not* dirty.
-        # This is because otherwise, on the next incremental bake, we wouldn't
-        # find any entry for those things, and figure that we need to delete
-        # their outputs.
-        for prev_entry, cur_entry in ctx.getAllPageRecords():
-            # Only consider taxonomy-related entries that don't have any
-            # current version (i.e. they weren't baked just now).
-            if prev_entry and not cur_entry:
-                try:
-                    t = ctx.getSeedFromRecordExtraKey(prev_entry.extra_key)
-                except InvalidRecordExtraKey:
-                    continue
-
-                if analyzer.isKnownSlugifiedTerm(t):
-                    logger.debug("Creating unbaked entry for %s term: %s" %
-                                 (self.name, t))
-                    ctx.collapseRecord(prev_entry)
-                else:
-                    logger.debug("Term %s in %s isn't used anymore." %
-                                 (self.name, t))
-
-        return job_count
+        route_params[self.taxonomy.term_name] = route_val
 
 
 class HasTaxonomyTermsFilterClause(SettingFilterClause):
     def __init__(self, taxonomy, slugify_mode, value, is_combination):
-        super(HasTaxonomyTermsFilterClause, self).__init__(
-                taxonomy.setting_name, value)
+        super().__init__(taxonomy.setting_name, value)
         self._taxonomy = taxonomy
         self._is_combination = is_combination
         self._slugifier = _Slugifier(taxonomy, slugify_mode)
@@ -277,11 +199,118 @@
             return page_value == self.value
 
 
+def _get_taxonomy(app, tax_name):
+    tax_config = app.config.get('site/taxonomies/' + tax_name)
+    if tax_config is None:
+        raise ConfigurationError("No such taxonomy: %s" % tax_name)
+    return Taxonomy(tax_name, tax_config)
+
+
+def _get_slugifier(app, taxonomy, slugify_mode=None):
+    if slugify_mode is None:
+        slugify_mode = app.config.get('site/slugify_mode', 'encode')
+    sm = _parse_slugify_mode(slugify_mode)
+    return _Slugifier(taxonomy, sm)
+
+
+class TaxonomyPipeline(ContentPipeline):
+    PIPELINE_NAME = 'taxonomy'
+    PASS_NUM = 1
+
+    def __init__(self, source, ctx):
+        if not isinstance(source, TaxonomySource):
+            raise Exception("The taxonomy pipeline only supports taxonomy "
+                            "content sources.")
+
+        super().__init__(source, ctx)
+        self.taxonomy = source.taxonomy
+        self.slugifier = source.slugifier
+
+    def buildJobs(self):
+        logger.debug("Building taxonomy pages for source: %s" %
+                     self.source.name)
+        analyzer = _TaxonomyTermsAnalyzer(self)
+        with format_timed_scope(logger, 'gathered taxonomy terms',
+                                level=logging.DEBUG, colored=False):
+            analyzer.analyze(ctx)
+
+    def bake(self, ctx):
+        if not self.page_ref.exists:
+            logger.debug(
+                "No page found at '%s', skipping taxonomy '%s'." %
+                (self.page_ref, self.taxonomy.name))
+            return
+
+        logger.debug("Baking %s pages...", self.taxonomy.name)
+        analyzer = _TaxonomyTermsAnalyzer(self.source_name, self.taxonomy,
+                                          self.slugify_mode)
+        with format_timed_scope(logger, 'gathered taxonomy terms',
+                                level=logging.DEBUG, colored=False):
+            analyzer.analyze(ctx)
+
+        start_time = time.perf_counter()
+        page_count = self._bakeTaxonomyTerms(ctx, analyzer)
+        if page_count > 0:
+            logger.info(format_timed(
+                start_time,
+                "baked %d %s pages for %s." % (
+                    page_count, self.taxonomy.term_name, self.source_name)))
+
+    def _bakeTaxonomyTerms(self, ctx, analyzer):
+        # Start baking those terms.
+        logger.debug(
+            "Baking '%s' for source '%s': %d terms" %
+            (self.taxonomy.name, self.source_name,
+             len(analyzer.dirty_slugified_terms)))
+
+        route = self.app.getGeneratorRoute(self.name)
+        if route is None:
+            raise Exception("No routes have been defined for generator: %s" %
+                            self.name)
+
+        logger.debug("Using taxonomy page: %s" % self.page_ref)
+        fac = self.page_ref.getFactory()
+
+        job_count = 0
+        for slugified_term in analyzer.dirty_slugified_terms:
+            extra_route_params = {
+                self.taxonomy.term_name: slugified_term}
+
+            # Use the slugified term as the record's extra key seed.
+            logger.debug(
+                "Queuing: %s [%s=%s]" %
+                (fac.ref_spec, self.taxonomy.name, slugified_term))
+            ctx.queueBakeJob(fac, route, extra_route_params, slugified_term)
+            job_count += 1
+        ctx.runJobQueue()
+
+        # Now we create bake entries for all the terms that were *not* dirty.
+        # This is because otherwise, on the next incremental bake, we wouldn't
+        # find any entry for those things, and figure that we need to delete
+        # their outputs.
+        for prev_entry, cur_entry in ctx.getAllPageRecords():
+            # Only consider taxonomy-related entries that don't have any
+            # current version (i.e. they weren't baked just now).
+            if prev_entry and not cur_entry:
+                try:
+                    t = ctx.getSeedFromRecordExtraKey(prev_entry.extra_key)
+                except InvalidRecordExtraKey:
+                    continue
+
+                if analyzer.isKnownSlugifiedTerm(t):
+                    logger.debug("Creating unbaked entry for %s term: %s" %
+                                 (self.name, t))
+                    ctx.collapseRecord(prev_entry)
+                else:
+                    logger.debug("Term %s in %s isn't used anymore." %
+                                 (self.name, t))
+
+        return job_count
+
+
 class _TaxonomyTermsAnalyzer(object):
-    def __init__(self, source_name, taxonomy, slugify_mode):
-        self.source_name = source_name
-        self.taxonomy = taxonomy
-        self.slugifier = _Slugifier(taxonomy, slugify_mode)
+    def __init__(self, source):
+        self.source = source
         self._all_terms = {}
         self._single_dirty_slugified_terms = set()
         self._all_dirty_slugified_terms = None
@@ -415,11 +444,11 @@
 
 def _parse_slugify_mode(value):
     mapping = {
-            'encode': SLUGIFY_ENCODE,
-            'transliterate': SLUGIFY_TRANSLITERATE,
-            'lowercase': SLUGIFY_LOWERCASE,
-            'dot_to_dash': SLUGIFY_DOT_TO_DASH,
-            'space_to_dash': SLUGIFY_SPACE_TO_DASH}
+        'encode': SLUGIFY_ENCODE,
+        'transliterate': SLUGIFY_TRANSLITERATE,
+        'lowercase': SLUGIFY_LOWERCASE,
+        'dot_to_dash': SLUGIFY_DOT_TO_DASH,
+        'space_to_dash': SLUGIFY_SPACE_TO_DASH}
     mode = 0
     for v in value.split(','):
         f = mapping.get(v.strip())
--- a/piecrust/templating/jinja/environment.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/templating/jinja/environment.py	Sun Jun 04 23:34:28 2017 -0700
@@ -2,6 +2,7 @@
 import time
 import email.utils
 import hashlib
+import logging
 import strict_rfc3339
 from jinja2 import Environment
 from .extensions import get_highlight_css
@@ -10,6 +11,9 @@
 from piecrust.uriutil import multi_replace
 
 
+logger = logging.getLogger(__name__)
+
+
 class PieCrustEnvironment(Environment):
     def __init__(self, app, *args, **kwargs):
         self.app = app
@@ -33,6 +37,16 @@
             if val is not None:
                 kwargs.setdefault(name, val)
 
+        # Undefined behaviour.
+        undef = app.config.get('jinja/undefined')
+        if undef == 'logging':
+            from jinja2 import make_logging_undefined
+            kwargs.setdefault('undefined',
+                              make_logging_undefined(logger))
+        elif undef == 'strict':
+            from jinja2 import StrictUndefined
+            kwargs.setdefault('undefined', StrictUndefined)
+
         # Twig trims blocks.
         if twig_compatibility_mode is True:
             kwargs['trim_blocks'] = True
--- a/piecrust/workerpool.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/workerpool.py	Sun Jun 04 23:34:28 2017 -0700
@@ -90,6 +90,11 @@
         from piecrust.main import _pre_parse_chef_args
         _pre_parse_chef_args(sys.argv[1:])
 
+    from piecrust.main import ColoredFormatter
+    root_logger = logging.getLogger()
+    root_logger.handlers[0].setFormatter(ColoredFormatter(
+        ('[W-%d]' % wid) + '[%(name)s] %(message)s'))
+
     logger.debug("Worker %d initializing..." % wid)
 
     # We don't need those.
@@ -178,7 +183,10 @@
 class WorkerPool:
     def __init__(self, worker_class, initargs=(), *,
                  callback=None, error_callback=None,
-                 worker_count=None, batch_size=None):
+                 worker_count=None, batch_size=None,
+                 userdata=None):
+        self.userdata = userdata
+
         worker_count = worker_count or os.cpu_count() or 1
 
         if use_fastqueue:
@@ -271,6 +279,7 @@
 
     @staticmethod
     def _handleResults(pool):
+        userdata = pool.userdata
         while True:
             try:
                 res = pool._quick_get()
@@ -287,10 +296,10 @@
             try:
                 if success:
                     if pool._callback:
-                        pool._callback(task_data, data)
+                        pool._callback(task_data, data, userdata)
                 else:
                     if pool._error_callback:
-                        pool._error_callback(task_data, data)
+                        pool._error_callback(task_data, data, userdata)
                     else:
                         logger.error(
                             "Worker %d failed to process a job:" % wid)
@@ -312,7 +321,7 @@
     def wait(self, timeout=None):
         return self._event.wait(timeout)
 
-    def _handle(self, job, res):
+    def _handle(self, job, res, _):
         wid, data = res
         if wid < 0 or wid > self._count:
             logger.error("Ignoring report from unknown worker %d." % wid)
@@ -324,7 +333,7 @@
         if self._received == self._count:
             self._event.set()
 
-    def _handleError(self, job, res):
+    def _handleError(self, job, res, _):
         logger.error("Worker %d failed to send its report." % res.wid)
         logger.error(res)
 
--- a/requirements.txt	Sun May 21 00:06:59 2017 -0700
+++ b/requirements.txt	Sun Jun 04 23:34:28 2017 -0700
@@ -1,17 +1,28 @@
+appdirs==1.4.3
+asn1crypto==0.22.0
 cffi==1.5.0
 colorama==0.3.3
 compressinja==0.0.2
+cryptography==1.8.1
 Flask==0.10.1
 Flask-Login==0.3.2
-Jinja2==2.7.3
+idna==2.5
+itsdangerous==0.24
+Jinja2==2.9.6
 Markdown==2.6.2
-MarkupSafe==0.23
+MarkupSafe==1.0
+packaging==16.8
 paramiko==2.0.0
+py==1.4.33
+pyasn1==0.2.3
+pycparser==2.17
 Pygments==2.0.2
+pyparsing==2.2.0
 pystache==0.5.4
 python-dateutil==2.4.2
 PyYAML==3.11
 repoze.lru==0.6
+six==1.10.0
 smartypants==1.8.6
 strict-rfc3339==0.5
 textile==2.2.2