comparison piecrust/workerpool.py @ 451:838f3964f400

bake: Optimize the bake by not using custom classes for passing info. See previous changeset about pickling performance between processes. Now just use plain standard structures, or the new `fastpickle` when needed.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 06 Jul 2015 21:30:49 -0700
parents aefe70229fdd
children 8351a77e13f5
comparison
equal deleted inserted replaced
450:298f8f46432a 451:838f3964f400
1 import os 1 import os
2 import sys 2 import sys
3 import logging 3 import logging
4 import threading 4 import threading
5 import multiprocessing 5 import multiprocessing
6 from piecrust.fastpickle import pickle, unpickle
6 7
7 8
8 logger = logging.getLogger(__name__) 9 logger = logging.getLogger(__name__)
9 10
10 11
73 e, e.__traceback__) 74 e, e.__traceback__)
74 rep = (task_type, False, wid, (wid, e)) 75 rep = (task_type, False, wid, (wid, e))
75 put(rep) 76 put(rep)
76 break 77 break
77 78
79 task_data = unpickle(task_data)
78 try: 80 try:
79 res = (task_type, True, wid, w.process(task_data)) 81 res = (task_type, True, wid, w.process(task_data))
80 except Exception as e: 82 except Exception as e:
81 if params.wrap_exception: 83 if params.wrap_exception:
82 e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__) 84 e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__)
99 self.wrap_exception = wrap_exception 101 self.wrap_exception = wrap_exception
100 self.is_profiling = is_profiling 102 self.is_profiling = is_profiling
101 103
102 104
103 class WorkerPool(object): 105 class WorkerPool(object):
104 def __init__(self, worker_class, worker_count=None, initargs=()): 106 def __init__(self, worker_class, worker_count=None, initargs=(),
107 wrap_exception=False):
105 worker_count = worker_count or os.cpu_count() or 1 108 worker_count = worker_count or os.cpu_count() or 1
106 109
107 self._task_queue = multiprocessing.SimpleQueue() 110 self._task_queue = multiprocessing.SimpleQueue()
108 self._result_queue = multiprocessing.SimpleQueue() 111 self._result_queue = multiprocessing.SimpleQueue()
109 self._quick_put = self._task_queue._writer.send 112 self._quick_put = self._task_queue._writer.send
120 self._pool = [] 123 self._pool = []
121 for i in range(worker_count): 124 for i in range(worker_count):
122 worker_params = _WorkerParams( 125 worker_params = _WorkerParams(
123 i, self._task_queue, self._result_queue, 126 i, self._task_queue, self._result_queue,
124 worker_class, initargs, 127 worker_class, initargs,
128 wrap_exception=wrap_exception,
125 is_profiling=is_profiling) 129 is_profiling=is_profiling)
126 w = multiprocessing.Process(target=worker_func, 130 w = multiprocessing.Process(target=worker_func,
127 args=(worker_params,)) 131 args=(worker_params,))
128 w.name = w.name.replace('Process', 'PoolWorker') 132 w.name = w.name.replace('Process', 'PoolWorker')
129 w.daemon = True 133 w.daemon = True
159 res._event.set() 163 res._event.set()
160 return res 164 return res
161 165
162 self._listener = res 166 self._listener = res
163 for job in jobs: 167 for job in jobs:
164 self._quick_put((TASK_JOB, job)) 168 job_data = pickle(job)
169 self._quick_put((TASK_JOB, job_data))
165 170
166 return res 171 return res
167 172
168 def close(self): 173 def close(self):
169 if self._listener is not None: 174 if self._listener is not None:
207 212
208 task_type, success, wid, data = res 213 task_type, success, wid, data = res
209 try: 214 try:
210 if success and pool._callback: 215 if success and pool._callback:
211 pool._callback(data) 216 pool._callback(data)
212 elif not success and pool._error_callback: 217 elif not success:
213 pool._error_callback(data) 218 if pool._error_callback:
219 pool._error_callback(data)
220 else:
221 logger.error(data)
214 except Exception as ex: 222 except Exception as ex:
215 logger.exception(ex) 223 logger.exception(ex)
216 224
217 if task_type == TASK_JOB: 225 if task_type == TASK_JOB:
218 pool._listener._onTaskDone() 226 pool._listener._onTaskDone()