Mercurial > piecrust2
diff piecrust/processing/worker.py @ 447:aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
The `workerpool` package now defines a generic-ish worker pool. It's similar
to the Python framework pool but with a simpler use-case (only one way to
queue jobs) and support for workers to send a final "report" to the master
process, which we use to get timing information here.
The rest of the changes basically remove a whole bunch of duplicated code
that's not needed anymore.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 05 Jul 2015 00:09:41 -0700 |
parents | 171dde4f61dc |
children | 3ceeca7bb71c |
line wrap: on
line diff
--- a/piecrust/processing/worker.py Thu Jul 02 23:28:24 2015 -0700 +++ b/piecrust/processing/worker.py Sun Jul 05 00:09:41 2015 -0700 @@ -1,7 +1,6 @@ +import re import os.path -import re import time -import queue import logging from piecrust.app import PieCrust from piecrust.processing.base import PipelineContext @@ -13,6 +12,7 @@ ProcessingTreeError, ProcessorError, get_node_name_tree, print_node, STATE_DIRTY) +from piecrust.workerpool import IWorker logger = logging.getLogger(__name__) @@ -22,37 +22,12 @@ re_ansicolors = re.compile('\033\\[\d+m') -def worker_func(wid, ctx): - if ctx.is_profiling: - try: - import cProfile as profile - except ImportError: - import profile - - ctx.is_profiling = False - profile.runctx('_real_worker_func(wid, ctx)', - globals(), locals(), - filename='PipelineWorker-%d.prof' % wid) - else: - _real_worker_func(wid, ctx) - - -def _real_worker_func(wid, ctx): - logger.debug("Worker %d booting up..." % wid) - w = ProcessingWorker(wid, ctx) - w.run() - - class ProcessingWorkerContext(object): def __init__(self, root_dir, out_dir, tmp_dir, - work_queue, results, abort_event, force=False, debug=False): self.root_dir = root_dir self.out_dir = out_dir self.tmp_dir = tmp_dir - self.work_queue = work_queue - self.results = results - self.abort_event = abort_event self.force = force self.debug = debug self.is_profiling = False @@ -77,23 +52,20 @@ self.errors = None -class ProcessingWorker(object): - def __init__(self, wid, ctx): - self.wid = wid +class ProcessingWorker(IWorker): + def __init__(self, ctx): self.ctx = ctx + self.work_start_time = time.perf_counter() - def run(self): - logger.debug("Worker %d initializing..." % self.wid) - work_start_time = time.perf_counter() - + def initialize(self): # Create the app local to this worker. app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) - app.env.fs_cache_only_for_main_page = True app.env.registerTimer("PipelineWorker_%d_Total" % self.wid) app.env.registerTimer("PipelineWorkerInit") app.env.registerTimer("JobReceive") app.env.registerTimer('BuildProcessingTree') app.env.registerTimer('RunProcessingTree') + self.app = app processors = app.plugin_loader.getProcessors() if self.ctx.enabled_processors: @@ -108,9 +80,10 @@ app.env.registerTimer(proc.__class__.__name__) proc.initialize(app) processors.append(proc) + self.processors = processors # Invoke pre-processors. - pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, + pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, self.ctx.tmp_dir, self.ctx.force) for proc in processors: proc.onPipelineStart(pipeline_ctx) @@ -119,52 +92,18 @@ # patching the processors with some new ones. processors.sort(key=lambda p: p.priority) - app.env.stepTimerSince("PipelineWorkerInit", work_start_time) - - aborted_with_exception = None - while not self.ctx.abort_event.is_set(): - try: - with app.env.timerScope('JobReceive'): - job = self.ctx.work_queue.get(True, 0.01) - except queue.Empty: - continue + app.env.stepTimerSince("PipelineWorkerInit", self.work_start_time) - try: - result = self._unsafeRun(app, processors, job) - self.ctx.results.put_nowait(result) - except Exception as ex: - self.ctx.abort_event.set() - aborted_with_exception = ex - logger.error("[%d] Critical error, aborting." % self.wid) - if self.ctx.debug: - logger.exception(ex) - break - finally: - self.ctx.work_queue.task_done() - - if aborted_with_exception is not None: - msgs = _get_errors(aborted_with_exception) - self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) - - # Invoke post-processors. - for proc in processors: - proc.onPipelineEnd(pipeline_ctx) - - app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, - work_start_time) - self.ctx.results.put_nowait({ - 'type': 'timers', 'data': app.env._timers}) - - def _unsafeRun(self, app, processors, job): + def process(self, job): result = ProcessingWorkerResult(job.path) processors = get_filtered_processors( - processors, job.mount_info['processors']) + self.processors, job.mount_info['processors']) # Build the processing tree for this job. rel_path = os.path.relpath(job.path, job.base_dir) try: - with app.env.timerScope('BuildProcessingTree'): + with self.app.env.timerScope('BuildProcessingTree'): builder = ProcessingTreeBuilder(processors) tree_root = builder.build(rel_path) result.flags |= FLAG_PREPARED @@ -184,7 +123,7 @@ tree_root.setState(STATE_DIRTY, True) try: - with app.env.timerScope('RunProcessingTree'): + with self.app.env.timerScope('RunProcessingTree'): runner = ProcessingTreeRunner( job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) if runner.processSubTree(tree_root): @@ -197,6 +136,19 @@ return result + def getReport(self): + # Invoke post-processors. + pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, + self.ctx.tmp_dir, self.ctx.force) + for proc in self.processors: + proc.onPipelineEnd(pipeline_ctx) + + self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, + self.work_start_time) + return { + 'type': 'timers', + 'data': self.app.env._timers} + def get_filtered_processors(processors, authorized_names): if not authorized_names or authorized_names == 'all':