Mercurial > piecrust2
diff piecrust/workerpool.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | 45ad976712ec |
children | 1857dbd4580f |
line wrap: on
line diff
--- a/piecrust/workerpool.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/workerpool.py Sun Nov 19 14:29:17 2017 -0800 @@ -11,11 +11,11 @@ logger = logging.getLogger(__name__) -use_fastqueue = False - +use_fastqueue = True use_fastpickle = False use_msgpack = False use_marshall = False +use_json = False class IWorker(object): @@ -34,17 +34,14 @@ pass -class WorkerExceptionData: - def __init__(self, wid): - super().__init__() - self.wid = wid - t, v, tb = sys.exc_info() - self.type = t - self.value = '\n'.join(_get_errors(v)) - self.traceback = ''.join(traceback.format_exception(t, v, tb)) - - def __str__(self): - return str(self.value) +def _get_worker_exception_data(wid): + t, v, tb = sys.exc_info() + return { + 'wid': wid, + 'type': str(t), + 'value': '\n'.join(_get_errors(v)), + 'traceback': ''.join(traceback.format_exception(t, v, tb)) + } def _get_errors(ex): @@ -57,7 +54,8 @@ TASK_JOB = 0 -TASK_END = 1 +TASK_JOB_BATCH = 1 +TASK_END = 2 _TASK_ABORT_WORKER = 10 _CRITICAL_WORKER_ERROR = 11 @@ -169,22 +167,33 @@ task_type, task_data = task - # Job task... just do it. - if task_type == TASK_JOB: - try: - res = (task_type, task_data, True, wid, w.process(task_data)) - except Exception as e: - logger.debug( - "Error processing job, sending exception to main process:") - logger.debug(traceback.format_exc()) - we = WorkerExceptionData(wid) - res = (task_type, task_data, False, wid, we) + # Job task(s)... just do it. + if task_type == TASK_JOB or task_type == TASK_JOB_BATCH: + + task_data_list = task_data + if task_type == TASK_JOB: + task_data_list = [task_data] + + result_list = [] + for td in task_data_list: + try: + res = w.process(td) + result_list.append((td, res, True)) + except Exception as e: + logger.debug( + "Error processing job, sending exception to main process:") + logger.debug(traceback.format_exc()) + we = _get_worker_exception_data(wid) + res = (td, we, False) + result_list.append((td, res, False)) + + res = (task_type, wid, result_list) put_start_time = time.perf_counter() put(res) time_in_put += (time.perf_counter() - put_start_time) - completed += 1 + completed += len(task_data_list) # End task... gather stats to send back to the main process. elif task_type == TASK_END: @@ -193,13 +202,13 @@ stats.registerTimer('WorkerResultPut', time=time_in_put) try: stats.mergeStats(w.getStats()) - rep = (task_type, task_data, True, wid, (wid, stats)) + rep = (task_type, wid, [(task_data, (wid, stats), True)]) except Exception as e: logger.debug( "Error getting report, sending exception to main process:") logger.debug(traceback.format_exc()) - we = WorkerExceptionData(wid) - rep = (task_type, task_data, False, wid, (wid, we)) + we = _get_worker_exception_data(wid) + rep = (task_type, wid, [(task_data, (wid, we), False)]) put(rep) break @@ -302,8 +311,17 @@ self._jobs_left += new_job_count self._event.clear() - for job in jobs: - self._quick_put((TASK_JOB, job)) + bs = self._batch_size + if not bs: + for job in jobs: + self._quick_put((TASK_JOB, job)) + else: + cur_offset = 0 + while cur_offset < new_job_count: + next_batch_idx = min(cur_offset + bs, new_job_count) + job_batch = jobs[cur_offset:next_batch_idx] + self._quick_put((TASK_JOB_BATCH, job_batch)) + cur_offset = next_batch_idx else: with self._lock_jobs_left: done = (self._jobs_left == 0) @@ -388,27 +406,29 @@ logger.debug("Result handler exiting.") return - task_type, task_data, success, wid, data = res - try: - if success: - if pool._callback: - pool._callback(task_data, data, userdata) - else: - if task_type == _CRITICAL_WORKER_ERROR: - logger.error(data) - do_continue = pool._onResultHandlerCriticalError(wid) - if not do_continue: - logger.debug("Aborting result handling thread.") - return + task_type, wid, res_data_list = res + for res_data in res_data_list: + try: + task_data, data, success = res_data + if success: + if pool._callback: + pool._callback(task_data, data, userdata) else: - if pool._error_callback: - pool._error_callback(task_data, data, userdata) + if task_type == _CRITICAL_WORKER_ERROR: + logger.error(data) + do_continue = pool._onResultHandlerCriticalError(wid) + if not do_continue: + logger.debug("Aborting result handling thread.") + return else: - logger.error( - "Worker %d failed to process a job:" % wid) - logger.error(data) - except Exception as ex: - logger.exception(ex) + if pool._error_callback: + pool._error_callback(task_data, data, userdata) + else: + logger.error( + "Worker %d failed to process a job:" % wid) + logger.error(data) + except Exception as ex: + logger.exception(ex) if task_type == TASK_JOB: pool._onTaskDone() @@ -522,6 +542,30 @@ _pickle = _pickle_marshal _unpickle = _unpickle_marshal +elif use_json: + import json + + class _BufferWrapper: + def __init__(self, buf): + self._buf = buf + + def write(self, data): + self._buf.write(data.encode('utf8')) + + def read(self): + return self._buf.read().decode('utf8') + + def _pickle_json(obj, buf): + buf = _BufferWrapper(buf) + json.dump(obj, buf, indent=None, separators=(',', ':')) + + def _unpickle_json(buf, bufsize): + buf = _BufferWrapper(buf) + return json.load(buf) + + _pickle = _pickle_json + _unpickle = _unpickle_json + else: import pickle @@ -533,4 +577,3 @@ _pickle = _pickle_default _unpickle = _unpickle_default -