comparison piecrust/workerpool.py @ 693:d2a87365b85b

bake: Use threads to read/write from/to the main arbitrator process. Since the GIL is released most of the time during blocking I/O operations, this should let the worker threads do more during that time.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 23 Mar 2016 01:53:57 -0700
parents 9ae9390192da
children 9e5393fcfab2
comparison
equal deleted inserted replaced
692:c11a4339fccb 693:d2a87365b85b
1 import io 1 import io
2 import os 2 import os
3 import sys 3 import sys
4 import time 4 import time
5 import zlib 5 import zlib
6 import queue
6 import pickle 7 import pickle
7 import logging 8 import logging
8 import itertools 9 import itertools
9 import threading 10 import threading
10 import multiprocessing 11 import multiprocessing
55 _pre_parse_chef_args(sys.argv[1:]) 56 _pre_parse_chef_args(sys.argv[1:])
56 57
57 wid = params.wid 58 wid = params.wid
58 logger.debug("Worker %d initializing..." % wid) 59 logger.debug("Worker %d initializing..." % wid)
59 60
61 # We don't need those.
60 params.inqueue._writer.close() 62 params.inqueue._writer.close()
61 params.outqueue._reader.close() 63 params.outqueue._reader.close()
62 64
65 # Initialize the underlying worker class.
63 w = params.worker_class(*params.initargs) 66 w = params.worker_class(*params.initargs)
64 w.wid = wid 67 w.wid = wid
65 try: 68 try:
66 w.initialize() 69 w.initialize()
67 except Exception as ex: 70 except Exception as ex:
68 logger.error("Working failed to initialize:") 71 logger.error("Working failed to initialize:")
69 logger.exception(ex) 72 logger.exception(ex)
70 params.outqueue.put(None) 73 params.outqueue.put(None)
71 return 74 return
72 75
73 get = params.inqueue.get 76 # Create threads to read/write the jobs and results from/to the
74 put = params.outqueue.put 77 # main arbitrator process.
75 78 local_job_queue = queue.Queue()
79 reader_thread = threading.Thread(
80 target=_job_queue_reader,
81 args=(params.inqueue.get, local_job_queue),
82 name="JobQueueReaderThread")
83 reader_thread.start()
84
85 local_result_queue = queue.Queue()
86 writer_thread = threading.Thread(
87 target=_job_results_writer,
88 args=(local_result_queue, params.outqueue.put),
89 name="JobResultWriterThread")
90 writer_thread.start()
91
92 # Start pumping!
76 completed = 0 93 completed = 0
77 time_in_get = 0 94 time_in_get = 0
78 time_in_put = 0 95 time_in_put = 0
79 while True: 96 while True:
80 get_start_time = time.perf_counter() 97 get_start_time = time.perf_counter()
81 try: 98 task = local_job_queue.get()
82 task = get() 99 local_job_queue.task_done()
83 except (EOFError, OSError):
84 logger.debug("Worker %d encountered connection problem." % wid)
85 break
86 time_in_get += (time.perf_counter() - get_start_time) 100 time_in_get += (time.perf_counter() - get_start_time)
87 101
88 task_type, task_data = task 102 task_type, task_data = task
89 if task_type == TASK_END: 103 if task_type == TASK_END:
90 logger.debug("Worker %d got end task, exiting." % wid) 104 logger.debug("Worker %d got end task, exiting." % wid)
97 logger.debug("Error getting report: %s" % e) 111 logger.debug("Error getting report: %s" % e)
98 if params.wrap_exception: 112 if params.wrap_exception:
99 e = multiprocessing.ExceptionWithTraceback( 113 e = multiprocessing.ExceptionWithTraceback(
100 e, e.__traceback__) 114 e, e.__traceback__)
101 rep = (task_type, False, wid, (wid, e)) 115 rep = (task_type, False, wid, (wid, e))
102 put(rep) 116 local_result_queue.put_nowait(rep)
103 break 117 break
104 118
105 if task_type == TASK_JOB: 119 if task_type == TASK_JOB:
106 task_data = (task_data,) 120 task_data = (task_data,)
107 121
113 e = multiprocessing.ExceptionWithTraceback( 127 e = multiprocessing.ExceptionWithTraceback(
114 e, e.__traceback__) 128 e, e.__traceback__)
115 res = (TASK_JOB, False, wid, e) 129 res = (TASK_JOB, False, wid, e)
116 130
117 put_start_time = time.perf_counter() 131 put_start_time = time.perf_counter()
118 put(res) 132 local_result_queue.put_nowait(res)
119 time_in_put += (time.perf_counter() - put_start_time) 133 time_in_put += (time.perf_counter() - put_start_time)
120 134
121 completed += 1 135 completed += 1
122 136
137 logger.debug("Worker %d waiting for reader/writer threads." % wid)
138 local_result_queue.put_nowait(None)
139 reader_thread.join()
140 writer_thread.join()
141
123 logger.debug("Worker %d completed %d tasks." % (wid, completed)) 142 logger.debug("Worker %d completed %d tasks." % (wid, completed))
143
144
145 def _job_queue_reader(getter, out_queue):
146 while True:
147 try:
148 task = getter()
149 except (EOFError, OSError):
150 logger.debug("Worker %d encountered connection problem." % wid)
151 break
152
153 out_queue.put_nowait(task)
154
155 if task[0] == TASK_END:
156 # Done reading jobs from the main process.
157 logger.debug("Got end task, exiting task queue reader thread.")
158 break
159
160
161 def _job_results_writer(in_queue, putter):
162 while True:
163 res = in_queue.get()
164 if res is not None:
165 putter(res)
166 in_queue.task_done()
167 else:
168 # Got sentinel. Exit.
169 in_queue.task_done()
170 break
171 logger.debug("Exiting result queue writer thread.")
124 172
125 173
126 class _WorkerParams(object): 174 class _WorkerParams(object):
127 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), 175 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
128 wrap_exception=False, is_profiling=False): 176 wrap_exception=False, is_profiling=False):