Mercurial > piecrust2
comparison piecrust/workerpool.py @ 1011:c4cf3cfe2726
bake: Better performance stats, and add callback to preload templates.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 26 Nov 2017 22:23:03 -0800 |
parents | 09c3d415d9e5 |
children | 071f30aa04bb |
comparison
equal
deleted
inserted
replaced
1010:501bd4ab7e06 | 1011:c4cf3cfe2726 |
---|---|
109 from piecrust.main import _pre_parse_chef_args | 109 from piecrust.main import _pre_parse_chef_args |
110 _pre_parse_chef_args(chef_args) | 110 _pre_parse_chef_args(chef_args) |
111 | 111 |
112 | 112 |
113 def _real_worker_func_unsafe(params): | 113 def _real_worker_func_unsafe(params): |
114 init_start_time = time.perf_counter() | |
115 | |
114 wid = params.wid | 116 wid = params.wid |
115 | 117 |
116 stats = ExecutionStats() | 118 stats = ExecutionStats() |
117 stats.registerTimer('WorkerInit') | 119 stats.registerTimer('WorkerInit') |
118 init_start_time = time.perf_counter() | |
119 | 120 |
120 # In a context where `multiprocessing` is using the `spawn` forking model, | 121 # In a context where `multiprocessing` is using the `spawn` forking model, |
121 # the new process doesn't inherit anything, so we lost all our logging | 122 # the new process doesn't inherit anything, so we lost all our logging |
122 # configuration here. Let's set it up again. | 123 # configuration here. Let's set it up again. |
123 if (hasattr(multiprocessing, 'get_start_method') and | 124 if (hasattr(multiprocessing, 'get_start_method') and |
246 class WorkerPool: | 247 class WorkerPool: |
247 def __init__(self, worker_class, initargs=(), *, | 248 def __init__(self, worker_class, initargs=(), *, |
248 callback=None, error_callback=None, | 249 callback=None, error_callback=None, |
249 worker_count=None, batch_size=None, | 250 worker_count=None, batch_size=None, |
250 userdata=None): | 251 userdata=None): |
252 init_start_time = time.perf_counter() | |
253 | |
254 stats = ExecutionStats() | |
255 stats.registerTimer('MasterInit') | |
256 self._stats = stats | |
257 self._time_in_put = 0 | |
258 self._time_in_get = 0 | |
259 | |
251 self.userdata = userdata | 260 self.userdata = userdata |
252 | 261 |
253 worker_count = worker_count or os.cpu_count() or 1 | 262 worker_count = worker_count or os.cpu_count() or 1 |
254 | 263 |
255 if use_fastqueue: | 264 if use_fastqueue: |
297 target=WorkerPool._handleResults, | 306 target=WorkerPool._handleResults, |
298 args=(self,)) | 307 args=(self,)) |
299 self._result_handler.daemon = True | 308 self._result_handler.daemon = True |
300 self._result_handler.start() | 309 self._result_handler.start() |
301 | 310 |
311 stats.stepTimerSince('MasterInit', init_start_time) | |
312 | |
302 def queueJobs(self, jobs): | 313 def queueJobs(self, jobs): |
303 if self._closed: | 314 if self._closed: |
304 if self._error_on_join: | 315 if self._error_on_join: |
305 raise self._error_on_join | 316 raise self._error_on_join |
306 raise Exception("This worker pool has been closed.") | 317 raise Exception("This worker pool has been closed.") |
307 | 318 |
308 jobs = list(jobs) | 319 jobs = list(jobs) |
309 new_job_count = len(jobs) | 320 new_job_count = len(jobs) |
310 if new_job_count > 0: | 321 if new_job_count > 0: |
322 put_start_time = time.perf_counter() | |
323 | |
311 with self._lock_jobs_left: | 324 with self._lock_jobs_left: |
312 self._jobs_left += new_job_count | 325 self._jobs_left += new_job_count |
313 | 326 |
314 self._event.clear() | 327 self._event.clear() |
315 bs = self._batch_size | 328 bs = self._batch_size |
321 while cur_offset < new_job_count: | 334 while cur_offset < new_job_count: |
322 next_batch_idx = min(cur_offset + bs, new_job_count) | 335 next_batch_idx = min(cur_offset + bs, new_job_count) |
323 job_batch = jobs[cur_offset:next_batch_idx] | 336 job_batch = jobs[cur_offset:next_batch_idx] |
324 self._quick_put((TASK_JOB_BATCH, job_batch)) | 337 self._quick_put((TASK_JOB_BATCH, job_batch)) |
325 cur_offset = next_batch_idx | 338 cur_offset = next_batch_idx |
339 | |
340 self._time_in_put += (time.perf_counter() - put_start_time) | |
326 else: | 341 else: |
327 with self._lock_jobs_left: | 342 with self._lock_jobs_left: |
328 done = (self._jobs_left == 0) | 343 done = (self._jobs_left == 0) |
329 if done: | 344 if done: |
330 self._event.set() | 345 self._event.set() |
344 if self._jobs_left > 0: | 359 if self._jobs_left > 0: |
345 raise Exception("A previous job queue has not finished yet.") | 360 raise Exception("A previous job queue has not finished yet.") |
346 if not self._event.is_set(): | 361 if not self._event.is_set(): |
347 raise Exception("A previous job queue hasn't been cleared.") | 362 raise Exception("A previous job queue hasn't been cleared.") |
348 | 363 |
364 close_start_time = time.perf_counter() | |
349 logger.debug("Closing worker pool...") | 365 logger.debug("Closing worker pool...") |
350 live_workers = list(filter(lambda w: w is not None, self._pool)) | 366 live_workers = list(filter(lambda w: w is not None, self._pool)) |
351 handler = _ReportHandler(len(live_workers)) | 367 handler = _ReportHandler(len(live_workers)) |
352 self._callback = handler._handle | 368 self._callback = handler._handle |
353 self._error_callback = handler._handleError | 369 self._error_callback = handler._handleError |
366 logger.debug("Exiting result handler thread...") | 382 logger.debug("Exiting result handler thread...") |
367 self._result_queue.put(None) | 383 self._result_queue.put(None) |
368 self._result_handler.join() | 384 self._result_handler.join() |
369 self._closed = True | 385 self._closed = True |
370 | 386 |
371 return handler.reports | 387 stats = self._stats |
388 stats.registerTimer('MasterTaskPut', time=self._time_in_put) | |
389 stats.registerTimer('MasterResultGet', time=self._time_in_get) | |
390 stats.registerTimer('MasterClose', | |
391 time=(time.perf_counter() - close_start_time)) | |
392 | |
393 return [stats] + handler.reports | |
372 | 394 |
373 def _onResultHandlerCriticalError(self, wid): | 395 def _onResultHandlerCriticalError(self, wid): |
374 logger.error("Result handler received a critical error from " | 396 logger.error("Result handler received a critical error from " |
375 "worker %d." % wid) | 397 "worker %d." % wid) |
376 with self._lock_workers: | 398 with self._lock_workers: |
395 @staticmethod | 417 @staticmethod |
396 def _handleResults(pool): | 418 def _handleResults(pool): |
397 userdata = pool.userdata | 419 userdata = pool.userdata |
398 while True: | 420 while True: |
399 try: | 421 try: |
422 get_start_time = time.perf_counter() | |
400 res = pool._quick_get() | 423 res = pool._quick_get() |
424 pool._time_in_get = (time.perf_counter() - get_start_time) | |
401 except (EOFError, OSError): | 425 except (EOFError, OSError): |
402 logger.debug("Result handler thread encountered connection " | 426 logger.debug("Result handler thread encountered connection " |
403 "problem, exiting.") | 427 "problem, exiting.") |
404 return | 428 return |
405 | 429 |