changeset 451:838f3964f400

bake: Optimize the bake by not using custom classes for passing info. See previous changeset about pickling performance between processes. Now just use plain standard structures, or the new `fastpickle` when needed.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 06 Jul 2015 21:30:49 -0700
parents 298f8f46432a
children 55026b7bb1bf
files piecrust/baking/baker.py piecrust/baking/worker.py piecrust/workerpool.py
diffstat 3 files changed, 89 insertions(+), 111 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Mon Jul 06 21:29:17 2015 -0700
+++ b/piecrust/baking/baker.py	Mon Jul 06 21:30:49 2015 -0700
@@ -1,15 +1,12 @@
-import copy
 import time
 import os.path
-import queue
 import hashlib
 import logging
 import multiprocessing
 from piecrust.baking.records import (
         BakeRecordEntry, TransitionalBakeRecord, TaxonomyInfo)
 from piecrust.baking.worker import (
-        BakeWorkerJob, LoadJobPayload, RenderFirstSubJobPayload,
-        BakeJobPayload,
+        save_factory,
         JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE)
 from piecrust.chefutil import (
         format_timed_scope, format_timed)
@@ -212,12 +209,15 @@
     def _loadRealmPages(self, record, pool, factories):
         def _handler(res):
             # Create the record entry for this page.
-            record_entry = BakeRecordEntry(res.source_name, res.path)
-            record_entry.config = res.config
-            if res.errors:
-                record_entry.errors += res.errors
+            # This will also update the `dirty_source_names` for the record
+            # as we add page files whose last modification times are later
+            # than the last bake.
+            record_entry = BakeRecordEntry(res['source_name'], res['path'])
+            record_entry.config = res['config']
+            if res['errors']:
+                record_entry.errors += res['errors']
                 record.current.success = False
-                self._logErrors(res.path, res.errors)
+                self._logErrors(res['path'], res['errors'])
             record.addEntry(record_entry)
 
         logger.debug("Loading %d realm pages..." % len(factories))
@@ -226,19 +226,22 @@
                                 level=logging.DEBUG, colored=False,
                                 timer_env=self.app.env,
                                 timer_category='LoadJob'):
-            jobs = [
-                BakeWorkerJob(JOB_LOAD, LoadJobPayload(fac))
-                for fac in factories]
+            jobs = []
+            for fac in factories:
+                job = {
+                        'type': JOB_LOAD,
+                        'job': save_factory(fac)}
+                jobs.append(job)
             ar = pool.queueJobs(jobs, handler=_handler)
             ar.wait()
 
     def _renderRealmPages(self, record, pool, factories):
         def _handler(res):
-            entry = record.getCurrentEntry(res.path)
-            if res.errors:
-                entry.errors += res.errors
+            entry = record.getCurrentEntry(res['path'])
+            if res['errors']:
+                entry.errors += res['errors']
                 record.current.success = False
-                self._logErrors(res.path, res.errors)
+                self._logErrors(res['path'], res['errors'])
 
         logger.debug("Rendering %d realm pages..." % len(factories))
         with format_timed_scope(logger,
@@ -273,9 +276,9 @@
                     continue
 
                 # All good, queue the job.
-                job = BakeWorkerJob(
-                        JOB_RENDER_FIRST,
-                        RenderFirstSubJobPayload(fac))
+                job = {
+                        'type': JOB_RENDER_FIRST,
+                        'job': save_factory(fac)}
                 jobs.append(job)
 
             ar = pool.queueJobs(jobs, handler=_handler)
@@ -283,16 +286,15 @@
 
     def _bakeRealmPages(self, record, pool, realm, factories):
         def _handler(res):
-            entry = record.getCurrentEntry(res.path, res.taxonomy_info)
-            entry.subs = res.sub_entries
-            if res.errors:
-                entry.errors += res.errors
-                self._logErrors(res.path, res.errors)
+            entry = record.getCurrentEntry(res['path'], res['taxonomy_info'])
+            entry.subs = res['sub_entries']
+            if res['errors']:
+                entry.errors += res['errors']
+                self._logErrors(res['path'], res['errors'])
             if entry.has_any_error:
                 record.current.success = False
-            if entry.was_any_sub_baked:
+            if entry.subs and entry.was_any_sub_baked:
                 record.current.baked_count[realm] += 1
-                record.dirty_source_names.add(entry.source_name)
 
         logger.debug("Baking %d realm pages..." % len(factories))
         with format_timed_scope(logger,
@@ -388,10 +390,10 @@
 
     def _bakeTaxonomyBuckets(self, record, pool, buckets):
         def _handler(res):
-            entry = record.getCurrentEntry(res.path, res.taxonomy_info)
-            entry.subs = res.sub_entries
-            if res.errors:
-                entry.errors += res.errors
+            entry = record.getCurrentEntry(res['path'], res['taxonomy_info'])
+            entry.subs = res['sub_entries']
+            if res['errors']:
+                entry.errors += res['errors']
             if entry.has_any_error:
                 record.current.success = False
 
@@ -503,11 +505,16 @@
             cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN
             return None
 
-        job = BakeWorkerJob(
-                JOB_BAKE,
-                BakeJobPayload(fac, route_metadata, prev_entry,
-                               record.dirty_source_names,
-                               tax_info))
+        job = {
+                'type': JOB_BAKE,
+                'job': {
+                        'factory_info': save_factory(fac),
+                        'taxonomy_info': tax_info,
+                        'route_metadata': route_metadata,
+                        'prev_entry': prev_entry,
+                        'dirty_source_names': record.dirty_source_names
+                        }
+                }
         return job
 
     def _handleDeletetions(self, record):
--- a/piecrust/baking/worker.py	Mon Jul 06 21:29:17 2015 -0700
+++ b/piecrust/baking/worker.py	Mon Jul 06 21:30:49 2015 -0700
@@ -49,9 +49,9 @@
         app.env.stepTimerSince("BakeWorkerInit", self.work_start_time)
 
     def process(self, job):
-        handler = self.job_handlers[job.job_type]
+        handler = self.job_handlers[job['type']]
         with self.app.env.timerScope(type(handler).__name__):
-            return handler.handleJob(job)
+            return handler.handleJob(job['job'])
 
     def getReport(self):
         self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
@@ -64,12 +64,6 @@
 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)
 
 
-class BakeWorkerJob(object):
-    def __init__(self, job_type, payload):
-        self.job_type = job_type
-        self.payload = payload
-
-
 class JobHandler(object):
     def __init__(self, app, ctx):
         self.app = app
@@ -87,72 +81,35 @@
     return errors
 
 
-class PageFactoryInfo(object):
-    def __init__(self, fac):
-        self.source_name = fac.source.name
-        self.rel_path = fac.rel_path
-        self.metadata = fac.metadata
-
-    def build(self, app):
-        source = app.getSource(self.source_name)
-        return PageFactory(source, self.rel_path, self.metadata)
-
-
-class LoadJobPayload(object):
-    def __init__(self, fac):
-        self.factory_info = PageFactoryInfo(fac)
-
-
-class LoadJobResult(object):
-    def __init__(self, source_name, path):
-        self.source_name = source_name
-        self.path = path
-        self.config = None
-        self.errors = None
+def save_factory(fac):
+    return {
+            'source_name': fac.source.name,
+            'rel_path': fac.rel_path,
+            'metadata': fac.metadata}
 
 
-class RenderFirstSubJobPayload(object):
-    def __init__(self, fac):
-        self.factory_info = PageFactoryInfo(fac)
-
-
-class RenderFirstSubJobResult(object):
-    def __init__(self, path):
-        self.path = path
-        self.errors = None
-
-
-class BakeJobPayload(object):
-    def __init__(self, fac, route_metadata, previous_entry,
-                 dirty_source_names, tax_info=None):
-        self.factory_info = PageFactoryInfo(fac)
-        self.route_metadata = route_metadata
-        self.previous_entry = previous_entry
-        self.dirty_source_names = dirty_source_names
-        self.taxonomy_info = tax_info
-
-
-class BakeJobResult(object):
-    def __init__(self, path, tax_info=None):
-        self.path = path
-        self.taxonomy_info = tax_info
-        self.sub_entries = None
-        self.errors = None
+def load_factory(app, info):
+    source = app.getSource(info['source_name'])
+    return PageFactory(source, info['rel_path'], info['metadata'])
 
 
 class LoadJobHandler(JobHandler):
     def handleJob(self, job):
         # Just make sure the page has been cached.
-        fac = job.payload.factory_info.build(self.app)
+        fac = load_factory(self.app, job)
         logger.debug("Loading page: %s" % fac.ref_spec)
-        result = LoadJobResult(fac.source.name, fac.path)
+        result = {
+                'source_name': fac.source.name,
+                'path': fac.path,
+                'config': None,
+                'errors': None}
         try:
             page = fac.buildPage()
             page._load()
-            result.config = page.config.getAll()
+            result['config'] = page.config.getAll()
         except Exception as ex:
             logger.debug("Got loading error. Sending it to master.")
-            result.errors = _get_errors(ex)
+            result['errors'] = _get_errors(ex)
             if self.ctx.debug:
                 logger.exception(ex)
         return result
@@ -161,7 +118,7 @@
 class RenderFirstSubJobHandler(JobHandler):
     def handleJob(self, job):
         # Render the segments for the first sub-page of this page.
-        fac = job.payload.factory_info.build(self.app)
+        fac = load_factory(self.app, job)
 
         # These things should be OK as they're checked upstream by the baker.
         route = self.app.getRoute(fac.source.name, fac.metadata,
@@ -173,13 +130,15 @@
         qp = QualifiedPage(page, route, route_metadata)
         ctx = PageRenderingContext(qp)
 
-        result = RenderFirstSubJobResult(fac.path)
+        result = {
+                'path': fac.path,
+                'errors': None}
         logger.debug("Preparing page: %s" % fac.ref_spec)
         try:
             render_page_segments(ctx)
         except Exception as ex:
             logger.debug("Got rendering error. Sending it to master.")
-            result.errors = _get_errors(ex)
+            result['errors'] = _get_errors(ex)
             if self.ctx.debug:
                 logger.exception(ex)
         return result
@@ -192,10 +151,10 @@
 
     def handleJob(self, job):
         # Actually bake the page and all its sub-pages to the output folder.
-        fac = job.payload.factory_info.build(self.app)
+        fac = load_factory(self.app, job['factory_info'])
 
-        route_metadata = job.payload.route_metadata
-        tax_info = job.payload.taxonomy_info
+        route_metadata = job['route_metadata']
+        tax_info = job['taxonomy_info']
         if tax_info is not None:
             route = self.app.getTaxonomyRoute(tax_info.taxonomy_name,
                                               tax_info.source_name)
@@ -207,18 +166,22 @@
         page = fac.buildPage()
         qp = QualifiedPage(page, route, route_metadata)
 
-        result = BakeJobResult(fac.path, tax_info)
-        previous_entry = job.payload.previous_entry
-        dirty_source_names = job.payload.dirty_source_names
+        result = {
+                'path': fac.path,
+                'taxonomy_info': tax_info,
+                'sub_entries': None,
+                'errors': None}
+        previous_entry = job['prev_entry']
+        dirty_source_names = job['dirty_source_names']
         logger.debug("Baking page: %s" % fac.ref_spec)
         try:
             sub_entries = self.page_baker.bake(
                     qp, previous_entry, dirty_source_names, tax_info)
-            result.sub_entries = sub_entries
+            result['sub_entries'] = sub_entries
 
         except BakingError as ex:
             logger.debug("Got baking error. Sending it to master.")
-            result.errors = _get_errors(ex)
+            result['errors'] = _get_errors(ex)
             if self.ctx.debug:
                 logger.exception(ex)
 
--- a/piecrust/workerpool.py	Mon Jul 06 21:29:17 2015 -0700
+++ b/piecrust/workerpool.py	Mon Jul 06 21:30:49 2015 -0700
@@ -3,6 +3,7 @@
 import logging
 import threading
 import multiprocessing
+from piecrust.fastpickle import pickle, unpickle
 
 
 logger = logging.getLogger(__name__)
@@ -75,6 +76,7 @@
             put(rep)
             break
 
+        task_data = unpickle(task_data)
         try:
             res = (task_type, True, wid, w.process(task_data))
         except Exception as e:
@@ -101,7 +103,8 @@
 
 
 class WorkerPool(object):
-    def __init__(self, worker_class, worker_count=None, initargs=()):
+    def __init__(self, worker_class, worker_count=None, initargs=(),
+                 wrap_exception=False):
         worker_count = worker_count or os.cpu_count() or 1
 
         self._task_queue = multiprocessing.SimpleQueue()
@@ -122,6 +125,7 @@
             worker_params = _WorkerParams(
                     i, self._task_queue, self._result_queue,
                     worker_class, initargs,
+                    wrap_exception=wrap_exception,
                     is_profiling=is_profiling)
             w = multiprocessing.Process(target=worker_func,
                                         args=(worker_params,))
@@ -161,7 +165,8 @@
 
         self._listener = res
         for job in jobs:
-            self._quick_put((TASK_JOB, job))
+            job_data = pickle(job)
+            self._quick_put((TASK_JOB, job_data))
 
         return res
 
@@ -209,8 +214,11 @@
             try:
                 if success and pool._callback:
                     pool._callback(data)
-                elif not success and pool._error_callback:
-                    pool._error_callback(data)
+                elif not success:
+                    if pool._error_callback:
+                        pool._error_callback(data)
+                    else:
+                        logger.error(data)
             except Exception as ex:
                 logger.exception(ex)