view piecrust/baking/baker.py @ 1136:5f97b5b59dfe

bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 23 Apr 2018 21:47:49 -0700
parents 971b4d67e82a
children
line wrap: on
line source

import time
import os.path
import hashlib
import logging
from piecrust.chefutil import (
    format_timed_scope, format_timed)
from piecrust.environment import ExecutionStats
from piecrust.pipelines.base import (
    PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager,
    get_pipeline_name_for_source)
from piecrust.pipelines.records import (
    MultiRecordHistory, MultiRecord,
    load_records)
from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES


logger = logging.getLogger(__name__)


def get_bake_records_path(app, out_dir, *, suffix=''):
    records_cache = app.cache.getCache('baker')
    records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest()
    records_name = '%s%s.records' % (records_id, suffix)
    return records_cache.getCachePath(records_name)


class Baker(object):
    def __init__(self, appfactory, app, out_dir, *,
                 force=False,
                 allowed_pipelines=None,
                 forbidden_pipelines=None,
                 allowed_sources=None,
                 rotate_bake_records=True,
                 keep_unused_records=False):
        self.appfactory = appfactory
        self.app = app
        self.out_dir = out_dir
        self.force = force
        self.allowed_pipelines = allowed_pipelines
        self.forbidden_pipelines = forbidden_pipelines
        self.allowed_sources = allowed_sources
        self.rotate_bake_records = rotate_bake_records
        self.keep_unused_records = keep_unused_records

    def bake(self):
        start_time = time.perf_counter()

        # Setup baker.
        logger.debug("  Bake Output: %s" % self.out_dir)
        logger.debug("  Root URL: %s" % self.app.config.get('site/root'))

        # Get into bake mode.
        self.app.config.set('baker/is_baking', True)
        self.app.config.set('site/asset_url_format', '%page_uri%/%filename%')

        stats = self.app.env.stats
        stats.registerTimer('LoadSourceContents', raise_if_registered=False)
        stats.registerTimer('CacheTemplates', raise_if_registered=False)

        # Make sure the output directory exists.
        if not os.path.isdir(self.out_dir):
            os.makedirs(self.out_dir, 0o755)

        # Load/create the bake records.
        records_path = get_bake_records_path(
            self.app, self.out_dir)
        if not self.force and os.path.isfile(records_path):
            with format_timed_scope(logger, "loaded previous bake records",
                                    level=logging.DEBUG, colored=False):
                previous_records = load_records(records_path)
        else:
            previous_records = MultiRecord()
        current_records = MultiRecord()

        # Figure out if we need to clean the cache because important things
        # have changed.
        is_cache_valid = self._handleCacheValidity(previous_records,
                                                   current_records)
        if not is_cache_valid:
            previous_records = MultiRecord()

        # Create the bake records history which tracks what's up-to-date
        # or not since last time we baked to the given output folder.
        record_histories = MultiRecordHistory(
            previous_records, current_records)

        # Pre-create all caches.
        for cache_name in ['app', 'baker', 'pages', 'renders']:
            self.app.cache.getCache(cache_name)

        # Create the pipelines.
        ppmngr = self._createPipelineManager(record_histories)

        # Done with all the setup, let's start the actual work.
        logger.info(format_timed(start_time, "setup baker"))

        # Load all sources, pre-cache templates.
        load_start_time = time.perf_counter()
        self._populateTemplateCaches()
        logger.info(format_timed(load_start_time, "cache templates"))

        # Create the worker processes.
        pool_userdata = _PoolUserData(self, ppmngr)
        pool = self._createWorkerPool(records_path, pool_userdata)

        # Bake the realms.
        self._bakeRealms(pool, ppmngr, record_histories)

        # Handle deletions, collapse records, etc.
        ppmngr.postJobRun()
        ppmngr.deleteStaleOutputs()
        ppmngr.collapseRecords(self.keep_unused_records)

        # All done with the workers. Close the pool and get reports.
        pool_stats = pool.close()
        current_records.stats = _merge_execution_stats(stats, *pool_stats)

        # Shutdown the pipelines.
        ppmngr.shutdownPipelines()

        # Backup previous records, save the current ones.
        current_records.bake_time = time.time()
        current_records.out_dir = self.out_dir
        _save_bake_records(current_records, records_path,
                           rotate_previous=self.rotate_bake_records)

        # All done.
        self.app.config.set('baker/is_baking', False)
        logger.debug(format_timed(start_time, 'done baking'))

        return current_records

    def _handleCacheValidity(self, previous_records, current_records):
        start_time = time.perf_counter()

        reason = None
        if self.force:
            reason = "ordered to"
        elif not self.app.config.get('__cache_valid'):
            # The configuration file was changed, or we're running a new
            # version of the app.
            reason = "not valid anymore"
        elif previous_records.invalidated:
            # We have no valid previous bake records.
            reason = "need bake records regeneration"
        else:
            # Check if any template has changed since the last bake. Since
            # there could be some advanced conditional logic going on, we'd
            # better just force a bake from scratch if that's the case.
            max_time = 0
            for d in self.app.templates_dirs:
                for dpath, _, filenames in os.walk(d):
                    for fn in filenames:
                        full_fn = os.path.join(dpath, fn)
                        max_time = max(max_time, os.path.getmtime(full_fn))
            if max_time >= previous_records.bake_time:
                reason = "templates modified"

        if reason is not None:
            # We have to bake everything from scratch.
            self.app.cache.clearCaches(except_names=['app', 'baker'])
            self.force = True
            current_records.incremental_count = 0
            previous_records = MultiRecord()
            logger.debug(format_timed(
                start_time, "cleaned cache (reason: %s)" % reason,
                colored=False))
            return False
        else:
            current_records.incremental_count += 1
            logger.debug(format_timed(
                start_time, "cache is assumed valid", colored=False))
            return True

    def _createPipelineManager(self, record_histories):
        # Gather all sources by realm -- we're going to bake each realm
        # separately so we can handle "overriding" (i.e. one realm overrides
        # another realm's pages, like the user realm overriding the theme
        # realm).
        #
        # Also, create and initialize each pipeline for each source.
        has_any_pp = False
        ppmngr = PipelineManager(
            self.app, self.out_dir,
            record_histories=record_histories)
        ok_pp = self.allowed_pipelines
        nok_pp = self.forbidden_pipelines
        ok_src = self.allowed_sources
        for source in self.app.sources:
            if ok_src is not None and source.name not in ok_src:
                continue

            pname = get_pipeline_name_for_source(source)
            if ok_pp is not None and pname not in ok_pp:
                continue
            if nok_pp is not None and pname in nok_pp:
                continue

            ppinfo = ppmngr.createPipeline(source)
            logger.debug(
                "Created pipeline '%s' for source: %s" %
                (ppinfo.pipeline.PIPELINE_NAME, source.name))
            has_any_pp = True
        if not has_any_pp:
            raise Exception("The website has no content sources, or the bake "
                            "command was invoked with all pipelines filtered "
                            "out. There's nothing to do.")
        return ppmngr

    def _populateTemplateCaches(self):
        engine_name = self.app.config.get('site/default_template_engine')
        for engine in self.app.plugin_loader.getTemplateEngines():
            if engine_name in engine.ENGINE_NAMES:
                engine.populateCache()
                break

    def _bakeRealms(self, pool, ppmngr, record_histories):
        # Bake the realms -- user first, theme second, so that a user item
        # can override a theme item.
        # Do this for as many times as we have pipeline passes left to do.
        realm_list = [REALM_USER, REALM_THEME]
        pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm(
            ppmngr.getPipelineInfos())

        for pp_pass_num in sorted(pp_by_pass_and_realm.keys()):
            logger.debug("Pipelines pass %d" % pp_pass_num)
            pp_by_realm = pp_by_pass_and_realm[pp_pass_num]
            for realm in realm_list:
                pplist = pp_by_realm.get(realm)
                if pplist is not None:
                    self._bakeRealm(pool, ppmngr, record_histories,
                                    pp_pass_num, realm, pplist)

    def _bakeRealm(self, pool, ppmngr, record_histories,
                   pp_pass_num, realm, pplist):
        start_time = time.perf_counter()

        job_count = 0
        job_descs = {}
        realm_name = REALM_NAMES[realm].lower()
        pool.userdata.cur_pass = pp_pass_num

        for ppinfo in pplist:
            src = ppinfo.source
            pp = ppinfo.pipeline
            jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name,
                                             record_histories)

            jobs, job_desc = pp.createJobs(jcctx)
            if jobs is not None:
                new_job_count = len(jobs)
                job_count += new_job_count
                pool.queueJobs(jobs)
                if job_desc:
                    job_descs.setdefault(job_desc, []).append(src.name)
            else:
                new_job_count = 0

            logger.debug(
                "Queued %d jobs for source '%s' using pipeline '%s' "
                "(%s)." %
                (new_job_count, src.name, pp.PIPELINE_NAME, realm_name))

        if job_count == 0:
            logger.debug("No jobs queued! Bailing out of this bake pass.")
            return

        pool.wait()

        logger.info(format_timed(
            start_time, "%d jobs completed (%s)." %
            (job_count, ', '.join(
                ['%s %s' % (d, ', '.join(sn))
                 for d, sn in job_descs.items()]))))

    def _logErrors(self, item_spec, errors):
        logger.error("Errors found in %s:" % item_spec)
        for e in errors:
            logger.error("  " + e)

    def _logWorkerException(self, item_spec, exc_data):
        logger.error("Errors found in %s:" % item_spec)
        logger.error(exc_data['value'])
        if self.app.debug:
            logger.error(exc_data['traceback'])

    def _createWorkerPool(self, previous_records_path, pool_userdata):
        from piecrust.workerpool import WorkerPool
        from piecrust.baking.worker import BakeWorkerContext, BakeWorker

        worker_count = self.app.config.get('baker/workers')
        batch_size = self.app.config.get('baker/batch_size')

        ctx = BakeWorkerContext(
            self.appfactory,
            self.out_dir,
            force=self.force,
            previous_records_path=previous_records_path,
            allowed_pipelines=self.allowed_pipelines,
            forbidden_pipelines=self.forbidden_pipelines)
        pool = WorkerPool(
            worker_count=worker_count,
            batch_size=batch_size,
            worker_class=BakeWorker,
            initargs=(ctx,),
            callback=self._handleWorkerResult,
            error_callback=self._handleWorkerError,
            userdata=pool_userdata)
        return pool

    def _handleWorkerResult(self, job, res, userdata):
        cur_pass = userdata.cur_pass
        source_name, item_spec = job['job_spec']

        # Make the pipeline do custom handling to update the record entry.
        ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
        pipeline = ppinfo.pipeline
        record = ppinfo.current_record
        ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass)
        pipeline.handleJobResult(res, ppmrctx)

        # Set the overall success flags if there was an error.
        record_entry = ppmrctx.record_entry
        if not record_entry.success:
            record.success = False
            userdata.records.success = False
            self._logErrors(job['item_spec'], record_entry.errors)

    def _handleWorkerError(self, job, exc_data, userdata):
        # Set the overall success flag.
        source_name, item_spec = job['job_spec']
        ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
        pipeline = ppinfo.pipeline
        record = ppinfo.current_record
        record.success = False
        userdata.records.success = False

        # Add those errors to the record, if possible.
        record_entry_spec = job.get('record_entry_spec', item_spec)
        e = record.getEntry(record_entry_spec)
        if not e:
            e = pipeline.createRecordEntry(item_spec)
            record.addEntry(e)
        e.errors.append(exc_data['value'])
        self._logWorkerException(item_spec, exc_data)

        # Log debug stuff.
        if self.app.debug:
            logger.error(exc_data['traceback'])


class _PoolUserData:
    def __init__(self, baker, ppmngr):
        self.baker = baker
        self.ppmngr = ppmngr
        self.records = ppmngr.record_histories.current
        self.cur_pass = 0


def _get_pipeline_infos_by_pass_and_realm(pp_infos):
    pp_by_pass_and_realm = {}
    for pp_info in pp_infos:
        pp_pass_num = pp_info.pipeline.PASS_NUM
        if isinstance(pp_pass_num, list):
            for ppn in pp_pass_num:
                _add_pipeline_info_to_pass_and_realm_dict(
                    ppn, pp_info, pp_by_pass_and_realm)
        else:
            _add_pipeline_info_to_pass_and_realm_dict(
                pp_pass_num, pp_info, pp_by_pass_and_realm)
    return pp_by_pass_and_realm


def _add_pipeline_info_to_pass_and_realm_dict(pp_pass_num, pp_info,
                                              pp_by_pass_and_realm):
    pp_by_realm = pp_by_pass_and_realm.setdefault(pp_pass_num, {})
    pplist = pp_by_realm.setdefault(
        pp_info.pipeline.source.config['realm'], [])
    pplist.append(pp_info)


def _merge_execution_stats(base_stats, *other_stats):
    total_stats = ExecutionStats()
    total_stats.mergeStats(base_stats)
    for ps in other_stats:
        if ps is not None:
            total_stats.mergeStats(ps)
    return total_stats


def _save_bake_records(records, records_path, *, rotate_previous):
    if rotate_previous:
        records_dir, records_fn = os.path.split(records_path)
        records_id, _ = os.path.splitext(records_fn)
        for i in range(8, -1, -1):
            suffix = '' if i == 0 else '.%d' % i
            records_path_i = os.path.join(
                records_dir,
                '%s%s.records' % (records_id, suffix))
            if os.path.exists(records_path_i):
                records_path_next = os.path.join(
                    records_dir,
                    '%s.%s.records' % (records_id, i + 1))
                if os.path.exists(records_path_next):
                    os.remove(records_path_next)
                os.rename(records_path_i, records_path_next)

    with format_timed_scope(logger, "saved bake records.",
                            level=logging.DEBUG, colored=False):
        records.save(records_path)