changeset 461:b015e38d4ee1

internal: Handle data serialization more under the hood. Implement a new queue class that handles pickling of objects, add option (currently disabled) to also do zlib compression before sending bytes.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 17:51:56 -0700
parents 55fc8918cb75
children 04abc97dd3b6
files piecrust/fastpickle.py piecrust/workerpool.py
diffstat 2 files changed, 156 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/fastpickle.py	Sat Jul 11 00:45:35 2015 -0700
+++ b/piecrust/fastpickle.py	Sat Jul 11 17:51:56 2015 -0700
@@ -1,49 +1,66 @@
 import sys
+import json
 import datetime
 import collections
 
 
 def pickle(obj):
-    return _pickle_object(obj)
+    data = _pickle_object(obj)
+    data = json.dumps(data, indent=None, separators=(',', ':'))
+    return data.encode('utf8')
 
 
 def unpickle(data):
+    data = data.decode('utf8')
+    data = json.loads(data)
     return _unpickle_object(data)
 
 
 _PICKLING = 0
 _UNPICKLING = 1
 
-
-def _tuple_dispatch(obj, func, op):
-    res = [None] * len(obj)
-    for i, c in enumerate(obj):
-        res[i] = func(c)
-    return tuple(res)
+_identity_dispatch = object()
 
 
-def _list_dispatch(obj, func, op):
-    res = [None] * len(obj)
-    for i, c in enumerate(obj):
-        res[i] = func(c)
-    return res
+def _tuple_convert(obj, func, op):
+    if op == _PICKLING:
+        return ['__type__:tuple'] + [func(c) for c in obj]
+    elif op == _UNPICKLING:
+        return tuple([func(c) for c in obj[1:]])
 
 
-def _dict_dispatch(obj, func, op):
+def _list_convert(obj, func, op):
+    return [func(c) for c in obj]
+
+
+def _dict_convert(obj, func, op):
     res = {}
     for k, v in obj.items():
         res[k] = func(v)
     return res
 
 
-def _set_dispatch(obj, func, op):
-    res = set()
-    for v in obj:
-        res.add(func(v))
-    return res
+def _ordered_dict_convert(obj, func, op):
+    if op == _PICKLING:
+        res = {'__type__': 'OrderedDict'}
+        for k, v in obj.items():
+            res[k] = func(v)
+        return res
+    elif op == _UNPICKLING:
+        res = collections.OrderedDict()
+        for k, v in obj.items():
+            res[k] = func(v)
+        return res
 
 
-def _date_convert(obj, op):
+def _set_convert(obj, func, op):
+    if op == _PICKLING:
+        return ['__type__:set'] + [func(c) for c in obj]
+    elif op == _UNPICKLING:
+        return set([func(c) for c in obj[1:]])
+
+
+def _date_convert(obj, func, op):
     if op == _PICKLING:
         return {'__class__': 'date',
                 'year': obj.year,
@@ -54,7 +71,7 @@
                 obj['year'], obj['month'], obj['day'])
 
 
-def _datetime_convert(obj, op):
+def _datetime_convert(obj, func, op):
     if op == _PICKLING:
         return {'__class__': 'datetime',
                 'year': obj.year,
@@ -70,7 +87,7 @@
                 obj['hour'], obj['minute'], obj['second'], obj['microsecond'])
 
 
-def _time_convert(obj, op):
+def _time_convert(obj, func, op):
     if op == _PICKLING:
         return {'__class__': 'time',
                 'hour': obj.hour,
@@ -82,56 +99,66 @@
                 obj['hour'], obj['minute'], obj['second'], obj['microsecond'])
 
 
-_identity_dispatch = object()
+_type_convert = {
+        type(None): _identity_dispatch,
+        bool: _identity_dispatch,
+        int: _identity_dispatch,
+        float: _identity_dispatch,
+        str: _identity_dispatch,
+        datetime.date: _date_convert,
+        datetime.datetime: _datetime_convert,
+        datetime.time: _time_convert,
+        tuple: _tuple_convert,
+        list: _list_convert,
+        dict: _dict_convert,
+        set: _set_convert,
+        collections.OrderedDict: _ordered_dict_convert,
+        }
 
-_type_dispatch = {
+
+_type_unconvert = {
         type(None): _identity_dispatch,
         bool: _identity_dispatch,
         int: _identity_dispatch,
         float: _identity_dispatch,
         str: _identity_dispatch,
-        tuple: _tuple_dispatch,
-        list: _list_dispatch,
-        dict: _dict_dispatch,
-        collections.OrderedDict: _dict_dispatch,
-        set: _set_dispatch
+        'date': _date_convert,
+        'datetime': _datetime_convert,
+        'time': _time_convert,
         }
 
 
-_type_convert = {
-        datetime.date: _date_convert,
-        datetime.datetime: _datetime_convert,
-        datetime.time: _time_convert
+_collection_unconvert = {
+        '__type__:tuple': _tuple_convert,
+        '__type__:set': _set_convert,
         }
 
 
-_type_unconvert = {
-        'date': _date_convert,
-        'datetime': _datetime_convert,
-        'time': _time_convert
+_mapping_unconvert = {
+        'OrderedDict': _ordered_dict_convert
         }
 
 
 def _pickle_object(obj):
     t = type(obj)
-    disp = _type_dispatch.get(t)
-    if disp is _identity_dispatch:
+    conv = _type_convert.get(t)
+
+    # Object doesn't need conversion?
+    if conv is _identity_dispatch:
         return obj
 
-    if disp is not None:
-        return disp(obj, _pickle_object, _PICKLING)
+    # Object has special conversion?
+    if conv is not None:
+        return conv(obj, _pickle_object, _PICKLING)
 
-    conv = _type_convert.get(t)
-    if conv is not None:
-        return conv(obj, _PICKLING)
-
+    # Use instance dictionary, or a custom state.
     getter = getattr(obj, '__getstate__', None)
     if getter is not None:
         state = getter()
     else:
         state = obj.__dict__
 
-    state = _dict_dispatch(state, _pickle_object, _PICKLING)
+    state = _dict_convert(state, _pickle_object, _PICKLING)
     state['__class__'] = obj.__class__.__name__
     state['__module__'] = obj.__class__.__module__
 
@@ -140,18 +167,42 @@
 
 def _unpickle_object(state):
     t = type(state)
-    disp = _type_dispatch.get(t)
-    if disp is _identity_dispatch:
+    conv = _type_unconvert.get(t)
+
+    # Object doesn't need conversion?
+    if conv is _identity_dispatch:
         return state
 
-    if (disp is not None and
-            (t != dict or '__class__' not in state)):
-        return disp(state, _unpickle_object, _UNPICKLING)
+    # Try collection or mapping conversion.
+    if t is list:
+        try:
+            col_type = state[0]
+            if not isinstance(col_type, str):
+                col_type = None
+        except IndexError:
+            col_type = None
+        if col_type is not None:
+            conv = _collection_unconvert.get(col_type)
+            if conv is not None:
+                return conv(state, _unpickle_object, _UNPICKLING)
+        return _list_convert(state, _unpickle_object, _UNPICKLING)
 
-    class_name = state['__class__']
+    assert t is dict
+
+    # Custom mapping type?
+    map_type = state.get('__type__')
+    if map_type:
+        conv = _mapping_unconvert.get(map_type)
+        return conv(state, _unpickle_object, _UNPICKLING)
+
+    # Class instance or other custom type.
+    class_name = state.get('__class__')
+    if class_name is None:
+        return _dict_convert(state, _unpickle_object, _UNPICKLING)
+
     conv = _type_unconvert.get(class_name)
     if conv is not None:
-        return conv(state, _UNPICKLING)
+        return conv(state, _unpickle_object, _UNPICKLING)
 
     mod_name = state['__module__']
     mod = sys.modules[mod_name]
--- a/piecrust/workerpool.py	Sat Jul 11 00:45:35 2015 -0700
+++ b/piecrust/workerpool.py	Sat Jul 11 17:51:56 2015 -0700
@@ -1,5 +1,6 @@
 import os
 import sys
+import zlib
 import logging
 import itertools
 import threading
@@ -43,13 +44,12 @@
 
 
 def _real_worker_func(params):
-    if hasattr(params.inqueue, '_writer'):
-        params.inqueue._writer.close()
-        params.outqueue._reader.close()
-
     wid = params.wid
     logger.debug("Worker %d initializing..." % wid)
 
+    params.inqueue._writer.close()
+    params.outqueue._reader.close()
+
     w = params.worker_class(*params.initargs)
     w.wid = wid
     try:
@@ -88,9 +88,8 @@
             task_data = (task_data,)
 
         for t in task_data:
-            td = unpickle(t)
             try:
-                res = (TASK_JOB, True, wid, w.process(td))
+                res = (TASK_JOB, True, wid, w.process(t))
             except Exception as e:
                 if params.wrap_exception:
                     e = multiprocessing.ExceptionWithTraceback(
@@ -120,10 +119,17 @@
                  wrap_exception=False):
         worker_count = worker_count or os.cpu_count() or 1
 
-        self._task_queue = multiprocessing.SimpleQueue()
-        self._result_queue = multiprocessing.SimpleQueue()
-        self._quick_put = self._task_queue._writer.send
-        self._quick_get = self._result_queue._reader.recv
+        use_fastqueue = True
+        if use_fastqueue:
+            self._task_queue = FastQueue()
+            self._result_queue = FastQueue()
+            self._quick_put = self._task_queue.put
+            self._quick_get = self._result_queue.get
+        else:
+            self._task_queue = multiprocessing.SimpleQueue()
+            self._result_queue = multiprocessing.SimpleQueue()
+            self._quick_put = self._task_queue._writer.send
+            self._quick_get = self._result_queue._reader.recv
 
         self._callback = None
         self._error_callback = None
@@ -188,13 +194,11 @@
 
         if chunk_size is None or chunk_size == 1:
             for job in jobs:
-                job_data = pickle(job)
-                self._quick_put((TASK_JOB, job_data))
+                self._quick_put((TASK_JOB, job))
         else:
             it = iter(jobs)
             while True:
-                batch = tuple([pickle(i)
-                               for i in itertools.islice(it, chunk_size)])
+                batch = tuple([i for i in itertools.islice(it, chunk_size)])
                 if not batch:
                     break
                 self._quick_put((TASK_BATCH, batch))
@@ -304,3 +308,38 @@
         logger.error("Worker %d failed to send its report." % wid)
         logger.exception(data)
 
+
+class FastQueue(object):
+    def __init__(self, compress=False):
+        self._reader, self._writer = multiprocessing.Pipe(duplex=False)
+        self._rlock = multiprocessing.Lock()
+        self._wlock = multiprocessing.Lock()
+        self._compress = compress
+
+    def __getstate__(self):
+        return (self._reader, self._writer, self._rlock, self._wlock,
+                self._compress)
+
+    def __setstate__(self, state):
+        (self._reader, self._writer, self._rlock, self._wlock,
+            self._compress) = state
+
+    def get(self):
+        with self._rlock:
+            raw = self._reader.recv_bytes()
+        if self._compress:
+            data = zlib.decompress(raw)
+        else:
+            data = raw
+        obj = unpickle(data)
+        return obj
+
+    def put(self, obj):
+        data = pickle(obj)
+        if self._compress:
+            raw = zlib.compress(data)
+        else:
+            raw = data
+        with self._wlock:
+            self._writer.send_bytes(raw)
+