Mercurial > piecrust2
comparison piecrust/workerpool.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | 45ad976712ec |
children | 1857dbd4580f |
comparison
equal
deleted
inserted
replaced
988:f83ae0a5d793 | 989:8adc27285d93 |
---|---|
9 from piecrust.environment import ExecutionStats | 9 from piecrust.environment import ExecutionStats |
10 | 10 |
11 | 11 |
12 logger = logging.getLogger(__name__) | 12 logger = logging.getLogger(__name__) |
13 | 13 |
14 use_fastqueue = False | 14 use_fastqueue = True |
15 | |
16 use_fastpickle = False | 15 use_fastpickle = False |
17 use_msgpack = False | 16 use_msgpack = False |
18 use_marshall = False | 17 use_marshall = False |
18 use_json = False | |
19 | 19 |
20 | 20 |
21 class IWorker(object): | 21 class IWorker(object): |
22 """ Interface for a pool worker. | 22 """ Interface for a pool worker. |
23 """ | 23 """ |
32 | 32 |
33 def shutdown(self): | 33 def shutdown(self): |
34 pass | 34 pass |
35 | 35 |
36 | 36 |
37 class WorkerExceptionData: | 37 def _get_worker_exception_data(wid): |
38 def __init__(self, wid): | 38 t, v, tb = sys.exc_info() |
39 super().__init__() | 39 return { |
40 self.wid = wid | 40 'wid': wid, |
41 t, v, tb = sys.exc_info() | 41 'type': str(t), |
42 self.type = t | 42 'value': '\n'.join(_get_errors(v)), |
43 self.value = '\n'.join(_get_errors(v)) | 43 'traceback': ''.join(traceback.format_exception(t, v, tb)) |
44 self.traceback = ''.join(traceback.format_exception(t, v, tb)) | 44 } |
45 | |
46 def __str__(self): | |
47 return str(self.value) | |
48 | 45 |
49 | 46 |
50 def _get_errors(ex): | 47 def _get_errors(ex): |
51 errors = [] | 48 errors = [] |
52 while ex is not None: | 49 while ex is not None: |
55 ex = ex.__cause__ | 52 ex = ex.__cause__ |
56 return errors | 53 return errors |
57 | 54 |
58 | 55 |
59 TASK_JOB = 0 | 56 TASK_JOB = 0 |
60 TASK_END = 1 | 57 TASK_JOB_BATCH = 1 |
58 TASK_END = 2 | |
61 _TASK_ABORT_WORKER = 10 | 59 _TASK_ABORT_WORKER = 10 |
62 _CRITICAL_WORKER_ERROR = 11 | 60 _CRITICAL_WORKER_ERROR = 11 |
63 | 61 |
64 | 62 |
65 def worker_func(params): | 63 def worker_func(params): |
167 task = get() | 165 task = get() |
168 time_in_get += (time.perf_counter() - get_start_time) | 166 time_in_get += (time.perf_counter() - get_start_time) |
169 | 167 |
170 task_type, task_data = task | 168 task_type, task_data = task |
171 | 169 |
172 # Job task... just do it. | 170 # Job task(s)... just do it. |
173 if task_type == TASK_JOB: | 171 if task_type == TASK_JOB or task_type == TASK_JOB_BATCH: |
174 try: | 172 |
175 res = (task_type, task_data, True, wid, w.process(task_data)) | 173 task_data_list = task_data |
176 except Exception as e: | 174 if task_type == TASK_JOB: |
177 logger.debug( | 175 task_data_list = [task_data] |
178 "Error processing job, sending exception to main process:") | 176 |
179 logger.debug(traceback.format_exc()) | 177 result_list = [] |
180 we = WorkerExceptionData(wid) | 178 |
181 res = (task_type, task_data, False, wid, we) | 179 for td in task_data_list: |
182 | 180 try: |
181 res = w.process(td) | |
182 result_list.append((td, res, True)) | |
183 except Exception as e: | |
184 logger.debug( | |
185 "Error processing job, sending exception to main process:") | |
186 logger.debug(traceback.format_exc()) | |
187 we = _get_worker_exception_data(wid) | |
188 res = (td, we, False) | |
189 result_list.append((td, res, False)) | |
190 | |
191 res = (task_type, wid, result_list) | |
183 put_start_time = time.perf_counter() | 192 put_start_time = time.perf_counter() |
184 put(res) | 193 put(res) |
185 time_in_put += (time.perf_counter() - put_start_time) | 194 time_in_put += (time.perf_counter() - put_start_time) |
186 | 195 |
187 completed += 1 | 196 completed += len(task_data_list) |
188 | 197 |
189 # End task... gather stats to send back to the main process. | 198 # End task... gather stats to send back to the main process. |
190 elif task_type == TASK_END: | 199 elif task_type == TASK_END: |
191 logger.debug("Worker %d got end task, exiting." % wid) | 200 logger.debug("Worker %d got end task, exiting." % wid) |
192 stats.registerTimer('WorkerTaskGet', time=time_in_get) | 201 stats.registerTimer('WorkerTaskGet', time=time_in_get) |
193 stats.registerTimer('WorkerResultPut', time=time_in_put) | 202 stats.registerTimer('WorkerResultPut', time=time_in_put) |
194 try: | 203 try: |
195 stats.mergeStats(w.getStats()) | 204 stats.mergeStats(w.getStats()) |
196 rep = (task_type, task_data, True, wid, (wid, stats)) | 205 rep = (task_type, wid, [(task_data, (wid, stats), True)]) |
197 except Exception as e: | 206 except Exception as e: |
198 logger.debug( | 207 logger.debug( |
199 "Error getting report, sending exception to main process:") | 208 "Error getting report, sending exception to main process:") |
200 logger.debug(traceback.format_exc()) | 209 logger.debug(traceback.format_exc()) |
201 we = WorkerExceptionData(wid) | 210 we = _get_worker_exception_data(wid) |
202 rep = (task_type, task_data, False, wid, (wid, we)) | 211 rep = (task_type, wid, [(task_data, (wid, we), False)]) |
203 put(rep) | 212 put(rep) |
204 break | 213 break |
205 | 214 |
206 # Emergy abort. | 215 # Emergy abort. |
207 elif task_type == _TASK_ABORT_WORKER: | 216 elif task_type == _TASK_ABORT_WORKER: |
300 if new_job_count > 0: | 309 if new_job_count > 0: |
301 with self._lock_jobs_left: | 310 with self._lock_jobs_left: |
302 self._jobs_left += new_job_count | 311 self._jobs_left += new_job_count |
303 | 312 |
304 self._event.clear() | 313 self._event.clear() |
305 for job in jobs: | 314 bs = self._batch_size |
306 self._quick_put((TASK_JOB, job)) | 315 if not bs: |
316 for job in jobs: | |
317 self._quick_put((TASK_JOB, job)) | |
318 else: | |
319 cur_offset = 0 | |
320 while cur_offset < new_job_count: | |
321 next_batch_idx = min(cur_offset + bs, new_job_count) | |
322 job_batch = jobs[cur_offset:next_batch_idx] | |
323 self._quick_put((TASK_JOB_BATCH, job_batch)) | |
324 cur_offset = next_batch_idx | |
307 else: | 325 else: |
308 with self._lock_jobs_left: | 326 with self._lock_jobs_left: |
309 done = (self._jobs_left == 0) | 327 done = (self._jobs_left == 0) |
310 if done: | 328 if done: |
311 self._event.set() | 329 self._event.set() |
386 | 404 |
387 if res is None: | 405 if res is None: |
388 logger.debug("Result handler exiting.") | 406 logger.debug("Result handler exiting.") |
389 return | 407 return |
390 | 408 |
391 task_type, task_data, success, wid, data = res | 409 task_type, wid, res_data_list = res |
392 try: | 410 for res_data in res_data_list: |
393 if success: | 411 try: |
394 if pool._callback: | 412 task_data, data, success = res_data |
395 pool._callback(task_data, data, userdata) | 413 if success: |
396 else: | 414 if pool._callback: |
397 if task_type == _CRITICAL_WORKER_ERROR: | 415 pool._callback(task_data, data, userdata) |
398 logger.error(data) | |
399 do_continue = pool._onResultHandlerCriticalError(wid) | |
400 if not do_continue: | |
401 logger.debug("Aborting result handling thread.") | |
402 return | |
403 else: | 416 else: |
404 if pool._error_callback: | 417 if task_type == _CRITICAL_WORKER_ERROR: |
405 pool._error_callback(task_data, data, userdata) | 418 logger.error(data) |
419 do_continue = pool._onResultHandlerCriticalError(wid) | |
420 if not do_continue: | |
421 logger.debug("Aborting result handling thread.") | |
422 return | |
406 else: | 423 else: |
407 logger.error( | 424 if pool._error_callback: |
408 "Worker %d failed to process a job:" % wid) | 425 pool._error_callback(task_data, data, userdata) |
409 logger.error(data) | 426 else: |
410 except Exception as ex: | 427 logger.error( |
411 logger.exception(ex) | 428 "Worker %d failed to process a job:" % wid) |
429 logger.error(data) | |
430 except Exception as ex: | |
431 logger.exception(ex) | |
412 | 432 |
413 if task_type == TASK_JOB: | 433 if task_type == TASK_JOB: |
414 pool._onTaskDone() | 434 pool._onTaskDone() |
415 | 435 |
416 | 436 |
520 return marshal.load(buf) | 540 return marshal.load(buf) |
521 | 541 |
522 _pickle = _pickle_marshal | 542 _pickle = _pickle_marshal |
523 _unpickle = _unpickle_marshal | 543 _unpickle = _unpickle_marshal |
524 | 544 |
545 elif use_json: | |
546 import json | |
547 | |
548 class _BufferWrapper: | |
549 def __init__(self, buf): | |
550 self._buf = buf | |
551 | |
552 def write(self, data): | |
553 self._buf.write(data.encode('utf8')) | |
554 | |
555 def read(self): | |
556 return self._buf.read().decode('utf8') | |
557 | |
558 def _pickle_json(obj, buf): | |
559 buf = _BufferWrapper(buf) | |
560 json.dump(obj, buf, indent=None, separators=(',', ':')) | |
561 | |
562 def _unpickle_json(buf, bufsize): | |
563 buf = _BufferWrapper(buf) | |
564 return json.load(buf) | |
565 | |
566 _pickle = _pickle_json | |
567 _unpickle = _unpickle_json | |
568 | |
525 else: | 569 else: |
526 import pickle | 570 import pickle |
527 | 571 |
528 def _pickle_default(obj, buf): | 572 def _pickle_default(obj, buf): |
529 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) | 573 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) |
531 def _unpickle_default(buf, bufsize): | 575 def _unpickle_default(buf, bufsize): |
532 return pickle.load(buf) | 576 return pickle.load(buf) |
533 | 577 |
534 _pickle = _pickle_default | 578 _pickle = _pickle_default |
535 _unpickle = _unpickle_default | 579 _unpickle = _unpickle_default |
536 |