Mercurial > piecrust2
diff piecrust/baking/worker.py @ 447:aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
The `workerpool` package now defines a generic-ish worker pool. It's similar
to the Python framework pool but with a simpler use-case (only one way to
queue jobs) and support for workers to send a final "report" to the master
process, which we use to get timing information here.
The rest of the changes basically remove a whole bunch of duplicated code
that's not needed anymore.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 05 Jul 2015 00:09:41 -0700 |
parents | dc8518c51cbe |
children | 838f3964f400 |
line wrap: on
line diff
--- a/piecrust/baking/worker.py Thu Jul 02 23:28:24 2015 -0700 +++ b/piecrust/baking/worker.py Sun Jul 05 00:09:41 2015 -0700 @@ -1,5 +1,4 @@ import time -import queue import logging from piecrust.app import PieCrust from piecrust.baking.single import PageBaker, BakingError @@ -7,45 +6,59 @@ QualifiedPage, PageRenderingContext, render_page_segments) from piecrust.routing import create_route_metadata from piecrust.sources.base import PageFactory +from piecrust.workerpool import IWorker logger = logging.getLogger(__name__) -def worker_func(wid, ctx): - if ctx.is_profiling: - try: - import cProfile as profile - except ImportError: - import profile - - ctx.is_profiling = False - profile.runctx('_real_worker_func(wid, ctx)', - globals(), locals(), - filename='BakeWorker-%d.prof' % wid) - else: - _real_worker_func(wid, ctx) - - -def _real_worker_func(wid, ctx): - logger.debug("Worker %d booting up..." % wid) - w = BakeWorker(wid, ctx) - w.run() - - class BakeWorkerContext(object): def __init__(self, root_dir, sub_cache_dir, out_dir, - work_queue, results, abort_event, - force=False, debug=False, is_profiling=False): + force=False, debug=False): self.root_dir = root_dir self.sub_cache_dir = sub_cache_dir self.out_dir = out_dir - self.work_queue = work_queue - self.results = results - self.abort_event = abort_event self.force = force self.debug = debug - self.is_profiling = is_profiling + + +class BakeWorker(IWorker): + def __init__(self, ctx): + self.ctx = ctx + self.work_start_time = time.perf_counter() + + def initialize(self): + # Create the app local to this worker. + app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) + app._useSubCacheDir(self.ctx.sub_cache_dir) + app.env.fs_cache_only_for_main_page = True + app.env.registerTimer("BakeWorker_%d_Total" % self.wid) + app.env.registerTimer("BakeWorkerInit") + app.env.registerTimer("JobReceive") + self.app = app + + # Create the job handlers. + job_handlers = { + JOB_LOAD: LoadJobHandler(app, self.ctx), + JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), + JOB_BAKE: BakeJobHandler(app, self.ctx)} + for jt, jh in job_handlers.items(): + app.env.registerTimer(type(jh).__name__) + self.job_handlers = job_handlers + + app.env.stepTimerSince("BakeWorkerInit", self.work_start_time) + + def process(self, job): + handler = self.job_handlers[job.job_type] + with self.app.env.timerScope(type(handler).__name__): + return handler.handleJob(job) + + def getReport(self): + self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, + self.work_start_time) + return { + 'type': 'timers', + 'data': self.app.env._timers} JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) @@ -57,67 +70,6 @@ self.payload = payload -class BakeWorker(object): - def __init__(self, wid, ctx): - self.wid = wid - self.ctx = ctx - - def run(self): - logger.debug("Working %d initializing..." % self.wid) - work_start_time = time.perf_counter() - - # Create the app local to this worker. - app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) - app._useSubCacheDir(self.ctx.sub_cache_dir) - app.env.fs_cache_only_for_main_page = True - app.env.registerTimer("BakeWorker_%d_Total" % self.wid) - app.env.registerTimer("BakeWorkerInit") - app.env.registerTimer("JobReceive") - - # Create the job handlers. - job_handlers = { - JOB_LOAD: LoadJobHandler(app, self.ctx), - JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), - JOB_BAKE: BakeJobHandler(app, self.ctx)} - for jt, jh in job_handlers.items(): - app.env.registerTimer(type(jh).__name__) - - app.env.stepTimerSince("BakeWorkerInit", work_start_time) - - # Start working! - aborted_with_exception = None - while not self.ctx.abort_event.is_set(): - try: - with app.env.timerScope('JobReceive'): - job = self.ctx.work_queue.get(True, 0.01) - except queue.Empty: - continue - - try: - handler = job_handlers[job.job_type] - with app.env.timerScope(type(handler).__name__): - handler.handleJob(job) - except Exception as ex: - self.ctx.abort_event.set() - aborted_with_exception = ex - logger.debug("[%d] Critical error, aborting." % self.wid) - if self.ctx.debug: - logger.exception(ex) - break - finally: - self.ctx.work_queue.task_done() - - if aborted_with_exception is not None: - msgs = _get_errors(aborted_with_exception) - self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) - - # Send our timers to the main process before exiting. - app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, - work_start_time) - self.ctx.results.put_nowait({ - 'type': 'timers', 'data': app.env._timers}) - - class JobHandler(object): def __init__(self, app, ctx): self.app = app @@ -203,8 +155,7 @@ result.errors = _get_errors(ex) if self.ctx.debug: logger.exception(ex) - - self.ctx.results.put_nowait(result) + return result class RenderFirstSubJobHandler(JobHandler): @@ -231,8 +182,7 @@ result.errors = _get_errors(ex) if self.ctx.debug: logger.exception(ex) - - self.ctx.results.put_nowait(result) + return result class BakeJobHandler(JobHandler): @@ -272,5 +222,5 @@ if self.ctx.debug: logger.exception(ex) - self.ctx.results.put_nowait(result) + return result