changeset 693:d2a87365b85b

bake: Use threads to read/write from/to the main arbitrator process. Since the GIL is released most of the time during blocking I/O operations, this should let the worker threads do more during that time.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 23 Mar 2016 01:53:57 -0700
parents c11a4339fccb
children b917ae071994
files piecrust/workerpool.py
diffstat 1 files changed, 57 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/workerpool.py	Mon Mar 21 22:29:18 2016 -0700
+++ b/piecrust/workerpool.py	Wed Mar 23 01:53:57 2016 -0700
@@ -3,6 +3,7 @@
 import sys
 import time
 import zlib
+import queue
 import pickle
 import logging
 import itertools
@@ -57,9 +58,11 @@
     wid = params.wid
     logger.debug("Worker %d initializing..." % wid)
 
+    # We don't need those.
     params.inqueue._writer.close()
     params.outqueue._reader.close()
 
+    # Initialize the underlying worker class.
     w = params.worker_class(*params.initargs)
     w.wid = wid
     try:
@@ -70,19 +73,30 @@
         params.outqueue.put(None)
         return
 
-    get = params.inqueue.get
-    put = params.outqueue.put
+    # Create threads to read/write the jobs and results from/to the
+    # main arbitrator process.
+    local_job_queue = queue.Queue()
+    reader_thread = threading.Thread(
+            target=_job_queue_reader,
+            args=(params.inqueue.get, local_job_queue),
+            name="JobQueueReaderThread")
+    reader_thread.start()
 
+    local_result_queue = queue.Queue()
+    writer_thread = threading.Thread(
+            target=_job_results_writer,
+            args=(local_result_queue, params.outqueue.put),
+            name="JobResultWriterThread")
+    writer_thread.start()
+
+    # Start pumping!
     completed = 0
     time_in_get = 0
     time_in_put = 0
     while True:
         get_start_time = time.perf_counter()
-        try:
-            task = get()
-        except (EOFError, OSError):
-            logger.debug("Worker %d encountered connection problem." % wid)
-            break
+        task = local_job_queue.get()
+        local_job_queue.task_done()
         time_in_get += (time.perf_counter() - get_start_time)
 
         task_type, task_data = task
@@ -99,7 +113,7 @@
                     e = multiprocessing.ExceptionWithTraceback(
                             e, e.__traceback__)
                 rep = (task_type, False, wid, (wid, e))
-            put(rep)
+            local_result_queue.put_nowait(rep)
             break
 
         if task_type == TASK_JOB:
@@ -115,14 +129,48 @@
                 res = (TASK_JOB, False, wid, e)
 
             put_start_time = time.perf_counter()
-            put(res)
+            local_result_queue.put_nowait(res)
             time_in_put += (time.perf_counter() - put_start_time)
 
             completed += 1
 
+    logger.debug("Worker %d waiting for reader/writer threads." % wid)
+    local_result_queue.put_nowait(None)
+    reader_thread.join()
+    writer_thread.join()
+
     logger.debug("Worker %d completed %d tasks." % (wid, completed))
 
 
+def _job_queue_reader(getter, out_queue):
+    while True:
+        try:
+            task = getter()
+        except (EOFError, OSError):
+            logger.debug("Worker %d encountered connection problem." % wid)
+            break
+
+        out_queue.put_nowait(task)
+
+        if task[0] == TASK_END:
+            # Done reading jobs from the main process.
+            logger.debug("Got end task, exiting task queue reader thread.")
+            break
+
+
+def _job_results_writer(in_queue, putter):
+    while True:
+        res = in_queue.get()
+        if res is not None:
+            putter(res)
+            in_queue.task_done()
+        else:
+            # Got sentinel. Exit.
+            in_queue.task_done()
+            break
+    logger.debug("Exiting result queue writer thread.")
+
+
 class _WorkerParams(object):
     def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
                  wrap_exception=False, is_profiling=False):