comparison piecrust/workerpool.py @ 460:55fc8918cb75

bake: Use batched jobs in the worker pool.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 00:45:35 -0700
parents 8351a77e13f5
children b015e38d4ee1
comparison
equal deleted inserted replaced
459:2ef04e16f0b9 460:55fc8918cb75
1 import os 1 import os
2 import sys 2 import sys
3 import logging 3 import logging
4 import itertools
4 import threading 5 import threading
5 import multiprocessing 6 import multiprocessing
6 from piecrust.fastpickle import pickle, unpickle 7 from piecrust.fastpickle import pickle, unpickle
7 8
8 9
19 def getReport(self): 20 def getReport(self):
20 return None 21 return None
21 22
22 23
23 TASK_JOB = 0 24 TASK_JOB = 0
24 TASK_END = 1 25 TASK_BATCH = 1
26 TASK_END = 2
25 27
26 28
27 def worker_func(params): 29 def worker_func(params):
28 if params.is_profiling: 30 if params.is_profiling:
29 try: 31 try:
80 e, e.__traceback__) 82 e, e.__traceback__)
81 rep = (task_type, False, wid, (wid, e)) 83 rep = (task_type, False, wid, (wid, e))
82 put(rep) 84 put(rep)
83 break 85 break
84 86
85 task_data = unpickle(task_data) 87 if task_type == TASK_JOB:
86 try: 88 task_data = (task_data,)
87 res = (task_type, True, wid, w.process(task_data)) 89
88 except Exception as e: 90 for t in task_data:
89 if params.wrap_exception: 91 td = unpickle(t)
90 e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__) 92 try:
91 res = (task_type, False, wid, e) 93 res = (TASK_JOB, True, wid, w.process(td))
92 put(res) 94 except Exception as e:
93 95 if params.wrap_exception:
94 completed += 1 96 e = multiprocessing.ExceptionWithTraceback(
97 e, e.__traceback__)
98 res = (TASK_JOB, False, wid, e)
99 put(res)
100
101 completed += 1
95 102
96 logger.debug("Worker %d completed %d tasks." % (wid, completed)) 103 logger.debug("Worker %d completed %d tasks." % (wid, completed))
97 104
98 105
99 class _WorkerParams(object): 106 class _WorkerParams(object):
150 157
151 def setHandler(self, callback=None, error_callback=None): 158 def setHandler(self, callback=None, error_callback=None):
152 self._callback = callback 159 self._callback = callback
153 self._error_callback = error_callback 160 self._error_callback = error_callback
154 161
155 def queueJobs(self, jobs, handler=None): 162 def queueJobs(self, jobs, handler=None, chunk_size=None):
156 if self._closed: 163 if self._closed:
157 raise Exception("This worker pool has been closed.") 164 raise Exception("This worker pool has been closed.")
158 if self._listener is not None: 165 if self._listener is not None:
159 raise Exception("A previous job queue has not finished yet.") 166 raise Exception("A previous job queue has not finished yet.")
160 167
164 if handler is not None: 171 if handler is not None:
165 self.setHandler(handler) 172 self.setHandler(handler)
166 173
167 if not hasattr(jobs, '__len__'): 174 if not hasattr(jobs, '__len__'):
168 jobs = list(jobs) 175 jobs = list(jobs)
169 176 job_count = len(jobs)
170 res = AsyncResult(self, len(jobs)) 177
178 res = AsyncResult(self, job_count)
171 if res._count == 0: 179 if res._count == 0:
172 res._event.set() 180 res._event.set()
173 return res 181 return res
174 182
175 self._listener = res 183 self._listener = res
176 for job in jobs: 184
177 job_data = pickle(job) 185 if chunk_size is None:
178 self._quick_put((TASK_JOB, job_data)) 186 chunk_size = max(1, job_count // 50)
187 logger.debug("Using chunk size of %d" % chunk_size)
188
189 if chunk_size is None or chunk_size == 1:
190 for job in jobs:
191 job_data = pickle(job)
192 self._quick_put((TASK_JOB, job_data))
193 else:
194 it = iter(jobs)
195 while True:
196 batch = tuple([pickle(i)
197 for i in itertools.islice(it, chunk_size)])
198 if not batch:
199 break
200 self._quick_put((TASK_BATCH, batch))
179 201
180 return res 202 return res
181 203
182 def close(self): 204 def close(self):
183 if self._listener is not None: 205 if self._listener is not None: