# HG changeset patch # User Ludovic Chabant # Date 1436243449 25200 # Node ID 838f3964f400bffe024f2819ca491c3101ffced2 # Parent 298f8f46432a3357c9a81095681e869084ec4a75 bake: Optimize the bake by not using custom classes for passing info. See previous changeset about pickling performance between processes. Now just use plain standard structures, or the new `fastpickle` when needed. diff -r 298f8f46432a -r 838f3964f400 piecrust/baking/baker.py --- a/piecrust/baking/baker.py Mon Jul 06 21:29:17 2015 -0700 +++ b/piecrust/baking/baker.py Mon Jul 06 21:30:49 2015 -0700 @@ -1,15 +1,12 @@ -import copy import time import os.path -import queue import hashlib import logging import multiprocessing from piecrust.baking.records import ( BakeRecordEntry, TransitionalBakeRecord, TaxonomyInfo) from piecrust.baking.worker import ( - BakeWorkerJob, LoadJobPayload, RenderFirstSubJobPayload, - BakeJobPayload, + save_factory, JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE) from piecrust.chefutil import ( format_timed_scope, format_timed) @@ -212,12 +209,15 @@ def _loadRealmPages(self, record, pool, factories): def _handler(res): # Create the record entry for this page. - record_entry = BakeRecordEntry(res.source_name, res.path) - record_entry.config = res.config - if res.errors: - record_entry.errors += res.errors + # This will also update the `dirty_source_names` for the record + # as we add page files whose last modification times are later + # than the last bake. + record_entry = BakeRecordEntry(res['source_name'], res['path']) + record_entry.config = res['config'] + if res['errors']: + record_entry.errors += res['errors'] record.current.success = False - self._logErrors(res.path, res.errors) + self._logErrors(res['path'], res['errors']) record.addEntry(record_entry) logger.debug("Loading %d realm pages..." % len(factories)) @@ -226,19 +226,22 @@ level=logging.DEBUG, colored=False, timer_env=self.app.env, timer_category='LoadJob'): - jobs = [ - BakeWorkerJob(JOB_LOAD, LoadJobPayload(fac)) - for fac in factories] + jobs = [] + for fac in factories: + job = { + 'type': JOB_LOAD, + 'job': save_factory(fac)} + jobs.append(job) ar = pool.queueJobs(jobs, handler=_handler) ar.wait() def _renderRealmPages(self, record, pool, factories): def _handler(res): - entry = record.getCurrentEntry(res.path) - if res.errors: - entry.errors += res.errors + entry = record.getCurrentEntry(res['path']) + if res['errors']: + entry.errors += res['errors'] record.current.success = False - self._logErrors(res.path, res.errors) + self._logErrors(res['path'], res['errors']) logger.debug("Rendering %d realm pages..." % len(factories)) with format_timed_scope(logger, @@ -273,9 +276,9 @@ continue # All good, queue the job. - job = BakeWorkerJob( - JOB_RENDER_FIRST, - RenderFirstSubJobPayload(fac)) + job = { + 'type': JOB_RENDER_FIRST, + 'job': save_factory(fac)} jobs.append(job) ar = pool.queueJobs(jobs, handler=_handler) @@ -283,16 +286,15 @@ def _bakeRealmPages(self, record, pool, realm, factories): def _handler(res): - entry = record.getCurrentEntry(res.path, res.taxonomy_info) - entry.subs = res.sub_entries - if res.errors: - entry.errors += res.errors - self._logErrors(res.path, res.errors) + entry = record.getCurrentEntry(res['path'], res['taxonomy_info']) + entry.subs = res['sub_entries'] + if res['errors']: + entry.errors += res['errors'] + self._logErrors(res['path'], res['errors']) if entry.has_any_error: record.current.success = False - if entry.was_any_sub_baked: + if entry.subs and entry.was_any_sub_baked: record.current.baked_count[realm] += 1 - record.dirty_source_names.add(entry.source_name) logger.debug("Baking %d realm pages..." % len(factories)) with format_timed_scope(logger, @@ -388,10 +390,10 @@ def _bakeTaxonomyBuckets(self, record, pool, buckets): def _handler(res): - entry = record.getCurrentEntry(res.path, res.taxonomy_info) - entry.subs = res.sub_entries - if res.errors: - entry.errors += res.errors + entry = record.getCurrentEntry(res['path'], res['taxonomy_info']) + entry.subs = res['sub_entries'] + if res['errors']: + entry.errors += res['errors'] if entry.has_any_error: record.current.success = False @@ -503,11 +505,16 @@ cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN return None - job = BakeWorkerJob( - JOB_BAKE, - BakeJobPayload(fac, route_metadata, prev_entry, - record.dirty_source_names, - tax_info)) + job = { + 'type': JOB_BAKE, + 'job': { + 'factory_info': save_factory(fac), + 'taxonomy_info': tax_info, + 'route_metadata': route_metadata, + 'prev_entry': prev_entry, + 'dirty_source_names': record.dirty_source_names + } + } return job def _handleDeletetions(self, record): diff -r 298f8f46432a -r 838f3964f400 piecrust/baking/worker.py --- a/piecrust/baking/worker.py Mon Jul 06 21:29:17 2015 -0700 +++ b/piecrust/baking/worker.py Mon Jul 06 21:30:49 2015 -0700 @@ -49,9 +49,9 @@ app.env.stepTimerSince("BakeWorkerInit", self.work_start_time) def process(self, job): - handler = self.job_handlers[job.job_type] + handler = self.job_handlers[job['type']] with self.app.env.timerScope(type(handler).__name__): - return handler.handleJob(job) + return handler.handleJob(job['job']) def getReport(self): self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, @@ -64,12 +64,6 @@ JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) -class BakeWorkerJob(object): - def __init__(self, job_type, payload): - self.job_type = job_type - self.payload = payload - - class JobHandler(object): def __init__(self, app, ctx): self.app = app @@ -87,72 +81,35 @@ return errors -class PageFactoryInfo(object): - def __init__(self, fac): - self.source_name = fac.source.name - self.rel_path = fac.rel_path - self.metadata = fac.metadata - - def build(self, app): - source = app.getSource(self.source_name) - return PageFactory(source, self.rel_path, self.metadata) - - -class LoadJobPayload(object): - def __init__(self, fac): - self.factory_info = PageFactoryInfo(fac) - - -class LoadJobResult(object): - def __init__(self, source_name, path): - self.source_name = source_name - self.path = path - self.config = None - self.errors = None +def save_factory(fac): + return { + 'source_name': fac.source.name, + 'rel_path': fac.rel_path, + 'metadata': fac.metadata} -class RenderFirstSubJobPayload(object): - def __init__(self, fac): - self.factory_info = PageFactoryInfo(fac) - - -class RenderFirstSubJobResult(object): - def __init__(self, path): - self.path = path - self.errors = None - - -class BakeJobPayload(object): - def __init__(self, fac, route_metadata, previous_entry, - dirty_source_names, tax_info=None): - self.factory_info = PageFactoryInfo(fac) - self.route_metadata = route_metadata - self.previous_entry = previous_entry - self.dirty_source_names = dirty_source_names - self.taxonomy_info = tax_info - - -class BakeJobResult(object): - def __init__(self, path, tax_info=None): - self.path = path - self.taxonomy_info = tax_info - self.sub_entries = None - self.errors = None +def load_factory(app, info): + source = app.getSource(info['source_name']) + return PageFactory(source, info['rel_path'], info['metadata']) class LoadJobHandler(JobHandler): def handleJob(self, job): # Just make sure the page has been cached. - fac = job.payload.factory_info.build(self.app) + fac = load_factory(self.app, job) logger.debug("Loading page: %s" % fac.ref_spec) - result = LoadJobResult(fac.source.name, fac.path) + result = { + 'source_name': fac.source.name, + 'path': fac.path, + 'config': None, + 'errors': None} try: page = fac.buildPage() page._load() - result.config = page.config.getAll() + result['config'] = page.config.getAll() except Exception as ex: logger.debug("Got loading error. Sending it to master.") - result.errors = _get_errors(ex) + result['errors'] = _get_errors(ex) if self.ctx.debug: logger.exception(ex) return result @@ -161,7 +118,7 @@ class RenderFirstSubJobHandler(JobHandler): def handleJob(self, job): # Render the segments for the first sub-page of this page. - fac = job.payload.factory_info.build(self.app) + fac = load_factory(self.app, job) # These things should be OK as they're checked upstream by the baker. route = self.app.getRoute(fac.source.name, fac.metadata, @@ -173,13 +130,15 @@ qp = QualifiedPage(page, route, route_metadata) ctx = PageRenderingContext(qp) - result = RenderFirstSubJobResult(fac.path) + result = { + 'path': fac.path, + 'errors': None} logger.debug("Preparing page: %s" % fac.ref_spec) try: render_page_segments(ctx) except Exception as ex: logger.debug("Got rendering error. Sending it to master.") - result.errors = _get_errors(ex) + result['errors'] = _get_errors(ex) if self.ctx.debug: logger.exception(ex) return result @@ -192,10 +151,10 @@ def handleJob(self, job): # Actually bake the page and all its sub-pages to the output folder. - fac = job.payload.factory_info.build(self.app) + fac = load_factory(self.app, job['factory_info']) - route_metadata = job.payload.route_metadata - tax_info = job.payload.taxonomy_info + route_metadata = job['route_metadata'] + tax_info = job['taxonomy_info'] if tax_info is not None: route = self.app.getTaxonomyRoute(tax_info.taxonomy_name, tax_info.source_name) @@ -207,18 +166,22 @@ page = fac.buildPage() qp = QualifiedPage(page, route, route_metadata) - result = BakeJobResult(fac.path, tax_info) - previous_entry = job.payload.previous_entry - dirty_source_names = job.payload.dirty_source_names + result = { + 'path': fac.path, + 'taxonomy_info': tax_info, + 'sub_entries': None, + 'errors': None} + previous_entry = job['prev_entry'] + dirty_source_names = job['dirty_source_names'] logger.debug("Baking page: %s" % fac.ref_spec) try: sub_entries = self.page_baker.bake( qp, previous_entry, dirty_source_names, tax_info) - result.sub_entries = sub_entries + result['sub_entries'] = sub_entries except BakingError as ex: logger.debug("Got baking error. Sending it to master.") - result.errors = _get_errors(ex) + result['errors'] = _get_errors(ex) if self.ctx.debug: logger.exception(ex) diff -r 298f8f46432a -r 838f3964f400 piecrust/workerpool.py --- a/piecrust/workerpool.py Mon Jul 06 21:29:17 2015 -0700 +++ b/piecrust/workerpool.py Mon Jul 06 21:30:49 2015 -0700 @@ -3,6 +3,7 @@ import logging import threading import multiprocessing +from piecrust.fastpickle import pickle, unpickle logger = logging.getLogger(__name__) @@ -75,6 +76,7 @@ put(rep) break + task_data = unpickle(task_data) try: res = (task_type, True, wid, w.process(task_data)) except Exception as e: @@ -101,7 +103,8 @@ class WorkerPool(object): - def __init__(self, worker_class, worker_count=None, initargs=()): + def __init__(self, worker_class, worker_count=None, initargs=(), + wrap_exception=False): worker_count = worker_count or os.cpu_count() or 1 self._task_queue = multiprocessing.SimpleQueue() @@ -122,6 +125,7 @@ worker_params = _WorkerParams( i, self._task_queue, self._result_queue, worker_class, initargs, + wrap_exception=wrap_exception, is_profiling=is_profiling) w = multiprocessing.Process(target=worker_func, args=(worker_params,)) @@ -161,7 +165,8 @@ self._listener = res for job in jobs: - self._quick_put((TASK_JOB, job)) + job_data = pickle(job) + self._quick_put((TASK_JOB, job_data)) return res @@ -209,8 +214,11 @@ try: if success and pool._callback: pool._callback(data) - elif not success and pool._error_callback: - pool._error_callback(data) + elif not success: + if pool._error_callback: + pool._error_callback(data) + else: + logger.error(data) except Exception as ex: logger.exception(ex)