comparison piecrust/processing/tree.py @ 414:c4b3a7fd2f87

bake: Make pipeline processing multi-process. Not many changes here, as it's pretty straightforward, but an API change for processors so they know if they're being initialized/disposed from the main process or from one of the workers. This makes it possible to do global stuff that has side-effects (e.g. create a directory) vs. doing in-memory stuff.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:20:30 -0700
parents a47580a0955b
children 7147b06670fd
comparison
equal deleted inserted replaced
413:eacf0a3afd0c 414:c4b3a7fd2f87
77 class ProcessingTreeBuilder(object): 77 class ProcessingTreeBuilder(object):
78 def __init__(self, processors): 78 def __init__(self, processors):
79 self.processors = processors 79 self.processors = processors
80 80
81 def build(self, path): 81 def build(self, path):
82 start_time = time.clock()
83 tree_root = ProcessingTreeNode(path, list(self.processors)) 82 tree_root = ProcessingTreeNode(path, list(self.processors))
84 83
85 loop_guard = 100 84 loop_guard = 100
86 walk_stack = [tree_root] 85 walk_stack = [tree_root]
87 while len(walk_stack) > 0: 86 while len(walk_stack) > 0:
95 # If the root tree node (and only that one) wants to bypass this 94 # If the root tree node (and only that one) wants to bypass this
96 # whole tree business, so be it. 95 # whole tree business, so be it.
97 if proc.is_bypassing_structured_processing: 96 if proc.is_bypassing_structured_processing:
98 if cur_node != tree_root: 97 if cur_node != tree_root:
99 raise ProcessingTreeError("Only root processors can " 98 raise ProcessingTreeError("Only root processors can "
100 "bypass structured processing.") 99 "bypass structured processing.")
101 break 100 break
102 101
103 # Get the destination directory and output files. 102 # Get the destination directory and output files.
104 rel_dir, basename = os.path.split(cur_node.path) 103 rel_dir, basename = os.path.split(cur_node.path)
105 out_names = proc.getOutputFilenames(basename) 104 out_names = proc.getOutputFilenames(basename)
114 cur_node.outputs.append(out_node) 113 cur_node.outputs.append(out_node)
115 114
116 if proc.PROCESSOR_NAME != 'copy': 115 if proc.PROCESSOR_NAME != 'copy':
117 walk_stack.append(out_node) 116 walk_stack.append(out_node)
118 117
119 logger.debug(format_timed(
120 start_time, "Built processing tree for: %s" % path,
121 colored=False))
122 return tree_root 118 return tree_root
123 119
124 120
125 class ProcessingTreeRunner(object): 121 class ProcessingTreeRunner(object):
126 def __init__(self, base_dir, tmp_dir, out_dir, lock=None): 122 def __init__(self, base_dir, tmp_dir, out_dir):
127 self.base_dir = base_dir 123 self.base_dir = base_dir
128 self.tmp_dir = tmp_dir 124 self.tmp_dir = tmp_dir
129 self.out_dir = out_dir 125 self.out_dir = out_dir
130 self.lock = lock
131 126
132 def processSubTree(self, tree_root): 127 def processSubTree(self, tree_root):
133 did_process = False 128 did_process = False
134 walk_stack = [tree_root] 129 walk_stack = [tree_root]
135 while len(walk_stack) > 0: 130 while len(walk_stack) > 0:
153 def processNode(self, node): 148 def processNode(self, node):
154 full_path = self._getNodePath(node) 149 full_path = self._getNodePath(node)
155 proc = node.getProcessor() 150 proc = node.getProcessor()
156 if proc.is_bypassing_structured_processing: 151 if proc.is_bypassing_structured_processing:
157 try: 152 try:
158 start_time = time.clock() 153 start_time = time.perf_counter()
159 proc.process(full_path, self.out_dir) 154 with proc.app.env.timerScope(proc.__class__.__name__):
155 proc.process(full_path, self.out_dir)
160 print_node( 156 print_node(
161 node, 157 node,
162 format_timed( 158 format_timed(
163 start_time, "(bypassing structured processing)", 159 start_time, "(bypassing structured processing)",
164 colored=False)) 160 colored=False))
170 # the output directory off of the first output. 166 # the output directory off of the first output.
171 base_out_dir = self._getNodeBaseDir(node.outputs[0]) 167 base_out_dir = self._getNodeBaseDir(node.outputs[0])
172 rel_out_dir = os.path.dirname(node.path) 168 rel_out_dir = os.path.dirname(node.path)
173 out_dir = os.path.join(base_out_dir, rel_out_dir) 169 out_dir = os.path.join(base_out_dir, rel_out_dir)
174 if not os.path.isdir(out_dir): 170 if not os.path.isdir(out_dir):
175 if self.lock: 171 try:
176 with self.lock: 172 os.makedirs(out_dir, 0o755, exist_ok=True)
177 if not os.path.isdir(out_dir): 173 except OSError:
178 os.makedirs(out_dir, 0o755) 174 pass
179 else:
180 os.makedirs(out_dir, 0o755)
181 175
182 try: 176 try:
183 start_time = time.clock() 177 start_time = time.perf_counter()
184 proc_res = proc.process(full_path, out_dir) 178 with proc.app.env.timerScope(proc.__class__.__name__):
179 proc_res = proc.process(full_path, out_dir)
185 if proc_res is None: 180 if proc_res is None:
186 raise Exception("Processor '%s' didn't return a boolean " 181 raise Exception("Processor '%s' didn't return a boolean "
187 "result value." % proc) 182 "result value." % proc)
188 if proc_res: 183 if proc_res:
189 print_node(node, "-> %s" % out_dir) 184 print_node(node, "-> %s" % out_dir)
198 if node.state != STATE_UNKNOWN: 193 if node.state != STATE_UNKNOWN:
199 return 194 return
200 195
201 proc = node.getProcessor() 196 proc = node.getProcessor()
202 if (proc.is_bypassing_structured_processing or 197 if (proc.is_bypassing_structured_processing or
203 not proc.is_delegating_dependency_check): 198 not proc.is_delegating_dependency_check):
204 # This processor wants to handle things on its own... 199 # This processor wants to handle things on its own...
205 node.setState(STATE_DIRTY, False) 200 node.setState(STATE_DIRTY, False)
206 return 201 return
207 202
208 start_time = time.clock() 203 start_time = time.perf_counter()
209 204
210 # Get paths and modification times for the input path and 205 # Get paths and modification times for the input path and
211 # all dependencies (if any). 206 # all dependencies (if any).
212 base_dir = self._getNodeBaseDir(node) 207 base_dir = self._getNodeBaseDir(node)
213 full_path = os.path.join(base_dir, node.path) 208 full_path = os.path.join(base_dir, node.path)