Mercurial > piecrust2
comparison piecrust/processing/worker.py @ 447:aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
The `workerpool` package now defines a generic-ish worker pool. It's similar
to the Python framework pool but with a simpler use-case (only one way to
queue jobs) and support for workers to send a final "report" to the master
process, which we use to get timing information here.
The rest of the changes basically remove a whole bunch of duplicated code
that's not needed anymore.
| author | Ludovic Chabant <ludovic@chabant.com> |
|---|---|
| date | Sun, 05 Jul 2015 00:09:41 -0700 |
| parents | 171dde4f61dc |
| children | 3ceeca7bb71c |
comparison
equal
deleted
inserted
replaced
| 446:4cdf6c2157a0 | 447:aefe70229fdd |
|---|---|
| 1 import re | |
| 1 import os.path | 2 import os.path |
| 2 import re | |
| 3 import time | 3 import time |
| 4 import queue | |
| 5 import logging | 4 import logging |
| 6 from piecrust.app import PieCrust | 5 from piecrust.app import PieCrust |
| 7 from piecrust.processing.base import PipelineContext | 6 from piecrust.processing.base import PipelineContext |
| 8 from piecrust.processing.records import ( | 7 from piecrust.processing.records import ( |
| 9 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, | 8 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, |
| 11 from piecrust.processing.tree import ( | 10 from piecrust.processing.tree import ( |
| 12 ProcessingTreeBuilder, ProcessingTreeRunner, | 11 ProcessingTreeBuilder, ProcessingTreeRunner, |
| 13 ProcessingTreeError, ProcessorError, | 12 ProcessingTreeError, ProcessorError, |
| 14 get_node_name_tree, print_node, | 13 get_node_name_tree, print_node, |
| 15 STATE_DIRTY) | 14 STATE_DIRTY) |
| 15 from piecrust.workerpool import IWorker | |
| 16 | 16 |
| 17 | 17 |
| 18 logger = logging.getLogger(__name__) | 18 logger = logging.getLogger(__name__) |
| 19 | 19 |
| 20 | 20 |
| 21 split_processor_names_re = re.compile(r'[ ,]+') | 21 split_processor_names_re = re.compile(r'[ ,]+') |
| 22 re_ansicolors = re.compile('\033\\[\d+m') | 22 re_ansicolors = re.compile('\033\\[\d+m') |
| 23 | 23 |
| 24 | 24 |
| 25 def worker_func(wid, ctx): | |
| 26 if ctx.is_profiling: | |
| 27 try: | |
| 28 import cProfile as profile | |
| 29 except ImportError: | |
| 30 import profile | |
| 31 | |
| 32 ctx.is_profiling = False | |
| 33 profile.runctx('_real_worker_func(wid, ctx)', | |
| 34 globals(), locals(), | |
| 35 filename='PipelineWorker-%d.prof' % wid) | |
| 36 else: | |
| 37 _real_worker_func(wid, ctx) | |
| 38 | |
| 39 | |
| 40 def _real_worker_func(wid, ctx): | |
| 41 logger.debug("Worker %d booting up..." % wid) | |
| 42 w = ProcessingWorker(wid, ctx) | |
| 43 w.run() | |
| 44 | |
| 45 | |
| 46 class ProcessingWorkerContext(object): | 25 class ProcessingWorkerContext(object): |
| 47 def __init__(self, root_dir, out_dir, tmp_dir, | 26 def __init__(self, root_dir, out_dir, tmp_dir, |
| 48 work_queue, results, abort_event, | |
| 49 force=False, debug=False): | 27 force=False, debug=False): |
| 50 self.root_dir = root_dir | 28 self.root_dir = root_dir |
| 51 self.out_dir = out_dir | 29 self.out_dir = out_dir |
| 52 self.tmp_dir = tmp_dir | 30 self.tmp_dir = tmp_dir |
| 53 self.work_queue = work_queue | |
| 54 self.results = results | |
| 55 self.abort_event = abort_event | |
| 56 self.force = force | 31 self.force = force |
| 57 self.debug = debug | 32 self.debug = debug |
| 58 self.is_profiling = False | 33 self.is_profiling = False |
| 59 self.enabled_processors = None | 34 self.enabled_processors = None |
| 60 self.additional_processors = None | 35 self.additional_processors = None |
| 75 self.proc_tree = None | 50 self.proc_tree = None |
| 76 self.rel_outputs = None | 51 self.rel_outputs = None |
| 77 self.errors = None | 52 self.errors = None |
| 78 | 53 |
| 79 | 54 |
| 80 class ProcessingWorker(object): | 55 class ProcessingWorker(IWorker): |
| 81 def __init__(self, wid, ctx): | 56 def __init__(self, ctx): |
| 82 self.wid = wid | |
| 83 self.ctx = ctx | 57 self.ctx = ctx |
| 58 self.work_start_time = time.perf_counter() | |
| 84 | 59 |
| 85 def run(self): | 60 def initialize(self): |
| 86 logger.debug("Worker %d initializing..." % self.wid) | |
| 87 work_start_time = time.perf_counter() | |
| 88 | |
| 89 # Create the app local to this worker. | 61 # Create the app local to this worker. |
| 90 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) | 62 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) |
| 91 app.env.fs_cache_only_for_main_page = True | |
| 92 app.env.registerTimer("PipelineWorker_%d_Total" % self.wid) | 63 app.env.registerTimer("PipelineWorker_%d_Total" % self.wid) |
| 93 app.env.registerTimer("PipelineWorkerInit") | 64 app.env.registerTimer("PipelineWorkerInit") |
| 94 app.env.registerTimer("JobReceive") | 65 app.env.registerTimer("JobReceive") |
| 95 app.env.registerTimer('BuildProcessingTree') | 66 app.env.registerTimer('BuildProcessingTree') |
| 96 app.env.registerTimer('RunProcessingTree') | 67 app.env.registerTimer('RunProcessingTree') |
| 68 self.app = app | |
| 97 | 69 |
| 98 processors = app.plugin_loader.getProcessors() | 70 processors = app.plugin_loader.getProcessors() |
| 99 if self.ctx.enabled_processors: | 71 if self.ctx.enabled_processors: |
| 100 logger.debug("Filtering processors to: %s" % | 72 logger.debug("Filtering processors to: %s" % |
| 101 self.ctx.enabled_processors) | 73 self.ctx.enabled_processors) |
| 106 len(self.ctx.additional_processors)) | 78 len(self.ctx.additional_processors)) |
| 107 for proc in self.ctx.additional_processors: | 79 for proc in self.ctx.additional_processors: |
| 108 app.env.registerTimer(proc.__class__.__name__) | 80 app.env.registerTimer(proc.__class__.__name__) |
| 109 proc.initialize(app) | 81 proc.initialize(app) |
| 110 processors.append(proc) | 82 processors.append(proc) |
| 83 self.processors = processors | |
| 111 | 84 |
| 112 # Invoke pre-processors. | 85 # Invoke pre-processors. |
| 113 pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, | 86 pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, |
| 114 self.ctx.tmp_dir, self.ctx.force) | 87 self.ctx.tmp_dir, self.ctx.force) |
| 115 for proc in processors: | 88 for proc in processors: |
| 116 proc.onPipelineStart(pipeline_ctx) | 89 proc.onPipelineStart(pipeline_ctx) |
| 117 | 90 |
| 118 # Sort our processors again in case the pre-process step involved | 91 # Sort our processors again in case the pre-process step involved |
| 119 # patching the processors with some new ones. | 92 # patching the processors with some new ones. |
| 120 processors.sort(key=lambda p: p.priority) | 93 processors.sort(key=lambda p: p.priority) |
| 121 | 94 |
| 122 app.env.stepTimerSince("PipelineWorkerInit", work_start_time) | 95 app.env.stepTimerSince("PipelineWorkerInit", self.work_start_time) |
| 123 | 96 |
| 124 aborted_with_exception = None | 97 def process(self, job): |
| 125 while not self.ctx.abort_event.is_set(): | |
| 126 try: | |
| 127 with app.env.timerScope('JobReceive'): | |
| 128 job = self.ctx.work_queue.get(True, 0.01) | |
| 129 except queue.Empty: | |
| 130 continue | |
| 131 | |
| 132 try: | |
| 133 result = self._unsafeRun(app, processors, job) | |
| 134 self.ctx.results.put_nowait(result) | |
| 135 except Exception as ex: | |
| 136 self.ctx.abort_event.set() | |
| 137 aborted_with_exception = ex | |
| 138 logger.error("[%d] Critical error, aborting." % self.wid) | |
| 139 if self.ctx.debug: | |
| 140 logger.exception(ex) | |
| 141 break | |
| 142 finally: | |
| 143 self.ctx.work_queue.task_done() | |
| 144 | |
| 145 if aborted_with_exception is not None: | |
| 146 msgs = _get_errors(aborted_with_exception) | |
| 147 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) | |
| 148 | |
| 149 # Invoke post-processors. | |
| 150 for proc in processors: | |
| 151 proc.onPipelineEnd(pipeline_ctx) | |
| 152 | |
| 153 app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, | |
| 154 work_start_time) | |
| 155 self.ctx.results.put_nowait({ | |
| 156 'type': 'timers', 'data': app.env._timers}) | |
| 157 | |
| 158 def _unsafeRun(self, app, processors, job): | |
| 159 result = ProcessingWorkerResult(job.path) | 98 result = ProcessingWorkerResult(job.path) |
| 160 | 99 |
| 161 processors = get_filtered_processors( | 100 processors = get_filtered_processors( |
| 162 processors, job.mount_info['processors']) | 101 self.processors, job.mount_info['processors']) |
| 163 | 102 |
| 164 # Build the processing tree for this job. | 103 # Build the processing tree for this job. |
| 165 rel_path = os.path.relpath(job.path, job.base_dir) | 104 rel_path = os.path.relpath(job.path, job.base_dir) |
| 166 try: | 105 try: |
| 167 with app.env.timerScope('BuildProcessingTree'): | 106 with self.app.env.timerScope('BuildProcessingTree'): |
| 168 builder = ProcessingTreeBuilder(processors) | 107 builder = ProcessingTreeBuilder(processors) |
| 169 tree_root = builder.build(rel_path) | 108 tree_root = builder.build(rel_path) |
| 170 result.flags |= FLAG_PREPARED | 109 result.flags |= FLAG_PREPARED |
| 171 except ProcessingTreeError as ex: | 110 except ProcessingTreeError as ex: |
| 172 result.errors = _get_errors(ex) | 111 result.errors = _get_errors(ex) |
| 182 | 121 |
| 183 if job.force: | 122 if job.force: |
| 184 tree_root.setState(STATE_DIRTY, True) | 123 tree_root.setState(STATE_DIRTY, True) |
| 185 | 124 |
| 186 try: | 125 try: |
| 187 with app.env.timerScope('RunProcessingTree'): | 126 with self.app.env.timerScope('RunProcessingTree'): |
| 188 runner = ProcessingTreeRunner( | 127 runner = ProcessingTreeRunner( |
| 189 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) | 128 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) |
| 190 if runner.processSubTree(tree_root): | 129 if runner.processSubTree(tree_root): |
| 191 result.flags |= FLAG_PROCESSED | 130 result.flags |= FLAG_PROCESSED |
| 192 except ProcessingTreeError as ex: | 131 except ProcessingTreeError as ex: |
| 194 ex = ex.__cause__ | 133 ex = ex.__cause__ |
| 195 # Need to strip out colored errors from external processes. | 134 # Need to strip out colored errors from external processes. |
| 196 result.errors = _get_errors(ex, strip_colors=True) | 135 result.errors = _get_errors(ex, strip_colors=True) |
| 197 | 136 |
| 198 return result | 137 return result |
| 138 | |
| 139 def getReport(self): | |
| 140 # Invoke post-processors. | |
| 141 pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, | |
| 142 self.ctx.tmp_dir, self.ctx.force) | |
| 143 for proc in self.processors: | |
| 144 proc.onPipelineEnd(pipeline_ctx) | |
| 145 | |
| 146 self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, | |
| 147 self.work_start_time) | |
| 148 return { | |
| 149 'type': 'timers', | |
| 150 'data': self.app.env._timers} | |
| 199 | 151 |
| 200 | 152 |
| 201 def get_filtered_processors(processors, authorized_names): | 153 def get_filtered_processors(processors, authorized_names): |
| 202 if not authorized_names or authorized_names == 'all': | 154 if not authorized_names or authorized_names == 'all': |
| 203 return processors | 155 return processors |
