comparison piecrust/processing/base.py @ 215:a47580a0955b

bake: Better error handling for the processing pipeline. Pipeline jobs now keep track of whether they've seen any errors. This is aggregated into an overall "success" flag for the processing record. Also, jobs keep going as long as there's no critical (i.e. internal) failure happening. Errors raised by processors are also better tracked: the actual processor that failed, along with the input file, are tracks in the processing record. The `bake` command returns a failure exit code if processing saw any error.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 31 Jan 2015 17:08:02 -0800
parents e34a6826a3d4
children f82262f59600
comparison
equal deleted inserted replaced
214:09e350db7f8f 215:a47580a0955b
9 from piecrust.chefutil import format_timed 9 from piecrust.chefutil import format_timed
10 from piecrust.processing.records import ( 10 from piecrust.processing.records import (
11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, 11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
12 FLAG_PROCESSED, FLAG_OVERRIDEN, FLAG_BYPASSED_STRUCTURED_PROCESSING) 12 FLAG_PROCESSED, FLAG_OVERRIDEN, FLAG_BYPASSED_STRUCTURED_PROCESSING)
13 from piecrust.processing.tree import ( 13 from piecrust.processing.tree import (
14 ProcessingTreeBuilder, ProcessingTreeRunner, ProcessingTreeError, 14 ProcessingTreeBuilder, ProcessingTreeRunner,
15 ProcessingTreeError, ProcessorError,
15 STATE_DIRTY, 16 STATE_DIRTY,
16 print_node, get_node_name_tree) 17 print_node, get_node_name_tree)
17 18
18 19
19 logger = logging.getLogger(__name__) 20 logger = logging.getLogger(__name__)
237 ctx = ProcessingContext(path, info, queue, record) 238 ctx = ProcessingContext(path, info, queue, record)
238 logger.debug("Initiating processing pipeline on: %s" % path) 239 logger.debug("Initiating processing pipeline on: %s" % path)
239 self.processDirectory(ctx, path) 240 self.processDirectory(ctx, path)
240 241
241 # Wait on all workers. 242 # Wait on all workers.
243 record.current.success = True
242 for w in pool: 244 for w in pool:
243 w.join() 245 w.join()
246 record.current.success &= w.success
244 if abort.is_set(): 247 if abort.is_set():
245 raise Exception("Worker pool was aborted.") 248 raise Exception("Worker pool was aborted.")
246 249
247 # Handle deletions. 250 # Handle deletions.
248 if delete: 251 if delete:
308 class ProcessingWorker(threading.Thread): 311 class ProcessingWorker(threading.Thread):
309 def __init__(self, wid, ctx): 312 def __init__(self, wid, ctx):
310 super(ProcessingWorker, self).__init__() 313 super(ProcessingWorker, self).__init__()
311 self.wid = wid 314 self.wid = wid
312 self.ctx = ctx 315 self.ctx = ctx
316 self.success = True
313 317
314 def run(self): 318 def run(self):
315 while(not self.ctx.abort_event.is_set()): 319 while(not self.ctx.abort_event.is_set()):
316 try: 320 try:
317 job = self.ctx.work_queue.get(True, 0.1) 321 job = self.ctx.work_queue.get(True, 0.1)
318 except Empty: 322 except Empty:
319 logger.debug("[%d] No more work... shutting down." % self.wid) 323 logger.debug("[%d] No more work... shutting down." % self.wid)
320 break 324 break
321 325
322 try: 326 try:
323 self._unsafeRun(job) 327 success = self._unsafeRun(job)
324 logger.debug("[%d] Done with file." % self.wid) 328 logger.debug("[%d] Done with file." % self.wid)
325 self.ctx.work_queue.task_done() 329 self.ctx.work_queue.task_done()
330 self.success &= success
326 except Exception as ex: 331 except Exception as ex:
327 self.ctx.abort_event.set() 332 self.ctx.abort_event.set()
333 self.success = False
328 logger.error("[%d] Critical error, aborting." % self.wid) 334 logger.error("[%d] Critical error, aborting." % self.wid)
329 logger.exception(ex) 335 logger.exception(ex)
330 break 336 break
331 337
332 def _unsafeRun(self, job): 338 def _unsafeRun(self, job):
345 # is overridden in the user's website. 351 # is overridden in the user's website.
346 if record.current.hasOverrideEntry(rel_path): 352 if record.current.hasOverrideEntry(rel_path):
347 record_entry.flags |= FLAG_OVERRIDEN 353 record_entry.flags |= FLAG_OVERRIDEN
348 logger.info(format_timed(start_time, 354 logger.info(format_timed(start_time,
349 '%s [not baked, overridden]' % rel_path)) 355 '%s [not baked, overridden]' % rel_path))
350 return 356 return True
351 357
352 processors = pipeline.getFilteredProcessors( 358 processors = pipeline.getFilteredProcessors(
353 job.mount_info['processors']) 359 job.mount_info['processors'])
354 try: 360 try:
355 builder = ProcessingTreeBuilder(processors) 361 builder = ProcessingTreeBuilder(processors)
356 tree_root = builder.build(rel_path) 362 tree_root = builder.build(rel_path)
357 except ProcessingTreeError as ex: 363 except ProcessingTreeError as ex:
358 record_entry.errors.append(str(ex)) 364 msg = str(ex)
359 logger.error("Error processing %s: %s" % (rel_path, ex)) 365 logger.error("Error processing %s: %s" % (rel_path, msg))
360 return 366 while ex:
367 record_entry.errors.append(str(ex))
368 ex = ex.__cause__
369 return False
361 370
362 print_node(tree_root, recursive=True) 371 print_node(tree_root, recursive=True)
363 leaves = tree_root.getLeaves() 372 leaves = tree_root.getLeaves()
364 record_entry.rel_outputs = [l.path for l in leaves] 373 record_entry.rel_outputs = [l.path for l in leaves]
365 record_entry.proc_tree = get_node_name_tree(tree_root) 374 record_entry.proc_tree = get_node_name_tree(tree_root)
378 runner = ProcessingTreeRunner( 387 runner = ProcessingTreeRunner(
379 job.base_dir, pipeline.tmp_dir, 388 job.base_dir, pipeline.tmp_dir,
380 pipeline.out_dir, self.ctx.pipeline_lock) 389 pipeline.out_dir, self.ctx.pipeline_lock)
381 if runner.processSubTree(tree_root): 390 if runner.processSubTree(tree_root):
382 record_entry.flags |= FLAG_PROCESSED 391 record_entry.flags |= FLAG_PROCESSED
383 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) 392 logger.info(format_timed(
393 start_time, "[%d] %s" % (self.wid, rel_path)))
394 return True
384 except ProcessingTreeError as ex: 395 except ProcessingTreeError as ex:
385 record_entry.errors.append(str(ex)) 396 msg = str(ex)
386 logger.error("Error processing %s: %s" % (rel_path, ex)) 397 if isinstance(ex, ProcessorError):
398 msg = str(ex.__cause__)
399 logger.error("Error processing %s: %s" % (rel_path, msg))
400 while ex:
401 record_entry.errors.append(str(ex))
402 ex = ex.__cause__
403 return False
387 404
388 405
389 def make_mount_infos(mounts, root_dir): 406 def make_mount_infos(mounts, root_dir):
390 if isinstance(mounts, list): 407 if isinstance(mounts, list):
391 mounts = {m: {} for m in mounts} 408 mounts = {m: {} for m in mounts}