view piecrust/processing/worker.py @ 415:0e9a94b7fdfa

bake: Improve bake record information. * Store things in the bake record that require less interaction between the master process and the workers. For instance, don't store the paginator object in the render pass info -- instead, just store whether pagination was used, and whether it had more items. * Simplify information passing between workers and bake passes by saving the rendering info to the JSON cache. This means the "render first sub" job doesn't have to return anything except errors now. * Add more performance counter info.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:23:16 -0700
parents c4b3a7fd2f87
children 4a43d7015b75
line wrap: on
line source

import os.path
import re
import time
import queue
import logging
from piecrust.app import PieCrust
from piecrust.processing.base import PipelineContext
from piecrust.processing.records import (
        FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED,
        FLAG_BYPASSED_STRUCTURED_PROCESSING)
from piecrust.processing.tree import (
        ProcessingTreeBuilder, ProcessingTreeRunner,
        ProcessingTreeError, ProcessorError,
        get_node_name_tree, print_node,
        STATE_DIRTY)


logger = logging.getLogger(__name__)


split_processor_names_re = re.compile(r'[ ,]+')
re_ansicolors = re.compile('\033\\[\d+m')


def worker_func(wid, ctx):
    logger.debug("Worker %d booting up..." % wid)
    w = ProcessingWorker(wid, ctx)
    w.run()


class ProcessingWorkerContext(object):
    def __init__(self, root_dir, out_dir, tmp_dir,
                 work_queue, results, abort_event,
                 force=False, debug=False):
        self.root_dir = root_dir
        self.out_dir = out_dir
        self.tmp_dir = tmp_dir
        self.work_queue = work_queue
        self.results = results
        self.abort_event = abort_event
        self.force = force
        self.debug = debug
        self.enabled_processors = None
        self.additional_processors = None


class ProcessingWorkerJob(object):
    def __init__(self, base_dir, mount_info, path, *, force=False):
        self.base_dir = base_dir
        self.mount_info = mount_info
        self.path = path
        self.force = force


class ProcessingWorkerResult(object):
    def __init__(self, path):
        self.path = path
        self.flags = FLAG_NONE
        self.proc_tree = None
        self.rel_outputs = None
        self.errors = None


class ProcessingWorker(object):
    def __init__(self, wid, ctx):
        self.wid = wid
        self.ctx = ctx

    def run(self):
        logger.debug("Worker %d initializing..." % self.wid)
        work_start_time = time.perf_counter()

        # Create the app local to this worker.
        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
        app.env.fs_cache_only_for_main_page = True
        app.env.registerTimer("Worker_%d" % self.wid)
        app.env.registerTimer("JobReceive")
        app.env.registerTimer('BuildProcessingTree')
        app.env.registerTimer('RunProcessingTree')

        processors = app.plugin_loader.getProcessors()
        if self.ctx.enabled_processors:
            logger.debug("Filtering processors to: %s" %
                         self.ctx.enabled_processors)
            processors = get_filtered_processors(processors,
                                                 self.ctx.enabled_processors)
        if self.ctx.additional_processors:
            logger.debug("Adding %s additional processors." %
                         len(self.ctx.additional_processors))
            for proc in self.ctx.additional_processors:
                app.env.registerTimer(proc.__class__.__name__)
                proc.initialize(app)
                processors.append(proc)

        # Invoke pre-processors.
        pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir,
                                       self.ctx.tmp_dir, self.ctx.force)
        for proc in processors:
            proc.onPipelineStart(pipeline_ctx)

        # Sort our processors again in case the pre-process step involved
        # patching the processors with some new ones.
        processors.sort(key=lambda p: p.priority)

        aborted_with_exception = None
        while not self.ctx.abort_event.is_set():
            try:
                with app.env.timerScope('JobReceive'):
                    job = self.ctx.work_queue.get(True, 0.01)
            except queue.Empty:
                continue

            try:
                result = self._unsafeRun(app, processors, job)
                self.ctx.results.put_nowait(result)
            except Exception as ex:
                self.ctx.abort_event.set()
                aborted_with_exception = ex
                logger.error("[%d] Critical error, aborting." % self.wid)
                if self.ctx.debug:
                    logger.exception(ex)
                break
            finally:
                self.ctx.work_queue.task_done()

        if aborted_with_exception is not None:
            msgs = _get_errors(aborted_with_exception)
            self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})

        # Invoke post-processors.
        for proc in processors:
            proc.onPipelineEnd(pipeline_ctx)

        app.env.stepTimer("Worker_%d" % self.wid,
                          time.perf_counter() - work_start_time)
        self.ctx.results.put_nowait({
                'type': 'timers', 'data': app.env._timers})

    def _unsafeRun(self, app, processors, job):
        result = ProcessingWorkerResult(job.path)

        processors = get_filtered_processors(
                processors, job.mount_info['processors'])

        # Build the processing tree for this job.
        rel_path = os.path.relpath(job.path, job.base_dir)
        try:
            with app.env.timerScope('BuildProcessingTree'):
                builder = ProcessingTreeBuilder(processors)
                tree_root = builder.build(rel_path)
                result.flags |= FLAG_PREPARED
        except ProcessingTreeError as ex:
            result.errors += _get_errors(ex)
            return result

        # Prepare and run the tree.
        print_node(tree_root, recursive=True)
        leaves = tree_root.getLeaves()
        result.rel_outputs = [l.path for l in leaves]
        result.proc_tree = get_node_name_tree(tree_root)
        if tree_root.getProcessor().is_bypassing_structured_processing:
            result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING

        if job.force:
            tree_root.setState(STATE_DIRTY, True)

        try:
            with app.env.timerScope('RunProcessingTree'):
                runner = ProcessingTreeRunner(
                        job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
                if runner.processSubTree(tree_root):
                    result.flags |= FLAG_PROCESSED
        except ProcessingTreeError as ex:
            if isinstance(ex, ProcessorError):
                ex = ex.__cause__
            # Need to strip out colored errors from external processes.
            result.errors += _get_errors(ex, strip_colors=True)

        return result


def get_filtered_processors(processors, authorized_names):
    if not authorized_names or authorized_names == 'all':
        return processors

    if isinstance(authorized_names, str):
        authorized_names = split_processor_names_re.split(authorized_names)

    procs = []
    has_star = 'all' in authorized_names
    for p in processors:
        for name in authorized_names:
            if name == p.PROCESSOR_NAME:
                procs.append(p)
                break
            if name == ('-%s' % p.PROCESSOR_NAME):
                break
        else:
            if has_star:
                procs.append(p)
    return procs


def _get_errors(ex, strip_colors=False):
    errors = []
    while ex is not None:
        msg = str(ex)
        if strip_colors:
            msg = re_ansicolors.sub('', msg)
        errors.append(msg)
        ex = ex.__cause__
    return errors