Mercurial > piecrust2
comparison piecrust/workerpool.py @ 697:9e5393fcfab2
bake: Re-enable faster serialization between processes.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 23 Mar 2016 12:19:35 -0700 |
parents | d2a87365b85b |
children | c62d83e17abf |
comparison
equal
deleted
inserted
replaced
696:3bc9f857eb48 | 697:9e5393fcfab2 |
---|---|
2 import os | 2 import os |
3 import sys | 3 import sys |
4 import time | 4 import time |
5 import zlib | 5 import zlib |
6 import queue | 6 import queue |
7 import pickle | |
8 import logging | 7 import logging |
9 import itertools | 8 import itertools |
10 import threading | 9 import threading |
11 import multiprocessing | 10 import multiprocessing |
11 from piecrust import fastpickle | |
12 | 12 |
13 | 13 |
14 logger = logging.getLogger(__name__) | 14 logger = logging.getLogger(__name__) |
15 | |
16 use_fastqueue = True | |
15 | 17 |
16 | 18 |
17 class IWorker(object): | 19 class IWorker(object): |
18 def initialize(self): | 20 def initialize(self): |
19 raise NotImplementedError() | 21 raise NotImplementedError() |
71 logger.error("Working failed to initialize:") | 73 logger.error("Working failed to initialize:") |
72 logger.exception(ex) | 74 logger.exception(ex) |
73 params.outqueue.put(None) | 75 params.outqueue.put(None) |
74 return | 76 return |
75 | 77 |
76 # Create threads to read/write the jobs and results from/to the | 78 use_threads = False |
77 # main arbitrator process. | 79 if use_threads: |
78 local_job_queue = queue.Queue() | 80 # Create threads to read/write the jobs and results from/to the |
79 reader_thread = threading.Thread( | 81 # main arbitrator process. |
80 target=_job_queue_reader, | 82 local_job_queue = queue.Queue() |
81 args=(params.inqueue.get, local_job_queue), | 83 reader_thread = threading.Thread( |
82 name="JobQueueReaderThread") | 84 target=_job_queue_reader, |
83 reader_thread.start() | 85 args=(params.inqueue.get, local_job_queue), |
84 | 86 name="JobQueueReaderThread") |
85 local_result_queue = queue.Queue() | 87 reader_thread.start() |
86 writer_thread = threading.Thread( | 88 |
87 target=_job_results_writer, | 89 local_result_queue = queue.Queue() |
88 args=(local_result_queue, params.outqueue.put), | 90 writer_thread = threading.Thread( |
89 name="JobResultWriterThread") | 91 target=_job_results_writer, |
90 writer_thread.start() | 92 args=(local_result_queue, params.outqueue.put), |
93 name="JobResultWriterThread") | |
94 writer_thread.start() | |
95 | |
96 get = local_job_queue.get | |
97 put = local_result_queue.put_nowait | |
98 else: | |
99 get = params.inqueue.get | |
100 put = params.outqueue.put | |
91 | 101 |
92 # Start pumping! | 102 # Start pumping! |
93 completed = 0 | 103 completed = 0 |
94 time_in_get = 0 | 104 time_in_get = 0 |
95 time_in_put = 0 | 105 time_in_put = 0 |
96 while True: | 106 while True: |
97 get_start_time = time.perf_counter() | 107 get_start_time = time.perf_counter() |
98 task = local_job_queue.get() | 108 task = get() |
99 local_job_queue.task_done() | |
100 time_in_get += (time.perf_counter() - get_start_time) | 109 time_in_get += (time.perf_counter() - get_start_time) |
101 | 110 |
102 task_type, task_data = task | 111 task_type, task_data = task |
103 if task_type == TASK_END: | 112 if task_type == TASK_END: |
104 logger.debug("Worker %d got end task, exiting." % wid) | 113 logger.debug("Worker %d got end task, exiting." % wid) |
111 logger.debug("Error getting report: %s" % e) | 120 logger.debug("Error getting report: %s" % e) |
112 if params.wrap_exception: | 121 if params.wrap_exception: |
113 e = multiprocessing.ExceptionWithTraceback( | 122 e = multiprocessing.ExceptionWithTraceback( |
114 e, e.__traceback__) | 123 e, e.__traceback__) |
115 rep = (task_type, False, wid, (wid, e)) | 124 rep = (task_type, False, wid, (wid, e)) |
116 local_result_queue.put_nowait(rep) | 125 put(rep) |
117 break | 126 break |
118 | 127 |
119 if task_type == TASK_JOB: | 128 if task_type == TASK_JOB: |
120 task_data = (task_data,) | 129 task_data = (task_data,) |
121 | 130 |
127 e = multiprocessing.ExceptionWithTraceback( | 136 e = multiprocessing.ExceptionWithTraceback( |
128 e, e.__traceback__) | 137 e, e.__traceback__) |
129 res = (TASK_JOB, False, wid, e) | 138 res = (TASK_JOB, False, wid, e) |
130 | 139 |
131 put_start_time = time.perf_counter() | 140 put_start_time = time.perf_counter() |
132 local_result_queue.put_nowait(res) | 141 put(res) |
133 time_in_put += (time.perf_counter() - put_start_time) | 142 time_in_put += (time.perf_counter() - put_start_time) |
134 | 143 |
135 completed += 1 | 144 completed += 1 |
136 | 145 |
137 logger.debug("Worker %d waiting for reader/writer threads." % wid) | 146 if use_threads: |
138 local_result_queue.put_nowait(None) | 147 logger.debug("Worker %d waiting for reader/writer threads." % wid) |
139 reader_thread.join() | 148 local_result_queue.put_nowait(None) |
140 writer_thread.join() | 149 reader_thread.join() |
150 writer_thread.join() | |
141 | 151 |
142 logger.debug("Worker %d completed %d tasks." % (wid, completed)) | 152 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
143 | 153 |
144 | 154 |
145 def _job_queue_reader(getter, out_queue): | 155 def _job_queue_reader(getter, out_queue): |
146 while True: | 156 while True: |
147 try: | 157 try: |
148 task = getter() | 158 task = getter() |
149 except (EOFError, OSError): | 159 except (EOFError, OSError): |
150 logger.debug("Worker %d encountered connection problem." % wid) | 160 logger.debug("Worker encountered connection problem.") |
151 break | 161 break |
152 | 162 |
153 out_queue.put_nowait(task) | 163 out_queue.put_nowait(task) |
154 | 164 |
155 if task[0] == TASK_END: | 165 if task[0] == TASK_END: |
187 def __init__(self, worker_class, initargs=(), | 197 def __init__(self, worker_class, initargs=(), |
188 worker_count=None, batch_size=None, | 198 worker_count=None, batch_size=None, |
189 wrap_exception=False): | 199 wrap_exception=False): |
190 worker_count = worker_count or os.cpu_count() or 1 | 200 worker_count = worker_count or os.cpu_count() or 1 |
191 | 201 |
192 use_fastqueue = False | |
193 if use_fastqueue: | 202 if use_fastqueue: |
194 self._task_queue = FastQueue() | 203 self._task_queue = FastQueue() |
195 self._result_queue = FastQueue() | 204 self._result_queue = FastQueue() |
196 self._quick_put = self._task_queue.put | 205 self._quick_put = self._task_queue.put |
197 self._quick_get = self._result_queue.get | 206 self._quick_get = self._result_queue.get |
386 class FastQueue(object): | 395 class FastQueue(object): |
387 def __init__(self): | 396 def __init__(self): |
388 self._reader, self._writer = multiprocessing.Pipe(duplex=False) | 397 self._reader, self._writer = multiprocessing.Pipe(duplex=False) |
389 self._rlock = multiprocessing.Lock() | 398 self._rlock = multiprocessing.Lock() |
390 self._wlock = multiprocessing.Lock() | 399 self._wlock = multiprocessing.Lock() |
400 self._initBuffers() | |
401 | |
402 def _initBuffers(self): | |
391 self._rbuf = io.BytesIO() | 403 self._rbuf = io.BytesIO() |
392 self._rbuf.truncate(256) | 404 self._rbuf.truncate(256) |
393 self._wbuf = io.BytesIO() | 405 self._wbuf = io.BytesIO() |
394 self._wbuf.truncate(256) | 406 self._wbuf.truncate(256) |
395 | 407 |
396 def __getstate__(self): | 408 def __getstate__(self): |
397 return (self._reader, self._writer, self._rlock, self._wlock) | 409 return (self._reader, self._writer, self._rlock, self._wlock) |
398 | 410 |
399 def __setstate__(self, state): | 411 def __setstate__(self, state): |
400 (self._reader, self._writer, self._rlock, self._wlock) = state | 412 (self._reader, self._writer, self._rlock, self._wlock) = state |
413 self._initBuffers() | |
401 | 414 |
402 def get(self): | 415 def get(self): |
403 with self._rlock: | 416 with self._rlock: |
404 try: | 417 try: |
405 with self._rbuf.getbuffer() as b: | 418 with self._rbuf.getbuffer() as b: |
406 self._reader.recv_bytes_into(b) | 419 bufsize = self._reader.recv_bytes_into(b) |
407 except multiprocessing.BufferTooShort as e: | 420 except multiprocessing.BufferTooShort as e: |
408 self._rbuf.truncate(len(e.args[0]) * 2) | 421 bufsize = len(e.args[0]) |
422 self._rbuf.truncate(bufsize * 2) | |
409 self._rbuf.seek(0) | 423 self._rbuf.seek(0) |
410 self._rbuf.write(e.args[0]) | 424 self._rbuf.write(e.args[0]) |
411 | 425 |
412 self._rbuf.seek(0) | 426 self._rbuf.seek(0) |
413 return self._unpickle(self._rbuf) | 427 return self._unpickle(self._rbuf, bufsize) |
414 | 428 |
415 def put(self, obj): | 429 def put(self, obj): |
416 self._wbuf.seek(0) | 430 self._wbuf.seek(0) |
417 self._pickle(obj, self._wbuf) | 431 self._pickle(obj, self._wbuf) |
418 size = self._wbuf.tell() | 432 size = self._wbuf.tell() |
421 with self._wlock: | 435 with self._wlock: |
422 with self._wbuf.getbuffer() as b: | 436 with self._wbuf.getbuffer() as b: |
423 self._writer.send_bytes(b, 0, size) | 437 self._writer.send_bytes(b, 0, size) |
424 | 438 |
425 def _pickle(self, obj, buf): | 439 def _pickle(self, obj, buf): |
426 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) | 440 fastpickle.pickle_intob(obj, buf) |
427 | 441 |
428 def _unpickle(self, buf): | 442 def _unpickle(self, buf, bufsize): |
429 return pickle.load(buf) | 443 return fastpickle.unpickle_fromb(buf, bufsize) |
430 | 444 |