diff piecrust/processing/worker.py @ 414:c4b3a7fd2f87

bake: Make pipeline processing multi-process. Not many changes here, as it's pretty straightforward, but an API change for processors so they know if they're being initialized/disposed from the main process or from one of the workers. This makes it possible to do global stuff that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:20:30 -0700
parents
children 4a43d7015b75
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/processing/worker.py	Sat Jun 20 19:20:30 2015 -0700
@@ -0,0 +1,213 @@
+import os.path
+import re
+import time
+import queue
+import logging
+from piecrust.app import PieCrust
+from piecrust.processing.base import PipelineContext
+from piecrust.processing.records import (
+        FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED,
+        FLAG_BYPASSED_STRUCTURED_PROCESSING)
+from piecrust.processing.tree import (
+        ProcessingTreeBuilder, ProcessingTreeRunner,
+        ProcessingTreeError, ProcessorError,
+        get_node_name_tree, print_node,
+        STATE_DIRTY)
+
+
+logger = logging.getLogger(__name__)
+
+
+split_processor_names_re = re.compile(r'[ ,]+')
+re_ansicolors = re.compile('\033\\[\d+m')
+
+
+def worker_func(wid, ctx):
+    logger.debug("Worker %d booting up..." % wid)
+    w = ProcessingWorker(wid, ctx)
+    w.run()
+
+
+class ProcessingWorkerContext(object):
+    def __init__(self, root_dir, out_dir, tmp_dir,
+                 work_queue, results, abort_event,
+                 force=False, debug=False):
+        self.root_dir = root_dir
+        self.out_dir = out_dir
+        self.tmp_dir = tmp_dir
+        self.work_queue = work_queue
+        self.results = results
+        self.abort_event = abort_event
+        self.force = force
+        self.debug = debug
+        self.enabled_processors = None
+        self.additional_processors = None
+
+
+class ProcessingWorkerJob(object):
+    def __init__(self, base_dir, mount_info, path, *, force=False):
+        self.base_dir = base_dir
+        self.mount_info = mount_info
+        self.path = path
+        self.force = force
+
+
+class ProcessingWorkerResult(object):
+    def __init__(self, path):
+        self.path = path
+        self.flags = FLAG_NONE
+        self.proc_tree = None
+        self.rel_outputs = None
+        self.errors = None
+
+
+class ProcessingWorker(object):
+    def __init__(self, wid, ctx):
+        self.wid = wid
+        self.ctx = ctx
+
+    def run(self):
+        logger.debug("Worker %d initializing..." % self.wid)
+        work_start_time = time.perf_counter()
+
+        # Create the app local to this worker.
+        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
+        app.env.fs_cache_only_for_main_page = True
+        app.env.registerTimer("Worker_%d" % self.wid)
+        app.env.registerTimer("JobReceive")
+        app.env.registerTimer('BuildProcessingTree')
+        app.env.registerTimer('RunProcessingTree')
+
+        processors = app.plugin_loader.getProcessors()
+        if self.ctx.enabled_processors:
+            logger.debug("Filtering processors to: %s" %
+                         self.ctx.enabled_processors)
+            processors = get_filtered_processors(processors,
+                                                 self.ctx.enabled_processors)
+        if self.ctx.additional_processors:
+            logger.debug("Adding %s additional processors." %
+                         len(self.ctx.additional_processors))
+            for proc in self.ctx.additional_processors:
+                app.env.registerTimer(proc.__class__.__name__)
+                proc.initialize(app)
+                processors.append(proc)
+
+        # Invoke pre-processors.
+        pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir,
+                                       self.ctx.tmp_dir, self.ctx.force)
+        for proc in processors:
+            proc.onPipelineStart(pipeline_ctx)
+
+        # Sort our processors again in case the pre-process step involved
+        # patching the processors with some new ones.
+        processors.sort(key=lambda p: p.priority)
+
+        aborted_with_exception = None
+        while not self.ctx.abort_event.is_set():
+            try:
+                with app.env.timerScope('JobReceive'):
+                    job = self.ctx.work_queue.get(True, 0.01)
+            except queue.Empty:
+                continue
+
+            try:
+                result = self._unsafeRun(app, processors, job)
+                self.ctx.results.put_nowait(result)
+            except Exception as ex:
+                self.ctx.abort_event.set()
+                aborted_with_exception = ex
+                logger.error("[%d] Critical error, aborting." % self.wid)
+                if self.ctx.debug:
+                    logger.exception(ex)
+                break
+            finally:
+                self.ctx.work_queue.task_done()
+
+        if aborted_with_exception is not None:
+            msgs = _get_errors(aborted_with_exception)
+            self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
+
+        # Invoke post-processors.
+        for proc in processors:
+            proc.onPipelineEnd(pipeline_ctx)
+
+        app.env.stepTimer("Worker_%d" % self.wid,
+                          time.perf_counter() - work_start_time)
+        self.ctx.results.put_nowait({
+                'type': 'timers', 'data': app.env._timers})
+
+    def _unsafeRun(self, app, processors, job):
+        result = ProcessingWorkerResult(job.path)
+
+        processors = get_filtered_processors(
+                processors, job.mount_info['processors'])
+
+        # Build the processing tree for this job.
+        rel_path = os.path.relpath(job.path, job.base_dir)
+        try:
+            with app.env.timerScope('BuildProcessingTree'):
+                builder = ProcessingTreeBuilder(processors)
+                tree_root = builder.build(rel_path)
+                result.flags |= FLAG_PREPARED
+        except ProcessingTreeError as ex:
+            result.errors += _get_errors(ex)
+            return result
+
+        # Prepare and run the tree.
+        print_node(tree_root, recursive=True)
+        leaves = tree_root.getLeaves()
+        result.rel_outputs = [l.path for l in leaves]
+        result.proc_tree = get_node_name_tree(tree_root)
+        if tree_root.getProcessor().is_bypassing_structured_processing:
+            result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING
+
+        if job.force:
+            tree_root.setState(STATE_DIRTY, True)
+
+        try:
+            with app.env.timerScope('RunProcessingTree'):
+                runner = ProcessingTreeRunner(
+                        job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
+                if runner.processSubTree(tree_root):
+                    result.flags |= FLAG_PROCESSED
+        except ProcessingTreeError as ex:
+            if isinstance(ex, ProcessorError):
+                ex = ex.__cause__
+            # Need to strip out colored errors from external processes.
+            result.errors += _get_errors(ex, strip_colors=True)
+
+        return result
+
+
+def get_filtered_processors(processors, authorized_names):
+    if not authorized_names or authorized_names == 'all':
+        return processors
+
+    if isinstance(authorized_names, str):
+        authorized_names = split_processor_names_re.split(authorized_names)
+
+    procs = []
+    has_star = 'all' in authorized_names
+    for p in processors:
+        for name in authorized_names:
+            if name == p.PROCESSOR_NAME:
+                procs.append(p)
+                break
+            if name == ('-%s' % p.PROCESSOR_NAME):
+                break
+        else:
+            if has_star:
+                procs.append(p)
+    return procs
+
+
+def _get_errors(ex, strip_colors=False):
+    errors = []
+    while ex is not None:
+        msg = str(ex)
+        if strip_colors:
+            msg = re_ansicolors.sub('', msg)
+        errors.append(msg)
+        ex = ex.__cause__
+    return errors
+