Mercurial > piecrust2
comparison piecrust/workerpool.py @ 693:d2a87365b85b
bake: Use threads to read/write from/to the main arbitrator process.
Since the GIL is released most of the time during blocking I/O operations,
this should let the worker threads do more during that time.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 23 Mar 2016 01:53:57 -0700 |
parents | 9ae9390192da |
children | 9e5393fcfab2 |
comparison
equal
deleted
inserted
replaced
692:c11a4339fccb | 693:d2a87365b85b |
---|---|
1 import io | 1 import io |
2 import os | 2 import os |
3 import sys | 3 import sys |
4 import time | 4 import time |
5 import zlib | 5 import zlib |
6 import queue | |
6 import pickle | 7 import pickle |
7 import logging | 8 import logging |
8 import itertools | 9 import itertools |
9 import threading | 10 import threading |
10 import multiprocessing | 11 import multiprocessing |
55 _pre_parse_chef_args(sys.argv[1:]) | 56 _pre_parse_chef_args(sys.argv[1:]) |
56 | 57 |
57 wid = params.wid | 58 wid = params.wid |
58 logger.debug("Worker %d initializing..." % wid) | 59 logger.debug("Worker %d initializing..." % wid) |
59 | 60 |
61 # We don't need those. | |
60 params.inqueue._writer.close() | 62 params.inqueue._writer.close() |
61 params.outqueue._reader.close() | 63 params.outqueue._reader.close() |
62 | 64 |
65 # Initialize the underlying worker class. | |
63 w = params.worker_class(*params.initargs) | 66 w = params.worker_class(*params.initargs) |
64 w.wid = wid | 67 w.wid = wid |
65 try: | 68 try: |
66 w.initialize() | 69 w.initialize() |
67 except Exception as ex: | 70 except Exception as ex: |
68 logger.error("Working failed to initialize:") | 71 logger.error("Working failed to initialize:") |
69 logger.exception(ex) | 72 logger.exception(ex) |
70 params.outqueue.put(None) | 73 params.outqueue.put(None) |
71 return | 74 return |
72 | 75 |
73 get = params.inqueue.get | 76 # Create threads to read/write the jobs and results from/to the |
74 put = params.outqueue.put | 77 # main arbitrator process. |
75 | 78 local_job_queue = queue.Queue() |
79 reader_thread = threading.Thread( | |
80 target=_job_queue_reader, | |
81 args=(params.inqueue.get, local_job_queue), | |
82 name="JobQueueReaderThread") | |
83 reader_thread.start() | |
84 | |
85 local_result_queue = queue.Queue() | |
86 writer_thread = threading.Thread( | |
87 target=_job_results_writer, | |
88 args=(local_result_queue, params.outqueue.put), | |
89 name="JobResultWriterThread") | |
90 writer_thread.start() | |
91 | |
92 # Start pumping! | |
76 completed = 0 | 93 completed = 0 |
77 time_in_get = 0 | 94 time_in_get = 0 |
78 time_in_put = 0 | 95 time_in_put = 0 |
79 while True: | 96 while True: |
80 get_start_time = time.perf_counter() | 97 get_start_time = time.perf_counter() |
81 try: | 98 task = local_job_queue.get() |
82 task = get() | 99 local_job_queue.task_done() |
83 except (EOFError, OSError): | |
84 logger.debug("Worker %d encountered connection problem." % wid) | |
85 break | |
86 time_in_get += (time.perf_counter() - get_start_time) | 100 time_in_get += (time.perf_counter() - get_start_time) |
87 | 101 |
88 task_type, task_data = task | 102 task_type, task_data = task |
89 if task_type == TASK_END: | 103 if task_type == TASK_END: |
90 logger.debug("Worker %d got end task, exiting." % wid) | 104 logger.debug("Worker %d got end task, exiting." % wid) |
97 logger.debug("Error getting report: %s" % e) | 111 logger.debug("Error getting report: %s" % e) |
98 if params.wrap_exception: | 112 if params.wrap_exception: |
99 e = multiprocessing.ExceptionWithTraceback( | 113 e = multiprocessing.ExceptionWithTraceback( |
100 e, e.__traceback__) | 114 e, e.__traceback__) |
101 rep = (task_type, False, wid, (wid, e)) | 115 rep = (task_type, False, wid, (wid, e)) |
102 put(rep) | 116 local_result_queue.put_nowait(rep) |
103 break | 117 break |
104 | 118 |
105 if task_type == TASK_JOB: | 119 if task_type == TASK_JOB: |
106 task_data = (task_data,) | 120 task_data = (task_data,) |
107 | 121 |
113 e = multiprocessing.ExceptionWithTraceback( | 127 e = multiprocessing.ExceptionWithTraceback( |
114 e, e.__traceback__) | 128 e, e.__traceback__) |
115 res = (TASK_JOB, False, wid, e) | 129 res = (TASK_JOB, False, wid, e) |
116 | 130 |
117 put_start_time = time.perf_counter() | 131 put_start_time = time.perf_counter() |
118 put(res) | 132 local_result_queue.put_nowait(res) |
119 time_in_put += (time.perf_counter() - put_start_time) | 133 time_in_put += (time.perf_counter() - put_start_time) |
120 | 134 |
121 completed += 1 | 135 completed += 1 |
122 | 136 |
137 logger.debug("Worker %d waiting for reader/writer threads." % wid) | |
138 local_result_queue.put_nowait(None) | |
139 reader_thread.join() | |
140 writer_thread.join() | |
141 | |
123 logger.debug("Worker %d completed %d tasks." % (wid, completed)) | 142 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
143 | |
144 | |
145 def _job_queue_reader(getter, out_queue): | |
146 while True: | |
147 try: | |
148 task = getter() | |
149 except (EOFError, OSError): | |
150 logger.debug("Worker %d encountered connection problem." % wid) | |
151 break | |
152 | |
153 out_queue.put_nowait(task) | |
154 | |
155 if task[0] == TASK_END: | |
156 # Done reading jobs from the main process. | |
157 logger.debug("Got end task, exiting task queue reader thread.") | |
158 break | |
159 | |
160 | |
161 def _job_results_writer(in_queue, putter): | |
162 while True: | |
163 res = in_queue.get() | |
164 if res is not None: | |
165 putter(res) | |
166 in_queue.task_done() | |
167 else: | |
168 # Got sentinel. Exit. | |
169 in_queue.task_done() | |
170 break | |
171 logger.debug("Exiting result queue writer thread.") | |
124 | 172 |
125 | 173 |
126 class _WorkerParams(object): | 174 class _WorkerParams(object): |
127 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), | 175 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), |
128 wrap_exception=False, is_profiling=False): | 176 wrap_exception=False, is_profiling=False): |