comparison piecrust/workerpool.py @ 462:04abc97dd3b6

bake: Add CLI argument to specify job batch size.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 18:49:50 -0700
parents b015e38d4ee1
children 22a230d99621
comparison
equal deleted inserted replaced
461:b015e38d4ee1 462:04abc97dd3b6
113 self.wrap_exception = wrap_exception 113 self.wrap_exception = wrap_exception
114 self.is_profiling = is_profiling 114 self.is_profiling = is_profiling
115 115
116 116
117 class WorkerPool(object): 117 class WorkerPool(object):
118 def __init__(self, worker_class, worker_count=None, initargs=(), 118 def __init__(self, worker_class, initargs=(),
119 worker_count=None, batch_size=None,
119 wrap_exception=False): 120 wrap_exception=False):
120 worker_count = worker_count or os.cpu_count() or 1 121 worker_count = worker_count or os.cpu_count() or 1
121 122
122 use_fastqueue = True 123 use_fastqueue = True
123 if use_fastqueue: 124 if use_fastqueue:
129 self._task_queue = multiprocessing.SimpleQueue() 130 self._task_queue = multiprocessing.SimpleQueue()
130 self._result_queue = multiprocessing.SimpleQueue() 131 self._result_queue = multiprocessing.SimpleQueue()
131 self._quick_put = self._task_queue._writer.send 132 self._quick_put = self._task_queue._writer.send
132 self._quick_get = self._result_queue._reader.recv 133 self._quick_get = self._result_queue._reader.recv
133 134
135 self._batch_size = batch_size
134 self._callback = None 136 self._callback = None
135 self._error_callback = None 137 self._error_callback = None
136 self._listener = None 138 self._listener = None
137 139
138 main_module = sys.modules['__main__'] 140 main_module = sys.modules['__main__']
187 return res 189 return res
188 190
189 self._listener = res 191 self._listener = res
190 192
191 if chunk_size is None: 193 if chunk_size is None:
194 chunk_size = self._batch_size
195 if chunk_size is None:
192 chunk_size = max(1, job_count // 50) 196 chunk_size = max(1, job_count // 50)
193 logger.debug("Using chunk size of %d" % chunk_size) 197 logger.debug("Using chunk size of %d" % chunk_size)
194 198
195 if chunk_size is None or chunk_size == 1: 199 if chunk_size is None or chunk_size == 1:
196 for job in jobs: 200 for job in jobs: