diff piecrust/workerpool.py @ 461:b015e38d4ee1

internal: Handle data serialization more under the hood. Implement a new queue class that handles pickling of objects, add option (currently disabled) to also do zlib compression before sending bytes.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 17:51:56 -0700
parents 55fc8918cb75
children 04abc97dd3b6
line wrap: on
line diff
--- a/piecrust/workerpool.py	Sat Jul 11 00:45:35 2015 -0700
+++ b/piecrust/workerpool.py	Sat Jul 11 17:51:56 2015 -0700
@@ -1,5 +1,6 @@
 import os
 import sys
+import zlib
 import logging
 import itertools
 import threading
@@ -43,13 +44,12 @@
 
 
 def _real_worker_func(params):
-    if hasattr(params.inqueue, '_writer'):
-        params.inqueue._writer.close()
-        params.outqueue._reader.close()
-
     wid = params.wid
     logger.debug("Worker %d initializing..." % wid)
 
+    params.inqueue._writer.close()
+    params.outqueue._reader.close()
+
     w = params.worker_class(*params.initargs)
     w.wid = wid
     try:
@@ -88,9 +88,8 @@
             task_data = (task_data,)
 
         for t in task_data:
-            td = unpickle(t)
             try:
-                res = (TASK_JOB, True, wid, w.process(td))
+                res = (TASK_JOB, True, wid, w.process(t))
             except Exception as e:
                 if params.wrap_exception:
                     e = multiprocessing.ExceptionWithTraceback(
@@ -120,10 +119,17 @@
                  wrap_exception=False):
         worker_count = worker_count or os.cpu_count() or 1
 
-        self._task_queue = multiprocessing.SimpleQueue()
-        self._result_queue = multiprocessing.SimpleQueue()
-        self._quick_put = self._task_queue._writer.send
-        self._quick_get = self._result_queue._reader.recv
+        use_fastqueue = True
+        if use_fastqueue:
+            self._task_queue = FastQueue()
+            self._result_queue = FastQueue()
+            self._quick_put = self._task_queue.put
+            self._quick_get = self._result_queue.get
+        else:
+            self._task_queue = multiprocessing.SimpleQueue()
+            self._result_queue = multiprocessing.SimpleQueue()
+            self._quick_put = self._task_queue._writer.send
+            self._quick_get = self._result_queue._reader.recv
 
         self._callback = None
         self._error_callback = None
@@ -188,13 +194,11 @@
 
         if chunk_size is None or chunk_size == 1:
             for job in jobs:
-                job_data = pickle(job)
-                self._quick_put((TASK_JOB, job_data))
+                self._quick_put((TASK_JOB, job))
         else:
             it = iter(jobs)
             while True:
-                batch = tuple([pickle(i)
-                               for i in itertools.islice(it, chunk_size)])
+                batch = tuple([i for i in itertools.islice(it, chunk_size)])
                 if not batch:
                     break
                 self._quick_put((TASK_BATCH, batch))
@@ -304,3 +308,38 @@
         logger.error("Worker %d failed to send its report." % wid)
         logger.exception(data)
 
+
+class FastQueue(object):
+    def __init__(self, compress=False):
+        self._reader, self._writer = multiprocessing.Pipe(duplex=False)
+        self._rlock = multiprocessing.Lock()
+        self._wlock = multiprocessing.Lock()
+        self._compress = compress
+
+    def __getstate__(self):
+        return (self._reader, self._writer, self._rlock, self._wlock,
+                self._compress)
+
+    def __setstate__(self, state):
+        (self._reader, self._writer, self._rlock, self._wlock,
+            self._compress) = state
+
+    def get(self):
+        with self._rlock:
+            raw = self._reader.recv_bytes()
+        if self._compress:
+            data = zlib.decompress(raw)
+        else:
+            data = raw
+        obj = unpickle(data)
+        return obj
+
+    def put(self, obj):
+        data = pickle(obj)
+        if self._compress:
+            raw = zlib.compress(data)
+        else:
+            raw = data
+        with self._wlock:
+            self._writer.send_bytes(raw)
+