Mercurial > piecrust2
comparison piecrust/workerpool.py @ 451:838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
See previous changeset about pickling performance between processes. Now
just use plain standard structures, or the new `fastpickle` when needed.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 06 Jul 2015 21:30:49 -0700 |
parents | aefe70229fdd |
children | 8351a77e13f5 |
comparison
equal
deleted
inserted
replaced
450:298f8f46432a | 451:838f3964f400 |
---|---|
1 import os | 1 import os |
2 import sys | 2 import sys |
3 import logging | 3 import logging |
4 import threading | 4 import threading |
5 import multiprocessing | 5 import multiprocessing |
6 from piecrust.fastpickle import pickle, unpickle | |
6 | 7 |
7 | 8 |
8 logger = logging.getLogger(__name__) | 9 logger = logging.getLogger(__name__) |
9 | 10 |
10 | 11 |
73 e, e.__traceback__) | 74 e, e.__traceback__) |
74 rep = (task_type, False, wid, (wid, e)) | 75 rep = (task_type, False, wid, (wid, e)) |
75 put(rep) | 76 put(rep) |
76 break | 77 break |
77 | 78 |
79 task_data = unpickle(task_data) | |
78 try: | 80 try: |
79 res = (task_type, True, wid, w.process(task_data)) | 81 res = (task_type, True, wid, w.process(task_data)) |
80 except Exception as e: | 82 except Exception as e: |
81 if params.wrap_exception: | 83 if params.wrap_exception: |
82 e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__) | 84 e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__) |
99 self.wrap_exception = wrap_exception | 101 self.wrap_exception = wrap_exception |
100 self.is_profiling = is_profiling | 102 self.is_profiling = is_profiling |
101 | 103 |
102 | 104 |
103 class WorkerPool(object): | 105 class WorkerPool(object): |
104 def __init__(self, worker_class, worker_count=None, initargs=()): | 106 def __init__(self, worker_class, worker_count=None, initargs=(), |
107 wrap_exception=False): | |
105 worker_count = worker_count or os.cpu_count() or 1 | 108 worker_count = worker_count or os.cpu_count() or 1 |
106 | 109 |
107 self._task_queue = multiprocessing.SimpleQueue() | 110 self._task_queue = multiprocessing.SimpleQueue() |
108 self._result_queue = multiprocessing.SimpleQueue() | 111 self._result_queue = multiprocessing.SimpleQueue() |
109 self._quick_put = self._task_queue._writer.send | 112 self._quick_put = self._task_queue._writer.send |
120 self._pool = [] | 123 self._pool = [] |
121 for i in range(worker_count): | 124 for i in range(worker_count): |
122 worker_params = _WorkerParams( | 125 worker_params = _WorkerParams( |
123 i, self._task_queue, self._result_queue, | 126 i, self._task_queue, self._result_queue, |
124 worker_class, initargs, | 127 worker_class, initargs, |
128 wrap_exception=wrap_exception, | |
125 is_profiling=is_profiling) | 129 is_profiling=is_profiling) |
126 w = multiprocessing.Process(target=worker_func, | 130 w = multiprocessing.Process(target=worker_func, |
127 args=(worker_params,)) | 131 args=(worker_params,)) |
128 w.name = w.name.replace('Process', 'PoolWorker') | 132 w.name = w.name.replace('Process', 'PoolWorker') |
129 w.daemon = True | 133 w.daemon = True |
159 res._event.set() | 163 res._event.set() |
160 return res | 164 return res |
161 | 165 |
162 self._listener = res | 166 self._listener = res |
163 for job in jobs: | 167 for job in jobs: |
164 self._quick_put((TASK_JOB, job)) | 168 job_data = pickle(job) |
169 self._quick_put((TASK_JOB, job_data)) | |
165 | 170 |
166 return res | 171 return res |
167 | 172 |
168 def close(self): | 173 def close(self): |
169 if self._listener is not None: | 174 if self._listener is not None: |
207 | 212 |
208 task_type, success, wid, data = res | 213 task_type, success, wid, data = res |
209 try: | 214 try: |
210 if success and pool._callback: | 215 if success and pool._callback: |
211 pool._callback(data) | 216 pool._callback(data) |
212 elif not success and pool._error_callback: | 217 elif not success: |
213 pool._error_callback(data) | 218 if pool._error_callback: |
219 pool._error_callback(data) | |
220 else: | |
221 logger.error(data) | |
214 except Exception as ex: | 222 except Exception as ex: |
215 logger.exception(ex) | 223 logger.exception(ex) |
216 | 224 |
217 if task_type == TASK_JOB: | 225 if task_type == TASK_JOB: |
218 pool._listener._onTaskDone() | 226 pool._listener._onTaskDone() |