changeset 460:55fc8918cb75

bake: Use batched jobs in the worker pool.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 00:45:35 -0700
parents 2ef04e16f0b9
children b015e38d4ee1
files piecrust/workerpool.py
diffstat 1 files changed, 37 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/workerpool.py	Sat Jul 11 00:44:58 2015 -0700
+++ b/piecrust/workerpool.py	Sat Jul 11 00:45:35 2015 -0700
@@ -1,6 +1,7 @@
 import os
 import sys
 import logging
+import itertools
 import threading
 import multiprocessing
 from piecrust.fastpickle import pickle, unpickle
@@ -21,7 +22,8 @@
 
 
 TASK_JOB = 0
-TASK_END = 1
+TASK_BATCH = 1
+TASK_END = 2
 
 
 def worker_func(params):
@@ -82,16 +84,21 @@
             put(rep)
             break
 
-        task_data = unpickle(task_data)
-        try:
-            res = (task_type, True, wid, w.process(task_data))
-        except Exception as e:
-            if params.wrap_exception:
-                e = multiprocessing.ExceptionWithTraceback(e, e.__traceback__)
-            res = (task_type, False, wid, e)
-        put(res)
+        if task_type == TASK_JOB:
+            task_data = (task_data,)
 
-        completed += 1
+        for t in task_data:
+            td = unpickle(t)
+            try:
+                res = (TASK_JOB, True, wid, w.process(td))
+            except Exception as e:
+                if params.wrap_exception:
+                    e = multiprocessing.ExceptionWithTraceback(
+                            e, e.__traceback__)
+                res = (TASK_JOB, False, wid, e)
+            put(res)
+
+            completed += 1
 
     logger.debug("Worker %d completed %d tasks." % (wid, completed))
 
@@ -152,7 +159,7 @@
         self._callback = callback
         self._error_callback = error_callback
 
-    def queueJobs(self, jobs, handler=None):
+    def queueJobs(self, jobs, handler=None, chunk_size=None):
         if self._closed:
             raise Exception("This worker pool has been closed.")
         if self._listener is not None:
@@ -166,16 +173,31 @@
 
         if not hasattr(jobs, '__len__'):
             jobs = list(jobs)
+        job_count = len(jobs)
 
-        res = AsyncResult(self, len(jobs))
+        res = AsyncResult(self, job_count)
         if res._count == 0:
             res._event.set()
             return res
 
         self._listener = res
-        for job in jobs:
-            job_data = pickle(job)
-            self._quick_put((TASK_JOB, job_data))
+
+        if chunk_size is None:
+            chunk_size = max(1, job_count // 50)
+            logger.debug("Using chunk size of %d" % chunk_size)
+
+        if chunk_size is None or chunk_size == 1:
+            for job in jobs:
+                job_data = pickle(job)
+                self._quick_put((TASK_JOB, job_data))
+        else:
+            it = iter(jobs)
+            while True:
+                batch = tuple([pickle(i)
+                               for i in itertools.islice(it, chunk_size)])
+                if not batch:
+                    break
+                self._quick_put((TASK_BATCH, batch))
 
         return res