view piecrust/baking/baker.py @ 852:4850f8c21b6e

core: Start of the big refactor for PieCrust 3.0. * Everything is a `ContentSource`, including assets directories. * Most content sources are subclasses of the base file-system source. * A source is processed by a "pipeline", and there are 2 built-in pipelines, one for assets and one for pages. The asset pipeline is vaguely functional, but the page pipeline is completely broken right now. * Rewrite the baking process as just running appropriate pipelines on each content item. This should allow for better parallelization.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 17 May 2017 00:11:48 -0700
parents 9a92e2804562
children f070a4fc033c
line wrap: on
line source

import time
import os.path
import hashlib
import logging
from piecrust.baking.worker import BakeJob
from piecrust.chefutil import (
    format_timed_scope, format_timed)
from piecrust.environment import ExecutionStats
from piecrust.pipelines.base import PipelineContext
from piecrust.pipelines.records import (
    MultiRecordHistory, MultiRecord, RecordEntry,
    load_records)
from piecrust.sources.base import REALM_USER, REALM_THEME


logger = logging.getLogger(__name__)


def get_bake_records_path(app, out_dir):
    records_cache = app.cache.getCache('baker')
    records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest()
    records_name = records_id + '.record'
    return records_cache.getCachePath(records_name)


class Baker(object):
    def __init__(self, appfactory, app, out_dir,
                 force=False, allowed_pipelines=None):
        self.appfactory = appfactory
        self.app = app
        self.out_dir = out_dir
        self.force = force

        self._pipeline_classes = {}
        for pclass in app.plugin_loader.getPipelines():
            self._pipeline_classes[pclass.PIPELINE_NAME] = pclass

        self.allowed_pipelines = allowed_pipelines
        if allowed_pipelines is None:
            self.allowed_pipelines = list(self._pipeline_classes.keys())

        self._records = None

    def bake(self):
        start_time = time.perf_counter()
        logger.debug("  Bake Output: %s" % self.out_dir)
        logger.debug("  Root URL: %s" % self.app.config.get('site/root'))

        # Get into bake mode.
        self.app.config.set('baker/is_baking', True)
        self.app.config.set('site/base_asset_url_format', '%uri')

        # Make sure the output directory exists.
        if not os.path.isdir(self.out_dir):
            os.makedirs(self.out_dir, 0o755)

        # Load/create the bake records.
        records_path = get_bake_records_path(
            self.app, self.out_dir)
        if not self.force and os.path.isfile(records_path):
            with format_timed_scope(logger, "loaded previous bake records",
                                    level=logging.DEBUG, colored=False):
                previous_records = load_records(records_path)
        else:
            previous_records = MultiRecord()
        self._records = MultiRecord()

        # Figure out if we need to clean the cache because important things
        # have changed.
        is_cache_valid = self._handleCacheValidity(previous_records,
                                                   self._records)
        if not is_cache_valid:
            previous_records = MultiRecord()

        # Create the bake records history which tracks what's up-to-date
        # or not since last time we baked to the given output folder.
        record_history = MultiRecordHistory(previous_records, self._records)

        # Pre-create all caches.
        for cache_name in ['app', 'baker', 'pages', 'renders']:
            self.app.cache.getCache(cache_name)

        # Gather all sources by realm -- we're going to bake each realm
        # separately so we can handle "overriding" (i.e. one realm overrides
        # another realm's pages, like the user realm overriding the theme
        # realm).
        #
        # Also, create and initialize each pipeline for each source.
        sources_by_realm = {}
        ppctx = PipelineContext(self.out_dir, record_history,
                                force=self.force)
        for source in self.app.sources:
            pname = source.config['pipeline']
            if pname in self.allowed_pipelines:
                srclist = sources_by_realm.setdefault(
                    source.config['realm'], [])

                pp = self._pipeline_classes[pname](source)
                pp.initialize(ppctx)
                srclist.append((source, pp))
            else:
                logger.debug(
                    "Skip source '%s' because pipeline '%s' is ignored." %
                    (source.name, pname))

        # Create the worker processes.
        pool = self._createWorkerPool(records_path)

        # Bake the realms -- user first, theme second, so that a user item
        # can override a theme item.
        realm_list = [REALM_USER, REALM_THEME]
        for realm in realm_list:
            srclist = sources_by_realm.get(realm)
            if srclist is not None:
                self._bakeRealm(record_history, pool, realm, srclist)

        # All done with the workers. Close the pool and get reports.
        pool_stats = pool.close()
        total_stats = ExecutionStats()
        for ps in pool_stats:
            if ps is not None:
                total_stats.mergeStats(ps)
        record_history.current.stats = total_stats

        # Shutdown the pipelines.
        for realm in realm_list:
            srclist = sources_by_realm.get(realm)
            if srclist is not None:
                for _, pp in srclist:
                    pp.shutdown(ppctx)

        # Backup previous records.
        records_dir, records_fn = os.path.split(records_path)
        records_id, _ = os.path.splitext(records_fn)
        for i in range(8, -1, -1):
            suffix = '' if i == 0 else '.%d' % i
            records_path_i = os.path.join(
                records_dir,
                '%s%s.record' % (records_id, suffix))
            if os.path.exists(records_path_i):
                records_path_next = os.path.join(
                    records_dir,
                    '%s.%s.record' % (records_id, i + 1))
                if os.path.exists(records_path_next):
                    os.remove(records_path_next)
                os.rename(records_path_i, records_path_next)

        # Save the bake record.
        with format_timed_scope(logger, "saved bake records.",
                                level=logging.DEBUG, colored=False):
            record_history.current.bake_time = time.time()
            record_history.current.out_dir = self.out_dir
            record_history.current.save(records_path)

        # All done.
        self.app.config.set('baker/is_baking', False)
        logger.debug(format_timed(start_time, 'done baking'))

        self._records = None
        return record_history.current

    def _handleCacheValidity(self, previous_records, current_records):
        start_time = time.perf_counter()

        reason = None
        if self.force:
            reason = "ordered to"
        elif not self.app.config.get('__cache_valid'):
            # The configuration file was changed, or we're running a new
            # version of the app.
            reason = "not valid anymore"
        elif previous_records.invalidated:
            # We have no valid previous bake record.
            reason = "need bake record regeneration"
        else:
            # Check if any template has changed since the last bake. Since
            # there could be some advanced conditional logic going on, we'd
            # better just force a bake from scratch if that's the case.
            max_time = 0
            for d in self.app.templates_dirs:
                for dpath, _, filenames in os.walk(d):
                    for fn in filenames:
                        full_fn = os.path.join(dpath, fn)
                        max_time = max(max_time, os.path.getmtime(full_fn))
            if max_time >= previous_records.bake_time:
                reason = "templates modified"

        if reason is not None:
            # We have to bake everything from scratch.
            self.app.cache.clearCaches(except_names=['app', 'baker'])
            self.force = True
            current_records.incremental_count = 0
            previous_records = MultiRecord()
            logger.info(format_timed(
                start_time, "cleaned cache (reason: %s)" % reason))
            return False
        else:
            current_records.incremental_count += 1
            logger.debug(format_timed(
                start_time, "cache is assumed valid", colored=False))
            return True

    def _bakeRealm(self, record_history, pool, realm, srclist):
        for source, pp in srclist:
            logger.debug("Queuing jobs for source '%s' using pipeline '%s'." %
                         (source.name, pp.PIPELINE_NAME))
            jobs = [BakeJob(source.name, item.spec, item.metadata)
                    for item in source.getAllContents()]
            pool.queueJobs(jobs)
        pool.wait()

    def _logErrors(self, item_spec, errors):
        logger.error("Errors found in %s:" % item_spec)
        for e in errors:
            logger.error("  " + e)

    def _createWorkerPool(self, previous_records_path):
        from piecrust.workerpool import WorkerPool
        from piecrust.baking.worker import BakeWorkerContext, BakeWorker

        worker_count = self.app.config.get('baker/workers')
        batch_size = self.app.config.get('baker/batch_size')

        ctx = BakeWorkerContext(
            self.appfactory,
            self.out_dir,
            force=self.force,
            previous_records_path=previous_records_path,
            allowed_pipelines=self.allowed_pipelines)
        pool = WorkerPool(
            worker_count=worker_count,
            batch_size=batch_size,
            worker_class=BakeWorker,
            initargs=(ctx,),
            callback=self._handleWorkerResult,
            error_callback=self._handleWorkerError)
        return pool

    def _handleWorkerResult(self, job, res):
        record_name = self._getRecordName(job)
        record = self._records.getRecord(record_name)
        record.entries.append(res.record)
        if not res.record.success:
            record.success = False
            self._records.success = False
            self._logErrors(job.item_spec, res.record.errors)

    def _handleWorkerError(self, job, exc_data):
        e = RecordEntry()
        e.item_spec = job.item_spec
        e.errors.append(str(exc_data))

        record_name = self._getRecordName(job)
        record = self._records.getRecord(record_name)
        record.entries.append(e)

        record.success = False
        self._records.success = False

        self._logErrors(job.item_spec, e.errors)
        if self.app.debug:
            logger.error(exc_data.traceback)

    def _getRecordName(self, job):
        sn = job.source_name
        ppn = self.app.getSource(sn).config['pipeline']
        return '%s@%s' % (sn, ppn)