comparison piecrust/processing/worker.py @ 414:c4b3a7fd2f87

bake: Make pipeline processing multi-process. Not many changes here, as it's pretty straightforward, but an API change for processors so they know if they're being initialized/disposed from the main process or from one of the workers. This makes it possible to do global stuff that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:20:30 -0700
parents
children 4a43d7015b75
comparison
equal deleted inserted replaced
413:eacf0a3afd0c 414:c4b3a7fd2f87
1 import os.path
2 import re
3 import time
4 import queue
5 import logging
6 from piecrust.app import PieCrust
7 from piecrust.processing.base import PipelineContext
8 from piecrust.processing.records import (
9 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED,
10 FLAG_BYPASSED_STRUCTURED_PROCESSING)
11 from piecrust.processing.tree import (
12 ProcessingTreeBuilder, ProcessingTreeRunner,
13 ProcessingTreeError, ProcessorError,
14 get_node_name_tree, print_node,
15 STATE_DIRTY)
16
17
18 logger = logging.getLogger(__name__)
19
20
21 split_processor_names_re = re.compile(r'[ ,]+')
22 re_ansicolors = re.compile('\033\\[\d+m')
23
24
25 def worker_func(wid, ctx):
26 logger.debug("Worker %d booting up..." % wid)
27 w = ProcessingWorker(wid, ctx)
28 w.run()
29
30
31 class ProcessingWorkerContext(object):
32 def __init__(self, root_dir, out_dir, tmp_dir,
33 work_queue, results, abort_event,
34 force=False, debug=False):
35 self.root_dir = root_dir
36 self.out_dir = out_dir
37 self.tmp_dir = tmp_dir
38 self.work_queue = work_queue
39 self.results = results
40 self.abort_event = abort_event
41 self.force = force
42 self.debug = debug
43 self.enabled_processors = None
44 self.additional_processors = None
45
46
47 class ProcessingWorkerJob(object):
48 def __init__(self, base_dir, mount_info, path, *, force=False):
49 self.base_dir = base_dir
50 self.mount_info = mount_info
51 self.path = path
52 self.force = force
53
54
55 class ProcessingWorkerResult(object):
56 def __init__(self, path):
57 self.path = path
58 self.flags = FLAG_NONE
59 self.proc_tree = None
60 self.rel_outputs = None
61 self.errors = None
62
63
64 class ProcessingWorker(object):
65 def __init__(self, wid, ctx):
66 self.wid = wid
67 self.ctx = ctx
68
69 def run(self):
70 logger.debug("Worker %d initializing..." % self.wid)
71 work_start_time = time.perf_counter()
72
73 # Create the app local to this worker.
74 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
75 app.env.fs_cache_only_for_main_page = True
76 app.env.registerTimer("Worker_%d" % self.wid)
77 app.env.registerTimer("JobReceive")
78 app.env.registerTimer('BuildProcessingTree')
79 app.env.registerTimer('RunProcessingTree')
80
81 processors = app.plugin_loader.getProcessors()
82 if self.ctx.enabled_processors:
83 logger.debug("Filtering processors to: %s" %
84 self.ctx.enabled_processors)
85 processors = get_filtered_processors(processors,
86 self.ctx.enabled_processors)
87 if self.ctx.additional_processors:
88 logger.debug("Adding %s additional processors." %
89 len(self.ctx.additional_processors))
90 for proc in self.ctx.additional_processors:
91 app.env.registerTimer(proc.__class__.__name__)
92 proc.initialize(app)
93 processors.append(proc)
94
95 # Invoke pre-processors.
96 pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir,
97 self.ctx.tmp_dir, self.ctx.force)
98 for proc in processors:
99 proc.onPipelineStart(pipeline_ctx)
100
101 # Sort our processors again in case the pre-process step involved
102 # patching the processors with some new ones.
103 processors.sort(key=lambda p: p.priority)
104
105 aborted_with_exception = None
106 while not self.ctx.abort_event.is_set():
107 try:
108 with app.env.timerScope('JobReceive'):
109 job = self.ctx.work_queue.get(True, 0.01)
110 except queue.Empty:
111 continue
112
113 try:
114 result = self._unsafeRun(app, processors, job)
115 self.ctx.results.put_nowait(result)
116 except Exception as ex:
117 self.ctx.abort_event.set()
118 aborted_with_exception = ex
119 logger.error("[%d] Critical error, aborting." % self.wid)
120 if self.ctx.debug:
121 logger.exception(ex)
122 break
123 finally:
124 self.ctx.work_queue.task_done()
125
126 if aborted_with_exception is not None:
127 msgs = _get_errors(aborted_with_exception)
128 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
129
130 # Invoke post-processors.
131 for proc in processors:
132 proc.onPipelineEnd(pipeline_ctx)
133
134 app.env.stepTimer("Worker_%d" % self.wid,
135 time.perf_counter() - work_start_time)
136 self.ctx.results.put_nowait({
137 'type': 'timers', 'data': app.env._timers})
138
139 def _unsafeRun(self, app, processors, job):
140 result = ProcessingWorkerResult(job.path)
141
142 processors = get_filtered_processors(
143 processors, job.mount_info['processors'])
144
145 # Build the processing tree for this job.
146 rel_path = os.path.relpath(job.path, job.base_dir)
147 try:
148 with app.env.timerScope('BuildProcessingTree'):
149 builder = ProcessingTreeBuilder(processors)
150 tree_root = builder.build(rel_path)
151 result.flags |= FLAG_PREPARED
152 except ProcessingTreeError as ex:
153 result.errors += _get_errors(ex)
154 return result
155
156 # Prepare and run the tree.
157 print_node(tree_root, recursive=True)
158 leaves = tree_root.getLeaves()
159 result.rel_outputs = [l.path for l in leaves]
160 result.proc_tree = get_node_name_tree(tree_root)
161 if tree_root.getProcessor().is_bypassing_structured_processing:
162 result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING
163
164 if job.force:
165 tree_root.setState(STATE_DIRTY, True)
166
167 try:
168 with app.env.timerScope('RunProcessingTree'):
169 runner = ProcessingTreeRunner(
170 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
171 if runner.processSubTree(tree_root):
172 result.flags |= FLAG_PROCESSED
173 except ProcessingTreeError as ex:
174 if isinstance(ex, ProcessorError):
175 ex = ex.__cause__
176 # Need to strip out colored errors from external processes.
177 result.errors += _get_errors(ex, strip_colors=True)
178
179 return result
180
181
182 def get_filtered_processors(processors, authorized_names):
183 if not authorized_names or authorized_names == 'all':
184 return processors
185
186 if isinstance(authorized_names, str):
187 authorized_names = split_processor_names_re.split(authorized_names)
188
189 procs = []
190 has_star = 'all' in authorized_names
191 for p in processors:
192 for name in authorized_names:
193 if name == p.PROCESSOR_NAME:
194 procs.append(p)
195 break
196 if name == ('-%s' % p.PROCESSOR_NAME):
197 break
198 else:
199 if has_star:
200 procs.append(p)
201 return procs
202
203
204 def _get_errors(ex, strip_colors=False):
205 errors = []
206 while ex is not None:
207 msg = str(ex)
208 if strip_colors:
209 msg = re_ansicolors.sub('', msg)
210 errors.append(msg)
211 ex = ex.__cause__
212 return errors
213