comparison piecrust/processing/base.py @ 414:c4b3a7fd2f87

bake: Make pipeline processing multi-process. Not many changes here, as it's pretty straightforward, but an API change for processors so they know if they're being initialized/disposed from the main process or from one of the workers. This makes it possible to do global stuff that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:20:30 -0700
parents c2ca72fb7f0b
children 4850f8c21b6e
comparison
equal deleted inserted replaced
413:eacf0a3afd0c 414:c4b3a7fd2f87
1 import re
2 import time
3 import shutil 1 import shutil
4 import os.path 2 import os.path
5 import logging 3 import logging
6 import hashlib
7 import threading
8 from queue import Queue, Empty
9 from piecrust.chefutil import format_timed
10 from piecrust.processing.records import (
11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
12 FLAG_PREPARED, FLAG_PROCESSED, FLAG_OVERRIDEN,
13 FLAG_BYPASSED_STRUCTURED_PROCESSING)
14 from piecrust.processing.tree import (
15 ProcessingTreeBuilder, ProcessingTreeRunner,
16 ProcessingTreeError, ProcessorError,
17 STATE_DIRTY,
18 print_node, get_node_name_tree)
19 4
20 5
21 logger = logging.getLogger(__name__) 6 logger = logging.getLogger(__name__)
22
23
24 re_ansicolors = re.compile('\033\\[\d+m')
25 7
26 8
27 PRIORITY_FIRST = -1 9 PRIORITY_FIRST = -1
28 PRIORITY_NORMAL = 0 10 PRIORITY_NORMAL = 0
29 PRIORITY_LAST = 1 11 PRIORITY_LAST = 1
30 12
31 13
32 split_processor_names_re = re.compile(r'[ ,]+') 14 class PipelineContext(object):
15 def __init__(self, worker_id, app, out_dir, tmp_dir, force=None):
16 self.worker_id = worker_id
17 self.app = app
18 self.out_dir = out_dir
19 self.tmp_dir = tmp_dir
20 self.force = force
21 self.record = None
22 self._additional_ignore_patterns = []
23
24 @property
25 def is_first_worker(self):
26 return self.worker_id == 0
27
28 @property
29 def is_pipeline_process(self):
30 return self.worker_id < 0
31
32 def addIgnorePatterns(self, patterns):
33 self._additional_ignore_patterns += patterns
33 34
34 35
35 class Processor(object): 36 class Processor(object):
36 PROCESSOR_NAME = None 37 PROCESSOR_NAME = None
37 38
41 self.is_delegating_dependency_check = True 42 self.is_delegating_dependency_check = True
42 43
43 def initialize(self, app): 44 def initialize(self, app):
44 self.app = app 45 self.app = app
45 46
46 def onPipelineStart(self, pipeline): 47 def onPipelineStart(self, ctx):
47 pass 48 pass
48 49
49 def onPipelineEnd(self, pipeline): 50 def onPipelineEnd(self, ctx):
50 pass 51 pass
51 52
52 def matches(self, path): 53 def matches(self, path):
53 return False 54 return False
54 55
115 116
116 def __str__(self): 117 def __str__(self):
117 return self.stderr_data 118 return self.stderr_data
118 119
119 120
120 class ProcessingContext(object):
121 def __init__(self, base_dir, mount_info, job_queue, record=None):
122 self.base_dir = base_dir
123 self.mount_info = mount_info
124 self.job_queue = job_queue
125 self.record = record
126
127
128 class ProcessorPipeline(object):
129 def __init__(self, app, out_dir, force=False):
130 assert app and out_dir
131 self.app = app
132 self.out_dir = out_dir
133 self.force = force
134
135 tmp_dir = app.sub_cache_dir
136 if not tmp_dir:
137 import tempfile
138 tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust')
139 self.tmp_dir = os.path.join(tmp_dir, 'proc')
140
141 baker_params = app.config.get('baker') or {}
142
143 assets_dirs = baker_params.get('assets_dirs', app.assets_dirs)
144 self.mounts = make_mount_infos(assets_dirs, self.app.root_dir)
145
146 self.num_workers = baker_params.get('workers', 4)
147
148 ignores = baker_params.get('ignore', [])
149 ignores += [
150 '_cache', '_counter',
151 'theme_info.yml',
152 '.DS_Store', 'Thumbs.db',
153 '.git*', '.hg*', '.svn']
154 self.skip_patterns = make_re(ignores)
155 self.force_patterns = make_re(baker_params.get('force', []))
156
157 self.processors = app.plugin_loader.getProcessors()
158
159 def addSkipPatterns(self, patterns):
160 self.skip_patterns += make_re(patterns)
161
162 def filterProcessors(self, authorized_names):
163 self.processors = self.getFilteredProcessors(authorized_names)
164
165 def getFilteredProcessors(self, authorized_names):
166 if not authorized_names or authorized_names == 'all':
167 return self.processors
168
169 if isinstance(authorized_names, str):
170 authorized_names = split_processor_names_re.split(authorized_names)
171
172 procs = []
173 has_star = 'all' in authorized_names
174 for p in self.processors:
175 for name in authorized_names:
176 if name == p.PROCESSOR_NAME:
177 procs.append(p)
178 break
179 if name == ('-%s' % p.PROCESSOR_NAME):
180 break
181 else:
182 if has_star:
183 procs.append(p)
184 return procs
185
186 def run(self, src_dir_or_file=None, *,
187 delete=True, previous_record=None, save_record=True):
188 # Invoke pre-processors.
189 for proc in self.processors:
190 proc.onPipelineStart(self)
191
192 # Sort our processors again in case the pre-process step involved
193 # patching the processors with some new ones.
194 self.processors.sort(key=lambda p: p.priority)
195
196 # Create the pipeline record.
197 record = TransitionalProcessorPipelineRecord()
198 record_cache = self.app.cache.getCache('proc')
199 record_name = (
200 hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
201 '.record')
202 if previous_record:
203 record.setPrevious(previous_record)
204 elif not self.force and record_cache.has(record_name):
205 t = time.clock()
206 record.loadPrevious(record_cache.getCachePath(record_name))
207 logger.debug(format_timed(t, 'loaded previous bake record',
208 colored=False))
209 logger.debug("Got %d entries in process record." %
210 len(record.previous.entries))
211
212 # Create the workers.
213 pool = []
214 queue = Queue()
215 abort = threading.Event()
216 pipeline_lock = threading.Lock()
217 for i in range(self.num_workers):
218 ctx = ProcessingWorkerContext(self, record,
219 queue, abort, pipeline_lock)
220 worker = ProcessingWorker(i, ctx)
221 worker.start()
222 pool.append(worker)
223
224 if src_dir_or_file is not None:
225 # Process only the given path.
226 # Find out what mount point this is in.
227 for name, info in self.mounts.items():
228 path = info['path']
229 if src_dir_or_file[:len(path)] == path:
230 base_dir = path
231 mount_info = info
232 break
233 else:
234 known_roots = [i['path'] for i in self.mounts.values()]
235 raise Exception("Input path '%s' is not part of any known "
236 "mount point: %s" %
237 (src_dir_or_file, known_roots))
238
239 ctx = ProcessingContext(base_dir, mount_info, queue, record)
240 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file)
241 if os.path.isdir(src_dir_or_file):
242 self.processDirectory(ctx, src_dir_or_file)
243 elif os.path.isfile(src_dir_or_file):
244 self.processFile(ctx, src_dir_or_file)
245
246 else:
247 # Process everything.
248 for name, info in self.mounts.items():
249 path = info['path']
250 ctx = ProcessingContext(path, info, queue, record)
251 logger.debug("Initiating processing pipeline on: %s" % path)
252 self.processDirectory(ctx, path)
253
254 # Wait on all workers.
255 record.current.success = True
256 for w in pool:
257 w.join()
258 record.current.success &= w.success
259 if abort.is_set():
260 raise Exception("Worker pool was aborted.")
261
262 # Handle deletions.
263 if delete:
264 for path, reason in record.getDeletions():
265 logger.debug("Removing '%s': %s" % (path, reason))
266 try:
267 os.remove(path)
268 except FileNotFoundError:
269 pass
270 logger.info('[delete] %s' % path)
271
272 # Invoke post-processors.
273 for proc in self.processors:
274 proc.onPipelineEnd(self)
275
276 # Finalize the process record.
277 record.current.process_time = time.time()
278 record.current.out_dir = self.out_dir
279 record.collapseRecords()
280
281 # Save the process record.
282 if save_record:
283 t = time.clock()
284 record.saveCurrent(record_cache.getCachePath(record_name))
285 logger.debug(format_timed(t, 'saved bake record', colored=False))
286
287 return record.detach()
288
289 def processDirectory(self, ctx, start_dir):
290 for dirpath, dirnames, filenames in os.walk(start_dir):
291 rel_dirpath = os.path.relpath(dirpath, start_dir)
292 dirnames[:] = [d for d in dirnames
293 if not re_matchany(d, self.skip_patterns, rel_dirpath)]
294
295 for filename in filenames:
296 if re_matchany(filename, self.skip_patterns, rel_dirpath):
297 continue
298 self.processFile(ctx, os.path.join(dirpath, filename))
299
300 def processFile(self, ctx, path):
301 logger.debug("Queuing: %s" % path)
302 job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path)
303 ctx.job_queue.put_nowait(job)
304
305
306 class ProcessingWorkerContext(object):
307 def __init__(self, pipeline, record,
308 work_queue, abort_event, pipeline_lock):
309 self.pipeline = pipeline
310 self.record = record
311 self.work_queue = work_queue
312 self.abort_event = abort_event
313 self.pipeline_lock = pipeline_lock
314
315
316 class ProcessingWorkerJob(object):
317 def __init__(self, base_dir, mount_info, path):
318 self.base_dir = base_dir
319 self.mount_info = mount_info
320 self.path = path
321
322
323 class ProcessingWorker(threading.Thread):
324 def __init__(self, wid, ctx):
325 super(ProcessingWorker, self).__init__()
326 self.wid = wid
327 self.ctx = ctx
328 self.success = True
329
330 def run(self):
331 while(not self.ctx.abort_event.is_set()):
332 try:
333 job = self.ctx.work_queue.get(True, 0.1)
334 except Empty:
335 logger.debug("[%d] No more work... shutting down." % self.wid)
336 break
337
338 try:
339 success = self._unsafeRun(job)
340 logger.debug("[%d] Done with file." % self.wid)
341 self.ctx.work_queue.task_done()
342 self.success &= success
343 except Exception as ex:
344 self.ctx.abort_event.set()
345 self.success = False
346 logger.error("[%d] Critical error, aborting." % self.wid)
347 logger.exception(ex)
348 break
349
350 def _unsafeRun(self, job):
351 start_time = time.clock()
352 pipeline = self.ctx.pipeline
353 record = self.ctx.record
354
355 rel_path = os.path.relpath(job.path, job.base_dir)
356 previous_entry = record.getPreviousEntry(rel_path)
357
358 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
359 record.addEntry(record_entry)
360
361 # Figure out if a previously processed file is overriding this one.
362 # This can happen if a theme file (processed via a mount point)
363 # is overridden in the user's website.
364 if record.current.hasOverrideEntry(rel_path):
365 record_entry.flags |= FLAG_OVERRIDEN
366 logger.info(format_timed(start_time,
367 '%s [not baked, overridden]' % rel_path))
368 return True
369
370 processors = pipeline.getFilteredProcessors(
371 job.mount_info['processors'])
372 try:
373 builder = ProcessingTreeBuilder(processors)
374 tree_root = builder.build(rel_path)
375 record_entry.flags |= FLAG_PREPARED
376 except ProcessingTreeError as ex:
377 msg = str(ex)
378 logger.error("Error preparing %s:\n%s" % (rel_path, msg))
379 while ex:
380 record_entry.errors.append(str(ex))
381 ex = ex.__cause__
382 return False
383
384 print_node(tree_root, recursive=True)
385 leaves = tree_root.getLeaves()
386 record_entry.rel_outputs = [l.path for l in leaves]
387 record_entry.proc_tree = get_node_name_tree(tree_root)
388 if tree_root.getProcessor().is_bypassing_structured_processing:
389 record_entry.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING
390
391 force = (pipeline.force or previous_entry is None or
392 not previous_entry.was_processed_successfully)
393 if not force:
394 force = re_matchany(rel_path, pipeline.force_patterns)
395
396 if force:
397 tree_root.setState(STATE_DIRTY, True)
398
399 try:
400 runner = ProcessingTreeRunner(
401 job.base_dir, pipeline.tmp_dir,
402 pipeline.out_dir, self.ctx.pipeline_lock)
403 if runner.processSubTree(tree_root):
404 record_entry.flags |= FLAG_PROCESSED
405 logger.info(format_timed(
406 start_time, "[%d] %s" % (self.wid, rel_path)))
407 return True
408 except ProcessingTreeError as ex:
409 msg = str(ex)
410 if isinstance(ex, ProcessorError):
411 msg = str(ex.__cause__)
412 logger.error("Error processing %s:\n%s" % (rel_path, msg))
413 while ex:
414 msg = re_ansicolors.sub('', str(ex))
415 record_entry.errors.append(msg)
416 ex = ex.__cause__
417 return False
418
419
420 def make_mount_infos(mounts, root_dir):
421 if isinstance(mounts, list):
422 mounts = {m: {} for m in mounts}
423
424 for name, info in mounts.items():
425 if not isinstance(info, dict):
426 raise Exception("Asset directory info for '%s' is not a "
427 "dictionary." % name)
428 info.setdefault('processors', 'all -uglifyjs -cleancss')
429 info['path'] = os.path.join(root_dir, name)
430
431 return mounts
432
433
434 def make_re(patterns):
435 re_patterns = []
436 for pat in patterns:
437 if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2:
438 re_patterns.append(pat[1:-1])
439 else:
440 escaped_pat = (re.escape(pat)
441 .replace(r'\*', r'[^/\\]*')
442 .replace(r'\?', r'[^/\\]'))
443 re_patterns.append(escaped_pat)
444 return [re.compile(p) for p in re_patterns]
445
446
447 def re_matchany(filename, patterns, dirname=None):
448 if dirname and dirname != '.':
449 filename = os.path.join(dirname, filename)
450
451 # skip patterns use a forward slash regardless of the platform.
452 filename = filename.replace('\\', '/')
453 for pattern in patterns:
454 if pattern.search(filename):
455 return True
456 return False
457