Mercurial > piecrust2
diff piecrust/workerpool.py @ 461:b015e38d4ee1
internal: Handle data serialization more under the hood.
Implement a new queue class that handles pickling of objects, add option
(currently disabled) to also do zlib compression before sending bytes.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 11 Jul 2015 17:51:56 -0700 |
parents | 55fc8918cb75 |
children | 04abc97dd3b6 |
line wrap: on
line diff
--- a/piecrust/workerpool.py Sat Jul 11 00:45:35 2015 -0700 +++ b/piecrust/workerpool.py Sat Jul 11 17:51:56 2015 -0700 @@ -1,5 +1,6 @@ import os import sys +import zlib import logging import itertools import threading @@ -43,13 +44,12 @@ def _real_worker_func(params): - if hasattr(params.inqueue, '_writer'): - params.inqueue._writer.close() - params.outqueue._reader.close() - wid = params.wid logger.debug("Worker %d initializing..." % wid) + params.inqueue._writer.close() + params.outqueue._reader.close() + w = params.worker_class(*params.initargs) w.wid = wid try: @@ -88,9 +88,8 @@ task_data = (task_data,) for t in task_data: - td = unpickle(t) try: - res = (TASK_JOB, True, wid, w.process(td)) + res = (TASK_JOB, True, wid, w.process(t)) except Exception as e: if params.wrap_exception: e = multiprocessing.ExceptionWithTraceback( @@ -120,10 +119,17 @@ wrap_exception=False): worker_count = worker_count or os.cpu_count() or 1 - self._task_queue = multiprocessing.SimpleQueue() - self._result_queue = multiprocessing.SimpleQueue() - self._quick_put = self._task_queue._writer.send - self._quick_get = self._result_queue._reader.recv + use_fastqueue = True + if use_fastqueue: + self._task_queue = FastQueue() + self._result_queue = FastQueue() + self._quick_put = self._task_queue.put + self._quick_get = self._result_queue.get + else: + self._task_queue = multiprocessing.SimpleQueue() + self._result_queue = multiprocessing.SimpleQueue() + self._quick_put = self._task_queue._writer.send + self._quick_get = self._result_queue._reader.recv self._callback = None self._error_callback = None @@ -188,13 +194,11 @@ if chunk_size is None or chunk_size == 1: for job in jobs: - job_data = pickle(job) - self._quick_put((TASK_JOB, job_data)) + self._quick_put((TASK_JOB, job)) else: it = iter(jobs) while True: - batch = tuple([pickle(i) - for i in itertools.islice(it, chunk_size)]) + batch = tuple([i for i in itertools.islice(it, chunk_size)]) if not batch: break self._quick_put((TASK_BATCH, batch)) @@ -304,3 +308,38 @@ logger.error("Worker %d failed to send its report." % wid) logger.exception(data) + +class FastQueue(object): + def __init__(self, compress=False): + self._reader, self._writer = multiprocessing.Pipe(duplex=False) + self._rlock = multiprocessing.Lock() + self._wlock = multiprocessing.Lock() + self._compress = compress + + def __getstate__(self): + return (self._reader, self._writer, self._rlock, self._wlock, + self._compress) + + def __setstate__(self, state): + (self._reader, self._writer, self._rlock, self._wlock, + self._compress) = state + + def get(self): + with self._rlock: + raw = self._reader.recv_bytes() + if self._compress: + data = zlib.decompress(raw) + else: + data = raw + obj = unpickle(data) + return obj + + def put(self, obj): + data = pickle(obj) + if self._compress: + raw = zlib.compress(data) + else: + raw = data + with self._wlock: + self._writer.send_bytes(raw) +