diff piecrust/workerpool.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents 45ad976712ec
children 1857dbd4580f
line wrap: on
line diff
--- a/piecrust/workerpool.py	Fri Nov 03 23:14:56 2017 -0700
+++ b/piecrust/workerpool.py	Sun Nov 19 14:29:17 2017 -0800
@@ -11,11 +11,11 @@
 
 logger = logging.getLogger(__name__)
 
-use_fastqueue = False
-
+use_fastqueue = True
 use_fastpickle = False
 use_msgpack = False
 use_marshall = False
+use_json = False
 
 
 class IWorker(object):
@@ -34,17 +34,14 @@
         pass
 
 
-class WorkerExceptionData:
-    def __init__(self, wid):
-        super().__init__()
-        self.wid = wid
-        t, v, tb = sys.exc_info()
-        self.type = t
-        self.value = '\n'.join(_get_errors(v))
-        self.traceback = ''.join(traceback.format_exception(t, v, tb))
-
-    def __str__(self):
-        return str(self.value)
+def _get_worker_exception_data(wid):
+    t, v, tb = sys.exc_info()
+    return {
+        'wid': wid,
+        'type': str(t),
+        'value': '\n'.join(_get_errors(v)),
+        'traceback': ''.join(traceback.format_exception(t, v, tb))
+    }
 
 
 def _get_errors(ex):
@@ -57,7 +54,8 @@
 
 
 TASK_JOB = 0
-TASK_END = 1
+TASK_JOB_BATCH = 1
+TASK_END = 2
 _TASK_ABORT_WORKER = 10
 _CRITICAL_WORKER_ERROR = 11
 
@@ -169,22 +167,33 @@
 
         task_type, task_data = task
 
-        # Job task... just do it.
-        if task_type == TASK_JOB:
-            try:
-                res = (task_type, task_data, True, wid, w.process(task_data))
-            except Exception as e:
-                logger.debug(
-                    "Error processing job, sending exception to main process:")
-                logger.debug(traceback.format_exc())
-                we = WorkerExceptionData(wid)
-                res = (task_type, task_data, False, wid, we)
+        # Job task(s)... just do it.
+        if task_type == TASK_JOB or task_type == TASK_JOB_BATCH:
+
+            task_data_list = task_data
+            if task_type == TASK_JOB:
+                task_data_list = [task_data]
+
+            result_list = []
 
+            for td in task_data_list:
+                try:
+                    res = w.process(td)
+                    result_list.append((td, res, True))
+                except Exception as e:
+                    logger.debug(
+                        "Error processing job, sending exception to main process:")
+                    logger.debug(traceback.format_exc())
+                    we = _get_worker_exception_data(wid)
+                    res = (td, we, False)
+                    result_list.append((td, res, False))
+
+            res = (task_type, wid, result_list)
             put_start_time = time.perf_counter()
             put(res)
             time_in_put += (time.perf_counter() - put_start_time)
 
-            completed += 1
+            completed += len(task_data_list)
 
         # End task... gather stats to send back to the main process.
         elif task_type == TASK_END:
@@ -193,13 +202,13 @@
             stats.registerTimer('WorkerResultPut', time=time_in_put)
             try:
                 stats.mergeStats(w.getStats())
-                rep = (task_type, task_data, True, wid, (wid, stats))
+                rep = (task_type, wid, [(task_data, (wid, stats), True)])
             except Exception as e:
                 logger.debug(
                     "Error getting report, sending exception to main process:")
                 logger.debug(traceback.format_exc())
-                we = WorkerExceptionData(wid)
-                rep = (task_type, task_data, False, wid, (wid, we))
+                we = _get_worker_exception_data(wid)
+                rep = (task_type, wid, [(task_data, (wid, we), False)])
             put(rep)
             break
 
@@ -302,8 +311,17 @@
                 self._jobs_left += new_job_count
 
             self._event.clear()
-            for job in jobs:
-                self._quick_put((TASK_JOB, job))
+            bs = self._batch_size
+            if not bs:
+                for job in jobs:
+                    self._quick_put((TASK_JOB, job))
+            else:
+                cur_offset = 0
+                while cur_offset < new_job_count:
+                    next_batch_idx = min(cur_offset + bs, new_job_count)
+                    job_batch = jobs[cur_offset:next_batch_idx]
+                    self._quick_put((TASK_JOB_BATCH, job_batch))
+                    cur_offset = next_batch_idx
         else:
             with self._lock_jobs_left:
                 done = (self._jobs_left == 0)
@@ -388,27 +406,29 @@
                 logger.debug("Result handler exiting.")
                 return
 
-            task_type, task_data, success, wid, data = res
-            try:
-                if success:
-                    if pool._callback:
-                        pool._callback(task_data, data, userdata)
-                else:
-                    if task_type == _CRITICAL_WORKER_ERROR:
-                        logger.error(data)
-                        do_continue = pool._onResultHandlerCriticalError(wid)
-                        if not do_continue:
-                            logger.debug("Aborting result handling thread.")
-                            return
+            task_type, wid, res_data_list = res
+            for res_data in res_data_list:
+                try:
+                    task_data, data, success = res_data
+                    if success:
+                        if pool._callback:
+                            pool._callback(task_data, data, userdata)
                     else:
-                        if pool._error_callback:
-                            pool._error_callback(task_data, data, userdata)
+                        if task_type == _CRITICAL_WORKER_ERROR:
+                            logger.error(data)
+                            do_continue = pool._onResultHandlerCriticalError(wid)
+                            if not do_continue:
+                                logger.debug("Aborting result handling thread.")
+                                return
                         else:
-                            logger.error(
-                                "Worker %d failed to process a job:" % wid)
-                            logger.error(data)
-            except Exception as ex:
-                logger.exception(ex)
+                            if pool._error_callback:
+                                pool._error_callback(task_data, data, userdata)
+                            else:
+                                logger.error(
+                                    "Worker %d failed to process a job:" % wid)
+                                logger.error(data)
+                except Exception as ex:
+                    logger.exception(ex)
 
             if task_type == TASK_JOB:
                 pool._onTaskDone()
@@ -522,6 +542,30 @@
     _pickle = _pickle_marshal
     _unpickle = _unpickle_marshal
 
+elif use_json:
+    import json
+
+    class _BufferWrapper:
+        def __init__(self, buf):
+            self._buf = buf
+
+        def write(self, data):
+            self._buf.write(data.encode('utf8'))
+
+        def read(self):
+            return self._buf.read().decode('utf8')
+
+    def _pickle_json(obj, buf):
+        buf = _BufferWrapper(buf)
+        json.dump(obj, buf, indent=None, separators=(',', ':'))
+
+    def _unpickle_json(buf, bufsize):
+        buf = _BufferWrapper(buf)
+        return json.load(buf)
+
+    _pickle = _pickle_json
+    _unpickle = _unpickle_json
+
 else:
     import pickle
 
@@ -533,4 +577,3 @@
 
     _pickle = _pickle_default
     _unpickle = _unpickle_default
-