diff piecrust/workerpool.py @ 451:838f3964f400

bake: Optimize the bake by not using custom classes for passing info. See previous changeset about pickling performance between processes. Now just use plain standard structures, or the new `fastpickle` when needed.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 06 Jul 2015 21:30:49 -0700
parents aefe70229fdd
children 8351a77e13f5
line wrap: on
line diff
--- a/piecrust/workerpool.py	Mon Jul 06 21:29:17 2015 -0700
+++ b/piecrust/workerpool.py	Mon Jul 06 21:30:49 2015 -0700
@@ -3,6 +3,7 @@
 import logging
 import threading
 import multiprocessing
+from piecrust.fastpickle import pickle, unpickle
 
 
 logger = logging.getLogger(__name__)
@@ -75,6 +76,7 @@
             put(rep)
             break
 
+        task_data = unpickle(task_data)
         try:
             res = (task_type, True, wid, w.process(task_data))
         except Exception as e:
@@ -101,7 +103,8 @@
 
 
 class WorkerPool(object):
-    def __init__(self, worker_class, worker_count=None, initargs=()):
+    def __init__(self, worker_class, worker_count=None, initargs=(),
+                 wrap_exception=False):
         worker_count = worker_count or os.cpu_count() or 1
 
         self._task_queue = multiprocessing.SimpleQueue()
@@ -122,6 +125,7 @@
             worker_params = _WorkerParams(
                     i, self._task_queue, self._result_queue,
                     worker_class, initargs,
+                    wrap_exception=wrap_exception,
                     is_profiling=is_profiling)
             w = multiprocessing.Process(target=worker_func,
                                         args=(worker_params,))
@@ -161,7 +165,8 @@
 
         self._listener = res
         for job in jobs:
-            self._quick_put((TASK_JOB, job))
+            job_data = pickle(job)
+            self._quick_put((TASK_JOB, job_data))
 
         return res
 
@@ -209,8 +214,11 @@
             try:
                 if success and pool._callback:
                     pool._callback(data)
-                elif not success and pool._error_callback:
-                    pool._error_callback(data)
+                elif not success:
+                    if pool._error_callback:
+                        pool._error_callback(data)
+                    else:
+                        logger.error(data)
             except Exception as ex:
                 logger.exception(ex)