diff piecrust/baking/worker.py @ 447:aefe70229fdd

bake: Commonize worker pool code between html and asset baking. The `workerpool` package now defines a generic-ish worker pool. It's similar to the Python framework pool but with a simpler use-case (only one way to queue jobs) and support for workers to send a final "report" to the master process, which we use to get timing information here. The rest of the changes basically remove a whole bunch of duplicated code that's not needed anymore.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 05 Jul 2015 00:09:41 -0700
parents dc8518c51cbe
children 838f3964f400
line wrap: on
line diff
--- a/piecrust/baking/worker.py	Thu Jul 02 23:28:24 2015 -0700
+++ b/piecrust/baking/worker.py	Sun Jul 05 00:09:41 2015 -0700
@@ -1,5 +1,4 @@
 import time
-import queue
 import logging
 from piecrust.app import PieCrust
 from piecrust.baking.single import PageBaker, BakingError
@@ -7,45 +6,59 @@
         QualifiedPage, PageRenderingContext, render_page_segments)
 from piecrust.routing import create_route_metadata
 from piecrust.sources.base import PageFactory
+from piecrust.workerpool import IWorker
 
 
 logger = logging.getLogger(__name__)
 
 
-def worker_func(wid, ctx):
-    if ctx.is_profiling:
-        try:
-            import cProfile as profile
-        except ImportError:
-            import profile
-
-        ctx.is_profiling = False
-        profile.runctx('_real_worker_func(wid, ctx)',
-                       globals(), locals(),
-                       filename='BakeWorker-%d.prof' % wid)
-    else:
-        _real_worker_func(wid, ctx)
-
-
-def _real_worker_func(wid, ctx):
-    logger.debug("Worker %d booting up..." % wid)
-    w = BakeWorker(wid, ctx)
-    w.run()
-
-
 class BakeWorkerContext(object):
     def __init__(self, root_dir, sub_cache_dir, out_dir,
-                 work_queue, results, abort_event,
-                 force=False, debug=False, is_profiling=False):
+                 force=False, debug=False):
         self.root_dir = root_dir
         self.sub_cache_dir = sub_cache_dir
         self.out_dir = out_dir
-        self.work_queue = work_queue
-        self.results = results
-        self.abort_event = abort_event
         self.force = force
         self.debug = debug
-        self.is_profiling = is_profiling
+
+
+class BakeWorker(IWorker):
+    def __init__(self, ctx):
+        self.ctx = ctx
+        self.work_start_time = time.perf_counter()
+
+    def initialize(self):
+        # Create the app local to this worker.
+        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
+        app._useSubCacheDir(self.ctx.sub_cache_dir)
+        app.env.fs_cache_only_for_main_page = True
+        app.env.registerTimer("BakeWorker_%d_Total" % self.wid)
+        app.env.registerTimer("BakeWorkerInit")
+        app.env.registerTimer("JobReceive")
+        self.app = app
+
+        # Create the job handlers.
+        job_handlers = {
+                JOB_LOAD: LoadJobHandler(app, self.ctx),
+                JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
+                JOB_BAKE: BakeJobHandler(app, self.ctx)}
+        for jt, jh in job_handlers.items():
+            app.env.registerTimer(type(jh).__name__)
+        self.job_handlers = job_handlers
+
+        app.env.stepTimerSince("BakeWorkerInit", self.work_start_time)
+
+    def process(self, job):
+        handler = self.job_handlers[job.job_type]
+        with self.app.env.timerScope(type(handler).__name__):
+            return handler.handleJob(job)
+
+    def getReport(self):
+        self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
+                                    self.work_start_time)
+        return {
+                'type': 'timers',
+                'data': self.app.env._timers}
 
 
 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)
@@ -57,67 +70,6 @@
         self.payload = payload
 
 
-class BakeWorker(object):
-    def __init__(self, wid, ctx):
-        self.wid = wid
-        self.ctx = ctx
-
-    def run(self):
-        logger.debug("Working %d initializing..." % self.wid)
-        work_start_time = time.perf_counter()
-
-        # Create the app local to this worker.
-        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
-        app._useSubCacheDir(self.ctx.sub_cache_dir)
-        app.env.fs_cache_only_for_main_page = True
-        app.env.registerTimer("BakeWorker_%d_Total" % self.wid)
-        app.env.registerTimer("BakeWorkerInit")
-        app.env.registerTimer("JobReceive")
-
-        # Create the job handlers.
-        job_handlers = {
-                JOB_LOAD: LoadJobHandler(app, self.ctx),
-                JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
-                JOB_BAKE: BakeJobHandler(app, self.ctx)}
-        for jt, jh in job_handlers.items():
-            app.env.registerTimer(type(jh).__name__)
-
-        app.env.stepTimerSince("BakeWorkerInit", work_start_time)
-
-        # Start working!
-        aborted_with_exception = None
-        while not self.ctx.abort_event.is_set():
-            try:
-                with app.env.timerScope('JobReceive'):
-                    job = self.ctx.work_queue.get(True, 0.01)
-            except queue.Empty:
-                continue
-
-            try:
-                handler = job_handlers[job.job_type]
-                with app.env.timerScope(type(handler).__name__):
-                    handler.handleJob(job)
-            except Exception as ex:
-                self.ctx.abort_event.set()
-                aborted_with_exception = ex
-                logger.debug("[%d] Critical error, aborting." % self.wid)
-                if self.ctx.debug:
-                    logger.exception(ex)
-                break
-            finally:
-                self.ctx.work_queue.task_done()
-
-        if aborted_with_exception is not None:
-            msgs = _get_errors(aborted_with_exception)
-            self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
-
-        # Send our timers to the main process before exiting.
-        app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
-                               work_start_time)
-        self.ctx.results.put_nowait({
-                'type': 'timers', 'data': app.env._timers})
-
-
 class JobHandler(object):
     def __init__(self, app, ctx):
         self.app = app
@@ -203,8 +155,7 @@
             result.errors = _get_errors(ex)
             if self.ctx.debug:
                 logger.exception(ex)
-
-        self.ctx.results.put_nowait(result)
+        return result
 
 
 class RenderFirstSubJobHandler(JobHandler):
@@ -231,8 +182,7 @@
             result.errors = _get_errors(ex)
             if self.ctx.debug:
                 logger.exception(ex)
-
-        self.ctx.results.put_nowait(result)
+        return result
 
 
 class BakeJobHandler(JobHandler):
@@ -272,5 +222,5 @@
             if self.ctx.debug:
                 logger.exception(ex)
 
-        self.ctx.results.put_nowait(result)
+        return result