Mercurial > piecrust2
changeset 691:9ae9390192da
bake: Use standard pickle and queue for now to fix some small issues.
* JSON leads to some problems with integers as keys.
* Add some stats to the baking process.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 21 Mar 2016 22:28:57 -0700 |
parents | f7207f4dab82 |
children | c11a4339fccb |
files | piecrust/baking/baker.py piecrust/baking/records.py piecrust/baking/worker.py piecrust/processing/worker.py piecrust/workerpool.py |
diffstat | 5 files changed, 58 insertions(+), 28 deletions(-) [+] |
line wrap: on
line diff
--- a/piecrust/baking/baker.py Mon Mar 21 19:16:54 2016 -0700 +++ b/piecrust/baking/baker.py Mon Mar 21 22:28:57 2016 -0700 @@ -191,6 +191,7 @@ start_time = time.perf_counter() try: record.current.baked_count[realm] = 0 + record.current.total_baked_count[realm] = 0 all_factories = [] for source in srclist: @@ -203,10 +204,12 @@ self._bakeRealmPages(record, pool, realm, all_factories) finally: page_count = record.current.baked_count[realm] + total_page_count = record.current.total_baked_count[realm] logger.info(format_timed( start_time, - "baked %d %s pages." % - (page_count, REALM_NAMES[realm].lower()))) + "baked %d %s pages (%d total)." % + (page_count, REALM_NAMES[realm].lower(), + total_page_count))) def _loadRealmPages(self, record, pool, factories): def _handler(res): @@ -297,6 +300,7 @@ record.current.success = False if entry.subs and entry.was_any_sub_baked: record.current.baked_count[realm] += 1 + record.current.total_baked_count[realm] += len(entry.subs) logger.debug("Baking %d realm pages..." % len(factories)) with format_timed_scope(logger,
--- a/piecrust/baking/records.py Mon Mar 21 19:16:54 2016 -0700 +++ b/piecrust/baking/records.py Mon Mar 21 22:28:57 2016 -0700 @@ -21,13 +21,14 @@ class BakeRecord(Record): - RECORD_VERSION = 16 + RECORD_VERSION = 17 def __init__(self): super(BakeRecord, self).__init__() self.out_dir = None self.bake_time = None self.baked_count = {} + self.total_baked_count = {} self.success = True
--- a/piecrust/baking/worker.py Mon Mar 21 19:16:54 2016 -0700 +++ b/piecrust/baking/worker.py Mon Mar 21 22:28:57 2016 -0700 @@ -71,10 +71,11 @@ with self.ctx.app.env.timerScope(type(handler).__name__): return handler.handleJob(job['job']) - def getReport(self): + def getReport(self, pool_reports): self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, self.work_start_time) data = self.ctx.app.env.getStats() + data.timers.update(pool_reports) return { 'type': 'stats', 'data': data}
--- a/piecrust/processing/worker.py Mon Mar 21 19:16:54 2016 -0700 +++ b/piecrust/processing/worker.py Mon Mar 21 22:28:57 2016 -0700 @@ -135,7 +135,7 @@ return result - def getReport(self): + def getReport(self, pool_reports): # Invoke post-processors. pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir, self.ctx.tmp_dir, self.ctx.force) @@ -145,6 +145,7 @@ self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid, self.work_start_time) data = self.app.env.getStats() + data.timers.update(pool_reports) return { 'type': 'stats', 'data': data}
--- a/piecrust/workerpool.py Mon Mar 21 19:16:54 2016 -0700 +++ b/piecrust/workerpool.py Mon Mar 21 22:28:57 2016 -0700 @@ -1,11 +1,13 @@ +import io import os import sys +import time import zlib +import pickle import logging import itertools import threading import multiprocessing -from piecrust.fastpickle import pickle, unpickle logger = logging.getLogger(__name__) @@ -18,7 +20,7 @@ def process(self, job): raise NotImplementedError() - def getReport(self): + def getReport(self, pool_reports): return None @@ -72,18 +74,25 @@ put = params.outqueue.put completed = 0 + time_in_get = 0 + time_in_put = 0 while True: + get_start_time = time.perf_counter() try: task = get() except (EOFError, OSError): logger.debug("Worker %d encountered connection problem." % wid) break + time_in_get += (time.perf_counter() - get_start_time) task_type, task_data = task if task_type == TASK_END: logger.debug("Worker %d got end task, exiting." % wid) + wprep = { + 'WorkerTaskGet': time_in_get, + 'WorkerResultPut': time_in_put} try: - rep = (task_type, True, wid, (wid, w.getReport())) + rep = (task_type, True, wid, (wid, w.getReport(wprep))) except Exception as e: logger.debug("Error getting report: %s" % e) if params.wrap_exception: @@ -104,7 +113,10 @@ e = multiprocessing.ExceptionWithTraceback( e, e.__traceback__) res = (TASK_JOB, False, wid, e) + + put_start_time = time.perf_counter() put(res) + time_in_put += (time.perf_counter() - put_start_time) completed += 1 @@ -129,7 +141,7 @@ wrap_exception=False): worker_count = worker_count or os.cpu_count() or 1 - use_fastqueue = True + use_fastqueue = False if use_fastqueue: self._task_queue = FastQueue() self._result_queue = FastQueue() @@ -324,36 +336,47 @@ class FastQueue(object): - def __init__(self, compress=False): + def __init__(self): self._reader, self._writer = multiprocessing.Pipe(duplex=False) self._rlock = multiprocessing.Lock() self._wlock = multiprocessing.Lock() - self._compress = compress + self._rbuf = io.BytesIO() + self._rbuf.truncate(256) + self._wbuf = io.BytesIO() + self._wbuf.truncate(256) def __getstate__(self): - return (self._reader, self._writer, self._rlock, self._wlock, - self._compress) + return (self._reader, self._writer, self._rlock, self._wlock) def __setstate__(self, state): - (self._reader, self._writer, self._rlock, self._wlock, - self._compress) = state + (self._reader, self._writer, self._rlock, self._wlock) = state def get(self): with self._rlock: - raw = self._reader.recv_bytes() - if self._compress: - data = zlib.decompress(raw) - else: - data = raw - obj = unpickle(data) - return obj + try: + with self._rbuf.getbuffer() as b: + self._reader.recv_bytes_into(b) + except multiprocessing.BufferTooShort as e: + self._rbuf.truncate(len(e.args[0]) * 2) + self._rbuf.seek(0) + self._rbuf.write(e.args[0]) + + self._rbuf.seek(0) + return self._unpickle(self._rbuf) def put(self, obj): - data = pickle(obj) - if self._compress: - raw = zlib.compress(data) - else: - raw = data + self._wbuf.seek(0) + self._pickle(obj, self._wbuf) + size = self._wbuf.tell() + + self._wbuf.seek(0) with self._wlock: - self._writer.send_bytes(raw) + with self._wbuf.getbuffer() as b: + self._writer.send_bytes(b, 0, size) + def _pickle(self, obj, buf): + pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) + + def _unpickle(self, buf): + return pickle.load(buf) +