Mercurial > piecrust2
diff piecrust/workerpool.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | 4850f8c21b6e |
children | 313db67cfc35 |
line wrap: on
line diff
--- a/piecrust/workerpool.py Sun May 21 00:06:59 2017 -0700 +++ b/piecrust/workerpool.py Sun Jun 04 23:34:28 2017 -0700 @@ -90,6 +90,11 @@ from piecrust.main import _pre_parse_chef_args _pre_parse_chef_args(sys.argv[1:]) + from piecrust.main import ColoredFormatter + root_logger = logging.getLogger() + root_logger.handlers[0].setFormatter(ColoredFormatter( + ('[W-%d]' % wid) + '[%(name)s] %(message)s')) + logger.debug("Worker %d initializing..." % wid) # We don't need those. @@ -178,7 +183,10 @@ class WorkerPool: def __init__(self, worker_class, initargs=(), *, callback=None, error_callback=None, - worker_count=None, batch_size=None): + worker_count=None, batch_size=None, + userdata=None): + self.userdata = userdata + worker_count = worker_count or os.cpu_count() or 1 if use_fastqueue: @@ -271,6 +279,7 @@ @staticmethod def _handleResults(pool): + userdata = pool.userdata while True: try: res = pool._quick_get() @@ -287,10 +296,10 @@ try: if success: if pool._callback: - pool._callback(task_data, data) + pool._callback(task_data, data, userdata) else: if pool._error_callback: - pool._error_callback(task_data, data) + pool._error_callback(task_data, data, userdata) else: logger.error( "Worker %d failed to process a job:" % wid) @@ -312,7 +321,7 @@ def wait(self, timeout=None): return self._event.wait(timeout) - def _handle(self, job, res): + def _handle(self, job, res, _): wid, data = res if wid < 0 or wid > self._count: logger.error("Ignoring report from unknown worker %d." % wid) @@ -324,7 +333,7 @@ if self._received == self._count: self._event.set() - def _handleError(self, job, res): + def _handleError(self, job, res, _): logger.error("Worker %d failed to send its report." % res.wid) logger.error(res)