comparison piecrust/workerpool.py @ 697:9e5393fcfab2

bake: Re-enable faster serialization between processes.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 23 Mar 2016 12:19:35 -0700
parents d2a87365b85b
children c62d83e17abf
comparison
equal deleted inserted replaced
696:3bc9f857eb48 697:9e5393fcfab2
2 import os 2 import os
3 import sys 3 import sys
4 import time 4 import time
5 import zlib 5 import zlib
6 import queue 6 import queue
7 import pickle
8 import logging 7 import logging
9 import itertools 8 import itertools
10 import threading 9 import threading
11 import multiprocessing 10 import multiprocessing
11 from piecrust import fastpickle
12 12
13 13
14 logger = logging.getLogger(__name__) 14 logger = logging.getLogger(__name__)
15
16 use_fastqueue = True
15 17
16 18
17 class IWorker(object): 19 class IWorker(object):
18 def initialize(self): 20 def initialize(self):
19 raise NotImplementedError() 21 raise NotImplementedError()
71 logger.error("Working failed to initialize:") 73 logger.error("Working failed to initialize:")
72 logger.exception(ex) 74 logger.exception(ex)
73 params.outqueue.put(None) 75 params.outqueue.put(None)
74 return 76 return
75 77
76 # Create threads to read/write the jobs and results from/to the 78 use_threads = False
77 # main arbitrator process. 79 if use_threads:
78 local_job_queue = queue.Queue() 80 # Create threads to read/write the jobs and results from/to the
79 reader_thread = threading.Thread( 81 # main arbitrator process.
80 target=_job_queue_reader, 82 local_job_queue = queue.Queue()
81 args=(params.inqueue.get, local_job_queue), 83 reader_thread = threading.Thread(
82 name="JobQueueReaderThread") 84 target=_job_queue_reader,
83 reader_thread.start() 85 args=(params.inqueue.get, local_job_queue),
84 86 name="JobQueueReaderThread")
85 local_result_queue = queue.Queue() 87 reader_thread.start()
86 writer_thread = threading.Thread( 88
87 target=_job_results_writer, 89 local_result_queue = queue.Queue()
88 args=(local_result_queue, params.outqueue.put), 90 writer_thread = threading.Thread(
89 name="JobResultWriterThread") 91 target=_job_results_writer,
90 writer_thread.start() 92 args=(local_result_queue, params.outqueue.put),
93 name="JobResultWriterThread")
94 writer_thread.start()
95
96 get = local_job_queue.get
97 put = local_result_queue.put_nowait
98 else:
99 get = params.inqueue.get
100 put = params.outqueue.put
91 101
92 # Start pumping! 102 # Start pumping!
93 completed = 0 103 completed = 0
94 time_in_get = 0 104 time_in_get = 0
95 time_in_put = 0 105 time_in_put = 0
96 while True: 106 while True:
97 get_start_time = time.perf_counter() 107 get_start_time = time.perf_counter()
98 task = local_job_queue.get() 108 task = get()
99 local_job_queue.task_done()
100 time_in_get += (time.perf_counter() - get_start_time) 109 time_in_get += (time.perf_counter() - get_start_time)
101 110
102 task_type, task_data = task 111 task_type, task_data = task
103 if task_type == TASK_END: 112 if task_type == TASK_END:
104 logger.debug("Worker %d got end task, exiting." % wid) 113 logger.debug("Worker %d got end task, exiting." % wid)
111 logger.debug("Error getting report: %s" % e) 120 logger.debug("Error getting report: %s" % e)
112 if params.wrap_exception: 121 if params.wrap_exception:
113 e = multiprocessing.ExceptionWithTraceback( 122 e = multiprocessing.ExceptionWithTraceback(
114 e, e.__traceback__) 123 e, e.__traceback__)
115 rep = (task_type, False, wid, (wid, e)) 124 rep = (task_type, False, wid, (wid, e))
116 local_result_queue.put_nowait(rep) 125 put(rep)
117 break 126 break
118 127
119 if task_type == TASK_JOB: 128 if task_type == TASK_JOB:
120 task_data = (task_data,) 129 task_data = (task_data,)
121 130
127 e = multiprocessing.ExceptionWithTraceback( 136 e = multiprocessing.ExceptionWithTraceback(
128 e, e.__traceback__) 137 e, e.__traceback__)
129 res = (TASK_JOB, False, wid, e) 138 res = (TASK_JOB, False, wid, e)
130 139
131 put_start_time = time.perf_counter() 140 put_start_time = time.perf_counter()
132 local_result_queue.put_nowait(res) 141 put(res)
133 time_in_put += (time.perf_counter() - put_start_time) 142 time_in_put += (time.perf_counter() - put_start_time)
134 143
135 completed += 1 144 completed += 1
136 145
137 logger.debug("Worker %d waiting for reader/writer threads." % wid) 146 if use_threads:
138 local_result_queue.put_nowait(None) 147 logger.debug("Worker %d waiting for reader/writer threads." % wid)
139 reader_thread.join() 148 local_result_queue.put_nowait(None)
140 writer_thread.join() 149 reader_thread.join()
150 writer_thread.join()
141 151
142 logger.debug("Worker %d completed %d tasks." % (wid, completed)) 152 logger.debug("Worker %d completed %d tasks." % (wid, completed))
143 153
144 154
145 def _job_queue_reader(getter, out_queue): 155 def _job_queue_reader(getter, out_queue):
146 while True: 156 while True:
147 try: 157 try:
148 task = getter() 158 task = getter()
149 except (EOFError, OSError): 159 except (EOFError, OSError):
150 logger.debug("Worker %d encountered connection problem." % wid) 160 logger.debug("Worker encountered connection problem.")
151 break 161 break
152 162
153 out_queue.put_nowait(task) 163 out_queue.put_nowait(task)
154 164
155 if task[0] == TASK_END: 165 if task[0] == TASK_END:
187 def __init__(self, worker_class, initargs=(), 197 def __init__(self, worker_class, initargs=(),
188 worker_count=None, batch_size=None, 198 worker_count=None, batch_size=None,
189 wrap_exception=False): 199 wrap_exception=False):
190 worker_count = worker_count or os.cpu_count() or 1 200 worker_count = worker_count or os.cpu_count() or 1
191 201
192 use_fastqueue = False
193 if use_fastqueue: 202 if use_fastqueue:
194 self._task_queue = FastQueue() 203 self._task_queue = FastQueue()
195 self._result_queue = FastQueue() 204 self._result_queue = FastQueue()
196 self._quick_put = self._task_queue.put 205 self._quick_put = self._task_queue.put
197 self._quick_get = self._result_queue.get 206 self._quick_get = self._result_queue.get
386 class FastQueue(object): 395 class FastQueue(object):
387 def __init__(self): 396 def __init__(self):
388 self._reader, self._writer = multiprocessing.Pipe(duplex=False) 397 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
389 self._rlock = multiprocessing.Lock() 398 self._rlock = multiprocessing.Lock()
390 self._wlock = multiprocessing.Lock() 399 self._wlock = multiprocessing.Lock()
400 self._initBuffers()
401
402 def _initBuffers(self):
391 self._rbuf = io.BytesIO() 403 self._rbuf = io.BytesIO()
392 self._rbuf.truncate(256) 404 self._rbuf.truncate(256)
393 self._wbuf = io.BytesIO() 405 self._wbuf = io.BytesIO()
394 self._wbuf.truncate(256) 406 self._wbuf.truncate(256)
395 407
396 def __getstate__(self): 408 def __getstate__(self):
397 return (self._reader, self._writer, self._rlock, self._wlock) 409 return (self._reader, self._writer, self._rlock, self._wlock)
398 410
399 def __setstate__(self, state): 411 def __setstate__(self, state):
400 (self._reader, self._writer, self._rlock, self._wlock) = state 412 (self._reader, self._writer, self._rlock, self._wlock) = state
413 self._initBuffers()
401 414
402 def get(self): 415 def get(self):
403 with self._rlock: 416 with self._rlock:
404 try: 417 try:
405 with self._rbuf.getbuffer() as b: 418 with self._rbuf.getbuffer() as b:
406 self._reader.recv_bytes_into(b) 419 bufsize = self._reader.recv_bytes_into(b)
407 except multiprocessing.BufferTooShort as e: 420 except multiprocessing.BufferTooShort as e:
408 self._rbuf.truncate(len(e.args[0]) * 2) 421 bufsize = len(e.args[0])
422 self._rbuf.truncate(bufsize * 2)
409 self._rbuf.seek(0) 423 self._rbuf.seek(0)
410 self._rbuf.write(e.args[0]) 424 self._rbuf.write(e.args[0])
411 425
412 self._rbuf.seek(0) 426 self._rbuf.seek(0)
413 return self._unpickle(self._rbuf) 427 return self._unpickle(self._rbuf, bufsize)
414 428
415 def put(self, obj): 429 def put(self, obj):
416 self._wbuf.seek(0) 430 self._wbuf.seek(0)
417 self._pickle(obj, self._wbuf) 431 self._pickle(obj, self._wbuf)
418 size = self._wbuf.tell() 432 size = self._wbuf.tell()
421 with self._wlock: 435 with self._wlock:
422 with self._wbuf.getbuffer() as b: 436 with self._wbuf.getbuffer() as b:
423 self._writer.send_bytes(b, 0, size) 437 self._writer.send_bytes(b, 0, size)
424 438
425 def _pickle(self, obj, buf): 439 def _pickle(self, obj, buf):
426 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) 440 fastpickle.pickle_intob(obj, buf)
427 441
428 def _unpickle(self, buf): 442 def _unpickle(self, buf, bufsize):
429 return pickle.load(buf) 443 return fastpickle.unpickle_fromb(buf, bufsize)
430 444