Mercurial > piecrust2
comparison piecrust/workerpool.py @ 979:45ad976712ec
tests: Big push to get the tests to pass again.
- Lots of fixes everywhere in the code.
- Try to handle debug logging in the multiprocessing worker pool when running in pytest. Not perfect, but usable for now.
- Replace all `.md` test files with `.html` since now a auto-format extension always sets the format.
- Replace `out` with `outfiles` in most places since now blog archives are added to the bake output and I don't want to add expected outputs for blog archives everywhere.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 29 Oct 2017 22:51:57 -0700 |
parents | 5485a11591ec |
children | 8adc27285d93 |
comparison
equal
deleted
inserted
replaced
978:7e51d14097cb | 979:45ad976712ec |
---|---|
86 msg = ("CRITICAL ERROR IN WORKER %d\n%s" % (params.wid, str(ex))) | 86 msg = ("CRITICAL ERROR IN WORKER %d\n%s" % (params.wid, str(ex))) |
87 params.outqueue.put(( | 87 params.outqueue.put(( |
88 _CRITICAL_WORKER_ERROR, None, False, params.wid, msg)) | 88 _CRITICAL_WORKER_ERROR, None, False, params.wid, msg)) |
89 | 89 |
90 | 90 |
91 def _pre_parse_pytest_args(): | |
92 # If we are unit-testing, we need to translate our test logging | |
93 # arguments into something Chef can understand. | |
94 import argparse | |
95 parser = argparse.ArgumentParser() | |
96 # This is adapted from our `conftest.py`. | |
97 parser.add_argument('--log-debug', action='store_true') | |
98 parser.add_argument('--log-file') | |
99 res, _ = parser.parse_known_args(sys.argv[1:]) | |
100 | |
101 chef_args = [] | |
102 if res.log_debug: | |
103 chef_args.append('--debug') | |
104 if res.log_file: | |
105 chef_args += ['--log', res.log_file] | |
106 | |
107 root_logger = logging.getLogger() | |
108 while len(root_logger.handlers) > 0: | |
109 root_logger.removeHandler(root_logger.handlers[0]) | |
110 | |
111 from piecrust.main import _pre_parse_chef_args | |
112 _pre_parse_chef_args(chef_args) | |
113 | |
114 | |
91 def _real_worker_func_unsafe(params): | 115 def _real_worker_func_unsafe(params): |
92 wid = params.wid | 116 wid = params.wid |
93 | 117 |
94 stats = ExecutionStats() | 118 stats = ExecutionStats() |
95 stats.registerTimer('WorkerInit') | 119 stats.registerTimer('WorkerInit') |
96 init_start_time = time.perf_counter() | 120 init_start_time = time.perf_counter() |
97 | 121 |
98 # If we are unit-testing, we didn't setup all the logging environment | |
99 # yet, since the executable is `py.test`. We need to translate our | |
100 # test logging arguments into something Chef can understand. | |
101 if params.is_unit_testing: | |
102 import argparse | |
103 parser = argparse.ArgumentParser() | |
104 # This is adapted from our `conftest.py`. | |
105 parser.add_argument('--log-debug', action='store_true') | |
106 parser.add_argument('--log-file') | |
107 res, _ = parser.parse_known_args(sys.argv[1:]) | |
108 | |
109 chef_args = [] | |
110 if res.log_debug: | |
111 chef_args.append('--debug') | |
112 if res.log_file: | |
113 chef_args += ['--log', res.log_file] | |
114 | |
115 from piecrust.main import _pre_parse_chef_args | |
116 _pre_parse_chef_args(chef_args) | |
117 | |
118 # In a context where `multiprocessing` is using the `spawn` forking model, | 122 # In a context where `multiprocessing` is using the `spawn` forking model, |
119 # the new process doesn't inherit anything, so we lost all our logging | 123 # the new process doesn't inherit anything, so we lost all our logging |
120 # configuration here. Let's set it up again. | 124 # configuration here. Let's set it up again. |
121 elif (hasattr(multiprocessing, 'get_start_method') and | 125 if (hasattr(multiprocessing, 'get_start_method') and |
122 multiprocessing.get_start_method() == 'spawn'): | 126 multiprocessing.get_start_method() == 'spawn'): |
123 from piecrust.main import _pre_parse_chef_args | 127 if not params.is_unit_testing: |
124 _pre_parse_chef_args(sys.argv[1:]) | 128 from piecrust.main import _pre_parse_chef_args |
129 _pre_parse_chef_args(sys.argv[1:]) | |
130 else: | |
131 _pre_parse_pytest_args() | |
132 elif params.is_unit_testing: | |
133 _pre_parse_pytest_args() | |
125 | 134 |
126 from piecrust.main import ColoredFormatter | 135 from piecrust.main import ColoredFormatter |
127 root_logger = logging.getLogger() | 136 root_logger = logging.getLogger() |
128 root_logger.handlers[0].setFormatter(ColoredFormatter( | 137 root_logger.handlers[0].setFormatter(ColoredFormatter( |
129 ('[W-%d]' % wid) + '[%(name)s] %(message)s')) | 138 ('[W-%d]' % wid) + '[%(name)s] %(message)s')) |
293 self._jobs_left += new_job_count | 302 self._jobs_left += new_job_count |
294 | 303 |
295 self._event.clear() | 304 self._event.clear() |
296 for job in jobs: | 305 for job in jobs: |
297 self._quick_put((TASK_JOB, job)) | 306 self._quick_put((TASK_JOB, job)) |
307 else: | |
308 with self._lock_jobs_left: | |
309 done = (self._jobs_left == 0) | |
310 if done: | |
311 self._event.set() | |
298 | 312 |
299 def wait(self, timeout=None): | 313 def wait(self, timeout=None): |
300 if self._closed: | 314 if self._closed: |
301 raise Exception("This worker pool has been closed.") | 315 raise Exception("This worker pool has been closed.") |
302 | 316 |
306 return ret | 320 return ret |
307 | 321 |
308 def close(self): | 322 def close(self): |
309 if self._closed: | 323 if self._closed: |
310 raise Exception("This worker pool has been closed.") | 324 raise Exception("This worker pool has been closed.") |
311 if self._jobs_left > 0 or not self._event.is_set(): | 325 if self._jobs_left > 0: |
312 raise Exception("A previous job queue has not finished yet.") | 326 raise Exception("A previous job queue has not finished yet.") |
327 if not self._event.is_set(): | |
328 raise Exception("A previous job queue hasn't been cleared.") | |
313 | 329 |
314 logger.debug("Closing worker pool...") | 330 logger.debug("Closing worker pool...") |
315 live_workers = list(filter(lambda w: w is not None, self._pool)) | 331 live_workers = list(filter(lambda w: w is not None, self._pool)) |
316 handler = _ReportHandler(len(live_workers)) | 332 handler = _ReportHandler(len(live_workers)) |
317 self._callback = handler._handle | 333 self._callback = handler._handle |