# HG changeset patch # User Ludovic Chabant # Date 1436662316 25200 # Node ID b015e38d4ee11b16ef80b127a551527ceff18365 # Parent 55fc8918cb75e10075202796720c1bddacc7bd9d internal: Handle data serialization more under the hood. Implement a new queue class that handles pickling of objects, add option (currently disabled) to also do zlib compression before sending bytes. diff -r 55fc8918cb75 -r b015e38d4ee1 piecrust/fastpickle.py --- a/piecrust/fastpickle.py Sat Jul 11 00:45:35 2015 -0700 +++ b/piecrust/fastpickle.py Sat Jul 11 17:51:56 2015 -0700 @@ -1,49 +1,66 @@ import sys +import json import datetime import collections def pickle(obj): - return _pickle_object(obj) + data = _pickle_object(obj) + data = json.dumps(data, indent=None, separators=(',', ':')) + return data.encode('utf8') def unpickle(data): + data = data.decode('utf8') + data = json.loads(data) return _unpickle_object(data) _PICKLING = 0 _UNPICKLING = 1 - -def _tuple_dispatch(obj, func, op): - res = [None] * len(obj) - for i, c in enumerate(obj): - res[i] = func(c) - return tuple(res) +_identity_dispatch = object() -def _list_dispatch(obj, func, op): - res = [None] * len(obj) - for i, c in enumerate(obj): - res[i] = func(c) - return res +def _tuple_convert(obj, func, op): + if op == _PICKLING: + return ['__type__:tuple'] + [func(c) for c in obj] + elif op == _UNPICKLING: + return tuple([func(c) for c in obj[1:]]) -def _dict_dispatch(obj, func, op): +def _list_convert(obj, func, op): + return [func(c) for c in obj] + + +def _dict_convert(obj, func, op): res = {} for k, v in obj.items(): res[k] = func(v) return res -def _set_dispatch(obj, func, op): - res = set() - for v in obj: - res.add(func(v)) - return res +def _ordered_dict_convert(obj, func, op): + if op == _PICKLING: + res = {'__type__': 'OrderedDict'} + for k, v in obj.items(): + res[k] = func(v) + return res + elif op == _UNPICKLING: + res = collections.OrderedDict() + for k, v in obj.items(): + res[k] = func(v) + return res -def _date_convert(obj, op): +def _set_convert(obj, func, op): + if op == _PICKLING: + return ['__type__:set'] + [func(c) for c in obj] + elif op == _UNPICKLING: + return set([func(c) for c in obj[1:]]) + + +def _date_convert(obj, func, op): if op == _PICKLING: return {'__class__': 'date', 'year': obj.year, @@ -54,7 +71,7 @@ obj['year'], obj['month'], obj['day']) -def _datetime_convert(obj, op): +def _datetime_convert(obj, func, op): if op == _PICKLING: return {'__class__': 'datetime', 'year': obj.year, @@ -70,7 +87,7 @@ obj['hour'], obj['minute'], obj['second'], obj['microsecond']) -def _time_convert(obj, op): +def _time_convert(obj, func, op): if op == _PICKLING: return {'__class__': 'time', 'hour': obj.hour, @@ -82,56 +99,66 @@ obj['hour'], obj['minute'], obj['second'], obj['microsecond']) -_identity_dispatch = object() +_type_convert = { + type(None): _identity_dispatch, + bool: _identity_dispatch, + int: _identity_dispatch, + float: _identity_dispatch, + str: _identity_dispatch, + datetime.date: _date_convert, + datetime.datetime: _datetime_convert, + datetime.time: _time_convert, + tuple: _tuple_convert, + list: _list_convert, + dict: _dict_convert, + set: _set_convert, + collections.OrderedDict: _ordered_dict_convert, + } -_type_dispatch = { + +_type_unconvert = { type(None): _identity_dispatch, bool: _identity_dispatch, int: _identity_dispatch, float: _identity_dispatch, str: _identity_dispatch, - tuple: _tuple_dispatch, - list: _list_dispatch, - dict: _dict_dispatch, - collections.OrderedDict: _dict_dispatch, - set: _set_dispatch + 'date': _date_convert, + 'datetime': _datetime_convert, + 'time': _time_convert, } -_type_convert = { - datetime.date: _date_convert, - datetime.datetime: _datetime_convert, - datetime.time: _time_convert +_collection_unconvert = { + '__type__:tuple': _tuple_convert, + '__type__:set': _set_convert, } -_type_unconvert = { - 'date': _date_convert, - 'datetime': _datetime_convert, - 'time': _time_convert +_mapping_unconvert = { + 'OrderedDict': _ordered_dict_convert } def _pickle_object(obj): t = type(obj) - disp = _type_dispatch.get(t) - if disp is _identity_dispatch: + conv = _type_convert.get(t) + + # Object doesn't need conversion? + if conv is _identity_dispatch: return obj - if disp is not None: - return disp(obj, _pickle_object, _PICKLING) + # Object has special conversion? + if conv is not None: + return conv(obj, _pickle_object, _PICKLING) - conv = _type_convert.get(t) - if conv is not None: - return conv(obj, _PICKLING) - + # Use instance dictionary, or a custom state. getter = getattr(obj, '__getstate__', None) if getter is not None: state = getter() else: state = obj.__dict__ - state = _dict_dispatch(state, _pickle_object, _PICKLING) + state = _dict_convert(state, _pickle_object, _PICKLING) state['__class__'] = obj.__class__.__name__ state['__module__'] = obj.__class__.__module__ @@ -140,18 +167,42 @@ def _unpickle_object(state): t = type(state) - disp = _type_dispatch.get(t) - if disp is _identity_dispatch: + conv = _type_unconvert.get(t) + + # Object doesn't need conversion? + if conv is _identity_dispatch: return state - if (disp is not None and - (t != dict or '__class__' not in state)): - return disp(state, _unpickle_object, _UNPICKLING) + # Try collection or mapping conversion. + if t is list: + try: + col_type = state[0] + if not isinstance(col_type, str): + col_type = None + except IndexError: + col_type = None + if col_type is not None: + conv = _collection_unconvert.get(col_type) + if conv is not None: + return conv(state, _unpickle_object, _UNPICKLING) + return _list_convert(state, _unpickle_object, _UNPICKLING) - class_name = state['__class__'] + assert t is dict + + # Custom mapping type? + map_type = state.get('__type__') + if map_type: + conv = _mapping_unconvert.get(map_type) + return conv(state, _unpickle_object, _UNPICKLING) + + # Class instance or other custom type. + class_name = state.get('__class__') + if class_name is None: + return _dict_convert(state, _unpickle_object, _UNPICKLING) + conv = _type_unconvert.get(class_name) if conv is not None: - return conv(state, _UNPICKLING) + return conv(state, _unpickle_object, _UNPICKLING) mod_name = state['__module__'] mod = sys.modules[mod_name] diff -r 55fc8918cb75 -r b015e38d4ee1 piecrust/workerpool.py --- a/piecrust/workerpool.py Sat Jul 11 00:45:35 2015 -0700 +++ b/piecrust/workerpool.py Sat Jul 11 17:51:56 2015 -0700 @@ -1,5 +1,6 @@ import os import sys +import zlib import logging import itertools import threading @@ -43,13 +44,12 @@ def _real_worker_func(params): - if hasattr(params.inqueue, '_writer'): - params.inqueue._writer.close() - params.outqueue._reader.close() - wid = params.wid logger.debug("Worker %d initializing..." % wid) + params.inqueue._writer.close() + params.outqueue._reader.close() + w = params.worker_class(*params.initargs) w.wid = wid try: @@ -88,9 +88,8 @@ task_data = (task_data,) for t in task_data: - td = unpickle(t) try: - res = (TASK_JOB, True, wid, w.process(td)) + res = (TASK_JOB, True, wid, w.process(t)) except Exception as e: if params.wrap_exception: e = multiprocessing.ExceptionWithTraceback( @@ -120,10 +119,17 @@ wrap_exception=False): worker_count = worker_count or os.cpu_count() or 1 - self._task_queue = multiprocessing.SimpleQueue() - self._result_queue = multiprocessing.SimpleQueue() - self._quick_put = self._task_queue._writer.send - self._quick_get = self._result_queue._reader.recv + use_fastqueue = True + if use_fastqueue: + self._task_queue = FastQueue() + self._result_queue = FastQueue() + self._quick_put = self._task_queue.put + self._quick_get = self._result_queue.get + else: + self._task_queue = multiprocessing.SimpleQueue() + self._result_queue = multiprocessing.SimpleQueue() + self._quick_put = self._task_queue._writer.send + self._quick_get = self._result_queue._reader.recv self._callback = None self._error_callback = None @@ -188,13 +194,11 @@ if chunk_size is None or chunk_size == 1: for job in jobs: - job_data = pickle(job) - self._quick_put((TASK_JOB, job_data)) + self._quick_put((TASK_JOB, job)) else: it = iter(jobs) while True: - batch = tuple([pickle(i) - for i in itertools.islice(it, chunk_size)]) + batch = tuple([i for i in itertools.islice(it, chunk_size)]) if not batch: break self._quick_put((TASK_BATCH, batch)) @@ -304,3 +308,38 @@ logger.error("Worker %d failed to send its report." % wid) logger.exception(data) + +class FastQueue(object): + def __init__(self, compress=False): + self._reader, self._writer = multiprocessing.Pipe(duplex=False) + self._rlock = multiprocessing.Lock() + self._wlock = multiprocessing.Lock() + self._compress = compress + + def __getstate__(self): + return (self._reader, self._writer, self._rlock, self._wlock, + self._compress) + + def __setstate__(self, state): + (self._reader, self._writer, self._rlock, self._wlock, + self._compress) = state + + def get(self): + with self._rlock: + raw = self._reader.recv_bytes() + if self._compress: + data = zlib.decompress(raw) + else: + data = raw + obj = unpickle(data) + return obj + + def put(self, obj): + data = pickle(obj) + if self._compress: + raw = zlib.compress(data) + else: + raw = data + with self._wlock: + self._writer.send_bytes(raw) +