Mercurial > piecrust2
comparison piecrust/processing/base.py @ 215:a47580a0955b
bake: Better error handling for the processing pipeline.
Pipeline jobs now keep track of whether they've seen any errors. This is
aggregated into an overall "success" flag for the processing record. Also, jobs
keep going as long as there's no critical (i.e. internal) failure happening.
Errors raised by processors are also better tracked: the actual processor that
failed, along with the input file, are tracks in the processing record.
The `bake` command returns a failure exit code if processing saw any error.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 31 Jan 2015 17:08:02 -0800 |
parents | e34a6826a3d4 |
children | f82262f59600 |
comparison
equal
deleted
inserted
replaced
214:09e350db7f8f | 215:a47580a0955b |
---|---|
9 from piecrust.chefutil import format_timed | 9 from piecrust.chefutil import format_timed |
10 from piecrust.processing.records import ( | 10 from piecrust.processing.records import ( |
11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, | 11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, |
12 FLAG_PROCESSED, FLAG_OVERRIDEN, FLAG_BYPASSED_STRUCTURED_PROCESSING) | 12 FLAG_PROCESSED, FLAG_OVERRIDEN, FLAG_BYPASSED_STRUCTURED_PROCESSING) |
13 from piecrust.processing.tree import ( | 13 from piecrust.processing.tree import ( |
14 ProcessingTreeBuilder, ProcessingTreeRunner, ProcessingTreeError, | 14 ProcessingTreeBuilder, ProcessingTreeRunner, |
15 ProcessingTreeError, ProcessorError, | |
15 STATE_DIRTY, | 16 STATE_DIRTY, |
16 print_node, get_node_name_tree) | 17 print_node, get_node_name_tree) |
17 | 18 |
18 | 19 |
19 logger = logging.getLogger(__name__) | 20 logger = logging.getLogger(__name__) |
237 ctx = ProcessingContext(path, info, queue, record) | 238 ctx = ProcessingContext(path, info, queue, record) |
238 logger.debug("Initiating processing pipeline on: %s" % path) | 239 logger.debug("Initiating processing pipeline on: %s" % path) |
239 self.processDirectory(ctx, path) | 240 self.processDirectory(ctx, path) |
240 | 241 |
241 # Wait on all workers. | 242 # Wait on all workers. |
243 record.current.success = True | |
242 for w in pool: | 244 for w in pool: |
243 w.join() | 245 w.join() |
246 record.current.success &= w.success | |
244 if abort.is_set(): | 247 if abort.is_set(): |
245 raise Exception("Worker pool was aborted.") | 248 raise Exception("Worker pool was aborted.") |
246 | 249 |
247 # Handle deletions. | 250 # Handle deletions. |
248 if delete: | 251 if delete: |
308 class ProcessingWorker(threading.Thread): | 311 class ProcessingWorker(threading.Thread): |
309 def __init__(self, wid, ctx): | 312 def __init__(self, wid, ctx): |
310 super(ProcessingWorker, self).__init__() | 313 super(ProcessingWorker, self).__init__() |
311 self.wid = wid | 314 self.wid = wid |
312 self.ctx = ctx | 315 self.ctx = ctx |
316 self.success = True | |
313 | 317 |
314 def run(self): | 318 def run(self): |
315 while(not self.ctx.abort_event.is_set()): | 319 while(not self.ctx.abort_event.is_set()): |
316 try: | 320 try: |
317 job = self.ctx.work_queue.get(True, 0.1) | 321 job = self.ctx.work_queue.get(True, 0.1) |
318 except Empty: | 322 except Empty: |
319 logger.debug("[%d] No more work... shutting down." % self.wid) | 323 logger.debug("[%d] No more work... shutting down." % self.wid) |
320 break | 324 break |
321 | 325 |
322 try: | 326 try: |
323 self._unsafeRun(job) | 327 success = self._unsafeRun(job) |
324 logger.debug("[%d] Done with file." % self.wid) | 328 logger.debug("[%d] Done with file." % self.wid) |
325 self.ctx.work_queue.task_done() | 329 self.ctx.work_queue.task_done() |
330 self.success &= success | |
326 except Exception as ex: | 331 except Exception as ex: |
327 self.ctx.abort_event.set() | 332 self.ctx.abort_event.set() |
333 self.success = False | |
328 logger.error("[%d] Critical error, aborting." % self.wid) | 334 logger.error("[%d] Critical error, aborting." % self.wid) |
329 logger.exception(ex) | 335 logger.exception(ex) |
330 break | 336 break |
331 | 337 |
332 def _unsafeRun(self, job): | 338 def _unsafeRun(self, job): |
345 # is overridden in the user's website. | 351 # is overridden in the user's website. |
346 if record.current.hasOverrideEntry(rel_path): | 352 if record.current.hasOverrideEntry(rel_path): |
347 record_entry.flags |= FLAG_OVERRIDEN | 353 record_entry.flags |= FLAG_OVERRIDEN |
348 logger.info(format_timed(start_time, | 354 logger.info(format_timed(start_time, |
349 '%s [not baked, overridden]' % rel_path)) | 355 '%s [not baked, overridden]' % rel_path)) |
350 return | 356 return True |
351 | 357 |
352 processors = pipeline.getFilteredProcessors( | 358 processors = pipeline.getFilteredProcessors( |
353 job.mount_info['processors']) | 359 job.mount_info['processors']) |
354 try: | 360 try: |
355 builder = ProcessingTreeBuilder(processors) | 361 builder = ProcessingTreeBuilder(processors) |
356 tree_root = builder.build(rel_path) | 362 tree_root = builder.build(rel_path) |
357 except ProcessingTreeError as ex: | 363 except ProcessingTreeError as ex: |
358 record_entry.errors.append(str(ex)) | 364 msg = str(ex) |
359 logger.error("Error processing %s: %s" % (rel_path, ex)) | 365 logger.error("Error processing %s: %s" % (rel_path, msg)) |
360 return | 366 while ex: |
367 record_entry.errors.append(str(ex)) | |
368 ex = ex.__cause__ | |
369 return False | |
361 | 370 |
362 print_node(tree_root, recursive=True) | 371 print_node(tree_root, recursive=True) |
363 leaves = tree_root.getLeaves() | 372 leaves = tree_root.getLeaves() |
364 record_entry.rel_outputs = [l.path for l in leaves] | 373 record_entry.rel_outputs = [l.path for l in leaves] |
365 record_entry.proc_tree = get_node_name_tree(tree_root) | 374 record_entry.proc_tree = get_node_name_tree(tree_root) |
378 runner = ProcessingTreeRunner( | 387 runner = ProcessingTreeRunner( |
379 job.base_dir, pipeline.tmp_dir, | 388 job.base_dir, pipeline.tmp_dir, |
380 pipeline.out_dir, self.ctx.pipeline_lock) | 389 pipeline.out_dir, self.ctx.pipeline_lock) |
381 if runner.processSubTree(tree_root): | 390 if runner.processSubTree(tree_root): |
382 record_entry.flags |= FLAG_PROCESSED | 391 record_entry.flags |= FLAG_PROCESSED |
383 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) | 392 logger.info(format_timed( |
393 start_time, "[%d] %s" % (self.wid, rel_path))) | |
394 return True | |
384 except ProcessingTreeError as ex: | 395 except ProcessingTreeError as ex: |
385 record_entry.errors.append(str(ex)) | 396 msg = str(ex) |
386 logger.error("Error processing %s: %s" % (rel_path, ex)) | 397 if isinstance(ex, ProcessorError): |
398 msg = str(ex.__cause__) | |
399 logger.error("Error processing %s: %s" % (rel_path, msg)) | |
400 while ex: | |
401 record_entry.errors.append(str(ex)) | |
402 ex = ex.__cause__ | |
403 return False | |
387 | 404 |
388 | 405 |
389 def make_mount_infos(mounts, root_dir): | 406 def make_mount_infos(mounts, root_dir): |
390 if isinstance(mounts, list): | 407 if isinstance(mounts, list): |
391 mounts = {m: {} for m in mounts} | 408 mounts = {m: {} for m in mounts} |