diff piecrust/processing/worker.py @ 447:aefe70229fdd

bake: Commonize worker pool code between html and asset baking. The `workerpool` package now defines a generic-ish worker pool. It's similar to the Python framework pool but with a simpler use-case (only one way to queue jobs) and support for workers to send a final "report" to the master process, which we use to get timing information here. The rest of the changes basically remove a whole bunch of duplicated code that's not needed anymore.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 05 Jul 2015 00:09:41 -0700
parents 171dde4f61dc
children 3ceeca7bb71c
line wrap: on
line diff
--- a/piecrust/processing/worker.py	Thu Jul 02 23:28:24 2015 -0700
+++ b/piecrust/processing/worker.py	Sun Jul 05 00:09:41 2015 -0700
@@ -1,7 +1,6 @@
+import re
 import os.path
-import re
 import time
-import queue
 import logging
 from piecrust.app import PieCrust
 from piecrust.processing.base import PipelineContext
@@ -13,6 +12,7 @@
         ProcessingTreeError, ProcessorError,
         get_node_name_tree, print_node,
         STATE_DIRTY)
+from piecrust.workerpool import IWorker
 
 
 logger = logging.getLogger(__name__)
@@ -22,37 +22,12 @@
 re_ansicolors = re.compile('\033\\[\d+m')
 
 
-def worker_func(wid, ctx):
-    if ctx.is_profiling:
-        try:
-            import cProfile as profile
-        except ImportError:
-            import profile
-
-        ctx.is_profiling = False
-        profile.runctx('_real_worker_func(wid, ctx)',
-                       globals(), locals(),
-                       filename='PipelineWorker-%d.prof' % wid)
-    else:
-        _real_worker_func(wid, ctx)
-
-
-def _real_worker_func(wid, ctx):
-    logger.debug("Worker %d booting up..." % wid)
-    w = ProcessingWorker(wid, ctx)
-    w.run()
-
-
 class ProcessingWorkerContext(object):
     def __init__(self, root_dir, out_dir, tmp_dir,
-                 work_queue, results, abort_event,
                  force=False, debug=False):
         self.root_dir = root_dir
         self.out_dir = out_dir
         self.tmp_dir = tmp_dir
-        self.work_queue = work_queue
-        self.results = results
-        self.abort_event = abort_event
         self.force = force
         self.debug = debug
         self.is_profiling = False
@@ -77,23 +52,20 @@
         self.errors = None
 
 
-class ProcessingWorker(object):
-    def __init__(self, wid, ctx):
-        self.wid = wid
+class ProcessingWorker(IWorker):
+    def __init__(self, ctx):
         self.ctx = ctx
+        self.work_start_time = time.perf_counter()
 
-    def run(self):
-        logger.debug("Worker %d initializing..." % self.wid)
-        work_start_time = time.perf_counter()
-
+    def initialize(self):
         # Create the app local to this worker.
         app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
-        app.env.fs_cache_only_for_main_page = True
         app.env.registerTimer("PipelineWorker_%d_Total" % self.wid)
         app.env.registerTimer("PipelineWorkerInit")
         app.env.registerTimer("JobReceive")
         app.env.registerTimer('BuildProcessingTree')
         app.env.registerTimer('RunProcessingTree')
+        self.app = app
 
         processors = app.plugin_loader.getProcessors()
         if self.ctx.enabled_processors:
@@ -108,9 +80,10 @@
                 app.env.registerTimer(proc.__class__.__name__)
                 proc.initialize(app)
                 processors.append(proc)
+        self.processors = processors
 
         # Invoke pre-processors.
-        pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir,
+        pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
                                        self.ctx.tmp_dir, self.ctx.force)
         for proc in processors:
             proc.onPipelineStart(pipeline_ctx)
@@ -119,52 +92,18 @@
         # patching the processors with some new ones.
         processors.sort(key=lambda p: p.priority)
 
-        app.env.stepTimerSince("PipelineWorkerInit", work_start_time)
-
-        aborted_with_exception = None
-        while not self.ctx.abort_event.is_set():
-            try:
-                with app.env.timerScope('JobReceive'):
-                    job = self.ctx.work_queue.get(True, 0.01)
-            except queue.Empty:
-                continue
+        app.env.stepTimerSince("PipelineWorkerInit", self.work_start_time)
 
-            try:
-                result = self._unsafeRun(app, processors, job)
-                self.ctx.results.put_nowait(result)
-            except Exception as ex:
-                self.ctx.abort_event.set()
-                aborted_with_exception = ex
-                logger.error("[%d] Critical error, aborting." % self.wid)
-                if self.ctx.debug:
-                    logger.exception(ex)
-                break
-            finally:
-                self.ctx.work_queue.task_done()
-
-        if aborted_with_exception is not None:
-            msgs = _get_errors(aborted_with_exception)
-            self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
-
-        # Invoke post-processors.
-        for proc in processors:
-            proc.onPipelineEnd(pipeline_ctx)
-
-        app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid,
-                               work_start_time)
-        self.ctx.results.put_nowait({
-                'type': 'timers', 'data': app.env._timers})
-
-    def _unsafeRun(self, app, processors, job):
+    def process(self, job):
         result = ProcessingWorkerResult(job.path)
 
         processors = get_filtered_processors(
-                processors, job.mount_info['processors'])
+                self.processors, job.mount_info['processors'])
 
         # Build the processing tree for this job.
         rel_path = os.path.relpath(job.path, job.base_dir)
         try:
-            with app.env.timerScope('BuildProcessingTree'):
+            with self.app.env.timerScope('BuildProcessingTree'):
                 builder = ProcessingTreeBuilder(processors)
                 tree_root = builder.build(rel_path)
                 result.flags |= FLAG_PREPARED
@@ -184,7 +123,7 @@
             tree_root.setState(STATE_DIRTY, True)
 
         try:
-            with app.env.timerScope('RunProcessingTree'):
+            with self.app.env.timerScope('RunProcessingTree'):
                 runner = ProcessingTreeRunner(
                         job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
                 if runner.processSubTree(tree_root):
@@ -197,6 +136,19 @@
 
         return result
 
+    def getReport(self):
+        # Invoke post-processors.
+        pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
+                                       self.ctx.tmp_dir, self.ctx.force)
+        for proc in self.processors:
+            proc.onPipelineEnd(pipeline_ctx)
+
+        self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid,
+                                    self.work_start_time)
+        return {
+                'type': 'timers',
+                'data': self.app.env._timers}
+
 
 def get_filtered_processors(processors, authorized_names):
     if not authorized_names or authorized_names == 'all':