Mercurial > piecrust2
diff piecrust/workerpool.py @ 1011:c4cf3cfe2726
bake: Better performance stats, and add callback to preload templates.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 26 Nov 2017 22:23:03 -0800 |
parents | 09c3d415d9e5 |
children | 071f30aa04bb |
line wrap: on
line diff
--- a/piecrust/workerpool.py Sun Nov 26 22:21:33 2017 -0800 +++ b/piecrust/workerpool.py Sun Nov 26 22:23:03 2017 -0800 @@ -111,11 +111,12 @@ def _real_worker_func_unsafe(params): + init_start_time = time.perf_counter() + wid = params.wid stats = ExecutionStats() stats.registerTimer('WorkerInit') - init_start_time = time.perf_counter() # In a context where `multiprocessing` is using the `spawn` forking model, # the new process doesn't inherit anything, so we lost all our logging @@ -248,6 +249,14 @@ callback=None, error_callback=None, worker_count=None, batch_size=None, userdata=None): + init_start_time = time.perf_counter() + + stats = ExecutionStats() + stats.registerTimer('MasterInit') + self._stats = stats + self._time_in_put = 0 + self._time_in_get = 0 + self.userdata = userdata worker_count = worker_count or os.cpu_count() or 1 @@ -299,6 +308,8 @@ self._result_handler.daemon = True self._result_handler.start() + stats.stepTimerSince('MasterInit', init_start_time) + def queueJobs(self, jobs): if self._closed: if self._error_on_join: @@ -308,6 +319,8 @@ jobs = list(jobs) new_job_count = len(jobs) if new_job_count > 0: + put_start_time = time.perf_counter() + with self._lock_jobs_left: self._jobs_left += new_job_count @@ -323,6 +336,8 @@ job_batch = jobs[cur_offset:next_batch_idx] self._quick_put((TASK_JOB_BATCH, job_batch)) cur_offset = next_batch_idx + + self._time_in_put += (time.perf_counter() - put_start_time) else: with self._lock_jobs_left: done = (self._jobs_left == 0) @@ -346,6 +361,7 @@ if not self._event.is_set(): raise Exception("A previous job queue hasn't been cleared.") + close_start_time = time.perf_counter() logger.debug("Closing worker pool...") live_workers = list(filter(lambda w: w is not None, self._pool)) handler = _ReportHandler(len(live_workers)) @@ -368,7 +384,13 @@ self._result_handler.join() self._closed = True - return handler.reports + stats = self._stats + stats.registerTimer('MasterTaskPut', time=self._time_in_put) + stats.registerTimer('MasterResultGet', time=self._time_in_get) + stats.registerTimer('MasterClose', + time=(time.perf_counter() - close_start_time)) + + return [stats] + handler.reports def _onResultHandlerCriticalError(self, wid): logger.error("Result handler received a critical error from " @@ -397,7 +419,9 @@ userdata = pool.userdata while True: try: + get_start_time = time.perf_counter() res = pool._quick_get() + pool._time_in_get = (time.perf_counter() - get_start_time) except (EOFError, OSError): logger.debug("Result handler thread encountered connection " "problem, exiting.")