comparison piecrust/workerpool.py @ 979:45ad976712ec

tests: Big push to get the tests to pass again. - Lots of fixes everywhere in the code. - Try to handle debug logging in the multiprocessing worker pool when running in pytest. Not perfect, but usable for now. - Replace all `.md` test files with `.html` since now a auto-format extension always sets the format. - Replace `out` with `outfiles` in most places since now blog archives are added to the bake output and I don't want to add expected outputs for blog archives everywhere.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 29 Oct 2017 22:51:57 -0700
parents 5485a11591ec
children 8adc27285d93
comparison
equal deleted inserted replaced
978:7e51d14097cb 979:45ad976712ec
86 msg = ("CRITICAL ERROR IN WORKER %d\n%s" % (params.wid, str(ex))) 86 msg = ("CRITICAL ERROR IN WORKER %d\n%s" % (params.wid, str(ex)))
87 params.outqueue.put(( 87 params.outqueue.put((
88 _CRITICAL_WORKER_ERROR, None, False, params.wid, msg)) 88 _CRITICAL_WORKER_ERROR, None, False, params.wid, msg))
89 89
90 90
91 def _pre_parse_pytest_args():
92 # If we are unit-testing, we need to translate our test logging
93 # arguments into something Chef can understand.
94 import argparse
95 parser = argparse.ArgumentParser()
96 # This is adapted from our `conftest.py`.
97 parser.add_argument('--log-debug', action='store_true')
98 parser.add_argument('--log-file')
99 res, _ = parser.parse_known_args(sys.argv[1:])
100
101 chef_args = []
102 if res.log_debug:
103 chef_args.append('--debug')
104 if res.log_file:
105 chef_args += ['--log', res.log_file]
106
107 root_logger = logging.getLogger()
108 while len(root_logger.handlers) > 0:
109 root_logger.removeHandler(root_logger.handlers[0])
110
111 from piecrust.main import _pre_parse_chef_args
112 _pre_parse_chef_args(chef_args)
113
114
91 def _real_worker_func_unsafe(params): 115 def _real_worker_func_unsafe(params):
92 wid = params.wid 116 wid = params.wid
93 117
94 stats = ExecutionStats() 118 stats = ExecutionStats()
95 stats.registerTimer('WorkerInit') 119 stats.registerTimer('WorkerInit')
96 init_start_time = time.perf_counter() 120 init_start_time = time.perf_counter()
97 121
98 # If we are unit-testing, we didn't setup all the logging environment
99 # yet, since the executable is `py.test`. We need to translate our
100 # test logging arguments into something Chef can understand.
101 if params.is_unit_testing:
102 import argparse
103 parser = argparse.ArgumentParser()
104 # This is adapted from our `conftest.py`.
105 parser.add_argument('--log-debug', action='store_true')
106 parser.add_argument('--log-file')
107 res, _ = parser.parse_known_args(sys.argv[1:])
108
109 chef_args = []
110 if res.log_debug:
111 chef_args.append('--debug')
112 if res.log_file:
113 chef_args += ['--log', res.log_file]
114
115 from piecrust.main import _pre_parse_chef_args
116 _pre_parse_chef_args(chef_args)
117
118 # In a context where `multiprocessing` is using the `spawn` forking model, 122 # In a context where `multiprocessing` is using the `spawn` forking model,
119 # the new process doesn't inherit anything, so we lost all our logging 123 # the new process doesn't inherit anything, so we lost all our logging
120 # configuration here. Let's set it up again. 124 # configuration here. Let's set it up again.
121 elif (hasattr(multiprocessing, 'get_start_method') and 125 if (hasattr(multiprocessing, 'get_start_method') and
122 multiprocessing.get_start_method() == 'spawn'): 126 multiprocessing.get_start_method() == 'spawn'):
123 from piecrust.main import _pre_parse_chef_args 127 if not params.is_unit_testing:
124 _pre_parse_chef_args(sys.argv[1:]) 128 from piecrust.main import _pre_parse_chef_args
129 _pre_parse_chef_args(sys.argv[1:])
130 else:
131 _pre_parse_pytest_args()
132 elif params.is_unit_testing:
133 _pre_parse_pytest_args()
125 134
126 from piecrust.main import ColoredFormatter 135 from piecrust.main import ColoredFormatter
127 root_logger = logging.getLogger() 136 root_logger = logging.getLogger()
128 root_logger.handlers[0].setFormatter(ColoredFormatter( 137 root_logger.handlers[0].setFormatter(ColoredFormatter(
129 ('[W-%d]' % wid) + '[%(name)s] %(message)s')) 138 ('[W-%d]' % wid) + '[%(name)s] %(message)s'))
293 self._jobs_left += new_job_count 302 self._jobs_left += new_job_count
294 303
295 self._event.clear() 304 self._event.clear()
296 for job in jobs: 305 for job in jobs:
297 self._quick_put((TASK_JOB, job)) 306 self._quick_put((TASK_JOB, job))
307 else:
308 with self._lock_jobs_left:
309 done = (self._jobs_left == 0)
310 if done:
311 self._event.set()
298 312
299 def wait(self, timeout=None): 313 def wait(self, timeout=None):
300 if self._closed: 314 if self._closed:
301 raise Exception("This worker pool has been closed.") 315 raise Exception("This worker pool has been closed.")
302 316
306 return ret 320 return ret
307 321
308 def close(self): 322 def close(self):
309 if self._closed: 323 if self._closed:
310 raise Exception("This worker pool has been closed.") 324 raise Exception("This worker pool has been closed.")
311 if self._jobs_left > 0 or not self._event.is_set(): 325 if self._jobs_left > 0:
312 raise Exception("A previous job queue has not finished yet.") 326 raise Exception("A previous job queue has not finished yet.")
327 if not self._event.is_set():
328 raise Exception("A previous job queue hasn't been cleared.")
313 329
314 logger.debug("Closing worker pool...") 330 logger.debug("Closing worker pool...")
315 live_workers = list(filter(lambda w: w is not None, self._pool)) 331 live_workers = list(filter(lambda w: w is not None, self._pool))
316 handler = _ReportHandler(len(live_workers)) 332 handler = _ReportHandler(len(live_workers))
317 self._callback = handler._handle 333 self._callback = handler._handle