view piecrust/baking/worker.py @ 411:e7b865f8f335

bake: Enable multiprocess baking. Baking is now done by running a worker per CPU, and sending jobs to them. This changes several things across the codebase: * Ability to not cache things related to pages other than the 'main' page (i.e. the page at the bottom of the execution stack). * Decouple the baking process from the bake records, so only the main process keeps track (and modifies) the bake record. * Remove the need for 'batch page getters' and loading a page directly from the page factories. There are various smaller changes too included here, including support for scope performance timers that are saved with the bake record and can be printed out to the console. Yes I got carried away. For testing, the in-memory 'mock' file-system doesn't work anymore, since we're spawning processes, so this is replaced by a 'tmpfs' file-system which is saved in temporary files on disk and deleted after tests have run.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 12 Jun 2015 17:09:19 -0700
parents
children 0e9a94b7fdfa
line wrap: on
line source

import time
import copy
import queue
import logging
from piecrust.app import PieCrust
from piecrust.baking.single import PageBaker, BakingError
from piecrust.rendering import (
        QualifiedPage, PageRenderingContext, render_page_segments)
from piecrust.sources.base import PageFactory


logger = logging.getLogger(__name__)


def worker_func(wid, ctx):
    logger.debug("Worker %d booting up..." % wid)
    w = BakeWorker(wid, ctx)
    w.run()


class BakeWorkerContext(object):
    def __init__(self, root_dir, out_dir,
                 work_queue, results, abort_event,
                 force=False, debug=False):
        self.root_dir = root_dir
        self.out_dir = out_dir
        self.work_queue = work_queue
        self.results = results
        self.abort_event = abort_event
        self.force = force
        self.debug = debug


JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)


class BakeWorkerJob(object):
    def __init__(self, job_type, payload):
        self.job_type = job_type
        self.payload = payload


class BakeWorker(object):
    def __init__(self, wid, ctx):
        self.wid = wid
        self.ctx = ctx

    def run(self):
        logger.debug("Working %d initializing..." % self.wid)
        work_start_time = time.perf_counter()

        # Create the app local to this worker.
        app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
        app.env.fs_cache_only_for_main_page = True
        app.env.registerTimer("Worker_%d" % self.wid)
        app.env.registerTimer("JobReceive")

        # Create the job handlers.
        job_handlers = {
                JOB_LOAD: LoadJobHandler(app, self.ctx),
                JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
                JOB_BAKE: BakeJobHandler(app, self.ctx)}
        for jt, jh in job_handlers.items():
            app.env.registerTimer(type(jh).__name__)

        # Start working!
        while not self.ctx.abort_event.is_set():
            try:
                with app.env.timerScope('JobReceive'):
                    job = self.ctx.work_queue.get(True, 0.01)
            except queue.Empty:
                continue

            try:
                handler = job_handlers[job.job_type]
                with app.env.timerScope(type(handler).__name__):
                    handler.handleJob(job)
            except Exception as ex:
                self.ctx.abort_event.set()
                self.abort_exception = ex
                self.success = False
                logger.debug("[%d] Critical error, aborting." % self.wid)
                if self.ctx.app.debug:
                    logger.exception(ex)
                break
            finally:
                self.ctx.work_queue.task_done()

        # Send our timers to the main process before exiting.
        app.env.stepTimer("Worker_%d" % self.wid,
                          time.perf_counter() - work_start_time)
        self.ctx.results.put_nowait(app.env._timers)


class JobHandler(object):
    def __init__(self, app, ctx):
        self.app = app
        self.ctx = ctx

    def handleJob(self, job):
        raise NotImplementedError()


def _get_errors(ex):
    errors = []
    while ex is not None:
        errors.append(str(ex))
        ex = ex.__cause__
    return errors


class PageFactoryInfo(object):
    def __init__(self, fac):
        self.source_name = fac.source.name
        self.rel_path = fac.rel_path
        self.metadata = fac.metadata

    def build(self, app):
        source = app.getSource(self.source_name)
        return PageFactory(source, self.rel_path, self.metadata)


class LoadJobPayload(object):
    def __init__(self, fac):
        self.factory_info = PageFactoryInfo(fac)


class LoadJobResult(object):
    def __init__(self, source_name, path):
        self.source_name = source_name
        self.path = path
        self.config = None
        self.errors = None


class RenderFirstSubJobPayload(object):
    def __init__(self, fac):
        self.factory_info = PageFactoryInfo(fac)


class RenderFirstSubJobResult(object):
    def __init__(self, path):
        self.path = path
        self.used_assets = None
        self.used_pagination = None
        self.pagination_has_more = False
        self.errors = None


class BakeJobPayload(object):
    def __init__(self, fac, route_metadata, previous_entry,
                 first_render_info, dirty_source_names, tax_info=None):
        self.factory_info = PageFactoryInfo(fac)
        self.route_metadata = route_metadata
        self.previous_entry = previous_entry
        self.dirty_source_names = dirty_source_names
        self.first_render_info = first_render_info
        self.taxonomy_info = tax_info


class BakeJobResult(object):
    def __init__(self, path, tax_info=None):
        self.path = path
        self.taxonomy_info = tax_info
        self.bake_info = None
        self.errors = None


class LoadJobHandler(JobHandler):
    def handleJob(self, job):
        # Just make sure the page has been cached.
        fac = job.payload.factory_info.build(self.app)
        logger.debug("Loading page: %s" % fac.ref_spec)
        result = LoadJobResult(fac.source.name, fac.path)
        try:
            page = fac.buildPage()
            page._load()
            result.config = page.config.get()
        except Exception as ex:
            result.errors = _get_errors(ex)

        self.ctx.results.put_nowait(result)


class RenderFirstSubJobHandler(JobHandler):
    def handleJob(self, job):
        # Render the segments for the first sub-page of this page.
        fac = job.payload.factory_info.build(self.app)

        # These things should be OK as they're checked upstream by the baker.
        route = self.app.getRoute(fac.source.name, fac.metadata,
                                  skip_taxonomies=True)
        assert route is not None

        page = fac.buildPage()
        route_metadata = copy.deepcopy(fac.metadata)
        qp = QualifiedPage(page, route, route_metadata)
        ctx = PageRenderingContext(qp)

        result = RenderFirstSubJobResult(fac.path)
        logger.debug("Preparing page: %s" % fac.ref_spec)
        try:
            render_page_segments(ctx)
            result.used_assets = ctx.used_assets
            result.used_pagination = ctx.used_pagination is not None
            if result.used_pagination:
                result.pagination_has_more = ctx.used_pagination.has_more
        except Exception as ex:
            logger.debug("Got rendering error. Sending it to master.")
            result.errors = _get_errors(ex)

        self.ctx.results.put_nowait(result)


class BakeJobHandler(JobHandler):
    def __init__(self, app, ctx):
        super(BakeJobHandler, self).__init__(app, ctx)
        self.page_baker = PageBaker(app, ctx.out_dir, ctx.force)

    def handleJob(self, job):
        # Actually bake the page and all its sub-pages to the output folder.
        fac = job.payload.factory_info.build(self.app)

        route_metadata = job.payload.route_metadata
        tax_info = job.payload.taxonomy_info
        if tax_info is not None:
            route = self.app.getTaxonomyRoute(tax_info.taxonomy_name,
                                              tax_info.source_name)
        else:
            route = self.app.getRoute(fac.source.name, route_metadata,
                                      skip_taxonomies=True)
        assert route is not None

        result = BakeJobResult(fac.path, tax_info)
        previous_entry = job.payload.previous_entry
        first_render_info = job.payload.first_render_info
        dirty_source_names = job.payload.dirty_source_names
        logger.debug("Baking page: %s" % fac.ref_spec)
        try:
            report = self.page_baker.bake(fac, route, route_metadata,
                                          previous_entry, first_render_info,
                                          dirty_source_names, tax_info)
            result.bake_info = report

        except BakingError as ex:
            logger.debug("Got baking error. Sending it to master.")
            result.errors = _get_errors(ex)

        self.ctx.results.put_nowait(result)