Mercurial > piecrust2
comparison piecrust/workerpool.py @ 693:d2a87365b85b
bake: Use threads to read/write from/to the main arbitrator process.
Since the GIL is released most of the time during blocking I/O operations,
this should let the worker threads do more during that time.
| author | Ludovic Chabant <ludovic@chabant.com> |
|---|---|
| date | Wed, 23 Mar 2016 01:53:57 -0700 |
| parents | 9ae9390192da |
| children | 9e5393fcfab2 |
comparison
equal
deleted
inserted
replaced
| 692:c11a4339fccb | 693:d2a87365b85b |
|---|---|
| 1 import io | 1 import io |
| 2 import os | 2 import os |
| 3 import sys | 3 import sys |
| 4 import time | 4 import time |
| 5 import zlib | 5 import zlib |
| 6 import queue | |
| 6 import pickle | 7 import pickle |
| 7 import logging | 8 import logging |
| 8 import itertools | 9 import itertools |
| 9 import threading | 10 import threading |
| 10 import multiprocessing | 11 import multiprocessing |
| 55 _pre_parse_chef_args(sys.argv[1:]) | 56 _pre_parse_chef_args(sys.argv[1:]) |
| 56 | 57 |
| 57 wid = params.wid | 58 wid = params.wid |
| 58 logger.debug("Worker %d initializing..." % wid) | 59 logger.debug("Worker %d initializing..." % wid) |
| 59 | 60 |
| 61 # We don't need those. | |
| 60 params.inqueue._writer.close() | 62 params.inqueue._writer.close() |
| 61 params.outqueue._reader.close() | 63 params.outqueue._reader.close() |
| 62 | 64 |
| 65 # Initialize the underlying worker class. | |
| 63 w = params.worker_class(*params.initargs) | 66 w = params.worker_class(*params.initargs) |
| 64 w.wid = wid | 67 w.wid = wid |
| 65 try: | 68 try: |
| 66 w.initialize() | 69 w.initialize() |
| 67 except Exception as ex: | 70 except Exception as ex: |
| 68 logger.error("Working failed to initialize:") | 71 logger.error("Working failed to initialize:") |
| 69 logger.exception(ex) | 72 logger.exception(ex) |
| 70 params.outqueue.put(None) | 73 params.outqueue.put(None) |
| 71 return | 74 return |
| 72 | 75 |
| 73 get = params.inqueue.get | 76 # Create threads to read/write the jobs and results from/to the |
| 74 put = params.outqueue.put | 77 # main arbitrator process. |
| 75 | 78 local_job_queue = queue.Queue() |
| 79 reader_thread = threading.Thread( | |
| 80 target=_job_queue_reader, | |
| 81 args=(params.inqueue.get, local_job_queue), | |
| 82 name="JobQueueReaderThread") | |
| 83 reader_thread.start() | |
| 84 | |
| 85 local_result_queue = queue.Queue() | |
| 86 writer_thread = threading.Thread( | |
| 87 target=_job_results_writer, | |
| 88 args=(local_result_queue, params.outqueue.put), | |
| 89 name="JobResultWriterThread") | |
| 90 writer_thread.start() | |
| 91 | |
| 92 # Start pumping! | |
| 76 completed = 0 | 93 completed = 0 |
| 77 time_in_get = 0 | 94 time_in_get = 0 |
| 78 time_in_put = 0 | 95 time_in_put = 0 |
| 79 while True: | 96 while True: |
| 80 get_start_time = time.perf_counter() | 97 get_start_time = time.perf_counter() |
| 81 try: | 98 task = local_job_queue.get() |
| 82 task = get() | 99 local_job_queue.task_done() |
| 83 except (EOFError, OSError): | |
| 84 logger.debug("Worker %d encountered connection problem." % wid) | |
| 85 break | |
| 86 time_in_get += (time.perf_counter() - get_start_time) | 100 time_in_get += (time.perf_counter() - get_start_time) |
| 87 | 101 |
| 88 task_type, task_data = task | 102 task_type, task_data = task |
| 89 if task_type == TASK_END: | 103 if task_type == TASK_END: |
| 90 logger.debug("Worker %d got end task, exiting." % wid) | 104 logger.debug("Worker %d got end task, exiting." % wid) |
| 97 logger.debug("Error getting report: %s" % e) | 111 logger.debug("Error getting report: %s" % e) |
| 98 if params.wrap_exception: | 112 if params.wrap_exception: |
| 99 e = multiprocessing.ExceptionWithTraceback( | 113 e = multiprocessing.ExceptionWithTraceback( |
| 100 e, e.__traceback__) | 114 e, e.__traceback__) |
| 101 rep = (task_type, False, wid, (wid, e)) | 115 rep = (task_type, False, wid, (wid, e)) |
| 102 put(rep) | 116 local_result_queue.put_nowait(rep) |
| 103 break | 117 break |
| 104 | 118 |
| 105 if task_type == TASK_JOB: | 119 if task_type == TASK_JOB: |
| 106 task_data = (task_data,) | 120 task_data = (task_data,) |
| 107 | 121 |
| 113 e = multiprocessing.ExceptionWithTraceback( | 127 e = multiprocessing.ExceptionWithTraceback( |
| 114 e, e.__traceback__) | 128 e, e.__traceback__) |
| 115 res = (TASK_JOB, False, wid, e) | 129 res = (TASK_JOB, False, wid, e) |
| 116 | 130 |
| 117 put_start_time = time.perf_counter() | 131 put_start_time = time.perf_counter() |
| 118 put(res) | 132 local_result_queue.put_nowait(res) |
| 119 time_in_put += (time.perf_counter() - put_start_time) | 133 time_in_put += (time.perf_counter() - put_start_time) |
| 120 | 134 |
| 121 completed += 1 | 135 completed += 1 |
| 122 | 136 |
| 137 logger.debug("Worker %d waiting for reader/writer threads." % wid) | |
| 138 local_result_queue.put_nowait(None) | |
| 139 reader_thread.join() | |
| 140 writer_thread.join() | |
| 141 | |
| 123 logger.debug("Worker %d completed %d tasks." % (wid, completed)) | 142 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
| 143 | |
| 144 | |
| 145 def _job_queue_reader(getter, out_queue): | |
| 146 while True: | |
| 147 try: | |
| 148 task = getter() | |
| 149 except (EOFError, OSError): | |
| 150 logger.debug("Worker %d encountered connection problem." % wid) | |
| 151 break | |
| 152 | |
| 153 out_queue.put_nowait(task) | |
| 154 | |
| 155 if task[0] == TASK_END: | |
| 156 # Done reading jobs from the main process. | |
| 157 logger.debug("Got end task, exiting task queue reader thread.") | |
| 158 break | |
| 159 | |
| 160 | |
| 161 def _job_results_writer(in_queue, putter): | |
| 162 while True: | |
| 163 res = in_queue.get() | |
| 164 if res is not None: | |
| 165 putter(res) | |
| 166 in_queue.task_done() | |
| 167 else: | |
| 168 # Got sentinel. Exit. | |
| 169 in_queue.task_done() | |
| 170 break | |
| 171 logger.debug("Exiting result queue writer thread.") | |
| 124 | 172 |
| 125 | 173 |
| 126 class _WorkerParams(object): | 174 class _WorkerParams(object): |
| 127 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), | 175 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), |
| 128 wrap_exception=False, is_profiling=False): | 176 wrap_exception=False, is_profiling=False): |
