Mercurial > piecrust2
comparison piecrust/processing/pipeline.py @ 421:4a43d7015b75
bake: Improve performance timers reports.
Add timers per-worker, and separate bake and pipeline workers.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jun 2015 23:27:39 -0700 |
parents | c4b3a7fd2f87 |
children | 6238dcfc7a78 |
comparison
equal
deleted
inserted
replaced
420:f1b759c188b0 | 421:4a43d7015b75 |
---|---|
132 self._waitOnWorkerPool(pool, expected_result_count, _handler) | 132 self._waitOnWorkerPool(pool, expected_result_count, _handler) |
133 self._terminateWorkerPool(pool) | 133 self._terminateWorkerPool(pool) |
134 | 134 |
135 # Get timing information from the workers. | 135 # Get timing information from the workers. |
136 record.current.timers = {} | 136 record.current.timers = {} |
137 for _ in range(len(pool.workers)): | 137 for i in range(len(pool.workers)): |
138 try: | 138 try: |
139 timers = pool.results.get(True, 0.1) | 139 timers = pool.results.get(True, 0.1) |
140 except queue.Empty: | 140 except queue.Empty: |
141 logger.error("Didn't get timing information from all workers.") | 141 logger.error("Didn't get timing information from all workers.") |
142 break | 142 break |
143 | 143 |
144 worker_name = 'PipelineWorker_%d' % i | |
145 record.current.timers[worker_name] = {} | |
144 for name, val in timers['data'].items(): | 146 for name, val in timers['data'].items(): |
145 main_val = record.current.timers.setdefault(name, 0) | 147 main_val = record.current.timers.setdefault(name, 0) |
146 record.current.timers[name] = main_val + val | 148 record.current.timers[name] = main_val + val |
149 record.current.timers[worker_name][name] = val | |
147 | 150 |
148 # Invoke post-processors. | 151 # Invoke post-processors. |
149 pipeline_ctx.record = record.current | 152 pipeline_ctx.record = record.current |
150 for proc in processors: | 153 for proc in processors: |
151 proc.onPipelineEnd(pipeline_ctx) | 154 proc.onPipelineEnd(pipeline_ctx) |
254 pool.queue, pool.results, pool.abort_event, | 257 pool.queue, pool.results, pool.abort_event, |
255 self.force, self.app.debug) | 258 self.force, self.app.debug) |
256 ctx.enabled_processors = self.enabled_processors | 259 ctx.enabled_processors = self.enabled_processors |
257 ctx.additional_processors = self.additional_processors | 260 ctx.additional_processors = self.additional_processors |
258 w = multiprocessing.Process( | 261 w = multiprocessing.Process( |
259 name='Worker_%d' % i, | 262 name='PipelineWorker_%d' % i, |
260 target=worker_func, args=(i, ctx)) | 263 target=worker_func, args=(i, ctx)) |
261 w.start() | 264 w.start() |
262 pool.workers.append(w) | 265 pool.workers.append(w) |
263 return pool | 266 return pool |
264 | 267 |