comparison piecrust/workerpool.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents 4850f8c21b6e
children 313db67cfc35
comparison
equal deleted inserted replaced
853:f070a4fc033c 854:08e02c2a2a1a
87 # configuration here. Let's set it up again. 87 # configuration here. Let's set it up again.
88 if (hasattr(multiprocessing, 'get_start_method') and 88 if (hasattr(multiprocessing, 'get_start_method') and
89 multiprocessing.get_start_method() == 'spawn'): 89 multiprocessing.get_start_method() == 'spawn'):
90 from piecrust.main import _pre_parse_chef_args 90 from piecrust.main import _pre_parse_chef_args
91 _pre_parse_chef_args(sys.argv[1:]) 91 _pre_parse_chef_args(sys.argv[1:])
92
93 from piecrust.main import ColoredFormatter
94 root_logger = logging.getLogger()
95 root_logger.handlers[0].setFormatter(ColoredFormatter(
96 ('[W-%d]' % wid) + '[%(name)s] %(message)s'))
92 97
93 logger.debug("Worker %d initializing..." % wid) 98 logger.debug("Worker %d initializing..." % wid)
94 99
95 # We don't need those. 100 # We don't need those.
96 params.inqueue._writer.close() 101 params.inqueue._writer.close()
176 181
177 182
178 class WorkerPool: 183 class WorkerPool:
179 def __init__(self, worker_class, initargs=(), *, 184 def __init__(self, worker_class, initargs=(), *,
180 callback=None, error_callback=None, 185 callback=None, error_callback=None,
181 worker_count=None, batch_size=None): 186 worker_count=None, batch_size=None,
187 userdata=None):
188 self.userdata = userdata
189
182 worker_count = worker_count or os.cpu_count() or 1 190 worker_count = worker_count or os.cpu_count() or 1
183 191
184 if use_fastqueue: 192 if use_fastqueue:
185 self._task_queue = FastQueue() 193 self._task_queue = FastQueue()
186 self._result_queue = FastQueue() 194 self._result_queue = FastQueue()
269 if self._jobs_left == 0: 277 if self._jobs_left == 0:
270 self._event.set() 278 self._event.set()
271 279
272 @staticmethod 280 @staticmethod
273 def _handleResults(pool): 281 def _handleResults(pool):
282 userdata = pool.userdata
274 while True: 283 while True:
275 try: 284 try:
276 res = pool._quick_get() 285 res = pool._quick_get()
277 except (EOFError, OSError): 286 except (EOFError, OSError):
278 logger.debug("Result handler thread encountered connection " 287 logger.debug("Result handler thread encountered connection "
285 294
286 task_type, task_data, success, wid, data = res 295 task_type, task_data, success, wid, data = res
287 try: 296 try:
288 if success: 297 if success:
289 if pool._callback: 298 if pool._callback:
290 pool._callback(task_data, data) 299 pool._callback(task_data, data, userdata)
291 else: 300 else:
292 if pool._error_callback: 301 if pool._error_callback:
293 pool._error_callback(task_data, data) 302 pool._error_callback(task_data, data, userdata)
294 else: 303 else:
295 logger.error( 304 logger.error(
296 "Worker %d failed to process a job:" % wid) 305 "Worker %d failed to process a job:" % wid)
297 logger.error(data) 306 logger.error(data)
298 except Exception as ex: 307 except Exception as ex:
310 self._event = threading.Event() 319 self._event = threading.Event()
311 320
312 def wait(self, timeout=None): 321 def wait(self, timeout=None):
313 return self._event.wait(timeout) 322 return self._event.wait(timeout)
314 323
315 def _handle(self, job, res): 324 def _handle(self, job, res, _):
316 wid, data = res 325 wid, data = res
317 if wid < 0 or wid > self._count: 326 if wid < 0 or wid > self._count:
318 logger.error("Ignoring report from unknown worker %d." % wid) 327 logger.error("Ignoring report from unknown worker %d." % wid)
319 return 328 return
320 329
322 self.reports[wid] = data 331 self.reports[wid] = data
323 332
324 if self._received == self._count: 333 if self._received == self._count:
325 self._event.set() 334 self._event.set()
326 335
327 def _handleError(self, job, res): 336 def _handleError(self, job, res, _):
328 logger.error("Worker %d failed to send its report." % res.wid) 337 logger.error("Worker %d failed to send its report." % res.wid)
329 logger.error(res) 338 logger.error(res)
330 339
331 340
332 class FastQueue: 341 class FastQueue: