# HG changeset patch # User Ludovic Chabant # Date 1458760775 25200 # Node ID 9e5393fcfab21006c571b792a65fa3c8e84d417a # Parent 3bc9f857eb48eefc89aea6377fa0c565dc59faf1 bake: Re-enable faster serialization between processes. diff -r 3bc9f857eb48 -r 9e5393fcfab2 piecrust/fastpickle.py --- a/piecrust/fastpickle.py Wed Mar 23 10:54:03 2016 -0700 +++ b/piecrust/fastpickle.py Wed Mar 23 12:19:35 2016 -0700 @@ -1,5 +1,6 @@ import sys import json +import codecs import datetime import collections @@ -10,12 +11,32 @@ return data.encode('utf8') +def pickle_intob(obj, buf): + data = _pickle_object(obj) + buf = _WriteWrapper(buf) + json.dump(data, buf, indent=None, separators=(',', ':')) + + def unpickle(data): - data = data.decode('utf8') + data = json.loads(data.decode('utf8')) + return _unpickle_object(data) + + +def unpickle_fromb(buf, bufsize): + with buf.getbuffer() as innerbuf: + data = codecs.decode(innerbuf[:bufsize], encoding='utf8') data = json.loads(data) return _unpickle_object(data) +class _WriteWrapper(object): + def __init__(self, buf): + self._buf = buf + + def write(self, data): + self._buf.write(data.encode('utf8')) + + _PICKLING = 0 _UNPICKLING = 1 diff -r 3bc9f857eb48 -r 9e5393fcfab2 piecrust/workerpool.py --- a/piecrust/workerpool.py Wed Mar 23 10:54:03 2016 -0700 +++ b/piecrust/workerpool.py Wed Mar 23 12:19:35 2016 -0700 @@ -4,15 +4,17 @@ import time import zlib import queue -import pickle import logging import itertools import threading import multiprocessing +from piecrust import fastpickle logger = logging.getLogger(__name__) +use_fastqueue = True + class IWorker(object): def initialize(self): @@ -73,21 +75,29 @@ params.outqueue.put(None) return - # Create threads to read/write the jobs and results from/to the - # main arbitrator process. - local_job_queue = queue.Queue() - reader_thread = threading.Thread( - target=_job_queue_reader, - args=(params.inqueue.get, local_job_queue), - name="JobQueueReaderThread") - reader_thread.start() + use_threads = False + if use_threads: + # Create threads to read/write the jobs and results from/to the + # main arbitrator process. + local_job_queue = queue.Queue() + reader_thread = threading.Thread( + target=_job_queue_reader, + args=(params.inqueue.get, local_job_queue), + name="JobQueueReaderThread") + reader_thread.start() - local_result_queue = queue.Queue() - writer_thread = threading.Thread( - target=_job_results_writer, - args=(local_result_queue, params.outqueue.put), - name="JobResultWriterThread") - writer_thread.start() + local_result_queue = queue.Queue() + writer_thread = threading.Thread( + target=_job_results_writer, + args=(local_result_queue, params.outqueue.put), + name="JobResultWriterThread") + writer_thread.start() + + get = local_job_queue.get + put = local_result_queue.put_nowait + else: + get = params.inqueue.get + put = params.outqueue.put # Start pumping! completed = 0 @@ -95,8 +105,7 @@ time_in_put = 0 while True: get_start_time = time.perf_counter() - task = local_job_queue.get() - local_job_queue.task_done() + task = get() time_in_get += (time.perf_counter() - get_start_time) task_type, task_data = task @@ -113,7 +122,7 @@ e = multiprocessing.ExceptionWithTraceback( e, e.__traceback__) rep = (task_type, False, wid, (wid, e)) - local_result_queue.put_nowait(rep) + put(rep) break if task_type == TASK_JOB: @@ -129,15 +138,16 @@ res = (TASK_JOB, False, wid, e) put_start_time = time.perf_counter() - local_result_queue.put_nowait(res) + put(res) time_in_put += (time.perf_counter() - put_start_time) completed += 1 - logger.debug("Worker %d waiting for reader/writer threads." % wid) - local_result_queue.put_nowait(None) - reader_thread.join() - writer_thread.join() + if use_threads: + logger.debug("Worker %d waiting for reader/writer threads." % wid) + local_result_queue.put_nowait(None) + reader_thread.join() + writer_thread.join() logger.debug("Worker %d completed %d tasks." % (wid, completed)) @@ -147,7 +157,7 @@ try: task = getter() except (EOFError, OSError): - logger.debug("Worker %d encountered connection problem." % wid) + logger.debug("Worker encountered connection problem.") break out_queue.put_nowait(task) @@ -189,7 +199,6 @@ wrap_exception=False): worker_count = worker_count or os.cpu_count() or 1 - use_fastqueue = False if use_fastqueue: self._task_queue = FastQueue() self._result_queue = FastQueue() @@ -388,6 +397,9 @@ self._reader, self._writer = multiprocessing.Pipe(duplex=False) self._rlock = multiprocessing.Lock() self._wlock = multiprocessing.Lock() + self._initBuffers() + + def _initBuffers(self): self._rbuf = io.BytesIO() self._rbuf.truncate(256) self._wbuf = io.BytesIO() @@ -398,19 +410,21 @@ def __setstate__(self, state): (self._reader, self._writer, self._rlock, self._wlock) = state + self._initBuffers() def get(self): with self._rlock: try: with self._rbuf.getbuffer() as b: - self._reader.recv_bytes_into(b) + bufsize = self._reader.recv_bytes_into(b) except multiprocessing.BufferTooShort as e: - self._rbuf.truncate(len(e.args[0]) * 2) + bufsize = len(e.args[0]) + self._rbuf.truncate(bufsize * 2) self._rbuf.seek(0) self._rbuf.write(e.args[0]) self._rbuf.seek(0) - return self._unpickle(self._rbuf) + return self._unpickle(self._rbuf, bufsize) def put(self, obj): self._wbuf.seek(0) @@ -423,8 +437,8 @@ self._writer.send_bytes(b, 0, size) def _pickle(self, obj, buf): - pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) + fastpickle.pickle_intob(obj, buf) - def _unpickle(self, buf): - return pickle.load(buf) + def _unpickle(self, buf, bufsize): + return fastpickle.unpickle_fromb(buf, bufsize)