Mercurial > piecrust2
comparison piecrust/workerpool.py @ 1019:bd544b65cfad
bake: More detailed stats, and fix a problem with some error reporting.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 29 Nov 2017 21:36:11 -0800 |
parents | 071f30aa04bb |
children | aa4f1e04cf3e |
comparison
equal
deleted
inserted
replaced
1018:3c6e6e7b9639 | 1019:bd544b65cfad |
---|---|
156 | 156 |
157 # Start pumping! | 157 # Start pumping! |
158 completed = 0 | 158 completed = 0 |
159 time_in_get = 0 | 159 time_in_get = 0 |
160 time_in_put = 0 | 160 time_in_put = 0 |
161 is_first_get = True | |
161 get = params.inqueue.get | 162 get = params.inqueue.get |
162 put = params.outqueue.put | 163 put = params.outqueue.put |
163 | 164 |
164 while True: | 165 while True: |
165 get_start_time = time.perf_counter() | 166 get_start_time = time.perf_counter() |
166 task = get() | 167 task = get() |
167 time_in_get += (time.perf_counter() - get_start_time) | 168 if not is_first_get: |
169 time_in_get += (time.perf_counter() - get_start_time) | |
170 else: | |
171 is_first_get = False | |
168 | 172 |
169 task_type, task_data = task | 173 task_type, task_data = task |
170 | 174 |
171 # Job task(s)... just do it. | 175 # Job task(s)... just do it. |
172 if task_type == TASK_JOB or task_type == TASK_JOB_BATCH: | 176 if task_type == TASK_JOB or task_type == TASK_JOB_BATCH: |
183 result_list.append((td, worker_res, True)) | 187 result_list.append((td, worker_res, True)) |
184 except Exception as e: | 188 except Exception as e: |
185 logger.debug( | 189 logger.debug( |
186 "Error processing job, sending exception to main process:") | 190 "Error processing job, sending exception to main process:") |
187 logger.debug(traceback.format_exc()) | 191 logger.debug(traceback.format_exc()) |
188 we = _get_worker_exception_data(wid) | 192 error_res = _get_worker_exception_data(wid) |
189 error_res = (td, we, False) | |
190 result_list.append((td, error_res, False)) | 193 result_list.append((td, error_res, False)) |
191 | 194 |
192 res = (task_type, wid, result_list) | 195 res = (task_type, wid, result_list) |
193 put_start_time = time.perf_counter() | 196 put_start_time = time.perf_counter() |
194 put(res) | 197 put(res) |
197 completed += len(task_data_list) | 200 completed += len(task_data_list) |
198 | 201 |
199 # End task... gather stats to send back to the main process. | 202 # End task... gather stats to send back to the main process. |
200 elif task_type == TASK_END: | 203 elif task_type == TASK_END: |
201 logger.debug("Worker %d got end task, exiting." % wid) | 204 logger.debug("Worker %d got end task, exiting." % wid) |
202 stats.registerTimer('WorkerTaskGet', time=time_in_get) | 205 stats.registerTimer('Worker_%d_TaskGet' % wid, time=time_in_get) |
203 stats.registerTimer('WorkerResultPut', time=time_in_put) | 206 stats.registerTimer('Worker_all_TaskGet', time=time_in_get) |
207 stats.registerTimer('Worker_%d_ResultPut' % wid, time=time_in_put) | |
208 stats.registerTimer('Worker_all_ResultPut', time=time_in_put) | |
204 try: | 209 try: |
205 stats.mergeStats(w.getStats()) | 210 stats.mergeStats(w.getStats()) |
206 stats_data = stats.toData() | 211 stats_data = stats.toData() |
207 rep = (task_type, wid, [(task_data, (wid, stats_data), True)]) | 212 rep = (task_type, wid, [(task_data, (wid, stats_data), True)]) |
208 except Exception as e: | 213 except Exception as e: |
488 self._received += 1 | 493 self._received += 1 |
489 if self._received == self._count: | 494 if self._received == self._count: |
490 self._event.set() | 495 self._event.set() |
491 | 496 |
492 def _handleError(self, job, res, _): | 497 def _handleError(self, job, res, _): |
493 logger.error("Worker %d failed to send its report." % res.wid) | 498 logger.error("Worker %d failed to send its report." % res[0]) |
494 logger.error(res) | 499 logger.error(res) |
495 | 500 |
496 | 501 |
497 class FastQueue: | 502 class FastQueue: |
498 def __init__(self): | 503 def __init__(self): |