Mercurial > piecrust2
comparison piecrust/workerpool.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | 4850f8c21b6e |
children | 313db67cfc35 |
comparison
equal
deleted
inserted
replaced
853:f070a4fc033c | 854:08e02c2a2a1a |
---|---|
87 # configuration here. Let's set it up again. | 87 # configuration here. Let's set it up again. |
88 if (hasattr(multiprocessing, 'get_start_method') and | 88 if (hasattr(multiprocessing, 'get_start_method') and |
89 multiprocessing.get_start_method() == 'spawn'): | 89 multiprocessing.get_start_method() == 'spawn'): |
90 from piecrust.main import _pre_parse_chef_args | 90 from piecrust.main import _pre_parse_chef_args |
91 _pre_parse_chef_args(sys.argv[1:]) | 91 _pre_parse_chef_args(sys.argv[1:]) |
92 | |
93 from piecrust.main import ColoredFormatter | |
94 root_logger = logging.getLogger() | |
95 root_logger.handlers[0].setFormatter(ColoredFormatter( | |
96 ('[W-%d]' % wid) + '[%(name)s] %(message)s')) | |
92 | 97 |
93 logger.debug("Worker %d initializing..." % wid) | 98 logger.debug("Worker %d initializing..." % wid) |
94 | 99 |
95 # We don't need those. | 100 # We don't need those. |
96 params.inqueue._writer.close() | 101 params.inqueue._writer.close() |
176 | 181 |
177 | 182 |
178 class WorkerPool: | 183 class WorkerPool: |
179 def __init__(self, worker_class, initargs=(), *, | 184 def __init__(self, worker_class, initargs=(), *, |
180 callback=None, error_callback=None, | 185 callback=None, error_callback=None, |
181 worker_count=None, batch_size=None): | 186 worker_count=None, batch_size=None, |
187 userdata=None): | |
188 self.userdata = userdata | |
189 | |
182 worker_count = worker_count or os.cpu_count() or 1 | 190 worker_count = worker_count or os.cpu_count() or 1 |
183 | 191 |
184 if use_fastqueue: | 192 if use_fastqueue: |
185 self._task_queue = FastQueue() | 193 self._task_queue = FastQueue() |
186 self._result_queue = FastQueue() | 194 self._result_queue = FastQueue() |
269 if self._jobs_left == 0: | 277 if self._jobs_left == 0: |
270 self._event.set() | 278 self._event.set() |
271 | 279 |
272 @staticmethod | 280 @staticmethod |
273 def _handleResults(pool): | 281 def _handleResults(pool): |
282 userdata = pool.userdata | |
274 while True: | 283 while True: |
275 try: | 284 try: |
276 res = pool._quick_get() | 285 res = pool._quick_get() |
277 except (EOFError, OSError): | 286 except (EOFError, OSError): |
278 logger.debug("Result handler thread encountered connection " | 287 logger.debug("Result handler thread encountered connection " |
285 | 294 |
286 task_type, task_data, success, wid, data = res | 295 task_type, task_data, success, wid, data = res |
287 try: | 296 try: |
288 if success: | 297 if success: |
289 if pool._callback: | 298 if pool._callback: |
290 pool._callback(task_data, data) | 299 pool._callback(task_data, data, userdata) |
291 else: | 300 else: |
292 if pool._error_callback: | 301 if pool._error_callback: |
293 pool._error_callback(task_data, data) | 302 pool._error_callback(task_data, data, userdata) |
294 else: | 303 else: |
295 logger.error( | 304 logger.error( |
296 "Worker %d failed to process a job:" % wid) | 305 "Worker %d failed to process a job:" % wid) |
297 logger.error(data) | 306 logger.error(data) |
298 except Exception as ex: | 307 except Exception as ex: |
310 self._event = threading.Event() | 319 self._event = threading.Event() |
311 | 320 |
312 def wait(self, timeout=None): | 321 def wait(self, timeout=None): |
313 return self._event.wait(timeout) | 322 return self._event.wait(timeout) |
314 | 323 |
315 def _handle(self, job, res): | 324 def _handle(self, job, res, _): |
316 wid, data = res | 325 wid, data = res |
317 if wid < 0 or wid > self._count: | 326 if wid < 0 or wid > self._count: |
318 logger.error("Ignoring report from unknown worker %d." % wid) | 327 logger.error("Ignoring report from unknown worker %d." % wid) |
319 return | 328 return |
320 | 329 |
322 self.reports[wid] = data | 331 self.reports[wid] = data |
323 | 332 |
324 if self._received == self._count: | 333 if self._received == self._count: |
325 self._event.set() | 334 self._event.set() |
326 | 335 |
327 def _handleError(self, job, res): | 336 def _handleError(self, job, res, _): |
328 logger.error("Worker %d failed to send its report." % res.wid) | 337 logger.error("Worker %d failed to send its report." % res.wid) |
329 logger.error(res) | 338 logger.error(res) |
330 | 339 |
331 | 340 |
332 class FastQueue: | 341 class FastQueue: |