Mercurial > piecrust2
diff piecrust/processing/worker.py @ 414:c4b3a7fd2f87
bake: Make pipeline processing multi-process.
Not many changes here, as it's pretty straightforward, but an API change for
processors so they know if they're being initialized/disposed from the main
process or from one of the workers. This makes it possible to do global stuff
that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:20:30 -0700 |
parents | |
children | 4a43d7015b75 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/processing/worker.py Sat Jun 20 19:20:30 2015 -0700 @@ -0,0 +1,213 @@ +import os.path +import re +import time +import queue +import logging +from piecrust.app import PieCrust +from piecrust.processing.base import PipelineContext +from piecrust.processing.records import ( + FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, + FLAG_BYPASSED_STRUCTURED_PROCESSING) +from piecrust.processing.tree import ( + ProcessingTreeBuilder, ProcessingTreeRunner, + ProcessingTreeError, ProcessorError, + get_node_name_tree, print_node, + STATE_DIRTY) + + +logger = logging.getLogger(__name__) + + +split_processor_names_re = re.compile(r'[ ,]+') +re_ansicolors = re.compile('\033\\[\d+m') + + +def worker_func(wid, ctx): + logger.debug("Worker %d booting up..." % wid) + w = ProcessingWorker(wid, ctx) + w.run() + + +class ProcessingWorkerContext(object): + def __init__(self, root_dir, out_dir, tmp_dir, + work_queue, results, abort_event, + force=False, debug=False): + self.root_dir = root_dir + self.out_dir = out_dir + self.tmp_dir = tmp_dir + self.work_queue = work_queue + self.results = results + self.abort_event = abort_event + self.force = force + self.debug = debug + self.enabled_processors = None + self.additional_processors = None + + +class ProcessingWorkerJob(object): + def __init__(self, base_dir, mount_info, path, *, force=False): + self.base_dir = base_dir + self.mount_info = mount_info + self.path = path + self.force = force + + +class ProcessingWorkerResult(object): + def __init__(self, path): + self.path = path + self.flags = FLAG_NONE + self.proc_tree = None + self.rel_outputs = None + self.errors = None + + +class ProcessingWorker(object): + def __init__(self, wid, ctx): + self.wid = wid + self.ctx = ctx + + def run(self): + logger.debug("Worker %d initializing..." % self.wid) + work_start_time = time.perf_counter() + + # Create the app local to this worker. + app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) + app.env.fs_cache_only_for_main_page = True + app.env.registerTimer("Worker_%d" % self.wid) + app.env.registerTimer("JobReceive") + app.env.registerTimer('BuildProcessingTree') + app.env.registerTimer('RunProcessingTree') + + processors = app.plugin_loader.getProcessors() + if self.ctx.enabled_processors: + logger.debug("Filtering processors to: %s" % + self.ctx.enabled_processors) + processors = get_filtered_processors(processors, + self.ctx.enabled_processors) + if self.ctx.additional_processors: + logger.debug("Adding %s additional processors." % + len(self.ctx.additional_processors)) + for proc in self.ctx.additional_processors: + app.env.registerTimer(proc.__class__.__name__) + proc.initialize(app) + processors.append(proc) + + # Invoke pre-processors. + pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, + self.ctx.tmp_dir, self.ctx.force) + for proc in processors: + proc.onPipelineStart(pipeline_ctx) + + # Sort our processors again in case the pre-process step involved + # patching the processors with some new ones. + processors.sort(key=lambda p: p.priority) + + aborted_with_exception = None + while not self.ctx.abort_event.is_set(): + try: + with app.env.timerScope('JobReceive'): + job = self.ctx.work_queue.get(True, 0.01) + except queue.Empty: + continue + + try: + result = self._unsafeRun(app, processors, job) + self.ctx.results.put_nowait(result) + except Exception as ex: + self.ctx.abort_event.set() + aborted_with_exception = ex + logger.error("[%d] Critical error, aborting." % self.wid) + if self.ctx.debug: + logger.exception(ex) + break + finally: + self.ctx.work_queue.task_done() + + if aborted_with_exception is not None: + msgs = _get_errors(aborted_with_exception) + self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) + + # Invoke post-processors. + for proc in processors: + proc.onPipelineEnd(pipeline_ctx) + + app.env.stepTimer("Worker_%d" % self.wid, + time.perf_counter() - work_start_time) + self.ctx.results.put_nowait({ + 'type': 'timers', 'data': app.env._timers}) + + def _unsafeRun(self, app, processors, job): + result = ProcessingWorkerResult(job.path) + + processors = get_filtered_processors( + processors, job.mount_info['processors']) + + # Build the processing tree for this job. + rel_path = os.path.relpath(job.path, job.base_dir) + try: + with app.env.timerScope('BuildProcessingTree'): + builder = ProcessingTreeBuilder(processors) + tree_root = builder.build(rel_path) + result.flags |= FLAG_PREPARED + except ProcessingTreeError as ex: + result.errors += _get_errors(ex) + return result + + # Prepare and run the tree. + print_node(tree_root, recursive=True) + leaves = tree_root.getLeaves() + result.rel_outputs = [l.path for l in leaves] + result.proc_tree = get_node_name_tree(tree_root) + if tree_root.getProcessor().is_bypassing_structured_processing: + result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING + + if job.force: + tree_root.setState(STATE_DIRTY, True) + + try: + with app.env.timerScope('RunProcessingTree'): + runner = ProcessingTreeRunner( + job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) + if runner.processSubTree(tree_root): + result.flags |= FLAG_PROCESSED + except ProcessingTreeError as ex: + if isinstance(ex, ProcessorError): + ex = ex.__cause__ + # Need to strip out colored errors from external processes. + result.errors += _get_errors(ex, strip_colors=True) + + return result + + +def get_filtered_processors(processors, authorized_names): + if not authorized_names or authorized_names == 'all': + return processors + + if isinstance(authorized_names, str): + authorized_names = split_processor_names_re.split(authorized_names) + + procs = [] + has_star = 'all' in authorized_names + for p in processors: + for name in authorized_names: + if name == p.PROCESSOR_NAME: + procs.append(p) + break + if name == ('-%s' % p.PROCESSOR_NAME): + break + else: + if has_star: + procs.append(p) + return procs + + +def _get_errors(ex, strip_colors=False): + errors = [] + while ex is not None: + msg = str(ex) + if strip_colors: + msg = re_ansicolors.sub('', msg) + errors.append(msg) + ex = ex.__cause__ + return errors +