view piecrust/pipelines/page.py @ 1051:971b4d67e82a

serve: Fix problems with assets disappearing between servings. When an asset file changes, its source's pipeline is re-run. But that created a bake record that only had that pipeline's output, so the other outputs were incorrectly considered empty and therefore any stray files were removed. Now we copy over bake records for the pipelines we don't run.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 26 Jan 2018 18:05:02 -0800
parents d85de09f40c7
children 3bcb2d446397
line wrap: on
line source

import copy
import time
import logging
from piecrust.pipelines.base import (
    ContentPipeline, create_job, content_item_from_job)
from piecrust.pipelines._pagebaker import PageBaker, get_output_path
from piecrust.pipelines._pagerecords import (
    PagePipelineRecordEntry,
    add_page_job_result, merge_job_result_into_record_entry)
from piecrust.sources.base import AbortedSourceUseError


logger = logging.getLogger(__name__)


class PagePipeline(ContentPipeline):
    PIPELINE_NAME = 'page'
    RECORD_ENTRY_CLASS = PagePipelineRecordEntry
    PASS_NUM = [0, 1, 2]

    def __init__(self, source, ppctx):
        super().__init__(source, ppctx)
        self._pagebaker = None
        self._stats = source.app.env.stats
        self._draft_setting = self.app.config['baker/no_bake_setting']

    def initialize(self):
        stats = self._stats
        stats.registerCounter('SourceUseAbortions', raise_if_registered=False)
        stats.registerManifest('SourceUseAbortions', raise_if_registered=False)

        self._pagebaker = PageBaker(self.app,
                                    self.ctx.out_dir,
                                    force=self.ctx.force)
        self._pagebaker.startWriterQueue()

    def createJobs(self, ctx):
        pass_num = ctx.pass_num
        if pass_num == 0:
            return self._createLoadJobs(ctx)
        if pass_num == 1:
            return self._createSecondPassJobs(ctx)
        if pass_num == 2:
            return self._createThirdPassJobs(ctx)
        raise Exception("Unexpected pipeline pass: %d" % pass_num)

    def _createLoadJobs(self, ctx):
        # Here we load all the pages in the source, making sure they all
        # have a valid cache for their configuration and contents.
        jobs = []
        for item in self.source.getAllContents():
            jobs.append(create_job(self, item.spec))
        if len(jobs) > 0:
            return jobs
        return None

    def _createSecondPassJobs(self, ctx):
        jobs = []

        app = self.app
        out_dir = self.ctx.out_dir
        uri_getter = self.source.route.getUri
        pretty_urls = app.config.get('site/pretty_urls')

        used_paths = _get_used_paths_from_records(
            ctx.record_histories.current.records)
        history = ctx.record_histories.getHistory(ctx.record_name).copy()
        history.build()

        pass_num = ctx.pass_num
        record = ctx.current_record
        record.user_data['dirty_source_names'] = set()

        for prev, cur in history.diffs:
            # Ignore pages that disappeared since last bake.
            if cur is None:
                continue

            # Skip draft pages.
            if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT:
                continue

            # For pages that are known to use other sources, we make a dummy
            # job that will effectively get directly passed on to the next
            # step.
            if prev:
                usn1, usn2 = prev.getAllUsedSourceNames()
                if usn1 or usn2:
                    jobs.append(create_job(self, cur.item_spec,
                                           pass_num=pass_num,
                                           uses_sources=True))
                    continue

            # Check if this item has been overriden by a previous pipeline
            # run... for instance, we could be the pipeline for a "theme pages"
            # source, and some of our pages have been overriden by a user
            # page that writes out to the same URL.
            uri = uri_getter(cur.route_params)
            path = get_output_path(app, out_dir, uri, pretty_urls)
            override = used_paths.get(path)
            if override is not None:
                override_source_name, override_entry = override
                override_source = app.getSource(override_source_name)
                if override_source.config['realm'] == \
                        self.source.config['realm']:
                    logger.error(
                        "Page '%s' would get baked to '%s' "
                        "but is overriden by '%s'." %
                        (cur.item_spec, path, override_entry.item_spec))
                else:
                    logger.debug(
                        "Page '%s' would get baked to '%s' "
                        "but is overriden by '%s'." %
                        (cur.item_spec, path, override_entry.item_spec))

                cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
                continue

            # Nope, all good, let's create a job for this item.
            jobs.append(create_job(self, cur.item_spec,
                                   pass_num=pass_num))

        if len(jobs) > 0:
            return jobs
        return None

    def _createThirdPassJobs(self, ctx):
        # Get the list of all sources that had anything baked.
        dirty_source_names = set()
        all_records = ctx.record_histories.current.records
        for rec in all_records:
            rec_dsn = rec.user_data.get('dirty_source_names')
            if rec_dsn:
                dirty_source_names |= rec_dsn

        # Now look at the stuff we bake for our own source on the first pass.
        # For anything that wasn't baked (i.e. it was considered 'up to date')
        # we look at the records from last time, and if they say that some
        # page was using a source that is "dirty", then we force bake it.
        #
        # The common example for this is a blog index page which hasn't been
        # touched, but needs to be re-baked because someone added or edited
        # a post.
        jobs = []
        pass_num = ctx.pass_num
        history = ctx.record_histories.getHistory(ctx.record_name).copy()
        history.build()
        for prev, cur in history.diffs:
            if not cur:
                continue
            if cur.was_any_sub_baked:
                continue
            if prev:
                if any(map(
                        lambda usn: usn in dirty_source_names,
                        prev.getAllUsedSourceNames()[0])):
                    jobs.append(create_job(self, prev.item_spec,
                                           pass_num=pass_num,
                                           force_bake=True))
                else:
                    # This page uses other sources, but no source was dirty
                    # this time around (it was a null build, maybe). We
                    # don't have any work to do, but we need to carry over
                    # any information we have, otherwise the post bake step
                    # will think we need to delete last bake's outputs.
                    cur.subs = copy.deepcopy(prev.subs)

        if len(jobs) > 0:
            return jobs
        return None

    def handleJobResult(self, result, ctx):
        pass_num = ctx.pass_num
        step_num = ctx.step_num

        if pass_num == 0:
            # Just went through a "load page" job. Let's create a record
            # entry with the information we got from the worker.
            new_entry = self.createRecordEntry(result['item_spec'])
            new_entry.flags = result['flags']
            new_entry.config = result['config']
            new_entry.route_params = result['route_params']
            new_entry.timestamp = result['timestamp']
            ctx.record.addEntry(new_entry)
        else:
            # Update the entry with the new information.
            existing = ctx.record_entry
            if not result.get('postponed', False):
                merge_job_result_into_record_entry(existing, result)
            if existing.was_any_sub_baked:
                ctx.record.user_data['dirty_source_names'].add(self.source.name)

    def run(self, job, ctx, result):
        pass_num = job.get('pass_num', 0)
        step_num = job.get('step_num', 0)

        if pass_num == 0:
            if step_num == 0:
                return self._loadPage(job, ctx, result)

        elif pass_num == 1:
            if step_num == 0:
                return self._renderOrPostpone(job, ctx, result)
            elif step_num == 1:
                return self._renderAlways(job, ctx, result)

        elif pass_num == 2:
            if step_num == 0:
                return self._renderAlways(job, ctx, result)

        raise Exception("Unexpected pipeline pass/step: %d/%d" %
                        (pass_num, step_num))

    def getDeletions(self, ctx):
        for prev, cur in ctx.record_history.diffs:
            if prev and not cur:
                for sub in prev.subs:
                    yield (sub['out_path'], 'previous source file was removed')
            elif prev and cur:
                prev_out_paths = [o['out_path'] for o in prev.subs]
                cur_out_paths = [o['out_path'] for o in cur.subs]
                diff = set(prev_out_paths) - set(cur_out_paths)
                for p in diff:
                    yield (p, 'source file changed outputs')

    def collapseRecords(self, ctx):
        pass

    def shutdown(self):
        self._pagebaker.stopWriterQueue()

    def _loadPage(self, job, ctx, result):
        content_item = content_item_from_job(self, job)
        page = self.app.getPage(self.source, content_item)

        result['flags'] = PagePipelineRecordEntry.FLAG_NONE
        result['config'] = page.config.getAll()
        result['route_params'] = content_item.metadata['route_params']
        result['timestamp'] = page.datetime.timestamp()

        if page.was_modified:
            result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
        if page.config.get(self._draft_setting):
            result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT

    def _renderOrPostpone(self, job, ctx, result):
        # See if we should immediately kick this job off to the next step.
        if job.get('uses_sources', False):
            result['postponed'] = True
            result['next_step_job'] = create_job(self, job['job_spec'][1])
            return

        # Here our job is to render the page's segments so that they're
        # cached in memory and on disk... unless we detect that the page
        # is using some other sources, in which case we abort and we'll try
        # again on the second pass.
        content_item = content_item_from_job(self, job)
        logger.debug("Conditional render for: %s" % content_item.spec)
        page = self.app.getPage(self.source, content_item)
        if page.config.get(self._draft_setting):
            raise Exception("Shouldn't have a draft page in a render job!")

        prev_entry = ctx.previous_entry

        env = self.app.env
        env.abort_source_use = True
        add_page_job_result(result)
        try:
            rdr_subs = self._pagebaker.bake(page, prev_entry)
            result['subs'] = rdr_subs
        except AbortedSourceUseError:
            logger.debug("Page was aborted for using source: %s" %
                         content_item.spec)
            result['flags'] |= \
                PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
            env.stats.stepCounter("SourceUseAbortions")
            env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
            result['next_step_job'] = create_job(self, content_item.spec)
        finally:
            env.abort_source_use = False

    def _renderAlways(self, job, ctx, result):
        content_item = content_item_from_job(self, job)
        logger.debug("Full render for: %s" % content_item.spec)
        page = self.app.getPage(self.source, content_item)
        prev_entry = ctx.previous_entry
        rdr_subs = self._pagebaker.bake(page, prev_entry,
                                        force=job.get('force_bake'))

        add_page_job_result(result)
        result['subs'] = rdr_subs

def _get_used_paths_from_records(records):
    used_paths = {}
    for rec in records:
        src_name = rec.name.split('@')[0]
        for e in rec.getEntries():
            paths = e.getAllOutputPaths()
            if paths is not None:
                for p in paths:
                    used_paths[p] = (src_name, e)
    return used_paths