diff piecrust/workerpool.py @ 991:1857dbd4580f

bake: Fix bugs introduced by bake optimizations, of course. - Make the execution stats JSON-serializable. - Re-add ability to differentiate between sources used during segment rendering and during layout rendering. Fixes problems with cache invalidation of pages that use other sources. - Make taxonomy-related stuff JSON-serializable.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 20 Nov 2017 23:06:47 -0800
parents 8adc27285d93
children 09c3d415d9e5
line wrap: on
line diff
--- a/piecrust/workerpool.py	Sun Nov 19 14:29:52 2017 -0800
+++ b/piecrust/workerpool.py	Mon Nov 20 23:06:47 2017 -0800
@@ -11,7 +11,7 @@
 
 logger = logging.getLogger(__name__)
 
-use_fastqueue = True
+use_fastqueue = False
 use_fastpickle = False
 use_msgpack = False
 use_marshall = False
@@ -202,7 +202,8 @@
             stats.registerTimer('WorkerResultPut', time=time_in_put)
             try:
                 stats.mergeStats(w.getStats())
-                rep = (task_type, wid, [(task_data, (wid, stats), True)])
+                stats_data = stats.toData()
+                rep = (task_type, wid, [(task_data, (wid, stats_data), True)])
             except Exception as e:
                 logger.debug(
                     "Error getting report, sending exception to main process:")
@@ -439,6 +440,7 @@
         self.reports = [None] * worker_count
         self._count = worker_count
         self._received = 0
+        self._lock = threading.Lock()
         self._event = threading.Event()
 
     def wait(self, timeout=None):
@@ -450,11 +452,14 @@
             logger.error("Ignoring report from unknown worker %d." % wid)
             return
 
-        self._received += 1
-        self.reports[wid] = data
+        stats = ExecutionStats()
+        stats.fromData(data)
 
-        if self._received == self._count:
-            self._event.set()
+        with self._lock:
+            self.reports[wid] = stats
+            self._received += 1
+            if self._received == self._count:
+                self._event.set()
 
     def _handleError(self, job, res, _):
         logger.error("Worker %d failed to send its report." % res.wid)
@@ -467,6 +472,7 @@
         self._rlock = multiprocessing.Lock()
         self._wlock = multiprocessing.Lock()
         self._initBuffers()
+        self._initSerializer()
 
     def _initBuffers(self):
         self._rbuf = io.BytesIO()
@@ -474,6 +480,9 @@
         self._wbuf = io.BytesIO()
         self._wbuf.truncate(256)
 
+    def _initSerializer(self):
+        pass
+
     def __getstate__(self):
         return (self._reader, self._writer, self._rlock, self._wlock)
 
@@ -483,6 +492,7 @@
 
     def get(self):
         with self._rlock:
+            self._rbuf.seek(0)
             try:
                 with self._rbuf.getbuffer() as b:
                     bufsize = self._reader.recv_bytes_into(b)
@@ -493,11 +503,11 @@
                 self._rbuf.write(e.args[0])
 
         self._rbuf.seek(0)
-        return _unpickle(self._rbuf, bufsize)
+        return _unpickle(self, self._rbuf, bufsize)
 
     def put(self, obj):
         self._wbuf.seek(0)
-        _pickle(obj, self._wbuf)
+        _pickle(self, obj, self._wbuf)
         size = self._wbuf.tell()
 
         self._wbuf.seek(0)
@@ -506,13 +516,25 @@
                 self._writer.send_bytes(b, 0, size)
 
 
+class _BufferWrapper:
+    def __init__(self, buf, read_size=0):
+        self._buf = buf
+        self._read_size = read_size
+
+    def write(self, data):
+        self._buf.write(data.encode('utf8'))
+
+    def read(self):
+        return self._buf.read(self._read_size).decode('utf8')
+
+
 if use_fastpickle:
     from piecrust import fastpickle
 
-    def _pickle_fast(obj, buf):
+    def _pickle_fast(queue, obj, buf):
         fastpickle.pickle_intob(obj, buf)
 
-    def _unpickle_fast(buf, bufsize):
+    def _unpickle_fast(queue, buf, bufsize):
         return fastpickle.unpickle_fromb(buf, bufsize)
 
     _pickle = _pickle_fast
@@ -521,22 +543,30 @@
 elif use_msgpack:
     import msgpack
 
-    def _pickle_msgpack(obj, buf):
-        msgpack.pack(obj, buf)
+    def _pickle_msgpack(queue, obj, buf):
+        buf.write(queue._packer.pack(obj))
 
-    def _unpickle_msgpack(buf, bufsize):
-        return msgpack.unpack(buf)
+    def _unpickle_msgpack(queue, buf, bufsize):
+        queue._unpacker.feed(buf.getbuffer())
+        for o in queue._unpacker:
+            return o
+        # return msgpack.unpack(buf)
+
+    def _init_msgpack(queue):
+        queue._packer = msgpack.Packer()
+        queue._unpacker = msgpack.Unpacker()
 
     _pickle = _pickle_msgpack
     _unpickle = _unpickle_msgpack
+    FastQueue._initSerializer = _init_msgpack
 
 elif use_marshall:
     import marshal
 
-    def _pickle_marshal(obj, buf):
+    def _pickle_marshal(queue, obj, buf):
         marshal.dump(obj, buf)
 
-    def _unpickle_marshal(buf, bufsize):
+    def _unpickle_marshal(queue, buf, bufsize):
         return marshal.load(buf)
 
     _pickle = _pickle_marshal
@@ -545,22 +575,12 @@
 elif use_json:
     import json
 
-    class _BufferWrapper:
-        def __init__(self, buf):
-            self._buf = buf
-
-        def write(self, data):
-            self._buf.write(data.encode('utf8'))
-
-        def read(self):
-            return self._buf.read().decode('utf8')
-
-    def _pickle_json(obj, buf):
+    def _pickle_json(queue, obj, buf):
         buf = _BufferWrapper(buf)
         json.dump(obj, buf, indent=None, separators=(',', ':'))
 
-    def _unpickle_json(buf, bufsize):
-        buf = _BufferWrapper(buf)
+    def _unpickle_json(queue, buf, bufsize):
+        buf = _BufferWrapper(buf, bufsize)
         return json.load(buf)
 
     _pickle = _pickle_json
@@ -569,10 +589,10 @@
 else:
     import pickle
 
-    def _pickle_default(obj, buf):
+    def _pickle_default(queue, obj, buf):
         pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
 
-    def _unpickle_default(buf, bufsize):
+    def _unpickle_default(queue, buf, bufsize):
         return pickle.load(buf)
 
     _pickle = _pickle_default