comparison piecrust/workerpool.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents 45ad976712ec
children 1857dbd4580f
comparison
equal deleted inserted replaced
988:f83ae0a5d793 989:8adc27285d93
9 from piecrust.environment import ExecutionStats 9 from piecrust.environment import ExecutionStats
10 10
11 11
12 logger = logging.getLogger(__name__) 12 logger = logging.getLogger(__name__)
13 13
14 use_fastqueue = False 14 use_fastqueue = True
15
16 use_fastpickle = False 15 use_fastpickle = False
17 use_msgpack = False 16 use_msgpack = False
18 use_marshall = False 17 use_marshall = False
18 use_json = False
19 19
20 20
21 class IWorker(object): 21 class IWorker(object):
22 """ Interface for a pool worker. 22 """ Interface for a pool worker.
23 """ 23 """
32 32
33 def shutdown(self): 33 def shutdown(self):
34 pass 34 pass
35 35
36 36
37 class WorkerExceptionData: 37 def _get_worker_exception_data(wid):
38 def __init__(self, wid): 38 t, v, tb = sys.exc_info()
39 super().__init__() 39 return {
40 self.wid = wid 40 'wid': wid,
41 t, v, tb = sys.exc_info() 41 'type': str(t),
42 self.type = t 42 'value': '\n'.join(_get_errors(v)),
43 self.value = '\n'.join(_get_errors(v)) 43 'traceback': ''.join(traceback.format_exception(t, v, tb))
44 self.traceback = ''.join(traceback.format_exception(t, v, tb)) 44 }
45
46 def __str__(self):
47 return str(self.value)
48 45
49 46
50 def _get_errors(ex): 47 def _get_errors(ex):
51 errors = [] 48 errors = []
52 while ex is not None: 49 while ex is not None:
55 ex = ex.__cause__ 52 ex = ex.__cause__
56 return errors 53 return errors
57 54
58 55
59 TASK_JOB = 0 56 TASK_JOB = 0
60 TASK_END = 1 57 TASK_JOB_BATCH = 1
58 TASK_END = 2
61 _TASK_ABORT_WORKER = 10 59 _TASK_ABORT_WORKER = 10
62 _CRITICAL_WORKER_ERROR = 11 60 _CRITICAL_WORKER_ERROR = 11
63 61
64 62
65 def worker_func(params): 63 def worker_func(params):
167 task = get() 165 task = get()
168 time_in_get += (time.perf_counter() - get_start_time) 166 time_in_get += (time.perf_counter() - get_start_time)
169 167
170 task_type, task_data = task 168 task_type, task_data = task
171 169
172 # Job task... just do it. 170 # Job task(s)... just do it.
173 if task_type == TASK_JOB: 171 if task_type == TASK_JOB or task_type == TASK_JOB_BATCH:
174 try: 172
175 res = (task_type, task_data, True, wid, w.process(task_data)) 173 task_data_list = task_data
176 except Exception as e: 174 if task_type == TASK_JOB:
177 logger.debug( 175 task_data_list = [task_data]
178 "Error processing job, sending exception to main process:") 176
179 logger.debug(traceback.format_exc()) 177 result_list = []
180 we = WorkerExceptionData(wid) 178
181 res = (task_type, task_data, False, wid, we) 179 for td in task_data_list:
182 180 try:
181 res = w.process(td)
182 result_list.append((td, res, True))
183 except Exception as e:
184 logger.debug(
185 "Error processing job, sending exception to main process:")
186 logger.debug(traceback.format_exc())
187 we = _get_worker_exception_data(wid)
188 res = (td, we, False)
189 result_list.append((td, res, False))
190
191 res = (task_type, wid, result_list)
183 put_start_time = time.perf_counter() 192 put_start_time = time.perf_counter()
184 put(res) 193 put(res)
185 time_in_put += (time.perf_counter() - put_start_time) 194 time_in_put += (time.perf_counter() - put_start_time)
186 195
187 completed += 1 196 completed += len(task_data_list)
188 197
189 # End task... gather stats to send back to the main process. 198 # End task... gather stats to send back to the main process.
190 elif task_type == TASK_END: 199 elif task_type == TASK_END:
191 logger.debug("Worker %d got end task, exiting." % wid) 200 logger.debug("Worker %d got end task, exiting." % wid)
192 stats.registerTimer('WorkerTaskGet', time=time_in_get) 201 stats.registerTimer('WorkerTaskGet', time=time_in_get)
193 stats.registerTimer('WorkerResultPut', time=time_in_put) 202 stats.registerTimer('WorkerResultPut', time=time_in_put)
194 try: 203 try:
195 stats.mergeStats(w.getStats()) 204 stats.mergeStats(w.getStats())
196 rep = (task_type, task_data, True, wid, (wid, stats)) 205 rep = (task_type, wid, [(task_data, (wid, stats), True)])
197 except Exception as e: 206 except Exception as e:
198 logger.debug( 207 logger.debug(
199 "Error getting report, sending exception to main process:") 208 "Error getting report, sending exception to main process:")
200 logger.debug(traceback.format_exc()) 209 logger.debug(traceback.format_exc())
201 we = WorkerExceptionData(wid) 210 we = _get_worker_exception_data(wid)
202 rep = (task_type, task_data, False, wid, (wid, we)) 211 rep = (task_type, wid, [(task_data, (wid, we), False)])
203 put(rep) 212 put(rep)
204 break 213 break
205 214
206 # Emergy abort. 215 # Emergy abort.
207 elif task_type == _TASK_ABORT_WORKER: 216 elif task_type == _TASK_ABORT_WORKER:
300 if new_job_count > 0: 309 if new_job_count > 0:
301 with self._lock_jobs_left: 310 with self._lock_jobs_left:
302 self._jobs_left += new_job_count 311 self._jobs_left += new_job_count
303 312
304 self._event.clear() 313 self._event.clear()
305 for job in jobs: 314 bs = self._batch_size
306 self._quick_put((TASK_JOB, job)) 315 if not bs:
316 for job in jobs:
317 self._quick_put((TASK_JOB, job))
318 else:
319 cur_offset = 0
320 while cur_offset < new_job_count:
321 next_batch_idx = min(cur_offset + bs, new_job_count)
322 job_batch = jobs[cur_offset:next_batch_idx]
323 self._quick_put((TASK_JOB_BATCH, job_batch))
324 cur_offset = next_batch_idx
307 else: 325 else:
308 with self._lock_jobs_left: 326 with self._lock_jobs_left:
309 done = (self._jobs_left == 0) 327 done = (self._jobs_left == 0)
310 if done: 328 if done:
311 self._event.set() 329 self._event.set()
386 404
387 if res is None: 405 if res is None:
388 logger.debug("Result handler exiting.") 406 logger.debug("Result handler exiting.")
389 return 407 return
390 408
391 task_type, task_data, success, wid, data = res 409 task_type, wid, res_data_list = res
392 try: 410 for res_data in res_data_list:
393 if success: 411 try:
394 if pool._callback: 412 task_data, data, success = res_data
395 pool._callback(task_data, data, userdata) 413 if success:
396 else: 414 if pool._callback:
397 if task_type == _CRITICAL_WORKER_ERROR: 415 pool._callback(task_data, data, userdata)
398 logger.error(data)
399 do_continue = pool._onResultHandlerCriticalError(wid)
400 if not do_continue:
401 logger.debug("Aborting result handling thread.")
402 return
403 else: 416 else:
404 if pool._error_callback: 417 if task_type == _CRITICAL_WORKER_ERROR:
405 pool._error_callback(task_data, data, userdata) 418 logger.error(data)
419 do_continue = pool._onResultHandlerCriticalError(wid)
420 if not do_continue:
421 logger.debug("Aborting result handling thread.")
422 return
406 else: 423 else:
407 logger.error( 424 if pool._error_callback:
408 "Worker %d failed to process a job:" % wid) 425 pool._error_callback(task_data, data, userdata)
409 logger.error(data) 426 else:
410 except Exception as ex: 427 logger.error(
411 logger.exception(ex) 428 "Worker %d failed to process a job:" % wid)
429 logger.error(data)
430 except Exception as ex:
431 logger.exception(ex)
412 432
413 if task_type == TASK_JOB: 433 if task_type == TASK_JOB:
414 pool._onTaskDone() 434 pool._onTaskDone()
415 435
416 436
520 return marshal.load(buf) 540 return marshal.load(buf)
521 541
522 _pickle = _pickle_marshal 542 _pickle = _pickle_marshal
523 _unpickle = _unpickle_marshal 543 _unpickle = _unpickle_marshal
524 544
545 elif use_json:
546 import json
547
548 class _BufferWrapper:
549 def __init__(self, buf):
550 self._buf = buf
551
552 def write(self, data):
553 self._buf.write(data.encode('utf8'))
554
555 def read(self):
556 return self._buf.read().decode('utf8')
557
558 def _pickle_json(obj, buf):
559 buf = _BufferWrapper(buf)
560 json.dump(obj, buf, indent=None, separators=(',', ':'))
561
562 def _unpickle_json(buf, bufsize):
563 buf = _BufferWrapper(buf)
564 return json.load(buf)
565
566 _pickle = _pickle_json
567 _unpickle = _unpickle_json
568
525 else: 569 else:
526 import pickle 570 import pickle
527 571
528 def _pickle_default(obj, buf): 572 def _pickle_default(obj, buf):
529 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) 573 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
531 def _unpickle_default(buf, bufsize): 575 def _unpickle_default(buf, bufsize):
532 return pickle.load(buf) 576 return pickle.load(buf)
533 577
534 _pickle = _pickle_default 578 _pickle = _pickle_default
535 _unpickle = _unpickle_default 579 _unpickle = _unpickle_default
536