diff piecrust/workerpool.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents 4850f8c21b6e
children 313db67cfc35
line wrap: on
line diff
--- a/piecrust/workerpool.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/workerpool.py	Sun Jun 04 23:34:28 2017 -0700
@@ -90,6 +90,11 @@
         from piecrust.main import _pre_parse_chef_args
         _pre_parse_chef_args(sys.argv[1:])
 
+    from piecrust.main import ColoredFormatter
+    root_logger = logging.getLogger()
+    root_logger.handlers[0].setFormatter(ColoredFormatter(
+        ('[W-%d]' % wid) + '[%(name)s] %(message)s'))
+
     logger.debug("Worker %d initializing..." % wid)
 
     # We don't need those.
@@ -178,7 +183,10 @@
 class WorkerPool:
     def __init__(self, worker_class, initargs=(), *,
                  callback=None, error_callback=None,
-                 worker_count=None, batch_size=None):
+                 worker_count=None, batch_size=None,
+                 userdata=None):
+        self.userdata = userdata
+
         worker_count = worker_count or os.cpu_count() or 1
 
         if use_fastqueue:
@@ -271,6 +279,7 @@
 
     @staticmethod
     def _handleResults(pool):
+        userdata = pool.userdata
         while True:
             try:
                 res = pool._quick_get()
@@ -287,10 +296,10 @@
             try:
                 if success:
                     if pool._callback:
-                        pool._callback(task_data, data)
+                        pool._callback(task_data, data, userdata)
                 else:
                     if pool._error_callback:
-                        pool._error_callback(task_data, data)
+                        pool._error_callback(task_data, data, userdata)
                     else:
                         logger.error(
                             "Worker %d failed to process a job:" % wid)
@@ -312,7 +321,7 @@
     def wait(self, timeout=None):
         return self._event.wait(timeout)
 
-    def _handle(self, job, res):
+    def _handle(self, job, res, _):
         wid, data = res
         if wid < 0 or wid > self._count:
             logger.error("Ignoring report from unknown worker %d." % wid)
@@ -324,7 +333,7 @@
         if self._received == self._count:
             self._event.set()
 
-    def _handleError(self, job, res):
+    def _handleError(self, job, res, _):
         logger.error("Worker %d failed to send its report." % res.wid)
         logger.error(res)