Mercurial > piecrust2
comparison piecrust/workerpool.py @ 461:b015e38d4ee1
internal: Handle data serialization more under the hood.
Implement a new queue class that handles pickling of objects, add option
(currently disabled) to also do zlib compression before sending bytes.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 11 Jul 2015 17:51:56 -0700 |
parents | 55fc8918cb75 |
children | 04abc97dd3b6 |
comparison
equal
deleted
inserted
replaced
460:55fc8918cb75 | 461:b015e38d4ee1 |
---|---|
1 import os | 1 import os |
2 import sys | 2 import sys |
3 import zlib | |
3 import logging | 4 import logging |
4 import itertools | 5 import itertools |
5 import threading | 6 import threading |
6 import multiprocessing | 7 import multiprocessing |
7 from piecrust.fastpickle import pickle, unpickle | 8 from piecrust.fastpickle import pickle, unpickle |
41 else: | 42 else: |
42 _real_worker_func(params) | 43 _real_worker_func(params) |
43 | 44 |
44 | 45 |
45 def _real_worker_func(params): | 46 def _real_worker_func(params): |
46 if hasattr(params.inqueue, '_writer'): | |
47 params.inqueue._writer.close() | |
48 params.outqueue._reader.close() | |
49 | |
50 wid = params.wid | 47 wid = params.wid |
51 logger.debug("Worker %d initializing..." % wid) | 48 logger.debug("Worker %d initializing..." % wid) |
49 | |
50 params.inqueue._writer.close() | |
51 params.outqueue._reader.close() | |
52 | 52 |
53 w = params.worker_class(*params.initargs) | 53 w = params.worker_class(*params.initargs) |
54 w.wid = wid | 54 w.wid = wid |
55 try: | 55 try: |
56 w.initialize() | 56 w.initialize() |
86 | 86 |
87 if task_type == TASK_JOB: | 87 if task_type == TASK_JOB: |
88 task_data = (task_data,) | 88 task_data = (task_data,) |
89 | 89 |
90 for t in task_data: | 90 for t in task_data: |
91 td = unpickle(t) | |
92 try: | 91 try: |
93 res = (TASK_JOB, True, wid, w.process(td)) | 92 res = (TASK_JOB, True, wid, w.process(t)) |
94 except Exception as e: | 93 except Exception as e: |
95 if params.wrap_exception: | 94 if params.wrap_exception: |
96 e = multiprocessing.ExceptionWithTraceback( | 95 e = multiprocessing.ExceptionWithTraceback( |
97 e, e.__traceback__) | 96 e, e.__traceback__) |
98 res = (TASK_JOB, False, wid, e) | 97 res = (TASK_JOB, False, wid, e) |
118 class WorkerPool(object): | 117 class WorkerPool(object): |
119 def __init__(self, worker_class, worker_count=None, initargs=(), | 118 def __init__(self, worker_class, worker_count=None, initargs=(), |
120 wrap_exception=False): | 119 wrap_exception=False): |
121 worker_count = worker_count or os.cpu_count() or 1 | 120 worker_count = worker_count or os.cpu_count() or 1 |
122 | 121 |
123 self._task_queue = multiprocessing.SimpleQueue() | 122 use_fastqueue = True |
124 self._result_queue = multiprocessing.SimpleQueue() | 123 if use_fastqueue: |
125 self._quick_put = self._task_queue._writer.send | 124 self._task_queue = FastQueue() |
126 self._quick_get = self._result_queue._reader.recv | 125 self._result_queue = FastQueue() |
126 self._quick_put = self._task_queue.put | |
127 self._quick_get = self._result_queue.get | |
128 else: | |
129 self._task_queue = multiprocessing.SimpleQueue() | |
130 self._result_queue = multiprocessing.SimpleQueue() | |
131 self._quick_put = self._task_queue._writer.send | |
132 self._quick_get = self._result_queue._reader.recv | |
127 | 133 |
128 self._callback = None | 134 self._callback = None |
129 self._error_callback = None | 135 self._error_callback = None |
130 self._listener = None | 136 self._listener = None |
131 | 137 |
186 chunk_size = max(1, job_count // 50) | 192 chunk_size = max(1, job_count // 50) |
187 logger.debug("Using chunk size of %d" % chunk_size) | 193 logger.debug("Using chunk size of %d" % chunk_size) |
188 | 194 |
189 if chunk_size is None or chunk_size == 1: | 195 if chunk_size is None or chunk_size == 1: |
190 for job in jobs: | 196 for job in jobs: |
191 job_data = pickle(job) | 197 self._quick_put((TASK_JOB, job)) |
192 self._quick_put((TASK_JOB, job_data)) | |
193 else: | 198 else: |
194 it = iter(jobs) | 199 it = iter(jobs) |
195 while True: | 200 while True: |
196 batch = tuple([pickle(i) | 201 batch = tuple([i for i in itertools.islice(it, chunk_size)]) |
197 for i in itertools.islice(it, chunk_size)]) | |
198 if not batch: | 202 if not batch: |
199 break | 203 break |
200 self._quick_put((TASK_BATCH, batch)) | 204 self._quick_put((TASK_BATCH, batch)) |
201 | 205 |
202 return res | 206 return res |
302 def _handleError(self, res): | 306 def _handleError(self, res): |
303 wid, data = res | 307 wid, data = res |
304 logger.error("Worker %d failed to send its report." % wid) | 308 logger.error("Worker %d failed to send its report." % wid) |
305 logger.exception(data) | 309 logger.exception(data) |
306 | 310 |
311 | |
312 class FastQueue(object): | |
313 def __init__(self, compress=False): | |
314 self._reader, self._writer = multiprocessing.Pipe(duplex=False) | |
315 self._rlock = multiprocessing.Lock() | |
316 self._wlock = multiprocessing.Lock() | |
317 self._compress = compress | |
318 | |
319 def __getstate__(self): | |
320 return (self._reader, self._writer, self._rlock, self._wlock, | |
321 self._compress) | |
322 | |
323 def __setstate__(self, state): | |
324 (self._reader, self._writer, self._rlock, self._wlock, | |
325 self._compress) = state | |
326 | |
327 def get(self): | |
328 with self._rlock: | |
329 raw = self._reader.recv_bytes() | |
330 if self._compress: | |
331 data = zlib.decompress(raw) | |
332 else: | |
333 data = raw | |
334 obj = unpickle(data) | |
335 return obj | |
336 | |
337 def put(self, obj): | |
338 data = pickle(obj) | |
339 if self._compress: | |
340 raw = zlib.compress(data) | |
341 else: | |
342 raw = data | |
343 with self._wlock: | |
344 self._writer.send_bytes(raw) | |
345 |