# HG changeset patch # User Ludovic Chabant # Date 1434853230 25200 # Node ID c4b3a7fd2f87ce530be4a256ab0f47d669cb4224 # Parent eacf0a3afd0c765a415e9c8bcfd2e117ff713269 bake: Make pipeline processing multi-process. Not many changes here, as it's pretty straightforward, but an API change for processors so they know if they're being initialized/disposed from the main process or from one of the workers. This makes it possible to do global stuff that has side-effects (e.g. create a directory) vs. doing in-memory stuff. diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/base.py --- a/piecrust/processing/base.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/base.py Sat Jun 20 19:20:30 2015 -0700 @@ -1,35 +1,36 @@ -import re -import time import shutil import os.path import logging -import hashlib -import threading -from queue import Queue, Empty -from piecrust.chefutil import format_timed -from piecrust.processing.records import ( - ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, - FLAG_PREPARED, FLAG_PROCESSED, FLAG_OVERRIDEN, - FLAG_BYPASSED_STRUCTURED_PROCESSING) -from piecrust.processing.tree import ( - ProcessingTreeBuilder, ProcessingTreeRunner, - ProcessingTreeError, ProcessorError, - STATE_DIRTY, - print_node, get_node_name_tree) logger = logging.getLogger(__name__) -re_ansicolors = re.compile('\033\\[\d+m') - - PRIORITY_FIRST = -1 PRIORITY_NORMAL = 0 PRIORITY_LAST = 1 -split_processor_names_re = re.compile(r'[ ,]+') +class PipelineContext(object): + def __init__(self, worker_id, app, out_dir, tmp_dir, force=None): + self.worker_id = worker_id + self.app = app + self.out_dir = out_dir + self.tmp_dir = tmp_dir + self.force = force + self.record = None + self._additional_ignore_patterns = [] + + @property + def is_first_worker(self): + return self.worker_id == 0 + + @property + def is_pipeline_process(self): + return self.worker_id < 0 + + def addIgnorePatterns(self, patterns): + self._additional_ignore_patterns += patterns class Processor(object): @@ -43,10 +44,10 @@ def initialize(self, app): self.app = app - def onPipelineStart(self, pipeline): + def onPipelineStart(self, ctx): pass - def onPipelineEnd(self, pipeline): + def onPipelineEnd(self, ctx): pass def matches(self, path): @@ -117,341 +118,3 @@ return self.stderr_data -class ProcessingContext(object): - def __init__(self, base_dir, mount_info, job_queue, record=None): - self.base_dir = base_dir - self.mount_info = mount_info - self.job_queue = job_queue - self.record = record - - -class ProcessorPipeline(object): - def __init__(self, app, out_dir, force=False): - assert app and out_dir - self.app = app - self.out_dir = out_dir - self.force = force - - tmp_dir = app.sub_cache_dir - if not tmp_dir: - import tempfile - tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') - self.tmp_dir = os.path.join(tmp_dir, 'proc') - - baker_params = app.config.get('baker') or {} - - assets_dirs = baker_params.get('assets_dirs', app.assets_dirs) - self.mounts = make_mount_infos(assets_dirs, self.app.root_dir) - - self.num_workers = baker_params.get('workers', 4) - - ignores = baker_params.get('ignore', []) - ignores += [ - '_cache', '_counter', - 'theme_info.yml', - '.DS_Store', 'Thumbs.db', - '.git*', '.hg*', '.svn'] - self.skip_patterns = make_re(ignores) - self.force_patterns = make_re(baker_params.get('force', [])) - - self.processors = app.plugin_loader.getProcessors() - - def addSkipPatterns(self, patterns): - self.skip_patterns += make_re(patterns) - - def filterProcessors(self, authorized_names): - self.processors = self.getFilteredProcessors(authorized_names) - - def getFilteredProcessors(self, authorized_names): - if not authorized_names or authorized_names == 'all': - return self.processors - - if isinstance(authorized_names, str): - authorized_names = split_processor_names_re.split(authorized_names) - - procs = [] - has_star = 'all' in authorized_names - for p in self.processors: - for name in authorized_names: - if name == p.PROCESSOR_NAME: - procs.append(p) - break - if name == ('-%s' % p.PROCESSOR_NAME): - break - else: - if has_star: - procs.append(p) - return procs - - def run(self, src_dir_or_file=None, *, - delete=True, previous_record=None, save_record=True): - # Invoke pre-processors. - for proc in self.processors: - proc.onPipelineStart(self) - - # Sort our processors again in case the pre-process step involved - # patching the processors with some new ones. - self.processors.sort(key=lambda p: p.priority) - - # Create the pipeline record. - record = TransitionalProcessorPipelineRecord() - record_cache = self.app.cache.getCache('proc') - record_name = ( - hashlib.md5(self.out_dir.encode('utf8')).hexdigest() + - '.record') - if previous_record: - record.setPrevious(previous_record) - elif not self.force and record_cache.has(record_name): - t = time.clock() - record.loadPrevious(record_cache.getCachePath(record_name)) - logger.debug(format_timed(t, 'loaded previous bake record', - colored=False)) - logger.debug("Got %d entries in process record." % - len(record.previous.entries)) - - # Create the workers. - pool = [] - queue = Queue() - abort = threading.Event() - pipeline_lock = threading.Lock() - for i in range(self.num_workers): - ctx = ProcessingWorkerContext(self, record, - queue, abort, pipeline_lock) - worker = ProcessingWorker(i, ctx) - worker.start() - pool.append(worker) - - if src_dir_or_file is not None: - # Process only the given path. - # Find out what mount point this is in. - for name, info in self.mounts.items(): - path = info['path'] - if src_dir_or_file[:len(path)] == path: - base_dir = path - mount_info = info - break - else: - known_roots = [i['path'] for i in self.mounts.values()] - raise Exception("Input path '%s' is not part of any known " - "mount point: %s" % - (src_dir_or_file, known_roots)) - - ctx = ProcessingContext(base_dir, mount_info, queue, record) - logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) - if os.path.isdir(src_dir_or_file): - self.processDirectory(ctx, src_dir_or_file) - elif os.path.isfile(src_dir_or_file): - self.processFile(ctx, src_dir_or_file) - - else: - # Process everything. - for name, info in self.mounts.items(): - path = info['path'] - ctx = ProcessingContext(path, info, queue, record) - logger.debug("Initiating processing pipeline on: %s" % path) - self.processDirectory(ctx, path) - - # Wait on all workers. - record.current.success = True - for w in pool: - w.join() - record.current.success &= w.success - if abort.is_set(): - raise Exception("Worker pool was aborted.") - - # Handle deletions. - if delete: - for path, reason in record.getDeletions(): - logger.debug("Removing '%s': %s" % (path, reason)) - try: - os.remove(path) - except FileNotFoundError: - pass - logger.info('[delete] %s' % path) - - # Invoke post-processors. - for proc in self.processors: - proc.onPipelineEnd(self) - - # Finalize the process record. - record.current.process_time = time.time() - record.current.out_dir = self.out_dir - record.collapseRecords() - - # Save the process record. - if save_record: - t = time.clock() - record.saveCurrent(record_cache.getCachePath(record_name)) - logger.debug(format_timed(t, 'saved bake record', colored=False)) - - return record.detach() - - def processDirectory(self, ctx, start_dir): - for dirpath, dirnames, filenames in os.walk(start_dir): - rel_dirpath = os.path.relpath(dirpath, start_dir) - dirnames[:] = [d for d in dirnames - if not re_matchany(d, self.skip_patterns, rel_dirpath)] - - for filename in filenames: - if re_matchany(filename, self.skip_patterns, rel_dirpath): - continue - self.processFile(ctx, os.path.join(dirpath, filename)) - - def processFile(self, ctx, path): - logger.debug("Queuing: %s" % path) - job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path) - ctx.job_queue.put_nowait(job) - - -class ProcessingWorkerContext(object): - def __init__(self, pipeline, record, - work_queue, abort_event, pipeline_lock): - self.pipeline = pipeline - self.record = record - self.work_queue = work_queue - self.abort_event = abort_event - self.pipeline_lock = pipeline_lock - - -class ProcessingWorkerJob(object): - def __init__(self, base_dir, mount_info, path): - self.base_dir = base_dir - self.mount_info = mount_info - self.path = path - - -class ProcessingWorker(threading.Thread): - def __init__(self, wid, ctx): - super(ProcessingWorker, self).__init__() - self.wid = wid - self.ctx = ctx - self.success = True - - def run(self): - while(not self.ctx.abort_event.is_set()): - try: - job = self.ctx.work_queue.get(True, 0.1) - except Empty: - logger.debug("[%d] No more work... shutting down." % self.wid) - break - - try: - success = self._unsafeRun(job) - logger.debug("[%d] Done with file." % self.wid) - self.ctx.work_queue.task_done() - self.success &= success - except Exception as ex: - self.ctx.abort_event.set() - self.success = False - logger.error("[%d] Critical error, aborting." % self.wid) - logger.exception(ex) - break - - def _unsafeRun(self, job): - start_time = time.clock() - pipeline = self.ctx.pipeline - record = self.ctx.record - - rel_path = os.path.relpath(job.path, job.base_dir) - previous_entry = record.getPreviousEntry(rel_path) - - record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) - record.addEntry(record_entry) - - # Figure out if a previously processed file is overriding this one. - # This can happen if a theme file (processed via a mount point) - # is overridden in the user's website. - if record.current.hasOverrideEntry(rel_path): - record_entry.flags |= FLAG_OVERRIDEN - logger.info(format_timed(start_time, - '%s [not baked, overridden]' % rel_path)) - return True - - processors = pipeline.getFilteredProcessors( - job.mount_info['processors']) - try: - builder = ProcessingTreeBuilder(processors) - tree_root = builder.build(rel_path) - record_entry.flags |= FLAG_PREPARED - except ProcessingTreeError as ex: - msg = str(ex) - logger.error("Error preparing %s:\n%s" % (rel_path, msg)) - while ex: - record_entry.errors.append(str(ex)) - ex = ex.__cause__ - return False - - print_node(tree_root, recursive=True) - leaves = tree_root.getLeaves() - record_entry.rel_outputs = [l.path for l in leaves] - record_entry.proc_tree = get_node_name_tree(tree_root) - if tree_root.getProcessor().is_bypassing_structured_processing: - record_entry.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING - - force = (pipeline.force or previous_entry is None or - not previous_entry.was_processed_successfully) - if not force: - force = re_matchany(rel_path, pipeline.force_patterns) - - if force: - tree_root.setState(STATE_DIRTY, True) - - try: - runner = ProcessingTreeRunner( - job.base_dir, pipeline.tmp_dir, - pipeline.out_dir, self.ctx.pipeline_lock) - if runner.processSubTree(tree_root): - record_entry.flags |= FLAG_PROCESSED - logger.info(format_timed( - start_time, "[%d] %s" % (self.wid, rel_path))) - return True - except ProcessingTreeError as ex: - msg = str(ex) - if isinstance(ex, ProcessorError): - msg = str(ex.__cause__) - logger.error("Error processing %s:\n%s" % (rel_path, msg)) - while ex: - msg = re_ansicolors.sub('', str(ex)) - record_entry.errors.append(msg) - ex = ex.__cause__ - return False - - -def make_mount_infos(mounts, root_dir): - if isinstance(mounts, list): - mounts = {m: {} for m in mounts} - - for name, info in mounts.items(): - if not isinstance(info, dict): - raise Exception("Asset directory info for '%s' is not a " - "dictionary." % name) - info.setdefault('processors', 'all -uglifyjs -cleancss') - info['path'] = os.path.join(root_dir, name) - - return mounts - - -def make_re(patterns): - re_patterns = [] - for pat in patterns: - if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: - re_patterns.append(pat[1:-1]) - else: - escaped_pat = (re.escape(pat) - .replace(r'\*', r'[^/\\]*') - .replace(r'\?', r'[^/\\]')) - re_patterns.append(escaped_pat) - return [re.compile(p) for p in re_patterns] - - -def re_matchany(filename, patterns, dirname=None): - if dirname and dirname != '.': - filename = os.path.join(dirname, filename) - - # skip patterns use a forward slash regardless of the platform. - filename = filename.replace('\\', '/') - for pattern in patterns: - if pattern.search(filename): - return True - return False - diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/compass.py --- a/piecrust/processing/compass.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/compass.py Sat Jun 20 19:20:30 2015 -0700 @@ -1,7 +1,6 @@ import os import os.path import logging -import platform import subprocess from piecrust.processing.base import Processor, PRIORITY_FIRST from piecrust.uriutil import multi_replace diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/less.py --- a/piecrust/processing/less.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/less.py Sat Jun 20 19:20:30 2015 -0700 @@ -24,7 +24,8 @@ def onPipelineStart(self, pipeline): self._map_dir = os.path.join(pipeline.tmp_dir, 'less') - if not os.path.isdir(self._map_dir): + if (pipeline.is_first_worker and + not os.path.isdir(self._map_dir)): os.makedirs(self._map_dir) def getDependencies(self, path): @@ -42,6 +43,7 @@ # Get the sources, but make all paths absolute. sources = dep_map.get('sources') path_dir = os.path.dirname(path) + def _makeAbs(p): return os.path.join(path_dir, p) deps = list(map(_makeAbs, sources)) diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/pipeline.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/processing/pipeline.py Sat Jun 20 19:20:30 2015 -0700 @@ -0,0 +1,351 @@ +import os +import os.path +import re +import time +import queue +import hashlib +import logging +import multiprocessing +from piecrust.chefutil import format_timed, format_timed_scope +from piecrust.processing.base import PipelineContext +from piecrust.processing.records import ( + ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, + FLAG_PROCESSED) +from piecrust.processing.worker import ( + ProcessingWorkerContext, ProcessingWorkerJob, + worker_func, get_filtered_processors) + + +logger = logging.getLogger(__name__) + + +class _ProcessingContext(object): + def __init__(self, pool, record, base_dir, mount_info): + self.pool = pool + self.record = record + self.base_dir = base_dir + self.mount_info = mount_info + + +class ProcessorPipeline(object): + def __init__(self, app, out_dir, force=False): + assert app and out_dir + self.app = app + self.out_dir = out_dir + self.force = force + + tmp_dir = app.sub_cache_dir + if not tmp_dir: + import tempfile + tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') + self.tmp_dir = os.path.join(tmp_dir, 'proc') + + baker_params = app.config.get('baker') or {} + + assets_dirs = baker_params.get('assets_dirs', app.assets_dirs) + self.mounts = make_mount_infos(assets_dirs, self.app.root_dir) + + self.num_workers = baker_params.get( + 'workers', multiprocessing.cpu_count()) + + ignores = baker_params.get('ignore', []) + ignores += [ + '_cache', '_counter', + 'theme_info.yml', + '.DS_Store', 'Thumbs.db', + '.git*', '.hg*', '.svn'] + self.ignore_patterns = make_re(ignores) + self.force_patterns = make_re(baker_params.get('force', [])) + + # Those things are mostly for unit-testing. + self.enabled_processors = None + self.additional_processors = None + + def addIgnorePatterns(self, patterns): + self.ignore_patterns += make_re(patterns) + + def run(self, src_dir_or_file=None, *, + delete=True, previous_record=None, save_record=True): + start_time = time.perf_counter() + + # Get the list of processors for this run. + processors = self.app.plugin_loader.getProcessors() + if self.enabled_processors is not None: + logger.debug("Filtering processors to: %s" % + self.enabled_processors) + processors = get_filtered_processors(processors, + self.enabled_processors) + if self.additional_processors is not None: + logger.debug("Adding %s additional processors." % + len(self.additional_processors)) + for proc in self.additional_processors: + self.app.env.registerTimer(proc.__class__.__name__, + raise_if_registered=False) + proc.initialize(self.app) + processors.append(proc) + + # Invoke pre-processors. + pipeline_ctx = PipelineContext(-1, self.app, self.out_dir, + self.tmp_dir, self.force) + for proc in processors: + proc.onPipelineStart(pipeline_ctx) + + # Pre-processors can define additional ignore patterns. + self.ignore_patterns += make_re( + pipeline_ctx._additional_ignore_patterns) + + # Create the worker pool. + pool = _WorkerPool() + + # Create the pipeline record. + record = TransitionalProcessorPipelineRecord() + record_cache = self.app.cache.getCache('proc') + record_name = ( + hashlib.md5(self.out_dir.encode('utf8')).hexdigest() + + '.record') + if previous_record: + record.setPrevious(previous_record) + elif not self.force and record_cache.has(record_name): + with format_timed_scope(logger, 'loaded previous bake record', + level=logging.DEBUG, colored=False): + record.loadPrevious(record_cache.getCachePath(record_name)) + logger.debug("Got %d entries in process record." % + len(record.previous.entries)) + record.current.success = True + record.current.processed_count = 0 + + # Work! + def _handler(res): + entry = record.getCurrentEntry(res.path) + assert entry is not None + entry.flags |= res.flags + entry.proc_tree = res.proc_tree + entry.rel_outputs = res.rel_outputs + if res.errors: + entry.errors += res.errors + record.current.success = False + if entry.flags & FLAG_PROCESSED: + record.current.processed_count += 1 + + pool = self._createWorkerPool() + expected_result_count = self._process(src_dir_or_file, pool, record) + self._waitOnWorkerPool(pool, expected_result_count, _handler) + self._terminateWorkerPool(pool) + + # Get timing information from the workers. + record.current.timers = {} + for _ in range(len(pool.workers)): + try: + timers = pool.results.get(True, 0.1) + except queue.Empty: + logger.error("Didn't get timing information from all workers.") + break + + for name, val in timers['data'].items(): + main_val = record.current.timers.setdefault(name, 0) + record.current.timers[name] = main_val + val + + # Invoke post-processors. + pipeline_ctx.record = record.current + for proc in processors: + proc.onPipelineEnd(pipeline_ctx) + + # Handle deletions. + if delete: + for path, reason in record.getDeletions(): + logger.debug("Removing '%s': %s" % (path, reason)) + try: + os.remove(path) + except FileNotFoundError: + pass + logger.info('[delete] %s' % path) + + # Finalize the process record. + record.current.process_time = time.time() + record.current.out_dir = self.out_dir + record.collapseRecords() + + # Save the process record. + if save_record: + with format_timed_scope(logger, 'saved bake record', + level=logging.DEBUG, colored=False): + record.saveCurrent(record_cache.getCachePath(record_name)) + + logger.info(format_timed( + start_time, + "processed %d assets." % record.current.processed_count)) + + return record.detach() + + def _process(self, src_dir_or_file, pool, record): + expected_result_count = 0 + + if src_dir_or_file is not None: + # Process only the given path. + # Find out what mount point this is in. + for name, info in self.mounts.items(): + path = info['path'] + if src_dir_or_file[:len(path)] == path: + base_dir = path + mount_info = info + break + else: + known_roots = [i['path'] for i in self.mounts.values()] + raise Exception("Input path '%s' is not part of any known " + "mount point: %s" % + (src_dir_or_file, known_roots)) + + ctx = _ProcessingContext(pool, record, base_dir, mount_info) + logger.debug("Initiating processing pipeline on: %s" % + src_dir_or_file) + if os.path.isdir(src_dir_or_file): + expected_result_count = self._processDirectory( + ctx, src_dir_or_file) + elif os.path.isfile(src_dir_or_file): + self._processFile(ctx, src_dir_or_file) + expected_result_count = 1 + + else: + # Process everything. + for name, info in self.mounts.items(): + path = info['path'] + ctx = _ProcessingContext(pool, record, path, info) + logger.debug("Initiating processing pipeline on: %s" % path) + expected_result_count = self._processDirectory(ctx, path) + + return expected_result_count + + def _processDirectory(self, ctx, start_dir): + queued_count = 0 + for dirpath, dirnames, filenames in os.walk(start_dir): + rel_dirpath = os.path.relpath(dirpath, start_dir) + dirnames[:] = [d for d in dirnames + if not re_matchany( + d, self.ignore_patterns, rel_dirpath)] + + for filename in filenames: + if re_matchany(filename, self.ignore_patterns, rel_dirpath): + continue + self._processFile(ctx, os.path.join(dirpath, filename)) + queued_count += 1 + return queued_count + + def _processFile(self, ctx, path): + # TODO: handle overrides between mount-points. + + entry = ProcessorPipelineRecordEntry(path) + ctx.record.addEntry(entry) + + previous_entry = ctx.record.getPreviousEntry(path) + force_this = (self.force or previous_entry is None or + not previous_entry.was_processed_successfully) + + job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path, + force=force_this) + + logger.debug("Queuing: %s" % path) + ctx.pool.queue.put_nowait(job) + + def _createWorkerPool(self): + pool = _WorkerPool() + for i in range(self.num_workers): + ctx = ProcessingWorkerContext( + self.app.root_dir, self.out_dir, self.tmp_dir, + pool.queue, pool.results, pool.abort_event, + self.force, self.app.debug) + ctx.enabled_processors = self.enabled_processors + ctx.additional_processors = self.additional_processors + w = multiprocessing.Process( + name='Worker_%d' % i, + target=worker_func, args=(i, ctx)) + w.start() + pool.workers.append(w) + return pool + + def _waitOnWorkerPool(self, pool, expected_result_count, result_handler): + abort_with_exception = None + try: + got_count = 0 + while got_count < expected_result_count: + try: + res = pool.results.get(True, 10) + except queue.Empty: + logger.error( + "Got %d results, expected %d, and timed-out " + "for 10 seconds. A worker might be stuck?" % + (got_count, expected_result_count)) + abort_with_exception = Exception("Worker time-out.") + break + + if isinstance(res, dict) and res.get('type') == 'error': + abort_with_exception = Exception( + 'Worker critical error:\n' + + '\n'.join(res['messages'])) + break + + got_count += 1 + result_handler(res) + except KeyboardInterrupt as kiex: + logger.warning("Bake aborted by user... " + "waiting for workers to stop.") + abort_with_exception = kiex + + if abort_with_exception: + pool.abort_event.set() + for w in pool.workers: + w.join(2) + raise abort_with_exception + + def _terminateWorkerPool(self, pool): + pool.abort_event.set() + for w in pool.workers: + w.join() + + +class _WorkerPool(object): + def __init__(self): + self.queue = multiprocessing.JoinableQueue() + self.results = multiprocessing.Queue() + self.abort_event = multiprocessing.Event() + self.workers = [] + + +def make_mount_infos(mounts, root_dir): + if isinstance(mounts, list): + mounts = {m: {} for m in mounts} + + for name, info in mounts.items(): + if not isinstance(info, dict): + raise Exception("Asset directory info for '%s' is not a " + "dictionary." % name) + info.setdefault('processors', 'all -uglifyjs -cleancss') + info['path'] = os.path.join(root_dir, name) + + return mounts + + +def make_re(patterns): + re_patterns = [] + for pat in patterns: + if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: + re_patterns.append(pat[1:-1]) + else: + escaped_pat = ( + re.escape(pat) + .replace(r'\*', r'[^/\\]*') + .replace(r'\?', r'[^/\\]')) + re_patterns.append(escaped_pat) + return [re.compile(p) for p in re_patterns] + + +def re_matchany(filename, patterns, dirname=None): + if dirname and dirname != '.': + filename = os.path.join(dirname, filename) + + # skip patterns use a forward slash regardless of the platform. + filename = filename.replace('\\', '/') + for pattern in patterns: + if pattern.search(filename): + return True + return False + diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/records.py --- a/piecrust/processing/records.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/records.py Sat Jun 20 19:20:30 2015 -0700 @@ -1,48 +1,33 @@ import os.path +import hashlib from piecrust.records import Record, TransitionalRecord class ProcessorPipelineRecord(Record): - RECORD_VERSION = 4 + RECORD_VERSION = 5 def __init__(self): super(ProcessorPipelineRecord, self).__init__() self.out_dir = None self.process_time = None + self.processed_count = 0 self.success = False - - def hasOverrideEntry(self, rel_path): - return self.findEntry(rel_path) is not None - - def findEntry(self, rel_path): - rel_path = rel_path.lower() - for entry in self.entries: - for out_path in entry.rel_outputs: - if out_path.lower() == rel_path: - return entry - return None - - def replaceEntry(self, new_entry): - for e in self.entries: - if (e.base_dir == new_entry.base_dir and - e.rel_input == new_entry.rel_input): - e.flags = new_entry.flags - e.rel_outputs = list(new_entry.rel_outputs) - e.errors = list(new_entry.errors) - break + self.timers = None FLAG_NONE = 0 FLAG_PREPARED = 2**0 FLAG_PROCESSED = 2**1 -FLAG_OVERRIDEN = 2**2 FLAG_BYPASSED_STRUCTURED_PROCESSING = 2**3 +def _get_transition_key(path): + return hashlib.md5(path.encode('utf8')).hexdigest() + + class ProcessorPipelineRecordEntry(object): - def __init__(self, base_dir, rel_input): - self.base_dir = base_dir - self.rel_input = rel_input + def __init__(self, path): + self.path = path self.flags = FLAG_NONE self.rel_outputs = [] @@ -50,10 +35,6 @@ self.errors = [] @property - def path(self): - return os.path.join(self.base_dir, self.rel_input) - - @property def was_prepared(self): return bool(self.flags & FLAG_PREPARED) @@ -73,10 +54,18 @@ ProcessorPipelineRecord, previous_path) def getTransitionKey(self, entry): - return entry.rel_input + return _get_transition_key(entry.path) - def getPreviousEntry(self, rel_path): - pair = self.transitions.get(rel_path) + def getCurrentEntry(self, path): + key = _get_transition_key(path) + pair = self.transitions.get(key) + if pair is not None: + return pair[1] + return None + + def getPreviousEntry(self, path): + key = _get_transition_key(path) + pair = self.transitions.get(key) if pair is not None: return pair[0] return None diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/sass.py --- a/piecrust/processing/sass.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/sass.py Sat Jun 20 19:20:30 2015 -0700 @@ -26,12 +26,14 @@ def onPipelineStart(self, pipeline): super(SassProcessor, self).onPipelineStart(pipeline) - self._map_dir = os.path.join(pipeline.tmp_dir, 'sass') - if not os.path.isdir(self._map_dir): - os.makedirs(self._map_dir) + + if pipeline.is_first_worker: + self._map_dir = os.path.join(pipeline.tmp_dir, 'sass') + if not os.path.isdir(self._map_dir): + os.makedirs(self._map_dir) # Ignore include-only Sass files. - pipeline.addSkipPatterns(['_*.scss', '_*.sass']) + pipeline.addIgnorePatterns(['_*.scss', '_*.sass']) def getDependencies(self, path): if _is_include_only(path): diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/tree.py --- a/piecrust/processing/tree.py Sat Jun 20 19:16:38 2015 -0700 +++ b/piecrust/processing/tree.py Sat Jun 20 19:20:30 2015 -0700 @@ -79,7 +79,6 @@ self.processors = processors def build(self, path): - start_time = time.clock() tree_root = ProcessingTreeNode(path, list(self.processors)) loop_guard = 100 @@ -97,7 +96,7 @@ if proc.is_bypassing_structured_processing: if cur_node != tree_root: raise ProcessingTreeError("Only root processors can " - "bypass structured processing.") + "bypass structured processing.") break # Get the destination directory and output files. @@ -116,18 +115,14 @@ if proc.PROCESSOR_NAME != 'copy': walk_stack.append(out_node) - logger.debug(format_timed( - start_time, "Built processing tree for: %s" % path, - colored=False)) return tree_root class ProcessingTreeRunner(object): - def __init__(self, base_dir, tmp_dir, out_dir, lock=None): + def __init__(self, base_dir, tmp_dir, out_dir): self.base_dir = base_dir self.tmp_dir = tmp_dir self.out_dir = out_dir - self.lock = lock def processSubTree(self, tree_root): did_process = False @@ -155,8 +150,9 @@ proc = node.getProcessor() if proc.is_bypassing_structured_processing: try: - start_time = time.clock() - proc.process(full_path, self.out_dir) + start_time = time.perf_counter() + with proc.app.env.timerScope(proc.__class__.__name__): + proc.process(full_path, self.out_dir) print_node( node, format_timed( @@ -172,16 +168,15 @@ rel_out_dir = os.path.dirname(node.path) out_dir = os.path.join(base_out_dir, rel_out_dir) if not os.path.isdir(out_dir): - if self.lock: - with self.lock: - if not os.path.isdir(out_dir): - os.makedirs(out_dir, 0o755) - else: - os.makedirs(out_dir, 0o755) + try: + os.makedirs(out_dir, 0o755, exist_ok=True) + except OSError: + pass try: - start_time = time.clock() - proc_res = proc.process(full_path, out_dir) + start_time = time.perf_counter() + with proc.app.env.timerScope(proc.__class__.__name__): + proc_res = proc.process(full_path, out_dir) if proc_res is None: raise Exception("Processor '%s' didn't return a boolean " "result value." % proc) @@ -200,12 +195,12 @@ proc = node.getProcessor() if (proc.is_bypassing_structured_processing or - not proc.is_delegating_dependency_check): + not proc.is_delegating_dependency_check): # This processor wants to handle things on its own... node.setState(STATE_DIRTY, False) return - start_time = time.clock() + start_time = time.perf_counter() # Get paths and modification times for the input path and # all dependencies (if any). diff -r eacf0a3afd0c -r c4b3a7fd2f87 piecrust/processing/worker.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/processing/worker.py Sat Jun 20 19:20:30 2015 -0700 @@ -0,0 +1,213 @@ +import os.path +import re +import time +import queue +import logging +from piecrust.app import PieCrust +from piecrust.processing.base import PipelineContext +from piecrust.processing.records import ( + FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, + FLAG_BYPASSED_STRUCTURED_PROCESSING) +from piecrust.processing.tree import ( + ProcessingTreeBuilder, ProcessingTreeRunner, + ProcessingTreeError, ProcessorError, + get_node_name_tree, print_node, + STATE_DIRTY) + + +logger = logging.getLogger(__name__) + + +split_processor_names_re = re.compile(r'[ ,]+') +re_ansicolors = re.compile('\033\\[\d+m') + + +def worker_func(wid, ctx): + logger.debug("Worker %d booting up..." % wid) + w = ProcessingWorker(wid, ctx) + w.run() + + +class ProcessingWorkerContext(object): + def __init__(self, root_dir, out_dir, tmp_dir, + work_queue, results, abort_event, + force=False, debug=False): + self.root_dir = root_dir + self.out_dir = out_dir + self.tmp_dir = tmp_dir + self.work_queue = work_queue + self.results = results + self.abort_event = abort_event + self.force = force + self.debug = debug + self.enabled_processors = None + self.additional_processors = None + + +class ProcessingWorkerJob(object): + def __init__(self, base_dir, mount_info, path, *, force=False): + self.base_dir = base_dir + self.mount_info = mount_info + self.path = path + self.force = force + + +class ProcessingWorkerResult(object): + def __init__(self, path): + self.path = path + self.flags = FLAG_NONE + self.proc_tree = None + self.rel_outputs = None + self.errors = None + + +class ProcessingWorker(object): + def __init__(self, wid, ctx): + self.wid = wid + self.ctx = ctx + + def run(self): + logger.debug("Worker %d initializing..." % self.wid) + work_start_time = time.perf_counter() + + # Create the app local to this worker. + app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) + app.env.fs_cache_only_for_main_page = True + app.env.registerTimer("Worker_%d" % self.wid) + app.env.registerTimer("JobReceive") + app.env.registerTimer('BuildProcessingTree') + app.env.registerTimer('RunProcessingTree') + + processors = app.plugin_loader.getProcessors() + if self.ctx.enabled_processors: + logger.debug("Filtering processors to: %s" % + self.ctx.enabled_processors) + processors = get_filtered_processors(processors, + self.ctx.enabled_processors) + if self.ctx.additional_processors: + logger.debug("Adding %s additional processors." % + len(self.ctx.additional_processors)) + for proc in self.ctx.additional_processors: + app.env.registerTimer(proc.__class__.__name__) + proc.initialize(app) + processors.append(proc) + + # Invoke pre-processors. + pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, + self.ctx.tmp_dir, self.ctx.force) + for proc in processors: + proc.onPipelineStart(pipeline_ctx) + + # Sort our processors again in case the pre-process step involved + # patching the processors with some new ones. + processors.sort(key=lambda p: p.priority) + + aborted_with_exception = None + while not self.ctx.abort_event.is_set(): + try: + with app.env.timerScope('JobReceive'): + job = self.ctx.work_queue.get(True, 0.01) + except queue.Empty: + continue + + try: + result = self._unsafeRun(app, processors, job) + self.ctx.results.put_nowait(result) + except Exception as ex: + self.ctx.abort_event.set() + aborted_with_exception = ex + logger.error("[%d] Critical error, aborting." % self.wid) + if self.ctx.debug: + logger.exception(ex) + break + finally: + self.ctx.work_queue.task_done() + + if aborted_with_exception is not None: + msgs = _get_errors(aborted_with_exception) + self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) + + # Invoke post-processors. + for proc in processors: + proc.onPipelineEnd(pipeline_ctx) + + app.env.stepTimer("Worker_%d" % self.wid, + time.perf_counter() - work_start_time) + self.ctx.results.put_nowait({ + 'type': 'timers', 'data': app.env._timers}) + + def _unsafeRun(self, app, processors, job): + result = ProcessingWorkerResult(job.path) + + processors = get_filtered_processors( + processors, job.mount_info['processors']) + + # Build the processing tree for this job. + rel_path = os.path.relpath(job.path, job.base_dir) + try: + with app.env.timerScope('BuildProcessingTree'): + builder = ProcessingTreeBuilder(processors) + tree_root = builder.build(rel_path) + result.flags |= FLAG_PREPARED + except ProcessingTreeError as ex: + result.errors += _get_errors(ex) + return result + + # Prepare and run the tree. + print_node(tree_root, recursive=True) + leaves = tree_root.getLeaves() + result.rel_outputs = [l.path for l in leaves] + result.proc_tree = get_node_name_tree(tree_root) + if tree_root.getProcessor().is_bypassing_structured_processing: + result.flags |= FLAG_BYPASSED_STRUCTURED_PROCESSING + + if job.force: + tree_root.setState(STATE_DIRTY, True) + + try: + with app.env.timerScope('RunProcessingTree'): + runner = ProcessingTreeRunner( + job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) + if runner.processSubTree(tree_root): + result.flags |= FLAG_PROCESSED + except ProcessingTreeError as ex: + if isinstance(ex, ProcessorError): + ex = ex.__cause__ + # Need to strip out colored errors from external processes. + result.errors += _get_errors(ex, strip_colors=True) + + return result + + +def get_filtered_processors(processors, authorized_names): + if not authorized_names or authorized_names == 'all': + return processors + + if isinstance(authorized_names, str): + authorized_names = split_processor_names_re.split(authorized_names) + + procs = [] + has_star = 'all' in authorized_names + for p in processors: + for name in authorized_names: + if name == p.PROCESSOR_NAME: + procs.append(p) + break + if name == ('-%s' % p.PROCESSOR_NAME): + break + else: + if has_star: + procs.append(p) + return procs + + +def _get_errors(ex, strip_colors=False): + errors = [] + while ex is not None: + msg = str(ex) + if strip_colors: + msg = re_ansicolors.sub('', msg) + errors.append(msg) + ex = ex.__cause__ + return errors + diff -r eacf0a3afd0c -r c4b3a7fd2f87 tests/test_processing_base.py --- a/tests/test_processing_base.py Sat Jun 20 19:16:38 2015 -0700 +++ b/tests/test_processing_base.py Sat Jun 20 19:20:30 2015 -0700 @@ -2,8 +2,10 @@ import os.path import shutil import pytest -from piecrust.processing.base import (ProcessorPipeline, SimpleFileProcessor) +from piecrust.processing.base import SimpleFileProcessor +from piecrust.processing.pipeline import ProcessorPipeline from piecrust.processing.records import ProcessorPipelineRecord +from piecrust.processing.worker import get_filtered_processors from .mockutil import mock_fs, mock_fs_scope @@ -36,7 +38,6 @@ def _get_pipeline(fs, app=None): app = app or fs.getApp() - app.config.set('baker/num_workers', 1) return ProcessorPipeline(app, fs.path('counter')) @@ -44,7 +45,7 @@ fs = mock_fs() with mock_fs_scope(fs): pp = _get_pipeline(fs) - pp.filterProcessors(['copy']) + pp.enabled_processors = ['copy'] expected = {} assert expected == fs.getStructure('counter') pp.run() @@ -57,7 +58,7 @@ .withFile('kitchen/assets/something.html', 'A test file.')) with mock_fs_scope(fs): pp = _get_pipeline(fs) - pp.filterProcessors(['copy']) + pp.enabled_processors = ['copy'] expected = {} assert expected == fs.getStructure('counter') pp.run() @@ -70,7 +71,7 @@ .withFile('kitchen/assets/blah.foo', 'A test file.')) with mock_fs_scope(fs): pp = _get_pipeline(fs) - pp.filterProcessors(['copy']) + pp.enabled_processors = ['copy'] pp.run() expected = {'blah.foo': 'A test file.'} assert expected == fs.getStructure('counter') @@ -93,10 +94,10 @@ def test_two_levels_dirtyness(): fs = (mock_fs() .withFile('kitchen/assets/blah.foo', 'A test file.')) - with mock_fs_scope(fs) as scope: + with mock_fs_scope(fs): pp = _get_pipeline(fs) - pp.processors.append(FooProcessor(('foo', 'bar'), scope._open)) - pp.filterProcessors(['foo', 'copy']) + pp.enabled_processors = ['copy'] + pp.additional_processors = [FooProcessor(('foo', 'bar'))] pp.run() expected = {'blah.bar': 'FOO: A test file.'} assert expected == fs.getStructure('counter') @@ -126,7 +127,7 @@ 'blah2.foo': 'Ooops'} assert expected == fs.getStructure('kitchen/assets') pp = _get_pipeline(fs) - pp.filterProcessors(['copy']) + pp.enabled_processors = ['copy'] pp.run() assert expected == fs.getStructure('counter') @@ -145,18 +146,21 @@ with mock_fs_scope(fs): pp = _get_pipeline(fs) noop = NoopProcessor(('foo', 'foo')) - pp.processors.append(noop) - pp.filterProcessors(['foo', 'copy']) + pp.enabled_processors = ['copy'] + pp.additional_processors = [noop] pp.run() - assert 1 == len(noop.processed) + assert os.path.exists(fs.path('/counter/blah.foo')) is True + mtime = os.path.getmtime(fs.path('/counter/blah.foo')) + time.sleep(1) pp.run() - assert 1 == len(noop.processed) + assert mtime == os.path.getmtime(fs.path('/counter/blah.foo')) + time.sleep(1) ProcessorPipelineRecord.RECORD_VERSION += 1 try: pp.run() - assert 2 == len(noop.processed) + assert mtime < os.path.getmtime(fs.path('/counter/blah.foo')) finally: ProcessorPipelineRecord.RECORD_VERSION -= 1 @@ -170,24 +174,27 @@ {'something.html': 'A test file.', 'foo': {'_important.html': 'Important!'}}) ]) -def test_skip_pattern(patterns, expected): +def test_ignore_pattern(patterns, expected): fs = (mock_fs() .withFile('kitchen/assets/something.html', 'A test file.') .withFile('kitchen/assets/_hidden.html', 'Shhh') .withFile('kitchen/assets/foo/_important.html', 'Important!')) with mock_fs_scope(fs): pp = _get_pipeline(fs) - pp.addSkipPatterns(patterns) - pp.filterProcessors(['copy']) + pp.addIgnorePatterns(patterns) + pp.enabled_processors = ['copy'] assert {} == fs.getStructure('counter') pp.run() assert expected == fs.getStructure('counter') @pytest.mark.parametrize('names, expected', [ - ('all', ['copy', 'concat', 'less', 'sass', 'sitemap']), - ('all -sitemap', ['copy', 'concat', 'less', 'sass']), - ('-sitemap -less -sass all', ['copy', 'concat']), + ('all', ['cleancss', 'compass', 'copy', 'concat', 'less', 'requirejs', + 'sass', 'sitemap', 'uglifyjs']), + ('all -sitemap', ['cleancss', 'copy', 'compass', 'concat', 'less', + 'requirejs', 'sass', 'uglifyjs']), + ('-sitemap -less -sass all', ['cleancss', 'copy', 'compass', 'concat', + 'requirejs', 'uglifyjs']), ('copy', ['copy']), ('less sass', ['less', 'sass']) ]) @@ -195,9 +202,8 @@ fs = mock_fs() with mock_fs_scope(fs): app = fs.getApp() - pp = _get_pipeline(fs, app=app) - pp.filterProcessors('copy concat less sass sitemap') - procs = pp.getFilteredProcessors(names) + processors = app.plugin_loader.getProcessors() + procs = get_filtered_processors(processors, names) actual = [p.PROCESSOR_NAME for p in procs] assert sorted(actual) == sorted(expected)