Mercurial > piecrust2
comparison piecrust/workerpool.py @ 460:55fc8918cb75
bake: Use batched jobs in the worker pool.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 11 Jul 2015 00:45:35 -0700 |
parents | 8351a77e13f5 |
children | b015e38d4ee1 |
comparison
equal
deleted
inserted
replaced
459:2ef04e16f0b9 | 460:55fc8918cb75 |
---|---|
1 import os | 1 import os |
2 import sys | 2 import sys |
3 import logging | 3 import logging |
4 import itertools | |
4 import threading | 5 import threading |
5 import multiprocessing | 6 import multiprocessing |
6 from piecrust.fastpickle import pickle, unpickle | 7 from piecrust.fastpickle import pickle, unpickle |
7 | 8 |
8 | 9 |
19 def getReport(self): | 20 def getReport(self): |
20 return None | 21 return None |
21 | 22 |
22 | 23 |
23 TASK_JOB = 0 | 24 TASK_JOB = 0 |
24 TASK_END = 1 | 25 TASK_BATCH = 1 |
26 TASK_END = 2 | |
25 | 27 |
26 | 28 |
27 def worker_func(params): | 29 def worker_func(params): |
28 if params.is_profiling: | 30 if params.is_profiling: |
29 try: | 31 try: |
80 e, e.__traceback__) | 82 e, e.__traceback__) |
81 rep = (task_type, False, wid, (wid, e)) | 83 rep = (task_type, False, wid, (wid, e)) |
82 put(rep) | 84 put(rep) |
83 break | 85 break |
84 | 86 |
85 task_data = unpickle(task_data) | 87 if task_type == TASK_JOB: |
86 try: | 88 task_data = (task_data,) |
87 res = (task_type, True, wid, w.process(task_data)) | 89 |
88 except Exception as e: | 90 for t in task_data: |
89 if params.wrap_exception: | 91 td = unpickle(t) |
90 e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__) | 92 try: |
91 res = (task_type, False, wid, e) | 93 res = (TASK_JOB, True, wid, w.process(td)) |
92 put(res) | 94 except Exception as e: |
93 | 95 if params.wrap_exception: |
94 completed += 1 | 96 e = multiprocessing.ExceptionWithTraceback( |
97 e, e.__traceback__) | |
98 res = (TASK_JOB, False, wid, e) | |
99 put(res) | |
100 | |
101 completed += 1 | |
95 | 102 |
96 logger.debug("Worker %d completed %d tasks." % (wid, completed)) | 103 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
97 | 104 |
98 | 105 |
99 class _WorkerParams(object): | 106 class _WorkerParams(object): |
150 | 157 |
151 def setHandler(self, callback=None, error_callback=None): | 158 def setHandler(self, callback=None, error_callback=None): |
152 self._callback = callback | 159 self._callback = callback |
153 self._error_callback = error_callback | 160 self._error_callback = error_callback |
154 | 161 |
155 def queueJobs(self, jobs, handler=None): | 162 def queueJobs(self, jobs, handler=None, chunk_size=None): |
156 if self._closed: | 163 if self._closed: |
157 raise Exception("This worker pool has been closed.") | 164 raise Exception("This worker pool has been closed.") |
158 if self._listener is not None: | 165 if self._listener is not None: |
159 raise Exception("A previous job queue has not finished yet.") | 166 raise Exception("A previous job queue has not finished yet.") |
160 | 167 |
164 if handler is not None: | 171 if handler is not None: |
165 self.setHandler(handler) | 172 self.setHandler(handler) |
166 | 173 |
167 if not hasattr(jobs, '__len__'): | 174 if not hasattr(jobs, '__len__'): |
168 jobs = list(jobs) | 175 jobs = list(jobs) |
169 | 176 job_count = len(jobs) |
170 res = AsyncResult(self, len(jobs)) | 177 |
178 res = AsyncResult(self, job_count) | |
171 if res._count == 0: | 179 if res._count == 0: |
172 res._event.set() | 180 res._event.set() |
173 return res | 181 return res |
174 | 182 |
175 self._listener = res | 183 self._listener = res |
176 for job in jobs: | 184 |
177 job_data = pickle(job) | 185 if chunk_size is None: |
178 self._quick_put((TASK_JOB, job_data)) | 186 chunk_size = max(1, job_count // 50) |
187 logger.debug("Using chunk size of %d" % chunk_size) | |
188 | |
189 if chunk_size is None or chunk_size == 1: | |
190 for job in jobs: | |
191 job_data = pickle(job) | |
192 self._quick_put((TASK_JOB, job_data)) | |
193 else: | |
194 it = iter(jobs) | |
195 while True: | |
196 batch = tuple([pickle(i) | |
197 for i in itertools.islice(it, chunk_size)]) | |
198 if not batch: | |
199 break | |
200 self._quick_put((TASK_BATCH, batch)) | |
179 | 201 |
180 return res | 202 return res |
181 | 203 |
182 def close(self): | 204 def close(self): |
183 if self._listener is not None: | 205 if self._listener is not None: |