# HG changeset patch # User Ludovic Chabant # Date 1458723237 25200 # Node ID d2a87365b85b3bf12e0f10de1eab21426b8ae23d # Parent c11a4339fccbcf50b0783c85456c939a932a4bce bake: Use threads to read/write from/to the main arbitrator process. Since the GIL is released most of the time during blocking I/O operations, this should let the worker threads do more during that time. diff -r c11a4339fccb -r d2a87365b85b piecrust/workerpool.py --- a/piecrust/workerpool.py Mon Mar 21 22:29:18 2016 -0700 +++ b/piecrust/workerpool.py Wed Mar 23 01:53:57 2016 -0700 @@ -3,6 +3,7 @@ import sys import time import zlib +import queue import pickle import logging import itertools @@ -57,9 +58,11 @@ wid = params.wid logger.debug("Worker %d initializing..." % wid) + # We don't need those. params.inqueue._writer.close() params.outqueue._reader.close() + # Initialize the underlying worker class. w = params.worker_class(*params.initargs) w.wid = wid try: @@ -70,19 +73,30 @@ params.outqueue.put(None) return - get = params.inqueue.get - put = params.outqueue.put + # Create threads to read/write the jobs and results from/to the + # main arbitrator process. + local_job_queue = queue.Queue() + reader_thread = threading.Thread( + target=_job_queue_reader, + args=(params.inqueue.get, local_job_queue), + name="JobQueueReaderThread") + reader_thread.start() + local_result_queue = queue.Queue() + writer_thread = threading.Thread( + target=_job_results_writer, + args=(local_result_queue, params.outqueue.put), + name="JobResultWriterThread") + writer_thread.start() + + # Start pumping! completed = 0 time_in_get = 0 time_in_put = 0 while True: get_start_time = time.perf_counter() - try: - task = get() - except (EOFError, OSError): - logger.debug("Worker %d encountered connection problem." % wid) - break + task = local_job_queue.get() + local_job_queue.task_done() time_in_get += (time.perf_counter() - get_start_time) task_type, task_data = task @@ -99,7 +113,7 @@ e = multiprocessing.ExceptionWithTraceback( e, e.__traceback__) rep = (task_type, False, wid, (wid, e)) - put(rep) + local_result_queue.put_nowait(rep) break if task_type == TASK_JOB: @@ -115,14 +129,48 @@ res = (TASK_JOB, False, wid, e) put_start_time = time.perf_counter() - put(res) + local_result_queue.put_nowait(res) time_in_put += (time.perf_counter() - put_start_time) completed += 1 + logger.debug("Worker %d waiting for reader/writer threads." % wid) + local_result_queue.put_nowait(None) + reader_thread.join() + writer_thread.join() + logger.debug("Worker %d completed %d tasks." % (wid, completed)) +def _job_queue_reader(getter, out_queue): + while True: + try: + task = getter() + except (EOFError, OSError): + logger.debug("Worker %d encountered connection problem." % wid) + break + + out_queue.put_nowait(task) + + if task[0] == TASK_END: + # Done reading jobs from the main process. + logger.debug("Got end task, exiting task queue reader thread.") + break + + +def _job_results_writer(in_queue, putter): + while True: + res = in_queue.get() + if res is not None: + putter(res) + in_queue.task_done() + else: + # Got sentinel. Exit. + in_queue.task_done() + break + logger.debug("Exiting result queue writer thread.") + + class _WorkerParams(object): def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), wrap_exception=False, is_profiling=False):