view piecrust/baking/worker.py @ 415:0e9a94b7fdfa

bake: Improve bake record information. * Store things in the bake record that require less interaction between the master process and the workers. For instance, don't store the paginator object in the render pass info -- instead, just store whether pagination was used, and whether it had more items. * Simplify information passing between workers and bake passes by saving the rendering info to the JSON cache. This means the "render first sub" job doesn't have to return anything except errors now. * Add more performance counter info.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jun 2015 19:23:16 -0700
parents e7b865f8f335
children 4a43d7015b75
line wrap: on
line source

import time
import copy
import queue
import logging
from piecrust.app import PieCrust
from piecrust.baking.single import PageBaker, BakingError
from piecrust.rendering import (
        QualifiedPage, PageRenderingContext, render_page_segments)
from piecrust.sources.base import PageFactory


logger = logging.getLogger(__name__)


def worker_func(wid, ctx):
    logger.debug("Worker %d booting up..." % wid)
    w = BakeWorker(wid, ctx)
    w.run()


class BakeWorkerContext(object):
    def __init__(self, root_dir, out_dir,
                 work_queue, results, abort_event,
                 force=False, debug=False):
        self.root_dir = root_dir
        self.out_dir = out_dir
        self.work_queue = work_queue
        self.results = results
        self.abort_event = abort_event
        self.force = force
        self.debug = debug


JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)


class BakeWorkerJob(object):
    def __init__(self, job_type, payload):
        self.job_type = job_type
        self.payload = payload


class BakeWorker(object):
    def __init__(self, wid, ctx):
        self.wid = wid
        self.ctx = ctx

    def run(self):
        logger.debug("Working %d initializing..." % self.wid)
        work_start_time = time.perf_counter()

        # Create the app local to this worker.
        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
        app.env.fs_cache_only_for_main_page = True
        app.env.registerTimer("Worker_%d" % self.wid)
        app.env.registerTimer("JobReceive")

        # Create the job handlers.
        job_handlers = {
                JOB_LOAD: LoadJobHandler(app, self.ctx),
                JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
                JOB_BAKE: BakeJobHandler(app, self.ctx)}
        for jt, jh in job_handlers.items():
            app.env.registerTimer(type(jh).__name__)

        # Start working!
        aborted_with_exception = None
        while not self.ctx.abort_event.is_set():
            try:
                with app.env.timerScope('JobReceive'):
                    job = self.ctx.work_queue.get(True, 0.01)
            except queue.Empty:
                continue

            try:
                handler = job_handlers[job.job_type]
                with app.env.timerScope(type(handler).__name__):
                    handler.handleJob(job)
            except Exception as ex:
                self.ctx.abort_event.set()
                aborted_with_exception = ex
                logger.debug("[%d] Critical error, aborting." % self.wid)
                if self.ctx.debug:
                    logger.exception(ex)
                break
            finally:
                self.ctx.work_queue.task_done()

        if aborted_with_exception is not None:
            msgs = _get_errors(aborted_with_exception)
            self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})

        # Send our timers to the main process before exiting.
        app.env.stepTimer("Worker_%d" % self.wid,
                          time.perf_counter() - work_start_time)
        self.ctx.results.put_nowait({
                'type': 'timers', 'data': app.env._timers})


class JobHandler(object):
    def __init__(self, app, ctx):
        self.app = app
        self.ctx = ctx

    def handleJob(self, job):
        raise NotImplementedError()


def _get_errors(ex):
    errors = []
    while ex is not None:
        errors.append(str(ex))
        ex = ex.__cause__
    return errors


class PageFactoryInfo(object):
    def __init__(self, fac):
        self.source_name = fac.source.name
        self.rel_path = fac.rel_path
        self.metadata = fac.metadata

    def build(self, app):
        source = app.getSource(self.source_name)
        return PageFactory(source, self.rel_path, self.metadata)


class LoadJobPayload(object):
    def __init__(self, fac):
        self.factory_info = PageFactoryInfo(fac)


class LoadJobResult(object):
    def __init__(self, source_name, path):
        self.source_name = source_name
        self.path = path
        self.config = None
        self.errors = None


class RenderFirstSubJobPayload(object):
    def __init__(self, fac):
        self.factory_info = PageFactoryInfo(fac)


class RenderFirstSubJobResult(object):
    def __init__(self, path):
        self.path = path
        self.errors = None


class BakeJobPayload(object):
    def __init__(self, fac, route_metadata, previous_entry,
                 dirty_source_names, tax_info=None):
        self.factory_info = PageFactoryInfo(fac)
        self.route_metadata = route_metadata
        self.previous_entry = previous_entry
        self.dirty_source_names = dirty_source_names
        self.taxonomy_info = tax_info


class BakeJobResult(object):
    def __init__(self, path, tax_info=None):
        self.path = path
        self.taxonomy_info = tax_info
        self.sub_entries = None
        self.errors = None


class LoadJobHandler(JobHandler):
    def handleJob(self, job):
        # Just make sure the page has been cached.
        fac = job.payload.factory_info.build(self.app)
        logger.debug("Loading page: %s" % fac.ref_spec)
        result = LoadJobResult(fac.source.name, fac.path)
        try:
            page = fac.buildPage()
            page._load()
            result.config = page.config.get()
        except Exception as ex:
            logger.debug("Got loading error. Sending it to master.")
            result.errors = _get_errors(ex)
            if self.ctx.debug:
                logger.exception(ex)

        self.ctx.results.put_nowait(result)


class RenderFirstSubJobHandler(JobHandler):
    def handleJob(self, job):
        # Render the segments for the first sub-page of this page.
        fac = job.payload.factory_info.build(self.app)

        # These things should be OK as they're checked upstream by the baker.
        route = self.app.getRoute(fac.source.name, fac.metadata,
                                  skip_taxonomies=True)
        assert route is not None

        page = fac.buildPage()
        route_metadata = copy.deepcopy(fac.metadata)
        qp = QualifiedPage(page, route, route_metadata)
        ctx = PageRenderingContext(qp)

        result = RenderFirstSubJobResult(fac.path)
        logger.debug("Preparing page: %s" % fac.ref_spec)
        try:
            render_page_segments(ctx)
        except Exception as ex:
            logger.debug("Got rendering error. Sending it to master.")
            result.errors = _get_errors(ex)
            if self.ctx.debug:
                logger.exception(ex)

        self.ctx.results.put_nowait(result)


class BakeJobHandler(JobHandler):
    def __init__(self, app, ctx):
        super(BakeJobHandler, self).__init__(app, ctx)
        self.page_baker = PageBaker(app, ctx.out_dir, ctx.force)

    def handleJob(self, job):
        # Actually bake the page and all its sub-pages to the output folder.
        fac = job.payload.factory_info.build(self.app)

        route_metadata = job.payload.route_metadata
        tax_info = job.payload.taxonomy_info
        if tax_info is not None:
            route = self.app.getTaxonomyRoute(tax_info.taxonomy_name,
                                              tax_info.source_name)
        else:
            route = self.app.getRoute(fac.source.name, route_metadata,
                                      skip_taxonomies=True)
        assert route is not None

        result = BakeJobResult(fac.path, tax_info)
        previous_entry = job.payload.previous_entry
        dirty_source_names = job.payload.dirty_source_names
        logger.debug("Baking page: %s" % fac.ref_spec)
        try:
            sub_entries = self.page_baker.bake(
                    fac, route, route_metadata, previous_entry,
                    dirty_source_names, tax_info)
            result.sub_entries = sub_entries

        except BakingError as ex:
            logger.debug("Got baking error. Sending it to master.")
            result.errors = _get_errors(ex)
            if self.ctx.debug:
                logger.exception(ex)

        self.ctx.results.put_nowait(result)