changeset 691:9ae9390192da

bake: Use standard pickle and queue for now to fix some small issues. * JSON leads to some problems with integers as keys. * Add some stats to the baking process.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 21 Mar 2016 22:28:57 -0700
parents f7207f4dab82
children c11a4339fccb
files piecrust/baking/baker.py piecrust/baking/records.py piecrust/baking/worker.py piecrust/processing/worker.py piecrust/workerpool.py
diffstat 5 files changed, 58 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Mon Mar 21 19:16:54 2016 -0700
+++ b/piecrust/baking/baker.py	Mon Mar 21 22:28:57 2016 -0700
@@ -191,6 +191,7 @@
         start_time = time.perf_counter()
         try:
             record.current.baked_count[realm] = 0
+            record.current.total_baked_count[realm] = 0
 
             all_factories = []
             for source in srclist:
@@ -203,10 +204,12 @@
             self._bakeRealmPages(record, pool, realm, all_factories)
         finally:
             page_count = record.current.baked_count[realm]
+            total_page_count = record.current.total_baked_count[realm]
             logger.info(format_timed(
                     start_time,
-                    "baked %d %s pages." %
-                    (page_count, REALM_NAMES[realm].lower())))
+                    "baked %d %s pages (%d total)." %
+                    (page_count, REALM_NAMES[realm].lower(),
+                        total_page_count)))
 
     def _loadRealmPages(self, record, pool, factories):
         def _handler(res):
@@ -297,6 +300,7 @@
                 record.current.success = False
             if entry.subs and entry.was_any_sub_baked:
                 record.current.baked_count[realm] += 1
+                record.current.total_baked_count[realm] += len(entry.subs)
 
         logger.debug("Baking %d realm pages..." % len(factories))
         with format_timed_scope(logger,
--- a/piecrust/baking/records.py	Mon Mar 21 19:16:54 2016 -0700
+++ b/piecrust/baking/records.py	Mon Mar 21 22:28:57 2016 -0700
@@ -21,13 +21,14 @@
 
 
 class BakeRecord(Record):
-    RECORD_VERSION = 16
+    RECORD_VERSION = 17
 
     def __init__(self):
         super(BakeRecord, self).__init__()
         self.out_dir = None
         self.bake_time = None
         self.baked_count = {}
+        self.total_baked_count = {}
         self.success = True
 
 
--- a/piecrust/baking/worker.py	Mon Mar 21 19:16:54 2016 -0700
+++ b/piecrust/baking/worker.py	Mon Mar 21 22:28:57 2016 -0700
@@ -71,10 +71,11 @@
         with self.ctx.app.env.timerScope(type(handler).__name__):
             return handler.handleJob(job['job'])
 
-    def getReport(self):
+    def getReport(self, pool_reports):
         self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
                                         self.work_start_time)
         data = self.ctx.app.env.getStats()
+        data.timers.update(pool_reports)
         return {
                 'type': 'stats',
                 'data': data}
--- a/piecrust/processing/worker.py	Mon Mar 21 19:16:54 2016 -0700
+++ b/piecrust/processing/worker.py	Mon Mar 21 22:28:57 2016 -0700
@@ -135,7 +135,7 @@
 
         return result
 
-    def getReport(self):
+    def getReport(self, pool_reports):
         # Invoke post-processors.
         pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
                                        self.ctx.tmp_dir, self.ctx.force)
@@ -145,6 +145,7 @@
         self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid,
                                     self.work_start_time)
         data = self.app.env.getStats()
+        data.timers.update(pool_reports)
         return {
                 'type': 'stats',
                 'data': data}
--- a/piecrust/workerpool.py	Mon Mar 21 19:16:54 2016 -0700
+++ b/piecrust/workerpool.py	Mon Mar 21 22:28:57 2016 -0700
@@ -1,11 +1,13 @@
+import io
 import os
 import sys
+import time
 import zlib
+import pickle
 import logging
 import itertools
 import threading
 import multiprocessing
-from piecrust.fastpickle import pickle, unpickle
 
 
 logger = logging.getLogger(__name__)
@@ -18,7 +20,7 @@
     def process(self, job):
         raise NotImplementedError()
 
-    def getReport(self):
+    def getReport(self, pool_reports):
         return None
 
 
@@ -72,18 +74,25 @@
     put = params.outqueue.put
 
     completed = 0
+    time_in_get = 0
+    time_in_put = 0
     while True:
+        get_start_time = time.perf_counter()
         try:
             task = get()
         except (EOFError, OSError):
             logger.debug("Worker %d encountered connection problem." % wid)
             break
+        time_in_get += (time.perf_counter() - get_start_time)
 
         task_type, task_data = task
         if task_type == TASK_END:
             logger.debug("Worker %d got end task, exiting." % wid)
+            wprep = {
+                    'WorkerTaskGet': time_in_get,
+                    'WorkerResultPut': time_in_put}
             try:
-                rep = (task_type, True, wid, (wid, w.getReport()))
+                rep = (task_type, True, wid, (wid, w.getReport(wprep)))
             except Exception as e:
                 logger.debug("Error getting report: %s" % e)
                 if params.wrap_exception:
@@ -104,7 +113,10 @@
                     e = multiprocessing.ExceptionWithTraceback(
                             e, e.__traceback__)
                 res = (TASK_JOB, False, wid, e)
+
+            put_start_time = time.perf_counter()
             put(res)
+            time_in_put += (time.perf_counter() - put_start_time)
 
             completed += 1
 
@@ -129,7 +141,7 @@
                  wrap_exception=False):
         worker_count = worker_count or os.cpu_count() or 1
 
-        use_fastqueue = True
+        use_fastqueue = False
         if use_fastqueue:
             self._task_queue = FastQueue()
             self._result_queue = FastQueue()
@@ -324,36 +336,47 @@
 
 
 class FastQueue(object):
-    def __init__(self, compress=False):
+    def __init__(self):
         self._reader, self._writer = multiprocessing.Pipe(duplex=False)
         self._rlock = multiprocessing.Lock()
         self._wlock = multiprocessing.Lock()
-        self._compress = compress
+        self._rbuf = io.BytesIO()
+        self._rbuf.truncate(256)
+        self._wbuf = io.BytesIO()
+        self._wbuf.truncate(256)
 
     def __getstate__(self):
-        return (self._reader, self._writer, self._rlock, self._wlock,
-                self._compress)
+        return (self._reader, self._writer, self._rlock, self._wlock)
 
     def __setstate__(self, state):
-        (self._reader, self._writer, self._rlock, self._wlock,
-            self._compress) = state
+        (self._reader, self._writer, self._rlock, self._wlock) = state
 
     def get(self):
         with self._rlock:
-            raw = self._reader.recv_bytes()
-        if self._compress:
-            data = zlib.decompress(raw)
-        else:
-            data = raw
-        obj = unpickle(data)
-        return obj
+            try:
+                with self._rbuf.getbuffer() as b:
+                    self._reader.recv_bytes_into(b)
+            except multiprocessing.BufferTooShort as e:
+                self._rbuf.truncate(len(e.args[0]) * 2)
+                self._rbuf.seek(0)
+                self._rbuf.write(e.args[0])
+
+        self._rbuf.seek(0)
+        return self._unpickle(self._rbuf)
 
     def put(self, obj):
-        data = pickle(obj)
-        if self._compress:
-            raw = zlib.compress(data)
-        else:
-            raw = data
+        self._wbuf.seek(0)
+        self._pickle(obj, self._wbuf)
+        size = self._wbuf.tell()
+
+        self._wbuf.seek(0)
         with self._wlock:
-            self._writer.send_bytes(raw)
+            with self._wbuf.getbuffer() as b:
+                self._writer.send_bytes(b, 0, size)
 
+    def _pickle(self, obj, buf):
+        pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
+
+    def _unpickle(self, buf):
+        return pickle.load(buf)
+