Mercurial > piecrust2
comparison piecrust/processing/worker.py @ 447:aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
The `workerpool` package now defines a generic-ish worker pool. It's similar
to the Python framework pool but with a simpler use-case (only one way to
queue jobs) and support for workers to send a final "report" to the master
process, which we use to get timing information here.
The rest of the changes basically remove a whole bunch of duplicated code
that's not needed anymore.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 05 Jul 2015 00:09:41 -0700 |
parents | 171dde4f61dc |
children | 3ceeca7bb71c |
comparison
equal
deleted
inserted
replaced
446:4cdf6c2157a0 | 447:aefe70229fdd |
---|---|
1 import re | |
1 import os.path | 2 import os.path |
2 import re | |
3 import time | 3 import time |
4 import queue | |
5 import logging | 4 import logging |
6 from piecrust.app import PieCrust | 5 from piecrust.app import PieCrust |
7 from piecrust.processing.base import PipelineContext | 6 from piecrust.processing.base import PipelineContext |
8 from piecrust.processing.records import ( | 7 from piecrust.processing.records import ( |
9 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, | 8 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, |
11 from piecrust.processing.tree import ( | 10 from piecrust.processing.tree import ( |
12 ProcessingTreeBuilder, ProcessingTreeRunner, | 11 ProcessingTreeBuilder, ProcessingTreeRunner, |
13 ProcessingTreeError, ProcessorError, | 12 ProcessingTreeError, ProcessorError, |
14 get_node_name_tree, print_node, | 13 get_node_name_tree, print_node, |
15 STATE_DIRTY) | 14 STATE_DIRTY) |
15 from piecrust.workerpool import IWorker | |
16 | 16 |
17 | 17 |
18 logger = logging.getLogger(__name__) | 18 logger = logging.getLogger(__name__) |
19 | 19 |
20 | 20 |
21 split_processor_names_re = re.compile(r'[ ,]+') | 21 split_processor_names_re = re.compile(r'[ ,]+') |
22 re_ansicolors = re.compile('\033\\[\d+m') | 22 re_ansicolors = re.compile('\033\\[\d+m') |
23 | 23 |
24 | 24 |
25 def worker_func(wid, ctx): | |
26 if ctx.is_profiling: | |
27 try: | |
28 import cProfile as profile | |
29 except ImportError: | |
30 import profile | |
31 | |
32 ctx.is_profiling = False | |
33 profile.runctx('_real_worker_func(wid, ctx)', | |
34 globals(), locals(), | |
35 filename='PipelineWorker-%d.prof' % wid) | |
36 else: | |
37 _real_worker_func(wid, ctx) | |
38 | |
39 | |
40 def _real_worker_func(wid, ctx): | |
41 logger.debug("Worker %d booting up..." % wid) | |
42 w = ProcessingWorker(wid, ctx) | |
43 w.run() | |
44 | |
45 | |
46 class ProcessingWorkerContext(object): | 25 class ProcessingWorkerContext(object): |
47 def __init__(self, root_dir, out_dir, tmp_dir, | 26 def __init__(self, root_dir, out_dir, tmp_dir, |
48 work_queue, results, abort_event, | |
49 force=False, debug=False): | 27 force=False, debug=False): |
50 self.root_dir = root_dir | 28 self.root_dir = root_dir |
51 self.out_dir = out_dir | 29 self.out_dir = out_dir |
52 self.tmp_dir = tmp_dir | 30 self.tmp_dir = tmp_dir |
53 self.work_queue = work_queue | |
54 self.results = results | |
55 self.abort_event = abort_event | |
56 self.force = force | 31 self.force = force |
57 self.debug = debug | 32 self.debug = debug |
58 self.is_profiling = False | 33 self.is_profiling = False |
59 self.enabled_processors = None | 34 self.enabled_processors = None |
60 self.additional_processors = None | 35 self.additional_processors = None |
75 self.proc_tree = None | 50 self.proc_tree = None |
76 self.rel_outputs = None | 51 self.rel_outputs = None |
77 self.errors = None | 52 self.errors = None |
78 | 53 |
79 | 54 |
80 class ProcessingWorker(object): | 55 class ProcessingWorker(IWorker): |
81 def __init__(self, wid, ctx): | 56 def __init__(self, ctx): |
82 self.wid = wid | |
83 self.ctx = ctx | 57 self.ctx = ctx |
58 self.work_start_time = time.perf_counter() | |
84 | 59 |
85 def run(self): | 60 def initialize(self): |
86 logger.debug("Worker %d initializing..." % self.wid) | |
87 work_start_time = time.perf_counter() | |
88 | |
89 # Create the app local to this worker. | 61 # Create the app local to this worker. |
90 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) | 62 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) |
91 app.env.fs_cache_only_for_main_page = True | |
92 app.env.registerTimer("PipelineWorker_%d_Total" % self.wid) | 63 app.env.registerTimer("PipelineWorker_%d_Total" % self.wid) |
93 app.env.registerTimer("PipelineWorkerInit") | 64 app.env.registerTimer("PipelineWorkerInit") |
94 app.env.registerTimer("JobReceive") | 65 app.env.registerTimer("JobReceive") |
95 app.env.registerTimer('BuildProcessingTree') | 66 app.env.registerTimer('BuildProcessingTree') |
96 app.env.registerTimer('RunProcessingTree') | 67 app.env.registerTimer('RunProcessingTree') |
68 self.app = app | |
97 | 69 |
98 processors = app.plugin_loader.getProcessors() | 70 processors = app.plugin_loader.getProcessors() |
99 if self.ctx.enabled_processors: | 71 if self.ctx.enabled_processors: |
100 logger.debug("Filtering processors to: %s" % | 72 logger.debug("Filtering processors to: %s" % |
101 self.ctx.enabled_processors) | 73 self.ctx.enabled_processors) |
106 len(self.ctx.additional_processors)) | 78 len(self.ctx.additional_processors)) |
107 for proc in self.ctx.additional_processors: | 79 for proc in self.ctx.additional_processors: |
108 app.env.registerTimer(proc.__class__.__name__) | 80 app.env.registerTimer(proc.__class__.__name__) |
109 proc.initialize(app) | 81 proc.initialize(app) |
110 processors.append(proc) | 82 processors.append(proc) |
83 self.processors = processors | |
111 | 84 |
112 # Invoke pre-processors. | 85 # Invoke pre-processors. |
113 pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, | 86 pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, |
114 self.ctx.tmp_dir, self.ctx.force) | 87 self.ctx.tmp_dir, self.ctx.force) |
115 for proc in processors: | 88 for proc in processors: |
116 proc.onPipelineStart(pipeline_ctx) | 89 proc.onPipelineStart(pipeline_ctx) |
117 | 90 |
118 # Sort our processors again in case the pre-process step involved | 91 # Sort our processors again in case the pre-process step involved |
119 # patching the processors with some new ones. | 92 # patching the processors with some new ones. |
120 processors.sort(key=lambda p: p.priority) | 93 processors.sort(key=lambda p: p.priority) |
121 | 94 |
122 app.env.stepTimerSince("PipelineWorkerInit", work_start_time) | 95 app.env.stepTimerSince("PipelineWorkerInit", self.work_start_time) |
123 | 96 |
124 aborted_with_exception = None | 97 def process(self, job): |
125 while not self.ctx.abort_event.is_set(): | |
126 try: | |
127 with app.env.timerScope('JobReceive'): | |
128 job = self.ctx.work_queue.get(True, 0.01) | |
129 except queue.Empty: | |
130 continue | |
131 | |
132 try: | |
133 result = self._unsafeRun(app, processors, job) | |
134 self.ctx.results.put_nowait(result) | |
135 except Exception as ex: | |
136 self.ctx.abort_event.set() | |
137 aborted_with_exception = ex | |
138 logger.error("[%d] Critical error, aborting." % self.wid) | |
139 if self.ctx.debug: | |
140 logger.exception(ex) | |
141 break | |
142 finally: | |
143 self.ctx.work_queue.task_done() | |
144 | |
145 if aborted_with_exception is not None: | |
146 msgs = _get_errors(aborted_with_exception) | |
147 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) | |
148 | |
149 # Invoke post-processors. | |
150 for proc in processors: | |
151 proc.onPipelineEnd(pipeline_ctx) | |
152 | |
153 app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, | |
154 work_start_time) | |
155 self.ctx.results.put_nowait({ | |
156 'type': 'timers', 'data': app.env._timers}) | |
157 | |
158 def _unsafeRun(self, app, processors, job): | |
159 result = ProcessingWorkerResult(job.path) | 98 result = ProcessingWorkerResult(job.path) |
160 | 99 |
161 processors = get_filtered_processors( | 100 processors = get_filtered_processors( |
162 processors, job.mount_info['processors']) | 101 self.processors, job.mount_info['processors']) |
163 | 102 |
164 # Build the processing tree for this job. | 103 # Build the processing tree for this job. |
165 rel_path = os.path.relpath(job.path, job.base_dir) | 104 rel_path = os.path.relpath(job.path, job.base_dir) |
166 try: | 105 try: |
167 with app.env.timerScope('BuildProcessingTree'): | 106 with self.app.env.timerScope('BuildProcessingTree'): |
168 builder = ProcessingTreeBuilder(processors) | 107 builder = ProcessingTreeBuilder(processors) |
169 tree_root = builder.build(rel_path) | 108 tree_root = builder.build(rel_path) |
170 result.flags |= FLAG_PREPARED | 109 result.flags |= FLAG_PREPARED |
171 except ProcessingTreeError as ex: | 110 except ProcessingTreeError as ex: |
172 result.errors = _get_errors(ex) | 111 result.errors = _get_errors(ex) |
182 | 121 |
183 if job.force: | 122 if job.force: |
184 tree_root.setState(STATE_DIRTY, True) | 123 tree_root.setState(STATE_DIRTY, True) |
185 | 124 |
186 try: | 125 try: |
187 with app.env.timerScope('RunProcessingTree'): | 126 with self.app.env.timerScope('RunProcessingTree'): |
188 runner = ProcessingTreeRunner( | 127 runner = ProcessingTreeRunner( |
189 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) | 128 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) |
190 if runner.processSubTree(tree_root): | 129 if runner.processSubTree(tree_root): |
191 result.flags |= FLAG_PROCESSED | 130 result.flags |= FLAG_PROCESSED |
192 except ProcessingTreeError as ex: | 131 except ProcessingTreeError as ex: |
194 ex = ex.__cause__ | 133 ex = ex.__cause__ |
195 # Need to strip out colored errors from external processes. | 134 # Need to strip out colored errors from external processes. |
196 result.errors = _get_errors(ex, strip_colors=True) | 135 result.errors = _get_errors(ex, strip_colors=True) |
197 | 136 |
198 return result | 137 return result |
138 | |
139 def getReport(self): | |
140 # Invoke post-processors. | |
141 pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, | |
142 self.ctx.tmp_dir, self.ctx.force) | |
143 for proc in self.processors: | |
144 proc.onPipelineEnd(pipeline_ctx) | |
145 | |
146 self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, | |
147 self.work_start_time) | |
148 return { | |
149 'type': 'timers', | |
150 'data': self.app.env._timers} | |
199 | 151 |
200 | 152 |
201 def get_filtered_processors(processors, authorized_names): | 153 def get_filtered_processors(processors, authorized_names): |
202 if not authorized_names or authorized_names == 'all': | 154 if not authorized_names or authorized_names == 'all': |
203 return processors | 155 return processors |