comparison piecrust/workerpool.py @ 1011:c4cf3cfe2726

bake: Better performance stats, and add callback to preload templates.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 26 Nov 2017 22:23:03 -0800
parents 09c3d415d9e5
children 071f30aa04bb
comparison
equal deleted inserted replaced
1010:501bd4ab7e06 1011:c4cf3cfe2726
109 from piecrust.main import _pre_parse_chef_args 109 from piecrust.main import _pre_parse_chef_args
110 _pre_parse_chef_args(chef_args) 110 _pre_parse_chef_args(chef_args)
111 111
112 112
113 def _real_worker_func_unsafe(params): 113 def _real_worker_func_unsafe(params):
114 init_start_time = time.perf_counter()
115
114 wid = params.wid 116 wid = params.wid
115 117
116 stats = ExecutionStats() 118 stats = ExecutionStats()
117 stats.registerTimer('WorkerInit') 119 stats.registerTimer('WorkerInit')
118 init_start_time = time.perf_counter()
119 120
120 # In a context where `multiprocessing` is using the `spawn` forking model, 121 # In a context where `multiprocessing` is using the `spawn` forking model,
121 # the new process doesn't inherit anything, so we lost all our logging 122 # the new process doesn't inherit anything, so we lost all our logging
122 # configuration here. Let's set it up again. 123 # configuration here. Let's set it up again.
123 if (hasattr(multiprocessing, 'get_start_method') and 124 if (hasattr(multiprocessing, 'get_start_method') and
246 class WorkerPool: 247 class WorkerPool:
247 def __init__(self, worker_class, initargs=(), *, 248 def __init__(self, worker_class, initargs=(), *,
248 callback=None, error_callback=None, 249 callback=None, error_callback=None,
249 worker_count=None, batch_size=None, 250 worker_count=None, batch_size=None,
250 userdata=None): 251 userdata=None):
252 init_start_time = time.perf_counter()
253
254 stats = ExecutionStats()
255 stats.registerTimer('MasterInit')
256 self._stats = stats
257 self._time_in_put = 0
258 self._time_in_get = 0
259
251 self.userdata = userdata 260 self.userdata = userdata
252 261
253 worker_count = worker_count or os.cpu_count() or 1 262 worker_count = worker_count or os.cpu_count() or 1
254 263
255 if use_fastqueue: 264 if use_fastqueue:
297 target=WorkerPool._handleResults, 306 target=WorkerPool._handleResults,
298 args=(self,)) 307 args=(self,))
299 self._result_handler.daemon = True 308 self._result_handler.daemon = True
300 self._result_handler.start() 309 self._result_handler.start()
301 310
311 stats.stepTimerSince('MasterInit', init_start_time)
312
302 def queueJobs(self, jobs): 313 def queueJobs(self, jobs):
303 if self._closed: 314 if self._closed:
304 if self._error_on_join: 315 if self._error_on_join:
305 raise self._error_on_join 316 raise self._error_on_join
306 raise Exception("This worker pool has been closed.") 317 raise Exception("This worker pool has been closed.")
307 318
308 jobs = list(jobs) 319 jobs = list(jobs)
309 new_job_count = len(jobs) 320 new_job_count = len(jobs)
310 if new_job_count > 0: 321 if new_job_count > 0:
322 put_start_time = time.perf_counter()
323
311 with self._lock_jobs_left: 324 with self._lock_jobs_left:
312 self._jobs_left += new_job_count 325 self._jobs_left += new_job_count
313 326
314 self._event.clear() 327 self._event.clear()
315 bs = self._batch_size 328 bs = self._batch_size
321 while cur_offset < new_job_count: 334 while cur_offset < new_job_count:
322 next_batch_idx = min(cur_offset + bs, new_job_count) 335 next_batch_idx = min(cur_offset + bs, new_job_count)
323 job_batch = jobs[cur_offset:next_batch_idx] 336 job_batch = jobs[cur_offset:next_batch_idx]
324 self._quick_put((TASK_JOB_BATCH, job_batch)) 337 self._quick_put((TASK_JOB_BATCH, job_batch))
325 cur_offset = next_batch_idx 338 cur_offset = next_batch_idx
339
340 self._time_in_put += (time.perf_counter() - put_start_time)
326 else: 341 else:
327 with self._lock_jobs_left: 342 with self._lock_jobs_left:
328 done = (self._jobs_left == 0) 343 done = (self._jobs_left == 0)
329 if done: 344 if done:
330 self._event.set() 345 self._event.set()
344 if self._jobs_left > 0: 359 if self._jobs_left > 0:
345 raise Exception("A previous job queue has not finished yet.") 360 raise Exception("A previous job queue has not finished yet.")
346 if not self._event.is_set(): 361 if not self._event.is_set():
347 raise Exception("A previous job queue hasn't been cleared.") 362 raise Exception("A previous job queue hasn't been cleared.")
348 363
364 close_start_time = time.perf_counter()
349 logger.debug("Closing worker pool...") 365 logger.debug("Closing worker pool...")
350 live_workers = list(filter(lambda w: w is not None, self._pool)) 366 live_workers = list(filter(lambda w: w is not None, self._pool))
351 handler = _ReportHandler(len(live_workers)) 367 handler = _ReportHandler(len(live_workers))
352 self._callback = handler._handle 368 self._callback = handler._handle
353 self._error_callback = handler._handleError 369 self._error_callback = handler._handleError
366 logger.debug("Exiting result handler thread...") 382 logger.debug("Exiting result handler thread...")
367 self._result_queue.put(None) 383 self._result_queue.put(None)
368 self._result_handler.join() 384 self._result_handler.join()
369 self._closed = True 385 self._closed = True
370 386
371 return handler.reports 387 stats = self._stats
388 stats.registerTimer('MasterTaskPut', time=self._time_in_put)
389 stats.registerTimer('MasterResultGet', time=self._time_in_get)
390 stats.registerTimer('MasterClose',
391 time=(time.perf_counter() - close_start_time))
392
393 return [stats] + handler.reports
372 394
373 def _onResultHandlerCriticalError(self, wid): 395 def _onResultHandlerCriticalError(self, wid):
374 logger.error("Result handler received a critical error from " 396 logger.error("Result handler received a critical error from "
375 "worker %d." % wid) 397 "worker %d." % wid)
376 with self._lock_workers: 398 with self._lock_workers:
395 @staticmethod 417 @staticmethod
396 def _handleResults(pool): 418 def _handleResults(pool):
397 userdata = pool.userdata 419 userdata = pool.userdata
398 while True: 420 while True:
399 try: 421 try:
422 get_start_time = time.perf_counter()
400 res = pool._quick_get() 423 res = pool._quick_get()
424 pool._time_in_get = (time.perf_counter() - get_start_time)
401 except (EOFError, OSError): 425 except (EOFError, OSError):
402 logger.debug("Result handler thread encountered connection " 426 logger.debug("Result handler thread encountered connection "
403 "problem, exiting.") 427 "problem, exiting.")
404 return 428 return
405 429