Mercurial > piecrust2
comparison piecrust/processing/base.py @ 414:c4b3a7fd2f87
bake: Make pipeline processing multi-process.
Not many changes here, as it's pretty straightforward, but an API change for
processors so they know if they're being initialized/disposed from the main
process or from one of the workers. This makes it possible to do global stuff
that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:20:30 -0700 |
parents | c2ca72fb7f0b |
children | 4850f8c21b6e |
comparison
equal
deleted
inserted
replaced
413:eacf0a3afd0c | 414:c4b3a7fd2f87 |
---|---|
1 import re | |
2 import time | |
3 import shutil | 1 import shutil |
4 import os.path | 2 import os.path |
5 import logging | 3 import logging |
6 import hashlib | |
7 import threading | |
8 from queue import Queue, Empty | |
9 from piecrust.chefutil import format_timed | |
10 from piecrust.processing.records import ( | |
11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, | |
12 FLAG_PREPARED, FLAG_PROCESSED, FLAG_OVERRIDEN, | |
13 FLAG_BYPASSED_STRUCTURED_PROCESSING) | |
14 from piecrust.processing.tree import ( | |
15 ProcessingTreeBuilder, ProcessingTreeRunner, | |
16 ProcessingTreeError, ProcessorError, | |
17 STATE_DIRTY, | |
18 print_node, get_node_name_tree) | |
19 | 4 |
20 | 5 |
21 logger = logging.getLogger(__name__) | 6 logger = logging.getLogger(__name__) |
22 | |
23 | |
24 re_ansicolors = re.compile('\033\\[\d+m') | |
25 | 7 |
26 | 8 |
27 PRIORITY_FIRST = -1 | 9 PRIORITY_FIRST = -1 |
28 PRIORITY_NORMAL = 0 | 10 PRIORITY_NORMAL = 0 |
29 PRIORITY_LAST = 1 | 11 PRIORITY_LAST = 1 |
30 | 12 |
31 | 13 |
32 split_processor_names_re = re.compile(r'[ ,]+') | 14 class PipelineContext(object): |
15 def __init__(self, worker_id, app, out_dir, tmp_dir, force=None): | |
16 self.worker_id = worker_id | |
17 self.app = app | |
18 self.out_dir = out_dir | |
19 self.tmp_dir = tmp_dir | |
20 self.force = force | |
21 self.record = None | |
22 self._additional_ignore_patterns = [] | |
23 | |
24 @property | |
25 def is_first_worker(self): | |
26 return self.worker_id == 0 | |
27 | |
28 @property | |
29 def is_pipeline_process(self): | |
30 return self.worker_id < 0 | |
31 | |
32 def addIgnorePatterns(self, patterns): | |
33 self._additional_ignore_patterns += patterns | |
33 | 34 |
34 | 35 |
35 class Processor(object): | 36 class Processor(object): |
36 PROCESSOR_NAME = None | 37 PROCESSOR_NAME = None |
37 | 38 |
41 self.is_delegating_dependency_check = True | 42 self.is_delegating_dependency_check = True |
42 | 43 |
43 def initialize(self, app): | 44 def initialize(self, app): |
44 self.app = app | 45 self.app = app |
45 | 46 |
46 def onPipelineStart(self, pipeline): | 47 def onPipelineStart(self, ctx): |
47 pass | 48 pass |
48 | 49 |
49 def onPipelineEnd(self, pipeline): | 50 def onPipelineEnd(self, ctx): |
50 pass | 51 pass |
51 | 52 |
52 def matches(self, path): | 53 def matches(self, path): |
53 return False | 54 return False |
54 | 55 |
115 | 116 |
116 def __str__(self): | 117 def __str__(self): |
117 return self.stderr_data | 118 return self.stderr_data |
118 | 119 |
119 | 120 |
120 class ProcessingContext(object): | |
121 def __init__(self, base_dir, mount_info, job_queue, record=None): | |
122 self.base_dir = base_dir | |
123 self.mount_info = mount_info | |
124 self.job_queue = job_queue | |
125 self.record = record | |
126 | |
127 | |
128 class ProcessorPipeline(object): | |
129 def __init__(self, app, out_dir, force=False): | |
130 assert app and out_dir | |
131 self.app = app | |
132 self.out_dir = out_dir | |
133 self.force = force | |
134 | |
135 tmp_dir = app.sub_cache_dir | |
136 if not tmp_dir: | |
137 import tempfile | |
138 tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') | |
139 self.tmp_dir = os.path.join(tmp_dir, 'proc') | |
140 | |
141 baker_params = app.config.get('baker') or {} | |
142 | |
143 assets_dirs = baker_params.get('assets_dirs', app.assets_dirs) | |
144 self.mounts = make_mount_infos(assets_dirs, self.app.root_dir) | |
145 | |
146 self.num_workers = baker_params.get('workers', 4) | |
147 | |
148 ignores = baker_params.get('ignore', []) | |
149 ignores += [ | |
150 '_cache', '_counter', | |
151 'theme_info.yml', | |
152 '.DS_Store', 'Thumbs.db', | |
153 '.git*', '.hg*', '.svn'] | |
154 self.skip_patterns = make_re(ignores) | |
155 self.force_patterns = make_re(baker_params.get('force', [])) | |
156 | |
157 self.processors = app.plugin_loader.getProcessors() | |
158 | |
159 def addSkipPatterns(self, patterns): | |
160 self.skip_patterns += make_re(patterns) | |
161 | |
162 def filterProcessors(self, authorized_names): | |
163 self.processors = self.getFilteredProcessors(authorized_names) | |
164 | |
165 def getFilteredProcessors(self, authorized_names): | |
166 if not authorized_names or authorized_names == 'all': | |
167 return self.processors | |
168 | |
169 if isinstance(authorized_names, str): | |
170 authorized_names = split_processor_names_re.split(authorized_names) | |
171 | |
172 procs = [] | |
173 has_star = 'all' in authorized_names | |
174 for p in self.processors: | |
175 for name in authorized_names: | |
176 if name == p.PROCESSOR_NAME: | |
177 procs.append(p) | |
178 break | |
179 if name == ('-%s' % p.PROCESSOR_NAME): | |
180 break | |
181 else: | |
182 if has_star: | |
183 procs.append(p) | |
184 return procs | |
185 | |
186 def run(self, src_dir_or_file=None, *, | |
187 delete=True, previous_record=None, save_record=True): | |
188 # Invoke pre-processors. | |
189 for proc in self.processors: | |
190 proc.onPipelineStart(self) | |
191 | |
192 # Sort our processors again in case the pre-process step involved | |
193 # patching the processors with some new ones. | |
194 self.processors.sort(key=lambda p: p.priority) | |
195 | |
196 # Create the pipeline record. | |
197 record = TransitionalProcessorPipelineRecord() | |
198 record_cache = self.app.cache.getCache('proc') | |
199 record_name = ( | |
200 hashlib.md5(self.out_dir.encode('utf8')).hexdigest() + | |
201 '.record') | |
202 if previous_record: | |
203 record.setPrevious(previous_record) | |
204 elif not self.force and record_cache.has(record_name): | |
205 t = time.clock() | |
206 record.loadPrevious(record_cache.getCachePath(record_name)) | |
207 logger.debug(format_timed(t, 'loaded previous bake record', | |
208 colored=False)) | |
209 logger.debug("Got %d entries in process record." % | |
210 len(record.previous.entries)) | |
211 | |
212 # Create the workers. | |
213 pool = [] | |
214 queue = Queue() | |
215 abort = threading.Event() | |
216 pipeline_lock = threading.Lock() | |
217 for i in range(self.num_workers): | |
218 ctx = ProcessingWorkerContext(self, record, | |
219 queue, abort, pipeline_lock) | |
220 worker = ProcessingWorker(i, ctx) | |
221 worker.start() | |
222 pool.append(worker) | |
223 | |
224 if src_dir_or_file is not None: | |
225 # Process only the given path. | |
226 # Find out what mount point this is in. | |
227 for name, info in self.mounts.items(): | |
228 path = info['path'] | |
229 if src_dir_or_file[:len(path)] == path: | |
230 base_dir = path | |
231 mount_info = info | |
232 break | |
233 else: | |
234 known_roots = [i['path'] for i in self.mounts.values()] | |
235 raise Exception("Input path '%s' is not part of any known " | |
236 "mount point: %s" % | |
237 (src_dir_or_file, known_roots)) | |
238 | |
239 ctx = ProcessingContext(base_dir, mount_info, queue, record) | |
240 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) | |
241 if os.path.isdir(src_dir_or_file): | |
242 self.processDirectory(ctx, src_dir_or_file) | |
243 elif os.path.isfile(src_dir_or_file): | |
244 self.processFile(ctx, src_dir_or_file) | |
245 | |
246 else: | |
247 # Process everything. | |
248 for name, info in self.mounts.items(): | |
249 path = info['path'] | |
250 ctx = ProcessingContext(path, info, queue, record) | |
251 logger.debug("Initiating processing pipeline on: %s" % path) | |
252 self.processDirectory(ctx, path) | |
253 | |
254 # Wait on all workers. | |
255 record.current.success = True | |
256 for w in pool: | |
257 w.join() | |
258 record.current.success &= w.success | |
259 if abort.is_set(): | |
260 raise Exception("Worker pool was aborted.") | |
261 | |
262 # Handle deletions. | |
263 if delete: | |
264 for path, reason in record.getDeletions(): | |
265 logger.debug("Removing '%s': %s" % (path, reason)) | |
266 try: | |
267 os.remove(path) | |
268 except FileNotFoundError: | |
269 pass | |
270 logger.info('[delete] %s' % path) | |
271 | |
272 # Invoke post-processors. | |
273 for proc in self.processors: | |
274 proc.onPipelineEnd(self) | |
275 | |
276 # Finalize the process record. | |
277 record.current.process_time = time.time() | |
278 record.current.out_dir = self.out_dir | |
279 record.collapseRecords() | |
280 | |
281 # Save the process record. | |
282 if save_record: | |
283 t = time.clock() | |
284 record.saveCurrent(record_cache.getCachePath(record_name)) | |
285 logger.debug(format_timed(t, 'saved bake record', colored=False)) | |
286 | |
287 return record.detach() | |
288 | |
289 def processDirectory(self, ctx, start_dir): | |
290 for dirpath, dirnames, filenames in os.walk(start_dir): | |
291 rel_dirpath = os.path.relpath(dirpath, start_dir) | |
292 dirnames[:] = [d for d in dirnames | |
293 if not re_matchany(d, self.skip_patterns, rel_dirpath)] | |
294 | |
295 for filename in filenames: | |
296 if re_matchany(filename, self.skip_patterns, rel_dirpath): | |
297 continue | |
298 self.processFile(ctx, os.path.join(dirpath, filename)) | |
299 | |
300 def processFile(self, ctx, path): | |
301 logger.debug("Queuing: %s" % path) | |
302 job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path) | |
303 ctx.job_queue.put_nowait(job) | |
304 | |
305 | |
306 class ProcessingWorkerContext(object): | |
307 def __init__(self, pipeline, record, | |
308 work_queue, abort_event, pipeline_lock): | |
309 self.pipeline = pipeline | |
310 self.record = record | |
311 self.work_queue = work_queue | |
312 self.abort_event = abort_event | |
313 self.pipeline_lock = pipeline_lock | |
314 | |
315 | |
316 class ProcessingWorkerJob(object): | |
317 def __init__(self, base_dir, mount_info, path): | |
318 self.base_dir = base_dir | |
319 self.mount_info = mount_info | |
320 self.path = path | |
321 | |
322 | |
323 class ProcessingWorker(threading.Thread): | |
324 def __init__(self, wid, ctx): | |
325 super(ProcessingWorker, self).__init__() | |
326 self.wid = wid | |
327 self.ctx = ctx | |
328 self.success = True | |
329 | |
330 def run(self): | |
331 while(not self.ctx.abort_event.is_set()): | |
332 try: | |
333 job = self.ctx.work_queue.get(True, 0.1) | |
334 except Empty: | |
335 logger.debug("[%d] No more work... shutting down." % self.wid) | |
336 break | |
337 | |
338 try: | |
339 success = self._unsafeRun(job) | |
340 logger.debug("[%d] Done with file." % self.wid) | |
341 self.ctx.work_queue.task_done() | |
342 self.success &= success | |
343 except Exception as ex: | |
344 self.ctx.abort_event.set() | |
345 self.success = False | |
346 logger.error("[%d] Critical error, aborting." % self.wid) | |
347 logger.exception(ex) | |
348 break | |
349 | |
350 def _unsafeRun(self, job): | |
351 start_time = time.clock() | |
352 pipeline = self.ctx.pipeline | |
353 record = self.ctx.record | |
354 | |
355 rel_path = os.path.relpath(job.path, job.base_dir) | |
356 previous_entry = record.getPreviousEntry(rel_path) | |
357 | |
358 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) | |
359 record.addEntry(record_entry) | |
360 | |
361 # Figure out if a previously processed file is overriding this one. | |
362 # This can happen if a theme file (processed via a mount point) | |
363 # is overridden in the user's website. | |
364 if record.current.hasOverrideEntry(rel_path): | |
365 record_entry.flags |= FLAG_OVERRIDEN | |
366 logger.info(format_timed(start_time, | |
367 '%s [not baked, overridden]' % rel_path)) | |
368 return True | |
369 | |
370 processors = pipeline.getFilteredProcessors( | |
371 job.mount_info['processors']) | |
372 try: | |
373 builder = ProcessingTreeBuilder(processors) | |
374 tree_root = builder.build(rel_path) | |
375 record_entry.flags |= FLAG_PREPARED | |
376 except ProcessingTreeError as ex: | |
377 msg = str(ex) | |
378 logger.error("Error preparing %s:\n%s" % (rel_path, msg)) | |
379 while ex: | |
380 record_entry.errors.append(str(ex)) | |
381 ex = ex.__cause__ | |
382 return False | |
383 | |
384 print_node(tree_root, recursive=True) | |
385 leaves = tree_root.getLeaves() | |
386 record_entry.rel_outputs = [l.path for l in leaves] | |
387 record_entry.proc_tree = get_node_name_tree(tree_root) | |
388 if tree_root.getProcessor().is_bypassing_structured_processing: | |
389 record_entry.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING | |
390 | |
391 force = (pipeline.force or previous_entry is None or | |
392 not previous_entry.was_processed_successfully) | |
393 if not force: | |
394 force = re_matchany(rel_path, pipeline.force_patterns) | |
395 | |
396 if force: | |
397 tree_root.setState(STATE_DIRTY, True) | |
398 | |
399 try: | |
400 runner = ProcessingTreeRunner( | |
401 job.base_dir, pipeline.tmp_dir, | |
402 pipeline.out_dir, self.ctx.pipeline_lock) | |
403 if runner.processSubTree(tree_root): | |
404 record_entry.flags |= FLAG_PROCESSED | |
405 logger.info(format_timed( | |
406 start_time, "[%d] %s" % (self.wid, rel_path))) | |
407 return True | |
408 except ProcessingTreeError as ex: | |
409 msg = str(ex) | |
410 if isinstance(ex, ProcessorError): | |
411 msg = str(ex.__cause__) | |
412 logger.error("Error processing %s:\n%s" % (rel_path, msg)) | |
413 while ex: | |
414 msg = re_ansicolors.sub('', str(ex)) | |
415 record_entry.errors.append(msg) | |
416 ex = ex.__cause__ | |
417 return False | |
418 | |
419 | |
420 def make_mount_infos(mounts, root_dir): | |
421 if isinstance(mounts, list): | |
422 mounts = {m: {} for m in mounts} | |
423 | |
424 for name, info in mounts.items(): | |
425 if not isinstance(info, dict): | |
426 raise Exception("Asset directory info for '%s' is not a " | |
427 "dictionary." % name) | |
428 info.setdefault('processors', 'all -uglifyjs -cleancss') | |
429 info['path'] = os.path.join(root_dir, name) | |
430 | |
431 return mounts | |
432 | |
433 | |
434 def make_re(patterns): | |
435 re_patterns = [] | |
436 for pat in patterns: | |
437 if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: | |
438 re_patterns.append(pat[1:-1]) | |
439 else: | |
440 escaped_pat = (re.escape(pat) | |
441 .replace(r'\*', r'[^/\\]*') | |
442 .replace(r'\?', r'[^/\\]')) | |
443 re_patterns.append(escaped_pat) | |
444 return [re.compile(p) for p in re_patterns] | |
445 | |
446 | |
447 def re_matchany(filename, patterns, dirname=None): | |
448 if dirname and dirname != '.': | |
449 filename = os.path.join(dirname, filename) | |
450 | |
451 # skip patterns use a forward slash regardless of the platform. | |
452 filename = filename.replace('\\', '/') | |
453 for pattern in patterns: | |
454 if pattern.search(filename): | |
455 return True | |
456 return False | |
457 |