Mercurial > piecrust2
view piecrust/processing/worker.py @ 415:0e9a94b7fdfa
bake: Improve bake record information.
* Store things in the bake record that require less interaction between the
master process and the workers. For instance, don't store the paginator
object in the render pass info -- instead, just store whether pagination
was used, and whether it had more items.
* Simplify information passing between workers and bake passes by saving the
rendering info to the JSON cache. This means the "render first sub" job
doesn't have to return anything except errors now.
* Add more performance counter info.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:23:16 -0700 |
parents | c4b3a7fd2f87 |
children | 4a43d7015b75 |
line wrap: on
line source
import os.path import re import time import queue import logging from piecrust.app import PieCrust from piecrust.processing.base import PipelineContext from piecrust.processing.records import ( FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, FLAG_BYPASSED_STRUCTURED_PROCESSING) from piecrust.processing.tree import ( ProcessingTreeBuilder, ProcessingTreeRunner, ProcessingTreeError, ProcessorError, get_node_name_tree, print_node, STATE_DIRTY) logger = logging.getLogger(__name__) split_processor_names_re = re.compile(r'[ ,]+') re_ansicolors = re.compile('\033\\[\d+m') def worker_func(wid, ctx): logger.debug("Worker %d booting up..." % wid) w = ProcessingWorker(wid, ctx) w.run() class ProcessingWorkerContext(object): def __init__(self, root_dir, out_dir, tmp_dir, work_queue, results, abort_event, force=False, debug=False): self.root_dir = root_dir self.out_dir = out_dir self.tmp_dir = tmp_dir self.work_queue = work_queue self.results = results self.abort_event = abort_event self.force = force self.debug = debug self.enabled_processors = None self.additional_processors = None class ProcessingWorkerJob(object): def __init__(self, base_dir, mount_info, path, *, force=False): self.base_dir = base_dir self.mount_info = mount_info self.path = path self.force = force class ProcessingWorkerResult(object): def __init__(self, path): self.path = path self.flags = FLAG_NONE self.proc_tree = None self.rel_outputs = None self.errors = None class ProcessingWorker(object): def __init__(self, wid, ctx): self.wid = wid self.ctx = ctx def run(self): logger.debug("Worker %d initializing..." % self.wid) work_start_time = time.perf_counter() # Create the app local to this worker. app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) app.env.fs_cache_only_for_main_page = True app.env.registerTimer("Worker_%d" % self.wid) app.env.registerTimer("JobReceive") app.env.registerTimer('BuildProcessingTree') app.env.registerTimer('RunProcessingTree') processors = app.plugin_loader.getProcessors() if self.ctx.enabled_processors: logger.debug("Filtering processors to: %s" % self.ctx.enabled_processors) processors = get_filtered_processors(processors, self.ctx.enabled_processors) if self.ctx.additional_processors: logger.debug("Adding %s additional processors." % len(self.ctx.additional_processors)) for proc in self.ctx.additional_processors: app.env.registerTimer(proc.__class__.__name__) proc.initialize(app) processors.append(proc) # Invoke pre-processors. pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, self.ctx.tmp_dir, self.ctx.force) for proc in processors: proc.onPipelineStart(pipeline_ctx) # Sort our processors again in case the pre-process step involved # patching the processors with some new ones. processors.sort(key=lambda p: p.priority) aborted_with_exception = None while not self.ctx.abort_event.is_set(): try: with app.env.timerScope('JobReceive'): job = self.ctx.work_queue.get(True, 0.01) except queue.Empty: continue try: result = self._unsafeRun(app, processors, job) self.ctx.results.put_nowait(result) except Exception as ex: self.ctx.abort_event.set() aborted_with_exception = ex logger.error("[%d] Critical error, aborting." % self.wid) if self.ctx.debug: logger.exception(ex) break finally: self.ctx.work_queue.task_done() if aborted_with_exception is not None: msgs = _get_errors(aborted_with_exception) self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) # Invoke post-processors. for proc in processors: proc.onPipelineEnd(pipeline_ctx) app.env.stepTimer("Worker_%d" % self.wid, time.perf_counter() - work_start_time) self.ctx.results.put_nowait({ 'type': 'timers', 'data': app.env._timers}) def _unsafeRun(self, app, processors, job): result = ProcessingWorkerResult(job.path) processors = get_filtered_processors( processors, job.mount_info['processors']) # Build the processing tree for this job. rel_path = os.path.relpath(job.path, job.base_dir) try: with app.env.timerScope('BuildProcessingTree'): builder = ProcessingTreeBuilder(processors) tree_root = builder.build(rel_path) result.flags |= FLAG_PREPARED except ProcessingTreeError as ex: result.errors += _get_errors(ex) return result # Prepare and run the tree. print_node(tree_root, recursive=True) leaves = tree_root.getLeaves() result.rel_outputs = [l.path for l in leaves] result.proc_tree = get_node_name_tree(tree_root) if tree_root.getProcessor().is_bypassing_structured_processing: result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING if job.force: tree_root.setState(STATE_DIRTY, True) try: with app.env.timerScope('RunProcessingTree'): runner = ProcessingTreeRunner( job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) if runner.processSubTree(tree_root): result.flags |= FLAG_PROCESSED except ProcessingTreeError as ex: if isinstance(ex, ProcessorError): ex = ex.__cause__ # Need to strip out colored errors from external processes. result.errors += _get_errors(ex, strip_colors=True) return result def get_filtered_processors(processors, authorized_names): if not authorized_names or authorized_names == 'all': return processors if isinstance(authorized_names, str): authorized_names = split_processor_names_re.split(authorized_names) procs = [] has_star = 'all' in authorized_names for p in processors: for name in authorized_names: if name == p.PROCESSOR_NAME: procs.append(p) break if name == ('-%s' % p.PROCESSOR_NAME): break else: if has_star: procs.append(p) return procs def _get_errors(ex, strip_colors=False): errors = [] while ex is not None: msg = str(ex) if strip_colors: msg = re_ansicolors.sub('', msg) errors.append(msg) ex = ex.__cause__ return errors