Mercurial > piecrust2
changeset 460:55fc8918cb75
bake: Use batched jobs in the worker pool.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 11 Jul 2015 00:45:35 -0700 |
parents | 2ef04e16f0b9 |
children | b015e38d4ee1 |
files | piecrust/workerpool.py |
diffstat | 1 files changed, 37 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/piecrust/workerpool.py Sat Jul 11 00:44:58 2015 -0700 +++ b/piecrust/workerpool.py Sat Jul 11 00:45:35 2015 -0700 @@ -1,6 +1,7 @@ import os import sys import logging +import itertools import threading import multiprocessing from piecrust.fastpickle import pickle, unpickle @@ -21,7 +22,8 @@ TASK_JOB = 0 -TASK_END = 1 +TASK_BATCH = 1 +TASK_END = 2 def worker_func(params): @@ -82,16 +84,21 @@ put(rep) break - task_data = unpickle(task_data) - try: - res = (task_type, True, wid, w.process(task_data)) - except Exception as e: - if params.wrap_exception: - e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__) - res = (task_type, False, wid, e) - put(res) + if task_type == TASK_JOB: + task_data = (task_data,) - completed += 1 + for t in task_data: + td = unpickle(t) + try: + res = (TASK_JOB, True, wid, w.process(td)) + except Exception as e: + if params.wrap_exception: + e = multiprocessing.ExceptionWithTraceback( + e, e.__traceback__) + res = (TASK_JOB, False, wid, e) + put(res) + + completed += 1 logger.debug("Worker %d completed %d tasks." % (wid, completed)) @@ -152,7 +159,7 @@ self._callback = callback self._error_callback = error_callback - def queueJobs(self, jobs, handler=None): + def queueJobs(self, jobs, handler=None, chunk_size=None): if self._closed: raise Exception("This worker pool has been closed.") if self._listener is not None: @@ -166,16 +173,31 @@ if not hasattr(jobs, '__len__'): jobs = list(jobs) + job_count = len(jobs) - res = AsyncResult(self, len(jobs)) + res = AsyncResult(self, job_count) if res._count == 0: res._event.set() return res self._listener = res - for job in jobs: - job_data = pickle(job) - self._quick_put((TASK_JOB, job_data)) + + if chunk_size is None: + chunk_size = max(1, job_count // 50) + logger.debug("Using chunk size of %d" % chunk_size) + + if chunk_size is None or chunk_size == 1: + for job in jobs: + job_data = pickle(job) + self._quick_put((TASK_JOB, job_data)) + else: + it = iter(jobs) + while True: + batch = tuple([pickle(i) + for i in itertools.islice(it, chunk_size)]) + if not batch: + break + self._quick_put((TASK_BATCH, batch)) return res