view piecrust/pipelines/page.py @ 1188:a7c43131d871

bake: Fix file write flushing problem with Python 3.8+ Writing the cache files fails in Python 3.8 because it looks like flushing behaviour has changed. We need to explicitly flush. And even then, in very rare occurrences, it looks like it can still run into racing conditions, so we do a very hacky and ugly "retry" loop when fetching cached data :(
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 15 Jun 2021 22:36:23 -0700
parents 5f97b5b59dfe
children
line wrap: on
line source

import copy
import logging
from piecrust.pipelines.base import (
    ContentPipeline, create_job, content_item_from_job)
from piecrust.pipelines._pagebaker import PageBaker, get_output_path
from piecrust.pipelines._pagerecords import (
    PagePipelineRecordEntry, SubPageFlags)
from piecrust.rendering import RenderingContext, render_page_segments
from piecrust.sources.base import AbortedSourceUseError


logger = logging.getLogger(__name__)


class PagePipeline(ContentPipeline):
    PIPELINE_NAME = 'page'
    RECORD_ENTRY_CLASS = PagePipelineRecordEntry
    PASS_NUM = [0, 1, 2]

    def __init__(self, source, ppctx):
        super().__init__(source, ppctx)
        self._pagebaker = None
        self._stats = source.app.env.stats
        self._draft_setting = self.app.config['baker/no_bake_setting']

    def initialize(self):
        stats = self._stats
        stats.registerCounter('SourceUseAbortions', raise_if_registered=False)
        stats.registerManifest('SourceUseAbortions', raise_if_registered=False)

        self._pagebaker = PageBaker(self.app,
                                    self.ctx.out_dir,
                                    force=self.ctx.force)
        self._pagebaker.startWriterQueue()

    def createJobs(self, ctx):
        pass_num = ctx.pass_num
        if pass_num == 0:
            ctx.current_record.user_data['dirty_source_names'] = set()
            return self._createLoadJobs(ctx), "load"
        if pass_num == 1:
            return self._createSegmentJobs(ctx), "render"
        if pass_num == 2:
            return self._createLayoutJobs(ctx), "layout"
        raise Exception("Unexpected pipeline pass: %d" % pass_num)

    def _createLoadJobs(self, ctx):
        # Here we load all the pages in the source, making sure they all
        # have a valid cache for their configuration and contents.
        jobs = []
        for item in self.source.getAllContents():
            jobs.append(create_job(self, item.spec))
        if len(jobs) > 0:
            return jobs
        return None

    def _createSegmentJobs(self, ctx):
        jobs = []

        app = self.app
        pass_num = ctx.pass_num
        out_dir = self.ctx.out_dir
        uri_getter = self.source.route.getUri
        pretty_urls = app.config.get('site/pretty_urls')

        history = ctx.record_histories.getHistory(ctx.record_name).copy()
        history.build()

        cur_rec_used_paths = {}
        history.current.user_data['used_paths'] = cur_rec_used_paths
        all_records = ctx.record_histories.current.records

        for prev, cur in history.diffs:
            # Ignore pages that disappeared since last bake.
            if cur is None:
                continue

            # Skip draft pages.
            if cur.hasFlag(PagePipelineRecordEntry.FLAG_IS_DRAFT):
                continue

            # Skip pages that haven't changed since last bake.
            if (prev and not cur.hasFlag(
                    PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED)):
                continue

            # For pages that are known to use other sources in their own
            # content segments (we don't care about the layout yet), we
            # postpone them to the next pipeline pass immediately, because they
            # might need populated render caches for those sources' pages.
            if prev:
                usn1, _ = prev.getAllUsedSourceNames()
                if usn1:
                    logger.debug("Postponing: %s" % cur.item_spec)
                    cur.flags |= \
                        PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
                    continue

            # Check if this item has been overriden by a previous pipeline
            # run... for instance, we could be the pipeline for a "theme pages"
            # source, and some of our pages have been overriden by a user
            # page that writes out to the same URL.
            uri = uri_getter(cur.route_params)
            out_path = get_output_path(app, out_dir, uri, pretty_urls)
            override = _find_used_path_spec(all_records, out_path)
            if override is not None:
                override_source_name, override_entry_spec = override
                override_source = app.getSource(override_source_name)
                if override_source.config['realm'] == \
                        self.source.config['realm']:
                    logger.error(
                        "Page '%s' would get baked to '%s' "
                        "but is overriden by '%s'." %
                        (cur.item_spec, out_path, override_entry_spec))
                else:
                    logger.debug(
                        "Page '%s' would get baked to '%s' "
                        "but is overriden by '%s'." %
                        (cur.item_spec, out_path, override_entry_spec))

                cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
                continue

            # Nope, all good, let's create a job for this item.
            cur.flags |= PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED
            cur_rec_used_paths[out_path] = cur.item_spec

            jobs.append(create_job(self, cur.item_spec,
                                   pass_num=pass_num))

        if len(jobs) > 0:
            return jobs
        return None

    def _createLayoutJobs(self, ctx):
        # Get the list of all sources that had anything baked.
        dirty_source_names = set()
        all_records = ctx.record_histories.current.records
        for rec in all_records:
            rec_dsn = rec.user_data.get('dirty_source_names')
            if rec_dsn:
                dirty_source_names |= rec_dsn

        jobs = []
        pass_num = ctx.pass_num
        history = ctx.record_histories.getHistory(ctx.record_name).copy()
        history.build()
        for prev, cur in history.diffs:
            if not cur or cur.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN):
                continue

            do_bake = False
            force_segments = False
            force_layout = False

            # Make sure we bake the layout for pages that got their segments
            # re-rendered.
            if cur.hasFlag(PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED):
                do_bake = True

            # Now look at the stuff we baked for our own source on the second
            # pass.  For anything that wasn't baked (i.e. it was considered 'up
            # to date') we look at the records from last time, and if they say
            # that some page was using a source that is "dirty", then we force
            # bake it.
            #
            # The common example for this is a blog index page which hasn't
            # been touched, but needs to be re-baked because someone added or
            # edited a post.
            if prev:
                usn1, usn2 = prev.getAllUsedSourceNames()
                force_segments = any(map(lambda u: u in dirty_source_names,
                                     usn1))
                force_layout = any(map(lambda u: u in dirty_source_names,
                                   usn2))

                if force_segments or force_layout:
                    # Yep, we need to force-rebake some aspect of this page.
                    do_bake = True

                elif not do_bake:
                    # This page uses other sources, but no source was dirty
                    # this time around (it was a null build, maybe). We
                    # don't have any work to do, but we need to carry over
                    # any information we have, otherwise the post bake step
                    # will think we need to delete last bake's outputs.
                    cur.subs = copy.deepcopy(prev.subs)
                    for cur_sub in cur.subs:
                        cur_sub['flags'] = \
                            SubPageFlags.FLAG_COLLAPSED_FROM_LAST_RUN

            if do_bake:
                jobs.append(create_job(self, cur.item_spec,
                                       pass_num=pass_num,
                                       force_segments=force_segments,
                                       force_layout=force_layout))

        if len(jobs) > 0:
            return jobs
        return None

    def handleJobResult(self, result, ctx):
        pass_num = ctx.pass_num

        if pass_num == 0:
            # Just went through a "load page" job. Let's create a record
            # entry with the information we got from the worker.
            new_entry = self.createRecordEntry(result['item_spec'])
            new_entry.flags = result['flags']
            new_entry.config = result['config']
            new_entry.route_params = result['route_params']
            new_entry.timestamp = result['timestamp']
            ctx.record.addEntry(new_entry)

            # If this page was modified, flag its entire source as "dirty",
            # so any pages using that source can be re-baked.
            if new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED:
                ctx.record.user_data['dirty_source_names'].add(
                    self.source.name)

            # If this page is new

        elif pass_num == 1:
            # Just went through the "render segments" job.
            existing = ctx.record_entry
            existing.flags |= result.get('flags',
                                         PagePipelineRecordEntry.FLAG_NONE)

        else:
            # Update the entry with the new information.
            existing = ctx.record_entry
            existing.flags |= result.get('flags',
                                         PagePipelineRecordEntry.FLAG_NONE)
            existing.errors += result.get('errors', [])
            existing.subs += result.get('subs', [])

    def run(self, job, ctx, result):
        pass_num = job.get('pass_num', 0)

        if pass_num == 0:
            return self._loadPage(job, ctx, result)

        elif pass_num == 1:
            return self._renderSegments(job, ctx, result)

        elif pass_num >= 2:
            return self._renderLayout(job, ctx, result)

    def getDeletions(self, ctx):
        for prev, cur in ctx.record_history.diffs:
            if prev and not cur:
                for sub in prev.subs:
                    yield (sub['out_path'], 'previous source file was removed')
            elif prev and cur:
                prev_out_paths = [o['out_path'] for o in prev.subs]
                cur_out_paths = [o['out_path'] for o in cur.subs]
                diff = set(prev_out_paths) - set(cur_out_paths)
                for p in diff:
                    yield (p, 'source file changed outputs')

    def collapseRecords(self, ctx):
        pass

    def shutdown(self):
        self._pagebaker.stopWriterQueue()

    def _loadPage(self, job, ctx, result):
        content_item = content_item_from_job(self, job)
        page = self.app.getPage(self.source, content_item)

        result['flags'] = PagePipelineRecordEntry.FLAG_NONE
        result['config'] = page.config.getAll()
        result['route_params'] = content_item.metadata['route_params']
        result['timestamp'] = page.datetime.timestamp()

        if page.was_modified:
            result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
        if page.config.get(self._draft_setting):
            result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT

    def _renderSegments(self, job, ctx, result):
        # Here our job is to render the page's segments so that they're
        # cached in memory and on disk... unless we detect that the page
        # is using some other sources, in which case we abort and we'll try
        # again on the second pass.
        content_item = content_item_from_job(self, job)
        logger.debug("Render segments for: %s" % content_item.spec)
        page = self.app.getPage(self.source, content_item)
        if page.config.get(self._draft_setting):
            raise Exception("Shouldn't have a draft page in a render job!")

        env = self.app.env
        env.abort_source_use = True
        try:
            rdr_ctx = RenderingContext(page)
            render_page_segments(rdr_ctx)
        except AbortedSourceUseError:
            logger.debug("Page was aborted for using source: %s" %
                         content_item.spec)
            result['flags'] = \
                PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
            env.stats.stepCounter("SourceUseAbortions")
            env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
        finally:
            env.abort_source_use = False

    def _renderLayout(self, job, ctx, result):
        content_item = content_item_from_job(self, job)
        logger.debug("Render layout for: %s" % content_item.spec)
        page = self.app.getPage(self.source, content_item)
        prev_entry = ctx.previous_entry
        rdr_subs = self._pagebaker.bake(
            page, prev_entry,
            force_segments=job.get('force_segments'),
            force_layout=job.get('force_layout'))
        result['subs'] = rdr_subs


def _find_used_path_spec(records, path):
    for rec in records:
        up = rec.user_data.get('used_paths')
        if up is not None:
            entry_spec = up.get(path)
            if entry_spec is not None:
                src_name = rec.name.split('@')[0]
                return (src_name, entry_spec)
    return None