Mercurial > piecrust2
comparison piecrust/processing/worker.py @ 414:c4b3a7fd2f87
bake: Make pipeline processing multi-process.
Not many changes here, as it's pretty straightforward, but an API change for
processors so they know if they're being initialized/disposed from the main
process or from one of the workers. This makes it possible to do global stuff
that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:20:30 -0700 |
parents | |
children | 4a43d7015b75 |
comparison
equal
deleted
inserted
replaced
413:eacf0a3afd0c | 414:c4b3a7fd2f87 |
---|---|
1 import os.path | |
2 import re | |
3 import time | |
4 import queue | |
5 import logging | |
6 from piecrust.app import PieCrust | |
7 from piecrust.processing.base import PipelineContext | |
8 from piecrust.processing.records import ( | |
9 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, | |
10 FLAG_BYPASSED_STRUCTURED_PROCESSING) | |
11 from piecrust.processing.tree import ( | |
12 ProcessingTreeBuilder, ProcessingTreeRunner, | |
13 ProcessingTreeError, ProcessorError, | |
14 get_node_name_tree, print_node, | |
15 STATE_DIRTY) | |
16 | |
17 | |
18 logger = logging.getLogger(__name__) | |
19 | |
20 | |
21 split_processor_names_re = re.compile(r'[ ,]+') | |
22 re_ansicolors = re.compile('\033\\[\d+m') | |
23 | |
24 | |
25 def worker_func(wid, ctx): | |
26 logger.debug("Worker %d booting up..." % wid) | |
27 w = ProcessingWorker(wid, ctx) | |
28 w.run() | |
29 | |
30 | |
31 class ProcessingWorkerContext(object): | |
32 def __init__(self, root_dir, out_dir, tmp_dir, | |
33 work_queue, results, abort_event, | |
34 force=False, debug=False): | |
35 self.root_dir = root_dir | |
36 self.out_dir = out_dir | |
37 self.tmp_dir = tmp_dir | |
38 self.work_queue = work_queue | |
39 self.results = results | |
40 self.abort_event = abort_event | |
41 self.force = force | |
42 self.debug = debug | |
43 self.enabled_processors = None | |
44 self.additional_processors = None | |
45 | |
46 | |
47 class ProcessingWorkerJob(object): | |
48 def __init__(self, base_dir, mount_info, path, *, force=False): | |
49 self.base_dir = base_dir | |
50 self.mount_info = mount_info | |
51 self.path = path | |
52 self.force = force | |
53 | |
54 | |
55 class ProcessingWorkerResult(object): | |
56 def __init__(self, path): | |
57 self.path = path | |
58 self.flags = FLAG_NONE | |
59 self.proc_tree = None | |
60 self.rel_outputs = None | |
61 self.errors = None | |
62 | |
63 | |
64 class ProcessingWorker(object): | |
65 def __init__(self, wid, ctx): | |
66 self.wid = wid | |
67 self.ctx = ctx | |
68 | |
69 def run(self): | |
70 logger.debug("Worker %d initializing..." % self.wid) | |
71 work_start_time = time.perf_counter() | |
72 | |
73 # Create the app local to this worker. | |
74 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) | |
75 app.env.fs_cache_only_for_main_page = True | |
76 app.env.registerTimer("Worker_%d" % self.wid) | |
77 app.env.registerTimer("JobReceive") | |
78 app.env.registerTimer('BuildProcessingTree') | |
79 app.env.registerTimer('RunProcessingTree') | |
80 | |
81 processors = app.plugin_loader.getProcessors() | |
82 if self.ctx.enabled_processors: | |
83 logger.debug("Filtering processors to: %s" % | |
84 self.ctx.enabled_processors) | |
85 processors = get_filtered_processors(processors, | |
86 self.ctx.enabled_processors) | |
87 if self.ctx.additional_processors: | |
88 logger.debug("Adding %s additional processors." % | |
89 len(self.ctx.additional_processors)) | |
90 for proc in self.ctx.additional_processors: | |
91 app.env.registerTimer(proc.__class__.__name__) | |
92 proc.initialize(app) | |
93 processors.append(proc) | |
94 | |
95 # Invoke pre-processors. | |
96 pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, | |
97 self.ctx.tmp_dir, self.ctx.force) | |
98 for proc in processors: | |
99 proc.onPipelineStart(pipeline_ctx) | |
100 | |
101 # Sort our processors again in case the pre-process step involved | |
102 # patching the processors with some new ones. | |
103 processors.sort(key=lambda p: p.priority) | |
104 | |
105 aborted_with_exception = None | |
106 while not self.ctx.abort_event.is_set(): | |
107 try: | |
108 with app.env.timerScope('JobReceive'): | |
109 job = self.ctx.work_queue.get(True, 0.01) | |
110 except queue.Empty: | |
111 continue | |
112 | |
113 try: | |
114 result = self._unsafeRun(app, processors, job) | |
115 self.ctx.results.put_nowait(result) | |
116 except Exception as ex: | |
117 self.ctx.abort_event.set() | |
118 aborted_with_exception = ex | |
119 logger.error("[%d] Critical error, aborting." % self.wid) | |
120 if self.ctx.debug: | |
121 logger.exception(ex) | |
122 break | |
123 finally: | |
124 self.ctx.work_queue.task_done() | |
125 | |
126 if aborted_with_exception is not None: | |
127 msgs = _get_errors(aborted_with_exception) | |
128 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) | |
129 | |
130 # Invoke post-processors. | |
131 for proc in processors: | |
132 proc.onPipelineEnd(pipeline_ctx) | |
133 | |
134 app.env.stepTimer("Worker_%d" % self.wid, | |
135 time.perf_counter() - work_start_time) | |
136 self.ctx.results.put_nowait({ | |
137 'type': 'timers', 'data': app.env._timers}) | |
138 | |
139 def _unsafeRun(self, app, processors, job): | |
140 result = ProcessingWorkerResult(job.path) | |
141 | |
142 processors = get_filtered_processors( | |
143 processors, job.mount_info['processors']) | |
144 | |
145 # Build the processing tree for this job. | |
146 rel_path = os.path.relpath(job.path, job.base_dir) | |
147 try: | |
148 with app.env.timerScope('BuildProcessingTree'): | |
149 builder = ProcessingTreeBuilder(processors) | |
150 tree_root = builder.build(rel_path) | |
151 result.flags |= FLAG_PREPARED | |
152 except ProcessingTreeError as ex: | |
153 result.errors += _get_errors(ex) | |
154 return result | |
155 | |
156 # Prepare and run the tree. | |
157 print_node(tree_root, recursive=True) | |
158 leaves = tree_root.getLeaves() | |
159 result.rel_outputs = [l.path for l in leaves] | |
160 result.proc_tree = get_node_name_tree(tree_root) | |
161 if tree_root.getProcessor().is_bypassing_structured_processing: | |
162 result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING | |
163 | |
164 if job.force: | |
165 tree_root.setState(STATE_DIRTY, True) | |
166 | |
167 try: | |
168 with app.env.timerScope('RunProcessingTree'): | |
169 runner = ProcessingTreeRunner( | |
170 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) | |
171 if runner.processSubTree(tree_root): | |
172 result.flags |= FLAG_PROCESSED | |
173 except ProcessingTreeError as ex: | |
174 if isinstance(ex, ProcessorError): | |
175 ex = ex.__cause__ | |
176 # Need to strip out colored errors from external processes. | |
177 result.errors += _get_errors(ex, strip_colors=True) | |
178 | |
179 return result | |
180 | |
181 | |
182 def get_filtered_processors(processors, authorized_names): | |
183 if not authorized_names or authorized_names == 'all': | |
184 return processors | |
185 | |
186 if isinstance(authorized_names, str): | |
187 authorized_names = split_processor_names_re.split(authorized_names) | |
188 | |
189 procs = [] | |
190 has_star = 'all' in authorized_names | |
191 for p in processors: | |
192 for name in authorized_names: | |
193 if name == p.PROCESSOR_NAME: | |
194 procs.append(p) | |
195 break | |
196 if name == ('-%s' % p.PROCESSOR_NAME): | |
197 break | |
198 else: | |
199 if has_star: | |
200 procs.append(p) | |
201 return procs | |
202 | |
203 | |
204 def _get_errors(ex, strip_colors=False): | |
205 errors = [] | |
206 while ex is not None: | |
207 msg = str(ex) | |
208 if strip_colors: | |
209 msg = re_ansicolors.sub('', msg) | |
210 errors.append(msg) | |
211 ex = ex.__cause__ | |
212 return errors | |
213 |