Mercurial > piecrust2
view piecrust/processing/tree.py @ 215:a47580a0955b
bake: Better error handling for the processing pipeline.
Pipeline jobs now keep track of whether they've seen any errors. This is
aggregated into an overall "success" flag for the processing record. Also, jobs
keep going as long as there's no critical (i.e. internal) failure happening.
Errors raised by processors are also better tracked: the actual processor that
failed, along with the input file, are tracks in the processing record.
The `bake` command returns a failure exit code if processing saw any error.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 31 Jan 2015 17:08:02 -0800 |
parents | 308d5180bf81 |
children | c4b3a7fd2f87 |
line wrap: on
line source
import os import time import os.path import logging from piecrust.chefutil import format_timed logger = logging.getLogger(__name__) STATE_UNKNOWN = 0 STATE_DIRTY = 1 STATE_CLEAN = 2 FORCE_BUILD = object() class ProcessingTreeError(Exception): pass class ProcessorNotFoundError(ProcessingTreeError): pass class ProcessorError(ProcessingTreeError): def __init__(self, proc_name, in_path, *args): super(ProcessorError, self).__init__(*args) self.proc_name = proc_name self.in_path = in_path def __str__(self): return "Processor %s failed on: %s" % (self.proc_name, self.in_path) class ProcessingTreeNode(object): def __init__(self, path, available_procs, level=0): self.path = path self.available_procs = available_procs self.outputs = [] self.level = level self.state = STATE_UNKNOWN self._processor = None def getProcessor(self): if self._processor is None: for p in self.available_procs: if p.matches(self.path): self._processor = p self.available_procs.remove(p) break else: raise ProcessorNotFoundError() return self._processor def setState(self, state, recursive=True): self.state = state if recursive: for o in self.outputs: o.setState(state, True) @property def is_leaf(self): return len(self.outputs) == 0 def getLeaves(self): if self.is_leaf: return [self] leaves = [] for o in self.outputs: for l in o.getLeaves(): leaves.append(l) return leaves class ProcessingTreeBuilder(object): def __init__(self, processors): self.processors = processors def build(self, path): start_time = time.clock() tree_root = ProcessingTreeNode(path, list(self.processors)) loop_guard = 100 walk_stack = [tree_root] while len(walk_stack) > 0: loop_guard -= 1 if loop_guard <= 0: raise ProcessingTreeError("Infinite loop detected!") cur_node = walk_stack.pop() proc = cur_node.getProcessor() # If the root tree node (and only that one) wants to bypass this # whole tree business, so be it. if proc.is_bypassing_structured_processing: if cur_node != tree_root: raise ProcessingTreeError("Only root processors can " "bypass structured processing.") break # Get the destination directory and output files. rel_dir, basename = os.path.split(cur_node.path) out_names = proc.getOutputFilenames(basename) if out_names is None: continue for n in out_names: out_node = ProcessingTreeNode( os.path.join(rel_dir, n), list(cur_node.available_procs), cur_node.level + 1) cur_node.outputs.append(out_node) if proc.PROCESSOR_NAME != 'copy': walk_stack.append(out_node) logger.debug(format_timed( start_time, "Built processing tree for: %s" % path, colored=False)) return tree_root class ProcessingTreeRunner(object): def __init__(self, base_dir, tmp_dir, out_dir, lock=None): self.base_dir = base_dir self.tmp_dir = tmp_dir self.out_dir = out_dir self.lock = lock def processSubTree(self, tree_root): did_process = False walk_stack = [tree_root] while len(walk_stack) > 0: cur_node = walk_stack.pop() self._computeNodeState(cur_node) if cur_node.state == STATE_DIRTY: did_process_this_node = self.processNode(cur_node) did_process |= did_process_this_node if did_process_this_node: for o in cur_node.outputs: if not o.is_leaf: walk_stack.append(o) else: for o in cur_node.outputs: if not o.is_leaf: walk_stack.append(o) return did_process def processNode(self, node): full_path = self._getNodePath(node) proc = node.getProcessor() if proc.is_bypassing_structured_processing: try: start_time = time.clock() proc.process(full_path, self.out_dir) print_node( node, format_timed( start_time, "(bypassing structured processing)", colored=False)) return True except Exception as e: raise ProcessorError(proc.PROCESSOR_NAME, full_path) from e # All outputs of a node must go to the same directory, so we can get # the output directory off of the first output. base_out_dir = self._getNodeBaseDir(node.outputs[0]) rel_out_dir = os.path.dirname(node.path) out_dir = os.path.join(base_out_dir, rel_out_dir) if not os.path.isdir(out_dir): if self.lock: with self.lock: if not os.path.isdir(out_dir): os.makedirs(out_dir, 0o755) else: os.makedirs(out_dir, 0o755) try: start_time = time.clock() proc_res = proc.process(full_path, out_dir) if proc_res is None: raise Exception("Processor '%s' didn't return a boolean " "result value." % proc) if proc_res: print_node(node, "-> %s" % out_dir) return True else: print_node(node, "-> %s [clean]" % out_dir) return False except Exception as e: raise ProcessorError(proc.PROCESSOR_NAME, full_path) from e def _computeNodeState(self, node): if node.state != STATE_UNKNOWN: return proc = node.getProcessor() if (proc.is_bypassing_structured_processing or not proc.is_delegating_dependency_check): # This processor wants to handle things on its own... node.setState(STATE_DIRTY, False) return start_time = time.clock() # Get paths and modification times for the input path and # all dependencies (if any). base_dir = self._getNodeBaseDir(node) full_path = os.path.join(base_dir, node.path) in_mtime = (full_path, os.path.getmtime(full_path)) force_build = False try: deps = proc.getDependencies(full_path) if deps == FORCE_BUILD: force_build = True elif deps is not None: for dep in deps: dep_mtime = os.path.getmtime(dep) if dep_mtime > in_mtime[1]: in_mtime = (dep, dep_mtime) except Exception as e: logger.warning("%s -- Will force-bake: %s" % (e, node.path)) node.setState(STATE_DIRTY, True) return if force_build: # Just do what the processor told us to do. node.setState(STATE_DIRTY, True) message = "Processor requested a forced build." print_node(node, message) else: # Get paths and modification times for the outputs. message = None for o in node.outputs: full_out_path = self._getNodePath(o) if not os.path.isfile(full_out_path): message = "Output '%s' doesn't exist." % o.path break o_mtime = os.path.getmtime(full_out_path) if o_mtime < in_mtime[1]: message = "Input '%s' is newer than output '%s'." % ( in_mtime[0], o.path) break if message is not None: node.setState(STATE_DIRTY, True) message += " Re-processing sub-tree." print_node(node, message) else: node.setState(STATE_CLEAN, False) state = "dirty" if node.state == STATE_DIRTY else "clean" logger.debug(format_timed(start_time, "Computed node dirtyness: %s" % state, indent_level=node.level, colored=False)) def _getNodeBaseDir(self, node): if node.level == 0: return self.base_dir if node.is_leaf: return self.out_dir return os.path.join(self.tmp_dir, str(node.level)) def _getNodePath(self, node): base_dir = self._getNodeBaseDir(node) return os.path.join(base_dir, node.path) def print_node(node, message=None, recursive=False): indent = ' ' * node.level try: proc_name = node.getProcessor().PROCESSOR_NAME except ProcessorNotFoundError: proc_name = 'n/a' message = message or '' logger.debug('%s%s [%s] %s' % (indent, node.path, proc_name, message)) if recursive: for o in node.outputs: print_node(o, None, True) def get_node_name_tree(node): try: proc_name = node.getProcessor().PROCESSOR_NAME except ProcessorNotFoundError: proc_name = 'n/a' children = [] for o in node.outputs: if not o.outputs: continue children.append(get_node_name_tree(o)) return (proc_name, children)