diff piecrust/baking/baker.py @ 852:4850f8c21b6e

core: Start of the big refactor for PieCrust 3.0. * Everything is a `ContentSource`, including assets directories. * Most content sources are subclasses of the base file-system source. * A source is processed by a "pipeline", and there are 2 built-in pipelines, one for assets and one for pages. The asset pipeline is vaguely functional, but the page pipeline is completely broken right now. * Rewrite the baking process as just running appropriate pipelines on each content item. This should allow for better parallelization.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 17 May 2017 00:11:48 -0700
parents 9a92e2804562
children f070a4fc033c
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Sat Apr 29 21:42:22 2017 -0700
+++ b/piecrust/baking/baker.py	Wed May 17 00:11:48 2017 -0700
@@ -2,79 +2,79 @@
 import os.path
 import hashlib
 import logging
-from piecrust.baking.records import (
-        BakeRecordEntry, TransitionalBakeRecord)
-from piecrust.baking.worker import (
-        save_factory,
-        JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE)
+from piecrust.baking.worker import BakeJob
 from piecrust.chefutil import (
-        format_timed_scope, format_timed)
+    format_timed_scope, format_timed)
 from piecrust.environment import ExecutionStats
-from piecrust.generation.base import PageGeneratorBakeContext
-from piecrust.routing import create_route_metadata
-from piecrust.sources.base import (
-        REALM_NAMES, REALM_USER, REALM_THEME)
+from piecrust.pipelines.base import PipelineContext
+from piecrust.pipelines.records import (
+    MultiRecordHistory, MultiRecord, RecordEntry,
+    load_records)
+from piecrust.sources.base import REALM_USER, REALM_THEME
 
 
 logger = logging.getLogger(__name__)
 
 
+def get_bake_records_path(app, out_dir):
+    records_cache = app.cache.getCache('baker')
+    records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest()
+    records_name = records_id + '.record'
+    return records_cache.getCachePath(records_name)
+
+
 class Baker(object):
-    def __init__(self, app, out_dir, force=False,
-                 applied_config_variant=None,
-                 applied_config_values=None):
-        assert app and out_dir
+    def __init__(self, appfactory, app, out_dir,
+                 force=False, allowed_pipelines=None):
+        self.appfactory = appfactory
         self.app = app
         self.out_dir = out_dir
         self.force = force
-        self.applied_config_variant = applied_config_variant
-        self.applied_config_values = applied_config_values
+
+        self._pipeline_classes = {}
+        for pclass in app.plugin_loader.getPipelines():
+            self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
 
-        # Remember what generator pages we should skip.
-        self.generator_pages = []
-        logger.debug("Gathering generator page paths:")
-        for gen in self.app.generators:
-            for path in gen.page_ref.possible_paths:
-                self.generator_pages.append(path)
-                logger.debug(" - %s" % path)
+        self.allowed_pipelines = allowed_pipelines
+        if allowed_pipelines is None:
+            self.allowed_pipelines = list(self._pipeline_classes.keys())
 
-        # Register some timers.
-        self.app.env.registerTimer('LoadJob', raise_if_registered=False)
-        self.app.env.registerTimer('RenderFirstSubJob',
-                                   raise_if_registered=False)
-        self.app.env.registerTimer('BakeJob', raise_if_registered=False)
+        self._records = None
 
     def bake(self):
+        start_time = time.perf_counter()
         logger.debug("  Bake Output: %s" % self.out_dir)
         logger.debug("  Root URL: %s" % self.app.config.get('site/root'))
 
         # Get into bake mode.
-        start_time = time.perf_counter()
         self.app.config.set('baker/is_baking', True)
-        self.app.env.base_asset_url_format = '%uri%'
+        self.app.config.set('site/base_asset_url_format', '%uri')
 
         # Make sure the output directory exists.
         if not os.path.isdir(self.out_dir):
             os.makedirs(self.out_dir, 0o755)
 
-        # Load/create the bake record.
-        record = TransitionalBakeRecord()
-        record_cache = self.app.cache.getCache('baker')
-        record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest()
-        record_name = record_id + '.record'
-        previous_record_path = None
-        if not self.force and record_cache.has(record_name):
-            with format_timed_scope(logger, "loaded previous bake record",
+        # Load/create the bake records.
+        records_path = get_bake_records_path(
+            self.app, self.out_dir)
+        if not self.force and os.path.isfile(records_path):
+            with format_timed_scope(logger, "loaded previous bake records",
                                     level=logging.DEBUG, colored=False):
-                previous_record_path = record_cache.getCachePath(record_name)
-                record.loadPrevious(previous_record_path)
-        record.current.success = True
+                previous_records = load_records(records_path)
+        else:
+            previous_records = MultiRecord()
+        self._records = MultiRecord()
 
         # Figure out if we need to clean the cache because important things
         # have changed.
-        is_cache_valid = self._handleCacheValidity(record)
+        is_cache_valid = self._handleCacheValidity(previous_records,
+                                                   self._records)
         if not is_cache_valid:
-            previous_record_path = None
+            previous_records = MultiRecord()
+
+        # Create the bake records history which tracks what's up-to-date
+        # or not since last time we baked to the given output folder.
+        record_history = MultiRecordHistory(previous_records, self._records)
 
         # Pre-create all caches.
         for cache_name in ['app', 'baker', 'pages', 'renders']:
@@ -84,64 +84,82 @@
         # separately so we can handle "overriding" (i.e. one realm overrides
         # another realm's pages, like the user realm overriding the theme
         # realm).
+        #
+        # Also, create and initialize each pipeline for each source.
         sources_by_realm = {}
+        ppctx = PipelineContext(self.out_dir, record_history,
+                                force=self.force)
         for source in self.app.sources:
-            srclist = sources_by_realm.setdefault(source.realm, [])
-            srclist.append(source)
+            pname = source.config['pipeline']
+            if pname in self.allowed_pipelines:
+                srclist = sources_by_realm.setdefault(
+                    source.config['realm'], [])
+
+                pp = self._pipeline_classes[pname](source)
+                pp.initialize(ppctx)
+                srclist.append((source, pp))
+            else:
+                logger.debug(
+                    "Skip source '%s' because pipeline '%s' is ignored." %
+                    (source.name, pname))
 
         # Create the worker processes.
-        pool = self._createWorkerPool(previous_record_path)
+        pool = self._createWorkerPool(records_path)
 
-        # Bake the realms.
+        # Bake the realms -- user first, theme second, so that a user item
+        # can override a theme item.
         realm_list = [REALM_USER, REALM_THEME]
         for realm in realm_list:
             srclist = sources_by_realm.get(realm)
             if srclist is not None:
-                self._bakeRealm(record, pool, realm, srclist)
-
-        # Call all the page generators.
-        self._bakePageGenerators(record, pool)
+                self._bakeRealm(record_history, pool, realm, srclist)
 
         # All done with the workers. Close the pool and get reports.
-        reports = pool.close()
+        pool_stats = pool.close()
         total_stats = ExecutionStats()
-        record.current.stats['_Total'] = total_stats
-        for i in range(len(reports)):
-            worker_stats = reports[i]['data']
-            if worker_stats is not None:
-                worker_name = 'BakeWorker_%d' % i
-                record.current.stats[worker_name] = worker_stats
-                total_stats.mergeStats(worker_stats)
+        for ps in pool_stats:
+            if ps is not None:
+                total_stats.mergeStats(ps)
+        record_history.current.stats = total_stats
 
-        # Delete files from the output.
-        self._handleDeletetions(record)
+        # Shutdown the pipelines.
+        for realm in realm_list:
+            srclist = sources_by_realm.get(realm)
+            if srclist is not None:
+                for _, pp in srclist:
+                    pp.shutdown(ppctx)
 
         # Backup previous records.
+        records_dir, records_fn = os.path.split(records_path)
+        records_id, _ = os.path.splitext(records_fn)
         for i in range(8, -1, -1):
             suffix = '' if i == 0 else '.%d' % i
-            record_path = record_cache.getCachePath(
-                    '%s%s.record' % (record_id, suffix))
-            if os.path.exists(record_path):
-                record_path_next = record_cache.getCachePath(
-                        '%s.%s.record' % (record_id, i + 1))
-                if os.path.exists(record_path_next):
-                    os.remove(record_path_next)
-                os.rename(record_path, record_path_next)
+            records_path_i = os.path.join(
+                records_dir,
+                '%s%s.record' % (records_id, suffix))
+            if os.path.exists(records_path_i):
+                records_path_next = os.path.join(
+                    records_dir,
+                    '%s.%s.record' % (records_id, i + 1))
+                if os.path.exists(records_path_next):
+                    os.remove(records_path_next)
+                os.rename(records_path_i, records_path_next)
 
         # Save the bake record.
-        with format_timed_scope(logger, "saved bake record.",
+        with format_timed_scope(logger, "saved bake records.",
                                 level=logging.DEBUG, colored=False):
-            record.current.bake_time = time.time()
-            record.current.out_dir = self.out_dir
-            record.saveCurrent(record_cache.getCachePath(record_name))
+            record_history.current.bake_time = time.time()
+            record_history.current.out_dir = self.out_dir
+            record_history.current.save(records_path)
 
         # All done.
         self.app.config.set('baker/is_baking', False)
         logger.debug(format_timed(start_time, 'done baking'))
 
-        return record.detach()
+        self._records = None
+        return record_history.current
 
-    def _handleCacheValidity(self, record):
+    def _handleCacheValidity(self, previous_records, current_records):
         start_time = time.perf_counter()
 
         reason = None
@@ -151,8 +169,7 @@
             # The configuration file was changed, or we're running a new
             # version of the app.
             reason = "not valid anymore"
-        elif (not record.previous.bake_time or
-                not record.previous.hasLatestVersion()):
+        elif previous_records.invalidated:
             # We have no valid previous bake record.
             reason = "need bake record regeneration"
         else:
@@ -165,261 +182,86 @@
                     for fn in filenames:
                         full_fn = os.path.join(dpath, fn)
                         max_time = max(max_time, os.path.getmtime(full_fn))
-            if max_time >= record.previous.bake_time:
+            if max_time >= previous_records.bake_time:
                 reason = "templates modified"
 
         if reason is not None:
             # We have to bake everything from scratch.
             self.app.cache.clearCaches(except_names=['app', 'baker'])
             self.force = True
-            record.incremental_count = 0
-            record.clearPrevious()
+            current_records.incremental_count = 0
+            previous_records = MultiRecord()
             logger.info(format_timed(
-                    start_time,
-                    "cleaned cache (reason: %s)" % reason))
+                start_time, "cleaned cache (reason: %s)" % reason))
             return False
         else:
-            record.incremental_count += 1
+            current_records.incremental_count += 1
             logger.debug(format_timed(
-                    start_time, "cache is assumed valid",
-                    colored=False))
+                start_time, "cache is assumed valid", colored=False))
             return True
 
-    def _bakeRealm(self, record, pool, realm, srclist):
-        start_time = time.perf_counter()
-        try:
-            record.current.baked_count[realm] = 0
-            record.current.total_baked_count[realm] = 0
-
-            all_factories = []
-            for source in srclist:
-                factories = source.getPageFactories()
-                all_factories += [f for f in factories
-                                  if f.path not in self.generator_pages]
-
-            self._loadRealmPages(record, pool, all_factories)
-            self._renderRealmPages(record, pool, all_factories)
-            self._bakeRealmPages(record, pool, realm, all_factories)
-        finally:
-            page_count = record.current.baked_count[realm]
-            total_page_count = record.current.total_baked_count[realm]
-            logger.info(format_timed(
-                    start_time,
-                    "baked %d %s pages (%d total)." %
-                    (page_count, REALM_NAMES[realm].lower(),
-                        total_page_count)))
-
-    def _loadRealmPages(self, record, pool, factories):
-        def _handler(res):
-            # Create the record entry for this page.
-            # This will also update the `dirty_source_names` for the record
-            # as we add page files whose last modification times are later
-            # than the last bake.
-            record_entry = BakeRecordEntry(res['source_name'], res['path'])
-            record_entry.config = res['config']
-            record_entry.timestamp = res['timestamp']
-            if res['errors']:
-                record_entry.errors += res['errors']
-                record.current.success = False
-                self._logErrors(res['path'], res['errors'])
-            record.addEntry(record_entry)
-
-        logger.debug("Loading %d realm pages..." % len(factories))
-        with format_timed_scope(logger,
-                                "loaded %d pages" % len(factories),
-                                level=logging.DEBUG, colored=False,
-                                timer_env=self.app.env,
-                                timer_category='LoadJob'):
-            jobs = []
-            for fac in factories:
-                job = {
-                        'type': JOB_LOAD,
-                        'job': save_factory(fac)}
-                jobs.append(job)
-            ar = pool.queueJobs(jobs, handler=_handler)
-            ar.wait()
-
-    def _renderRealmPages(self, record, pool, factories):
-        def _handler(res):
-            entry = record.getCurrentEntry(res['path'])
-            if res['errors']:
-                entry.errors += res['errors']
-                record.current.success = False
-                self._logErrors(res['path'], res['errors'])
-
-        logger.debug("Rendering %d realm pages..." % len(factories))
-        with format_timed_scope(logger,
-                                "prepared %d pages" % len(factories),
-                                level=logging.DEBUG, colored=False,
-                                timer_env=self.app.env,
-                                timer_category='RenderFirstSubJob'):
-            jobs = []
-            for fac in factories:
-                record_entry = record.getCurrentEntry(fac.path)
-                if record_entry.errors:
-                    logger.debug("Ignoring %s because it had previous "
-                                 "errors." % fac.ref_spec)
-                    continue
-
-                # Make sure the source and the route exist for this page,
-                # otherwise we add errors to the record entry and we'll skip
-                # this page for the rest of the bake.
-                source = self.app.getSource(fac.source.name)
-                if source is None:
-                    record_entry.errors.append(
-                            "Can't get source for page: %s" % fac.ref_spec)
-                    logger.error(record_entry.errors[-1])
-                    continue
-
-                route = self.app.getSourceRoute(fac.source.name, fac.metadata)
-                if route is None:
-                    record_entry.errors.append(
-                            "Can't get route for page: %s" % fac.ref_spec)
-                    logger.error(record_entry.errors[-1])
-                    continue
+    def _bakeRealm(self, record_history, pool, realm, srclist):
+        for source, pp in srclist:
+            logger.debug("Queuing jobs for source '%s' using pipeline '%s'." %
+                         (source.name, pp.PIPELINE_NAME))
+            jobs = [BakeJob(source.name, item.spec, item.metadata)
+                    for item in source.getAllContents()]
+            pool.queueJobs(jobs)
+        pool.wait()
 
-                # All good, queue the job.
-                route_index = self.app.routes.index(route)
-                job = {
-                        'type': JOB_RENDER_FIRST,
-                        'job': {
-                            'factory_info': save_factory(fac),
-                            'route_index': route_index
-                            }
-                        }
-                jobs.append(job)
-
-            ar = pool.queueJobs(jobs, handler=_handler)
-            ar.wait()
-
-    def _bakeRealmPages(self, record, pool, realm, factories):
-        def _handler(res):
-            entry = record.getCurrentEntry(res['path'])
-            entry.subs = res['sub_entries']
-            if res['errors']:
-                entry.errors += res['errors']
-                self._logErrors(res['path'], res['errors'])
-            if entry.has_any_error:
-                record.current.success = False
-            if entry.subs and entry.was_any_sub_baked:
-                record.current.baked_count[realm] += 1
-                record.current.total_baked_count[realm] += len(entry.subs)
-
-        logger.debug("Baking %d realm pages..." % len(factories))
-        with format_timed_scope(logger,
-                                "baked %d pages" % len(factories),
-                                level=logging.DEBUG, colored=False,
-                                timer_env=self.app.env,
-                                timer_category='BakeJob'):
-            jobs = []
-            for fac in factories:
-                job = self._makeBakeJob(record, fac)
-                if job is not None:
-                    jobs.append(job)
-
-            ar = pool.queueJobs(jobs, handler=_handler)
-            ar.wait()
-
-    def _bakePageGenerators(self, record, pool):
-        for gen in self.app.generators:
-            ctx = PageGeneratorBakeContext(self.app, record, pool, gen)
-            gen.bake(ctx)
-
-    def _makeBakeJob(self, record, fac):
-        # Get the previous (if any) and current entry for this page.
-        pair = record.getPreviousAndCurrentEntries(fac.path)
-        assert pair is not None
-        prev_entry, cur_entry = pair
-        assert cur_entry is not None
-
-        # Ignore if there were errors in the previous passes.
-        if cur_entry.errors:
-            logger.debug("Ignoring %s because it had previous "
-                         "errors." % fac.ref_spec)
-            return None
-
-        # Build the route metadata and find the appropriate route.
-        page = fac.buildPage()
-        route_metadata = create_route_metadata(page)
-        route = self.app.getSourceRoute(fac.source.name, route_metadata)
-        assert route is not None
-
-        # Figure out if this page is overriden by another previously
-        # baked page. This happens for example when the user has
-        # made a page that has the same page/URL as a theme page.
-        uri = route.getUri(route_metadata)
-        override_entry = record.getOverrideEntry(page.path, uri)
-        if override_entry is not None:
-            override_source = self.app.getSource(
-                    override_entry.source_name)
-            if override_source.realm == fac.source.realm:
-                cur_entry.errors.append(
-                        "Page '%s' maps to URL '%s' but is overriden "
-                        "by page '%s'." %
-                        (fac.ref_spec, uri, override_entry.path))
-                logger.error(cur_entry.errors[-1])
-            cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN
-            return None
-
-        route_index = self.app.routes.index(route)
-        job = {
-                'type': JOB_BAKE,
-                'job': {
-                        'factory_info': save_factory(fac),
-                        'generator_name': None,
-                        'generator_record_key': None,
-                        'route_index': route_index,
-                        'route_metadata': route_metadata,
-                        'dirty_source_names': record.dirty_source_names
-                        }
-                }
-        return job
-
-    def _handleDeletetions(self, record):
-        logger.debug("Handling deletions...")
-        for path, reason in record.getDeletions():
-            logger.debug("Removing '%s': %s" % (path, reason))
-            record.current.deleted.append(path)
-            try:
-                os.remove(path)
-                logger.info('[delete] %s' % path)
-            except OSError:
-                # Not a big deal if that file had already been removed
-                # by the user.
-                pass
-
-    def _logErrors(self, path, errors):
-        rel_path = os.path.relpath(path, self.app.root_dir)
-        logger.error("Errors found in %s:" % rel_path)
+    def _logErrors(self, item_spec, errors):
+        logger.error("Errors found in %s:" % item_spec)
         for e in errors:
             logger.error("  " + e)
 
-    def _createWorkerPool(self, previous_record_path):
-        from piecrust.app import PieCrustFactory
+    def _createWorkerPool(self, previous_records_path):
         from piecrust.workerpool import WorkerPool
         from piecrust.baking.worker import BakeWorkerContext, BakeWorker
 
-        appfactory = PieCrustFactory(
-                self.app.root_dir,
-                cache=self.app.cache.enabled,
-                cache_key=self.app.cache_key,
-                config_variant=self.applied_config_variant,
-                config_values=self.applied_config_values,
-                debug=self.app.debug,
-                theme_site=self.app.theme_site)
-
         worker_count = self.app.config.get('baker/workers')
         batch_size = self.app.config.get('baker/batch_size')
 
         ctx = BakeWorkerContext(
-                appfactory,
-                self.out_dir,
-                force=self.force,
-                previous_record_path=previous_record_path)
+            self.appfactory,
+            self.out_dir,
+            force=self.force,
+            previous_records_path=previous_records_path,
+            allowed_pipelines=self.allowed_pipelines)
         pool = WorkerPool(
-                worker_count=worker_count,
-                batch_size=batch_size,
-                worker_class=BakeWorker,
-                initargs=(ctx,))
+            worker_count=worker_count,
+            batch_size=batch_size,
+            worker_class=BakeWorker,
+            initargs=(ctx,),
+            callback=self._handleWorkerResult,
+            error_callback=self._handleWorkerError)
         return pool
 
+    def _handleWorkerResult(self, job, res):
+        record_name = self._getRecordName(job)
+        record = self._records.getRecord(record_name)
+        record.entries.append(res.record)
+        if not res.record.success:
+            record.success = False
+            self._records.success = False
+            self._logErrors(job.item_spec, res.record.errors)
+
+    def _handleWorkerError(self, job, exc_data):
+        e = RecordEntry()
+        e.item_spec = job.item_spec
+        e.errors.append(str(exc_data))
+
+        record_name = self._getRecordName(job)
+        record = self._records.getRecord(record_name)
+        record.entries.append(e)
+
+        record.success = False
+        self._records.success = False
+
+        self._logErrors(job.item_spec, e.errors)
+        if self.app.debug:
+            logger.error(exc_data.traceback)
+
+    def _getRecordName(self, job):
+        sn = job.source_name
+        ppn = self.app.getSource(sn).config['pipeline']
+        return '%s@%s' % (sn, ppn)