view piecrust/processing/worker.py @ 550:6f216c1ab6b1

bake: Add a flag to know which record entries got collapsed from last run. This makes it possible to find entries for things that were actually baked during the current run, as opposed to skipped because they were "clean".
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 04 Aug 2015 21:22:30 -0700
parents aefe70229fdd
children 3ceeca7bb71c
line wrap: on
line source

import re
import os.path
import time
import logging
from piecrust.app import PieCrust
from piecrust.processing.base import PipelineContext
from piecrust.processing.records import (
        FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED,
        FLAG_BYPASSED_STRUCTURED_PROCESSING)
from piecrust.processing.tree import (
        ProcessingTreeBuilder, ProcessingTreeRunner,
        ProcessingTreeError, ProcessorError,
        get_node_name_tree, print_node,
        STATE_DIRTY)
from piecrust.workerpool import IWorker


logger = logging.getLogger(__name__)


split_processor_names_re = re.compile(r'[ ,]+')
re_ansicolors = re.compile('\033\\[\d+m')


class ProcessingWorkerContext(object):
    def __init__(self, root_dir, out_dir, tmp_dir,
                 force=False, debug=False):
        self.root_dir = root_dir
        self.out_dir = out_dir
        self.tmp_dir = tmp_dir
        self.force = force
        self.debug = debug
        self.is_profiling = False
        self.enabled_processors = None
        self.additional_processors = None


class ProcessingWorkerJob(object):
    def __init__(self, base_dir, mount_info, path, *, force=False):
        self.base_dir = base_dir
        self.mount_info = mount_info
        self.path = path
        self.force = force


class ProcessingWorkerResult(object):
    def __init__(self, path):
        self.path = path
        self.flags = FLAG_NONE
        self.proc_tree = None
        self.rel_outputs = None
        self.errors = None


class ProcessingWorker(IWorker):
    def __init__(self, ctx):
        self.ctx = ctx
        self.work_start_time = time.perf_counter()

    def initialize(self):
        # Create the app local to this worker.
        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
        app.env.registerTimer("PipelineWorker_%d_Total" % self.wid)
        app.env.registerTimer("PipelineWorkerInit")
        app.env.registerTimer("JobReceive")
        app.env.registerTimer('BuildProcessingTree')
        app.env.registerTimer('RunProcessingTree')
        self.app = app

        processors = app.plugin_loader.getProcessors()
        if self.ctx.enabled_processors:
            logger.debug("Filtering processors to: %s" %
                         self.ctx.enabled_processors)
            processors = get_filtered_processors(processors,
                                                 self.ctx.enabled_processors)
        if self.ctx.additional_processors:
            logger.debug("Adding %s additional processors." %
                         len(self.ctx.additional_processors))
            for proc in self.ctx.additional_processors:
                app.env.registerTimer(proc.__class__.__name__)
                proc.initialize(app)
                processors.append(proc)
        self.processors = processors

        # Invoke pre-processors.
        pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
                                       self.ctx.tmp_dir, self.ctx.force)
        for proc in processors:
            proc.onPipelineStart(pipeline_ctx)

        # Sort our processors again in case the pre-process step involved
        # patching the processors with some new ones.
        processors.sort(key=lambda p: p.priority)

        app.env.stepTimerSince("PipelineWorkerInit", self.work_start_time)

    def process(self, job):
        result = ProcessingWorkerResult(job.path)

        processors = get_filtered_processors(
                self.processors, job.mount_info['processors'])

        # Build the processing tree for this job.
        rel_path = os.path.relpath(job.path, job.base_dir)
        try:
            with self.app.env.timerScope('BuildProcessingTree'):
                builder = ProcessingTreeBuilder(processors)
                tree_root = builder.build(rel_path)
                result.flags |= FLAG_PREPARED
        except ProcessingTreeError as ex:
            result.errors = _get_errors(ex)
            return result

        # Prepare and run the tree.
        print_node(tree_root, recursive=True)
        leaves = tree_root.getLeaves()
        result.rel_outputs = [l.path for l in leaves]
        result.proc_tree = get_node_name_tree(tree_root)
        if tree_root.getProcessor().is_bypassing_structured_processing:
            result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING

        if job.force:
            tree_root.setState(STATE_DIRTY, True)

        try:
            with self.app.env.timerScope('RunProcessingTree'):
                runner = ProcessingTreeRunner(
                        job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
                if runner.processSubTree(tree_root):
                    result.flags |= FLAG_PROCESSED
        except ProcessingTreeError as ex:
            if isinstance(ex, ProcessorError):
                ex = ex.__cause__
            # Need to strip out colored errors from external processes.
            result.errors = _get_errors(ex, strip_colors=True)

        return result

    def getReport(self):
        # Invoke post-processors.
        pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
                                       self.ctx.tmp_dir, self.ctx.force)
        for proc in self.processors:
            proc.onPipelineEnd(pipeline_ctx)

        self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid,
                                    self.work_start_time)
        return {
                'type': 'timers',
                'data': self.app.env._timers}


def get_filtered_processors(processors, authorized_names):
    if not authorized_names or authorized_names == 'all':
        return processors

    if isinstance(authorized_names, str):
        authorized_names = split_processor_names_re.split(authorized_names)

    procs = []
    has_star = 'all' in authorized_names
    for p in processors:
        for name in authorized_names:
            if name == p.PROCESSOR_NAME:
                procs.append(p)
                break
            if name == ('-%s' % p.PROCESSOR_NAME):
                break
        else:
            if has_star:
                procs.append(p)
    return procs


def _get_errors(ex, strip_colors=False):
    errors = []
    while ex is not None:
        msg = str(ex)
        if strip_colors:
            msg = re_ansicolors.sub('', msg)
        errors.append(msg)
        ex = ex.__cause__
    return errors