Mercurial > piecrust2
diff piecrust/workerpool.py @ 991:1857dbd4580f
bake: Fix bugs introduced by bake optimizations, of course.
- Make the execution stats JSON-serializable.
- Re-add ability to differentiate between sources used during segment rendering
and during layout rendering. Fixes problems with cache invalidation of
pages that use other sources.
- Make taxonomy-related stuff JSON-serializable.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 20 Nov 2017 23:06:47 -0800 |
parents | 8adc27285d93 |
children | 09c3d415d9e5 |
line wrap: on
line diff
--- a/piecrust/workerpool.py Sun Nov 19 14:29:52 2017 -0800 +++ b/piecrust/workerpool.py Mon Nov 20 23:06:47 2017 -0800 @@ -11,7 +11,7 @@ logger = logging.getLogger(__name__) -use_fastqueue = True +use_fastqueue = False use_fastpickle = False use_msgpack = False use_marshall = False @@ -202,7 +202,8 @@ stats.registerTimer('WorkerResultPut', time=time_in_put) try: stats.mergeStats(w.getStats()) - rep = (task_type, wid, [(task_data, (wid, stats), True)]) + stats_data = stats.toData() + rep = (task_type, wid, [(task_data, (wid, stats_data), True)]) except Exception as e: logger.debug( "Error getting report, sending exception to main process:") @@ -439,6 +440,7 @@ self.reports = [None] * worker_count self._count = worker_count self._received = 0 + self._lock = threading.Lock() self._event = threading.Event() def wait(self, timeout=None): @@ -450,11 +452,14 @@ logger.error("Ignoring report from unknown worker %d." % wid) return - self._received += 1 - self.reports[wid] = data + stats = ExecutionStats() + stats.fromData(data) - if self._received == self._count: - self._event.set() + with self._lock: + self.reports[wid] = stats + self._received += 1 + if self._received == self._count: + self._event.set() def _handleError(self, job, res, _): logger.error("Worker %d failed to send its report." % res.wid) @@ -467,6 +472,7 @@ self._rlock = multiprocessing.Lock() self._wlock = multiprocessing.Lock() self._initBuffers() + self._initSerializer() def _initBuffers(self): self._rbuf = io.BytesIO() @@ -474,6 +480,9 @@ self._wbuf = io.BytesIO() self._wbuf.truncate(256) + def _initSerializer(self): + pass + def __getstate__(self): return (self._reader, self._writer, self._rlock, self._wlock) @@ -483,6 +492,7 @@ def get(self): with self._rlock: + self._rbuf.seek(0) try: with self._rbuf.getbuffer() as b: bufsize = self._reader.recv_bytes_into(b) @@ -493,11 +503,11 @@ self._rbuf.write(e.args[0]) self._rbuf.seek(0) - return _unpickle(self._rbuf, bufsize) + return _unpickle(self, self._rbuf, bufsize) def put(self, obj): self._wbuf.seek(0) - _pickle(obj, self._wbuf) + _pickle(self, obj, self._wbuf) size = self._wbuf.tell() self._wbuf.seek(0) @@ -506,13 +516,25 @@ self._writer.send_bytes(b, 0, size) +class _BufferWrapper: + def __init__(self, buf, read_size=0): + self._buf = buf + self._read_size = read_size + + def write(self, data): + self._buf.write(data.encode('utf8')) + + def read(self): + return self._buf.read(self._read_size).decode('utf8') + + if use_fastpickle: from piecrust import fastpickle - def _pickle_fast(obj, buf): + def _pickle_fast(queue, obj, buf): fastpickle.pickle_intob(obj, buf) - def _unpickle_fast(buf, bufsize): + def _unpickle_fast(queue, buf, bufsize): return fastpickle.unpickle_fromb(buf, bufsize) _pickle = _pickle_fast @@ -521,22 +543,30 @@ elif use_msgpack: import msgpack - def _pickle_msgpack(obj, buf): - msgpack.pack(obj, buf) + def _pickle_msgpack(queue, obj, buf): + buf.write(queue._packer.pack(obj)) - def _unpickle_msgpack(buf, bufsize): - return msgpack.unpack(buf) + def _unpickle_msgpack(queue, buf, bufsize): + queue._unpacker.feed(buf.getbuffer()) + for o in queue._unpacker: + return o + # return msgpack.unpack(buf) + + def _init_msgpack(queue): + queue._packer = msgpack.Packer() + queue._unpacker = msgpack.Unpacker() _pickle = _pickle_msgpack _unpickle = _unpickle_msgpack + FastQueue._initSerializer = _init_msgpack elif use_marshall: import marshal - def _pickle_marshal(obj, buf): + def _pickle_marshal(queue, obj, buf): marshal.dump(obj, buf) - def _unpickle_marshal(buf, bufsize): + def _unpickle_marshal(queue, buf, bufsize): return marshal.load(buf) _pickle = _pickle_marshal @@ -545,22 +575,12 @@ elif use_json: import json - class _BufferWrapper: - def __init__(self, buf): - self._buf = buf - - def write(self, data): - self._buf.write(data.encode('utf8')) - - def read(self): - return self._buf.read().decode('utf8') - - def _pickle_json(obj, buf): + def _pickle_json(queue, obj, buf): buf = _BufferWrapper(buf) json.dump(obj, buf, indent=None, separators=(',', ':')) - def _unpickle_json(buf, bufsize): - buf = _BufferWrapper(buf) + def _unpickle_json(queue, buf, bufsize): + buf = _BufferWrapper(buf, bufsize) return json.load(buf) _pickle = _pickle_json @@ -569,10 +589,10 @@ else: import pickle - def _pickle_default(obj, buf): + def _pickle_default(queue, obj, buf): pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) - def _unpickle_default(buf, bufsize): + def _unpickle_default(queue, buf, bufsize): return pickle.load(buf) _pickle = _pickle_default