Mercurial > piecrust2
diff piecrust/processing/base.py @ 414:c4b3a7fd2f87
bake: Make pipeline processing multi-process.
Not many changes here, as it's pretty straightforward, but an API change for
processors so they know if they're being initialized/disposed from the main
process or from one of the workers. This makes it possible to do global stuff
that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 19:20:30 -0700 |
parents | c2ca72fb7f0b |
children | 4850f8c21b6e |
line wrap: on
line diff
--- a/piecrust/processing/base.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/base.py Sat Jun 20 19:20:30 2015 -0700 @@ -1,35 +1,36 @@ -import re -import time import shutil import os.path import logging -import hashlib -import threading -from queue import Queue, Empty -from piecrust.chefutil import format_timed -from piecrust.processing.records import ( - ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, - FLAG_PREPARED, FLAG_PROCESSED, FLAG_OVERRIDEN, - FLAG_BYPASSED_STRUCTURED_PROCESSING) -from piecrust.processing.tree import ( - ProcessingTreeBuilder, ProcessingTreeRunner, - ProcessingTreeError, ProcessorError, - STATE_DIRTY, - print_node, get_node_name_tree) logger = logging.getLogger(__name__) -re_ansicolors = re.compile('\033\\[\d+m') - - PRIORITY_FIRST = -1 PRIORITY_NORMAL = 0 PRIORITY_LAST = 1 -split_processor_names_re = re.compile(r'[ ,]+') +class PipelineContext(object): + def __init__(self, worker_id, app, out_dir, tmp_dir, force=None): + self.worker_id = worker_id + self.app = app + self.out_dir = out_dir + self.tmp_dir = tmp_dir + self.force = force + self.record = None + self._additional_ignore_patterns = [] + + @property + def is_first_worker(self): + return self.worker_id == 0 + + @property + def is_pipeline_process(self): + return self.worker_id < 0 + + def addIgnorePatterns(self, patterns): + self._additional_ignore_patterns += patterns class Processor(object): @@ -43,10 +44,10 @@ def initialize(self, app): self.app = app - def onPipelineStart(self, pipeline): + def onPipelineStart(self, ctx): pass - def onPipelineEnd(self, pipeline): + def onPipelineEnd(self, ctx): pass def matches(self, path): @@ -117,341 +118,3 @@ return self.stderr_data -class ProcessingContext(object): - def __init__(self, base_dir, mount_info, job_queue, record=None): - self.base_dir = base_dir - self.mount_info = mount_info - self.job_queue = job_queue - self.record = record - - -class ProcessorPipeline(object): - def __init__(self, app, out_dir, force=False): - assert app and out_dir - self.app = app - self.out_dir = out_dir - self.force = force - - tmp_dir = app.sub_cache_dir - if not tmp_dir: - import tempfile - tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') - self.tmp_dir = os.path.join(tmp_dir, 'proc') - - baker_params = app.config.get('baker') or {} - - assets_dirs = baker_params.get('assets_dirs', app.assets_dirs) - self.mounts = make_mount_infos(assets_dirs, self.app.root_dir) - - self.num_workers = baker_params.get('workers', 4) - - ignores = baker_params.get('ignore', []) - ignores += [ - '_cache', '_counter', - 'theme_info.yml', - '.DS_Store', 'Thumbs.db', - '.git*', '.hg*', '.svn'] - self.skip_patterns = make_re(ignores) - self.force_patterns = make_re(baker_params.get('force', [])) - - self.processors = app.plugin_loader.getProcessors() - - def addSkipPatterns(self, patterns): - self.skip_patterns += make_re(patterns) - - def filterProcessors(self, authorized_names): - self.processors = self.getFilteredProcessors(authorized_names) - - def getFilteredProcessors(self, authorized_names): - if not authorized_names or authorized_names == 'all': - return self.processors - - if isinstance(authorized_names, str): - authorized_names = split_processor_names_re.split(authorized_names) - - procs = [] - has_star = 'all' in authorized_names - for p in self.processors: - for name in authorized_names: - if name == p.PROCESSOR_NAME: - procs.append(p) - break - if name == ('-%s' % p.PROCESSOR_NAME): - break - else: - if has_star: - procs.append(p) - return procs - - def run(self, src_dir_or_file=None, *, - delete=True, previous_record=None, save_record=True): - # Invoke pre-processors. - for proc in self.processors: - proc.onPipelineStart(self) - - # Sort our processors again in case the pre-process step involved - # patching the processors with some new ones. - self.processors.sort(key=lambda p: p.priority) - - # Create the pipeline record. - record = TransitionalProcessorPipelineRecord() - record_cache = self.app.cache.getCache('proc') - record_name = ( - hashlib.md5(self.out_dir.encode('utf8')).hexdigest() + - '.record') - if previous_record: - record.setPrevious(previous_record) - elif not self.force and record_cache.has(record_name): - t = time.clock() - record.loadPrevious(record_cache.getCachePath(record_name)) - logger.debug(format_timed(t, 'loaded previous bake record', - colored=False)) - logger.debug("Got %d entries in process record." % - len(record.previous.entries)) - - # Create the workers. - pool = [] - queue = Queue() - abort = threading.Event() - pipeline_lock = threading.Lock() - for i in range(self.num_workers): - ctx = ProcessingWorkerContext(self, record, - queue, abort, pipeline_lock) - worker = ProcessingWorker(i, ctx) - worker.start() - pool.append(worker) - - if src_dir_or_file is not None: - # Process only the given path. - # Find out what mount point this is in. - for name, info in self.mounts.items(): - path = info['path'] - if src_dir_or_file[:len(path)] == path: - base_dir = path - mount_info = info - break - else: - known_roots = [i['path'] for i in self.mounts.values()] - raise Exception("Input path '%s' is not part of any known " - "mount point: %s" % - (src_dir_or_file, known_roots)) - - ctx = ProcessingContext(base_dir, mount_info, queue, record) - logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) - if os.path.isdir(src_dir_or_file): - self.processDirectory(ctx, src_dir_or_file) - elif os.path.isfile(src_dir_or_file): - self.processFile(ctx, src_dir_or_file) - - else: - # Process everything. - for name, info in self.mounts.items(): - path = info['path'] - ctx = ProcessingContext(path, info, queue, record) - logger.debug("Initiating processing pipeline on: %s" % path) - self.processDirectory(ctx, path) - - # Wait on all workers. - record.current.success = True - for w in pool: - w.join() - record.current.success &= w.success - if abort.is_set(): - raise Exception("Worker pool was aborted.") - - # Handle deletions. - if delete: - for path, reason in record.getDeletions(): - logger.debug("Removing '%s': %s" % (path, reason)) - try: - os.remove(path) - except FileNotFoundError: - pass - logger.info('[delete] %s' % path) - - # Invoke post-processors. - for proc in self.processors: - proc.onPipelineEnd(self) - - # Finalize the process record. - record.current.process_time = time.time() - record.current.out_dir = self.out_dir - record.collapseRecords() - - # Save the process record. - if save_record: - t = time.clock() - record.saveCurrent(record_cache.getCachePath(record_name)) - logger.debug(format_timed(t, 'saved bake record', colored=False)) - - return record.detach() - - def processDirectory(self, ctx, start_dir): - for dirpath, dirnames, filenames in os.walk(start_dir): - rel_dirpath = os.path.relpath(dirpath, start_dir) - dirnames[:] = [d for d in dirnames - if not re_matchany(d, self.skip_patterns, rel_dirpath)] - - for filename in filenames: - if re_matchany(filename, self.skip_patterns, rel_dirpath): - continue - self.processFile(ctx, os.path.join(dirpath, filename)) - - def processFile(self, ctx, path): - logger.debug("Queuing: %s" % path) - job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path) - ctx.job_queue.put_nowait(job) - - -class ProcessingWorkerContext(object): - def __init__(self, pipeline, record, - work_queue, abort_event, pipeline_lock): - self.pipeline = pipeline - self.record = record - self.work_queue = work_queue - self.abort_event = abort_event - self.pipeline_lock = pipeline_lock - - -class ProcessingWorkerJob(object): - def __init__(self, base_dir, mount_info, path): - self.base_dir = base_dir - self.mount_info = mount_info - self.path = path - - -class ProcessingWorker(threading.Thread): - def __init__(self, wid, ctx): - super(ProcessingWorker, self).__init__() - self.wid = wid - self.ctx = ctx - self.success = True - - def run(self): - while(not self.ctx.abort_event.is_set()): - try: - job = self.ctx.work_queue.get(True, 0.1) - except Empty: - logger.debug("[%d] No more work... shutting down." % self.wid) - break - - try: - success = self._unsafeRun(job) - logger.debug("[%d] Done with file." % self.wid) - self.ctx.work_queue.task_done() - self.success &= success - except Exception as ex: - self.ctx.abort_event.set() - self.success = False - logger.error("[%d] Critical error, aborting." % self.wid) - logger.exception(ex) - break - - def _unsafeRun(self, job): - start_time = time.clock() - pipeline = self.ctx.pipeline - record = self.ctx.record - - rel_path = os.path.relpath(job.path, job.base_dir) - previous_entry = record.getPreviousEntry(rel_path) - - record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) - record.addEntry(record_entry) - - # Figure out if a previously processed file is overriding this one. - # This can happen if a theme file (processed via a mount point) - # is overridden in the user's website. - if record.current.hasOverrideEntry(rel_path): - record_entry.flags |= FLAG_OVERRIDEN - logger.info(format_timed(start_time, - '%s [not baked, overridden]' % rel_path)) - return True - - processors = pipeline.getFilteredProcessors( - job.mount_info['processors']) - try: - builder = ProcessingTreeBuilder(processors) - tree_root = builder.build(rel_path) - record_entry.flags |= FLAG_PREPARED - except ProcessingTreeError as ex: - msg = str(ex) - logger.error("Error preparing %s:\n%s" % (rel_path, msg)) - while ex: - record_entry.errors.append(str(ex)) - ex = ex.__cause__ - return False - - print_node(tree_root, recursive=True) - leaves = tree_root.getLeaves() - record_entry.rel_outputs = [l.path for l in leaves] - record_entry.proc_tree = get_node_name_tree(tree_root) - if tree_root.getProcessor().is_bypassing_structured_processing: - record_entry.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING - - force = (pipeline.force or previous_entry is None or - not previous_entry.was_processed_successfully) - if not force: - force = re_matchany(rel_path, pipeline.force_patterns) - - if force: - tree_root.setState(STATE_DIRTY, True) - - try: - runner = ProcessingTreeRunner( - job.base_dir, pipeline.tmp_dir, - pipeline.out_dir, self.ctx.pipeline_lock) - if runner.processSubTree(tree_root): - record_entry.flags |= FLAG_PROCESSED - logger.info(format_timed( - start_time, "[%d] %s" % (self.wid, rel_path))) - return True - except ProcessingTreeError as ex: - msg = str(ex) - if isinstance(ex, ProcessorError): - msg = str(ex.__cause__) - logger.error("Error processing %s:\n%s" % (rel_path, msg)) - while ex: - msg = re_ansicolors.sub('', str(ex)) - record_entry.errors.append(msg) - ex = ex.__cause__ - return False - - -def make_mount_infos(mounts, root_dir): - if isinstance(mounts, list): - mounts = {m: {} for m in mounts} - - for name, info in mounts.items(): - if not isinstance(info, dict): - raise Exception("Asset directory info for '%s' is not a " - "dictionary." % name) - info.setdefault('processors', 'all -uglifyjs -cleancss') - info['path'] = os.path.join(root_dir, name) - - return mounts - - -def make_re(patterns): - re_patterns = [] - for pat in patterns: - if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: - re_patterns.append(pat[1:-1]) - else: - escaped_pat = (re.escape(pat) - .replace(r'\*', r'[^/\\]*') - .replace(r'\?', r'[^/\\]')) - re_patterns.append(escaped_pat) - return [re.compile(p) for p in re_patterns] - - -def re_matchany(filename, patterns, dirname=None): - if dirname and dirname != '.': - filename = os.path.join(dirname, filename) - - # skip patterns use a forward slash regardless of the platform. - filename = filename.replace('\\', '/') - for pattern in patterns: - if pattern.search(filename): - return True - return False -