comparison piecrust/processing/pipeline.py @ 421:4a43d7015b75

bake: Improve performance timers reports. Add timers per-worker, and separate bake and pipeline workers.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 23:27:39 -0700
parents c4b3a7fd2f87
children 6238dcfc7a78
comparison
equal deleted inserted replaced
420:f1b759c188b0 421:4a43d7015b75
132 self._waitOnWorkerPool(pool, expected_result_count, _handler) 132 self._waitOnWorkerPool(pool, expected_result_count, _handler)
133 self._terminateWorkerPool(pool) 133 self._terminateWorkerPool(pool)
134 134
135 # Get timing information from the workers. 135 # Get timing information from the workers.
136 record.current.timers = {} 136 record.current.timers = {}
137 for _ in range(len(pool.workers)): 137 for i in range(len(pool.workers)):
138 try: 138 try:
139 timers = pool.results.get(True, 0.1) 139 timers = pool.results.get(True, 0.1)
140 except queue.Empty: 140 except queue.Empty:
141 logger.error("Didn't get timing information from all workers.") 141 logger.error("Didn't get timing information from all workers.")
142 break 142 break
143 143
144 worker_name = 'PipelineWorker_%d' % i
145 record.current.timers[worker_name] = {}
144 for name, val in timers['data'].items(): 146 for name, val in timers['data'].items():
145 main_val = record.current.timers.setdefault(name, 0) 147 main_val = record.current.timers.setdefault(name, 0)
146 record.current.timers[name] = main_val + val 148 record.current.timers[name] = main_val + val
149 record.current.timers[worker_name][name] = val
147 150
148 # Invoke post-processors. 151 # Invoke post-processors.
149 pipeline_ctx.record = record.current 152 pipeline_ctx.record = record.current
150 for proc in processors: 153 for proc in processors:
151 proc.onPipelineEnd(pipeline_ctx) 154 proc.onPipelineEnd(pipeline_ctx)
254 pool.queue, pool.results, pool.abort_event, 257 pool.queue, pool.results, pool.abort_event,
255 self.force, self.app.debug) 258 self.force, self.app.debug)
256 ctx.enabled_processors = self.enabled_processors 259 ctx.enabled_processors = self.enabled_processors
257 ctx.additional_processors = self.additional_processors 260 ctx.additional_processors = self.additional_processors
258 w = multiprocessing.Process( 261 w = multiprocessing.Process(
259 name='Worker_%d' % i, 262 name='PipelineWorker_%d' % i,
260 target=worker_func, args=(i, ctx)) 263 target=worker_func, args=(i, ctx))
261 w.start() 264 w.start()
262 pool.workers.append(w) 265 pool.workers.append(w)
263 return pool 266 return pool
264 267