changeset 697:9e5393fcfab2

bake: Re-enable faster serialization between processes.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 23 Mar 2016 12:19:35 -0700
parents 3bc9f857eb48
children 33ab9badfd7a
files piecrust/fastpickle.py piecrust/workerpool.py
diffstat 2 files changed, 67 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/fastpickle.py	Wed Mar 23 10:54:03 2016 -0700
+++ b/piecrust/fastpickle.py	Wed Mar 23 12:19:35 2016 -0700
@@ -1,5 +1,6 @@
 import sys
 import json
+import codecs
 import datetime
 import collections
 
@@ -10,12 +11,32 @@
     return data.encode('utf8')
 
 
+def pickle_intob(obj, buf):
+    data = _pickle_object(obj)
+    buf = _WriteWrapper(buf)
+    json.dump(data, buf, indent=None, separators=(',', ':'))
+
+
 def unpickle(data):
-    data = data.decode('utf8')
+    data = json.loads(data.decode('utf8'))
+    return _unpickle_object(data)
+
+
+def unpickle_fromb(buf, bufsize):
+    with buf.getbuffer() as innerbuf:
+        data = codecs.decode(innerbuf[:bufsize], encoding='utf8')
     data = json.loads(data)
     return _unpickle_object(data)
 
 
+class _WriteWrapper(object):
+    def __init__(self, buf):
+        self._buf = buf
+
+    def write(self, data):
+        self._buf.write(data.encode('utf8'))
+
+
 _PICKLING = 0
 _UNPICKLING = 1
 
--- a/piecrust/workerpool.py	Wed Mar 23 10:54:03 2016 -0700
+++ b/piecrust/workerpool.py	Wed Mar 23 12:19:35 2016 -0700
@@ -4,15 +4,17 @@
 import time
 import zlib
 import queue
-import pickle
 import logging
 import itertools
 import threading
 import multiprocessing
+from piecrust import fastpickle
 
 
 logger = logging.getLogger(__name__)
 
+use_fastqueue = True
+
 
 class IWorker(object):
     def initialize(self):
@@ -73,21 +75,29 @@
         params.outqueue.put(None)
         return
 
-    # Create threads to read/write the jobs and results from/to the
-    # main arbitrator process.
-    local_job_queue = queue.Queue()
-    reader_thread = threading.Thread(
-            target=_job_queue_reader,
-            args=(params.inqueue.get, local_job_queue),
-            name="JobQueueReaderThread")
-    reader_thread.start()
+    use_threads = False
+    if use_threads:
+        # Create threads to read/write the jobs and results from/to the
+        # main arbitrator process.
+        local_job_queue = queue.Queue()
+        reader_thread = threading.Thread(
+                target=_job_queue_reader,
+                args=(params.inqueue.get, local_job_queue),
+                name="JobQueueReaderThread")
+        reader_thread.start()
 
-    local_result_queue = queue.Queue()
-    writer_thread = threading.Thread(
-            target=_job_results_writer,
-            args=(local_result_queue, params.outqueue.put),
-            name="JobResultWriterThread")
-    writer_thread.start()
+        local_result_queue = queue.Queue()
+        writer_thread = threading.Thread(
+                target=_job_results_writer,
+                args=(local_result_queue, params.outqueue.put),
+                name="JobResultWriterThread")
+        writer_thread.start()
+
+        get = local_job_queue.get
+        put = local_result_queue.put_nowait
+    else:
+        get = params.inqueue.get
+        put = params.outqueue.put
 
     # Start pumping!
     completed = 0
@@ -95,8 +105,7 @@
     time_in_put = 0
     while True:
         get_start_time = time.perf_counter()
-        task = local_job_queue.get()
-        local_job_queue.task_done()
+        task = get()
         time_in_get += (time.perf_counter() - get_start_time)
 
         task_type, task_data = task
@@ -113,7 +122,7 @@
                     e = multiprocessing.ExceptionWithTraceback(
                             e, e.__traceback__)
                 rep = (task_type, False, wid, (wid, e))
-            local_result_queue.put_nowait(rep)
+            put(rep)
             break
 
         if task_type == TASK_JOB:
@@ -129,15 +138,16 @@
                 res = (TASK_JOB, False, wid, e)
 
             put_start_time = time.perf_counter()
-            local_result_queue.put_nowait(res)
+            put(res)
             time_in_put += (time.perf_counter() - put_start_time)
 
             completed += 1
 
-    logger.debug("Worker %d waiting for reader/writer threads." % wid)
-    local_result_queue.put_nowait(None)
-    reader_thread.join()
-    writer_thread.join()
+    if use_threads:
+        logger.debug("Worker %d waiting for reader/writer threads." % wid)
+        local_result_queue.put_nowait(None)
+        reader_thread.join()
+        writer_thread.join()
 
     logger.debug("Worker %d completed %d tasks." % (wid, completed))
 
@@ -147,7 +157,7 @@
         try:
             task = getter()
         except (EOFError, OSError):
-            logger.debug("Worker %d encountered connection problem." % wid)
+            logger.debug("Worker encountered connection problem.")
             break
 
         out_queue.put_nowait(task)
@@ -189,7 +199,6 @@
                  wrap_exception=False):
         worker_count = worker_count or os.cpu_count() or 1
 
-        use_fastqueue = False
         if use_fastqueue:
             self._task_queue = FastQueue()
             self._result_queue = FastQueue()
@@ -388,6 +397,9 @@
         self._reader, self._writer = multiprocessing.Pipe(duplex=False)
         self._rlock = multiprocessing.Lock()
         self._wlock = multiprocessing.Lock()
+        self._initBuffers()
+
+    def _initBuffers(self):
         self._rbuf = io.BytesIO()
         self._rbuf.truncate(256)
         self._wbuf = io.BytesIO()
@@ -398,19 +410,21 @@
 
     def __setstate__(self, state):
         (self._reader, self._writer, self._rlock, self._wlock) = state
+        self._initBuffers()
 
     def get(self):
         with self._rlock:
             try:
                 with self._rbuf.getbuffer() as b:
-                    self._reader.recv_bytes_into(b)
+                    bufsize = self._reader.recv_bytes_into(b)
             except multiprocessing.BufferTooShort as e:
-                self._rbuf.truncate(len(e.args[0]) * 2)
+                bufsize = len(e.args[0])
+                self._rbuf.truncate(bufsize * 2)
                 self._rbuf.seek(0)
                 self._rbuf.write(e.args[0])
 
         self._rbuf.seek(0)
-        return self._unpickle(self._rbuf)
+        return self._unpickle(self._rbuf, bufsize)
 
     def put(self, obj):
         self._wbuf.seek(0)
@@ -423,8 +437,8 @@
                 self._writer.send_bytes(b, 0, size)
 
     def _pickle(self, obj, buf):
-        pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
+        fastpickle.pickle_intob(obj, buf)
 
-    def _unpickle(self, buf):
-        return pickle.load(buf)
+    def _unpickle(self, buf, bufsize):
+        return fastpickle.unpickle_fromb(buf, bufsize)