comparison piecrust/workerpool.py @ 461:b015e38d4ee1

internal: Handle data serialization more under the hood. Implement a new queue class that handles pickling of objects, add option (currently disabled) to also do zlib compression before sending bytes.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 17:51:56 -0700
parents 55fc8918cb75
children 04abc97dd3b6
comparison
equal deleted inserted replaced
460:55fc8918cb75 461:b015e38d4ee1
1 import os 1 import os
2 import sys 2 import sys
3 import zlib
3 import logging 4 import logging
4 import itertools 5 import itertools
5 import threading 6 import threading
6 import multiprocessing 7 import multiprocessing
7 from piecrust.fastpickle import pickle, unpickle 8 from piecrust.fastpickle import pickle, unpickle
41 else: 42 else:
42 _real_worker_func(params) 43 _real_worker_func(params)
43 44
44 45
45 def _real_worker_func(params): 46 def _real_worker_func(params):
46 if hasattr(params.inqueue, '_writer'):
47 params.inqueue._writer.close()
48 params.outqueue._reader.close()
49
50 wid = params.wid 47 wid = params.wid
51 logger.debug("Worker %d initializing..." % wid) 48 logger.debug("Worker %d initializing..." % wid)
49
50 params.inqueue._writer.close()
51 params.outqueue._reader.close()
52 52
53 w = params.worker_class(*params.initargs) 53 w = params.worker_class(*params.initargs)
54 w.wid = wid 54 w.wid = wid
55 try: 55 try:
56 w.initialize() 56 w.initialize()
86 86
87 if task_type == TASK_JOB: 87 if task_type == TASK_JOB:
88 task_data = (task_data,) 88 task_data = (task_data,)
89 89
90 for t in task_data: 90 for t in task_data:
91 td = unpickle(t)
92 try: 91 try:
93 res = (TASK_JOB, True, wid, w.process(td)) 92 res = (TASK_JOB, True, wid, w.process(t))
94 except Exception as e: 93 except Exception as e:
95 if params.wrap_exception: 94 if params.wrap_exception:
96 e = multiprocessing.ExceptionWithTraceback( 95 e = multiprocessing.ExceptionWithTraceback(
97 e, e.__traceback__) 96 e, e.__traceback__)
98 res = (TASK_JOB, False, wid, e) 97 res = (TASK_JOB, False, wid, e)
118 class WorkerPool(object): 117 class WorkerPool(object):
119 def __init__(self, worker_class, worker_count=None, initargs=(), 118 def __init__(self, worker_class, worker_count=None, initargs=(),
120 wrap_exception=False): 119 wrap_exception=False):
121 worker_count = worker_count or os.cpu_count() or 1 120 worker_count = worker_count or os.cpu_count() or 1
122 121
123 self._task_queue = multiprocessing.SimpleQueue() 122 use_fastqueue = True
124 self._result_queue = multiprocessing.SimpleQueue() 123 if use_fastqueue:
125 self._quick_put = self._task_queue._writer.send 124 self._task_queue = FastQueue()
126 self._quick_get = self._result_queue._reader.recv 125 self._result_queue = FastQueue()
126 self._quick_put = self._task_queue.put
127 self._quick_get = self._result_queue.get
128 else:
129 self._task_queue = multiprocessing.SimpleQueue()
130 self._result_queue = multiprocessing.SimpleQueue()
131 self._quick_put = self._task_queue._writer.send
132 self._quick_get = self._result_queue._reader.recv
127 133
128 self._callback = None 134 self._callback = None
129 self._error_callback = None 135 self._error_callback = None
130 self._listener = None 136 self._listener = None
131 137
186 chunk_size = max(1, job_count // 50) 192 chunk_size = max(1, job_count // 50)
187 logger.debug("Using chunk size of %d" % chunk_size) 193 logger.debug("Using chunk size of %d" % chunk_size)
188 194
189 if chunk_size is None or chunk_size == 1: 195 if chunk_size is None or chunk_size == 1:
190 for job in jobs: 196 for job in jobs:
191 job_data = pickle(job) 197 self._quick_put((TASK_JOB, job))
192 self._quick_put((TASK_JOB, job_data))
193 else: 198 else:
194 it = iter(jobs) 199 it = iter(jobs)
195 while True: 200 while True:
196 batch = tuple([pickle(i) 201 batch = tuple([i for i in itertools.islice(it, chunk_size)])
197 for i in itertools.islice(it, chunk_size)])
198 if not batch: 202 if not batch:
199 break 203 break
200 self._quick_put((TASK_BATCH, batch)) 204 self._quick_put((TASK_BATCH, batch))
201 205
202 return res 206 return res
302 def _handleError(self, res): 306 def _handleError(self, res):
303 wid, data = res 307 wid, data = res
304 logger.error("Worker %d failed to send its report." % wid) 308 logger.error("Worker %d failed to send its report." % wid)
305 logger.exception(data) 309 logger.exception(data)
306 310
311
312 class FastQueue(object):
313 def __init__(self, compress=False):
314 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
315 self._rlock = multiprocessing.Lock()
316 self._wlock = multiprocessing.Lock()
317 self._compress = compress
318
319 def __getstate__(self):
320 return (self._reader, self._writer, self._rlock, self._wlock,
321 self._compress)
322
323 def __setstate__(self, state):
324 (self._reader, self._writer, self._rlock, self._wlock,
325 self._compress) = state
326
327 def get(self):
328 with self._rlock:
329 raw = self._reader.recv_bytes()
330 if self._compress:
331 data = zlib.decompress(raw)
332 else:
333 data = raw
334 obj = unpickle(data)
335 return obj
336
337 def put(self, obj):
338 data = pickle(obj)
339 if self._compress:
340 raw = zlib.compress(data)
341 else:
342 raw = data
343 with self._wlock:
344 self._writer.send_bytes(raw)
345