diff piecrust/workerpool.py @ 1011:c4cf3cfe2726

bake: Better performance stats, and add callback to preload templates.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 26 Nov 2017 22:23:03 -0800
parents 09c3d415d9e5
children 071f30aa04bb
line wrap: on
line diff
--- a/piecrust/workerpool.py	Sun Nov 26 22:21:33 2017 -0800
+++ b/piecrust/workerpool.py	Sun Nov 26 22:23:03 2017 -0800
@@ -111,11 +111,12 @@
 
 
 def _real_worker_func_unsafe(params):
+    init_start_time = time.perf_counter()
+
     wid = params.wid
 
     stats = ExecutionStats()
     stats.registerTimer('WorkerInit')
-    init_start_time = time.perf_counter()
 
     # In a context where `multiprocessing` is using the `spawn` forking model,
     # the new process doesn't inherit anything, so we lost all our logging
@@ -248,6 +249,14 @@
                  callback=None, error_callback=None,
                  worker_count=None, batch_size=None,
                  userdata=None):
+        init_start_time = time.perf_counter()
+
+        stats = ExecutionStats()
+        stats.registerTimer('MasterInit')
+        self._stats = stats
+        self._time_in_put = 0
+        self._time_in_get = 0
+
         self.userdata = userdata
 
         worker_count = worker_count or os.cpu_count() or 1
@@ -299,6 +308,8 @@
         self._result_handler.daemon = True
         self._result_handler.start()
 
+        stats.stepTimerSince('MasterInit', init_start_time)
+
     def queueJobs(self, jobs):
         if self._closed:
             if self._error_on_join:
@@ -308,6 +319,8 @@
         jobs = list(jobs)
         new_job_count = len(jobs)
         if new_job_count > 0:
+            put_start_time = time.perf_counter()
+
             with self._lock_jobs_left:
                 self._jobs_left += new_job_count
 
@@ -323,6 +336,8 @@
                     job_batch = jobs[cur_offset:next_batch_idx]
                     self._quick_put((TASK_JOB_BATCH, job_batch))
                     cur_offset = next_batch_idx
+
+            self._time_in_put += (time.perf_counter() - put_start_time)
         else:
             with self._lock_jobs_left:
                 done = (self._jobs_left == 0)
@@ -346,6 +361,7 @@
         if not self._event.is_set():
             raise Exception("A previous job queue hasn't been cleared.")
 
+        close_start_time = time.perf_counter()
         logger.debug("Closing worker pool...")
         live_workers = list(filter(lambda w: w is not None, self._pool))
         handler = _ReportHandler(len(live_workers))
@@ -368,7 +384,13 @@
         self._result_handler.join()
         self._closed = True
 
-        return handler.reports
+        stats = self._stats
+        stats.registerTimer('MasterTaskPut', time=self._time_in_put)
+        stats.registerTimer('MasterResultGet', time=self._time_in_get)
+        stats.registerTimer('MasterClose',
+                            time=(time.perf_counter() - close_start_time))
+
+        return [stats] + handler.reports
 
     def _onResultHandlerCriticalError(self, wid):
         logger.error("Result handler received a critical error from "
@@ -397,7 +419,9 @@
         userdata = pool.userdata
         while True:
             try:
+                get_start_time = time.perf_counter()
                 res = pool._quick_get()
+                pool._time_in_get = (time.perf_counter() - get_start_time)
             except (EOFError, OSError):
                 logger.debug("Result handler thread encountered connection "
                              "problem, exiting.")