comparison piecrust/workerpool.py @ 1019:bd544b65cfad

bake: More detailed stats, and fix a problem with some error reporting.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 29 Nov 2017 21:36:11 -0800
parents 071f30aa04bb
children aa4f1e04cf3e
comparison
equal deleted inserted replaced
1018:3c6e6e7b9639 1019:bd544b65cfad
156 156
157 # Start pumping! 157 # Start pumping!
158 completed = 0 158 completed = 0
159 time_in_get = 0 159 time_in_get = 0
160 time_in_put = 0 160 time_in_put = 0
161 is_first_get = True
161 get = params.inqueue.get 162 get = params.inqueue.get
162 put = params.outqueue.put 163 put = params.outqueue.put
163 164
164 while True: 165 while True:
165 get_start_time = time.perf_counter() 166 get_start_time = time.perf_counter()
166 task = get() 167 task = get()
167 time_in_get += (time.perf_counter() - get_start_time) 168 if not is_first_get:
169 time_in_get += (time.perf_counter() - get_start_time)
170 else:
171 is_first_get = False
168 172
169 task_type, task_data = task 173 task_type, task_data = task
170 174
171 # Job task(s)... just do it. 175 # Job task(s)... just do it.
172 if task_type == TASK_JOB or task_type == TASK_JOB_BATCH: 176 if task_type == TASK_JOB or task_type == TASK_JOB_BATCH:
183 result_list.append((td, worker_res, True)) 187 result_list.append((td, worker_res, True))
184 except Exception as e: 188 except Exception as e:
185 logger.debug( 189 logger.debug(
186 "Error processing job, sending exception to main process:") 190 "Error processing job, sending exception to main process:")
187 logger.debug(traceback.format_exc()) 191 logger.debug(traceback.format_exc())
188 we = _get_worker_exception_data(wid) 192 error_res = _get_worker_exception_data(wid)
189 error_res = (td, we, False)
190 result_list.append((td, error_res, False)) 193 result_list.append((td, error_res, False))
191 194
192 res = (task_type, wid, result_list) 195 res = (task_type, wid, result_list)
193 put_start_time = time.perf_counter() 196 put_start_time = time.perf_counter()
194 put(res) 197 put(res)
197 completed += len(task_data_list) 200 completed += len(task_data_list)
198 201
199 # End task... gather stats to send back to the main process. 202 # End task... gather stats to send back to the main process.
200 elif task_type == TASK_END: 203 elif task_type == TASK_END:
201 logger.debug("Worker %d got end task, exiting." % wid) 204 logger.debug("Worker %d got end task, exiting." % wid)
202 stats.registerTimer('WorkerTaskGet', time=time_in_get) 205 stats.registerTimer('Worker_%d_TaskGet' % wid, time=time_in_get)
203 stats.registerTimer('WorkerResultPut', time=time_in_put) 206 stats.registerTimer('Worker_all_TaskGet', time=time_in_get)
207 stats.registerTimer('Worker_%d_ResultPut' % wid, time=time_in_put)
208 stats.registerTimer('Worker_all_ResultPut', time=time_in_put)
204 try: 209 try:
205 stats.mergeStats(w.getStats()) 210 stats.mergeStats(w.getStats())
206 stats_data = stats.toData() 211 stats_data = stats.toData()
207 rep = (task_type, wid, [(task_data, (wid, stats_data), True)]) 212 rep = (task_type, wid, [(task_data, (wid, stats_data), True)])
208 except Exception as e: 213 except Exception as e:
488 self._received += 1 493 self._received += 1
489 if self._received == self._count: 494 if self._received == self._count:
490 self._event.set() 495 self._event.set()
491 496
492 def _handleError(self, job, res, _): 497 def _handleError(self, job, res, _):
493 logger.error("Worker %d failed to send its report." % res.wid) 498 logger.error("Worker %d failed to send its report." % res[0])
494 logger.error(res) 499 logger.error(res)
495 500
496 501
497 class FastQueue: 502 class FastQueue:
498 def __init__(self): 503 def __init__(self):