view piecrust/templating/_inukshukext.py @ 1188:a7c43131d871

bake: Fix file write flushing problem with Python 3.8+ Writing the cache files fails in Python 3.8 because it looks like flushing behaviour has changed. We need to explicitly flush. And even then, in very rare occurrences, it looks like it can still run into racing conditions, so we do a very hacky and ugly "retry" loop when fetching cached data :(
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 15 Jun 2021 22:36:23 -0700
parents 10fd55b9ccfb
children
line wrap: on
line source

import io
import re
import time
from inukshuk.ext import Extension, StatementNode
from inukshuk.ext.core import filter_make_xml_date, filter_safe
from inukshuk.lexer import (
    TOKEN_ID_STRING_SINGLE_QUOTES, TOKEN_ID_STRING_DOUBLE_QUOTES)
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexers import get_lexer_by_name, guess_lexer
from piecrust.data.paginator import Paginator
from piecrust.rendering import format_text


class PieCrustExtension(Extension):
    def __init__(self, app):
        self.app = app

    def setupEngine(self, engine):
        engine.piecrust_app = self.app
        engine.piecrust_cache = {}

    def getGlobals(self):
        return {
            'highlight_css': get_highlight_css}

    def getFilters(self):
        return {
            'paginate': self._paginate,
            'formatwith': self._formatWith,
            'markdown': lambda v: self._formatWith(v, 'markdown'),
            'textile': lambda v: self._formatWith(v, 'textile'),
            'nocache': add_no_cache_parameter,
            'stripoutertag': strip_outer_tag,
            'stripslash': strip_slash,
            'atomdate': filter_make_xml_date,
            'raw': filter_safe
        }

    def getTests(self):
        return {}

    def getStatementNodes(self):
        return [
            PieCrustHighlightStatementNode,
            PieCrustGeshiStatementNode,
            PieCrustCacheStatementNode,
            PieCrustFormatStatementNode]

    def _paginate(self, value, items_per_page=5):
        ctx = self.app.env.render_ctx_stack.current_ctx
        if ctx is None or ctx.page is None:
            raise Exception("Can't paginate when no page has been pushed "
                            "on the execution stack.")
        return Paginator(value, ctx.page,
                         sub_num=ctx.sub_num,
                         items_per_page=items_per_page)

    def _formatWith(self, value, format_name):
        return format_text(self.app, format_name, value)


def add_no_cache_parameter(value, param_name='t', param_value=None):
    if not param_value:
        param_value = time.time()
    if '?' in value:
        value += '&'
    else:
        value += '?'
    value += '%s=%s' % (param_name, param_value)
    return value


def strip_outer_tag(value, tag=None):
    tag_pattern = '[a-z]+[a-z0-9]*'
    if tag is not None:
        tag_pattern = re.escape(tag)
    pat = r'^\<' + tag_pattern + r'\>(.*)\</' + tag_pattern + '>$'
    m = re.match(pat, value)
    if m:
        return m.group(1)
    return value


def strip_slash(value):
    return value.rstrip('/')


class PieCrustFormatStatementNode(StatementNode):
    name = 'pcformat'
    compiler_imports = ['import io',
                        'from piecrust.rendering import format_text']

    def __init__(self):
        super().__init__()
        self.format = None

    def parse(self, parser):
        self.format = parser.expectIdentifier()
        parser.skipWhitespace()
        parser.expectStatementEnd()
        parser.parseUntilStatement(self, ['endpcformat'])
        parser.expectIdentifier('endpcformat')

    def render(self, ctx, data, out):
        with io.StringIO() as tmp:
            inner_out = tmp.write
            for c in self.children:
                c.render(ctx, data, inner_out)

            text = format_text(ctx.engine.piecrust_app, self.format,
                               tmp.getvalue(), exact_format=True)
            out(text)

    def compile(self, ctx, out):
        out.indent().write('with io.StringIO() as tmp:\n')
        out.push(False)
        out.indent().write('prev_out_write = out_write\n')
        out.indent().write('out_write = tmp.write\n')
        for c in self.children:
            c.compile(ctx, out)
        out.indent().write('out_write = prev_out_write\n')
        out.indent().write(
            'text = format_text(ctx_engine.piecrust_app, %s, tmp.getvalue(), '
            'exact_format=True)\n' % repr(self.format))
        out.indent().write('out_write(text)\n')
        out.pull()


class PieCrustHighlightStatementNode(StatementNode):
    name = 'highlight'
    endname = 'endhighlight'
    compiler_imports = [
        'from pygments import highlight',
        'from pygments.formatters import HtmlFormatter',
        'from pygments.lexers import get_lexer_by_name, guess_lexer']

    def __init__(self):
        super().__init__()
        self.lang = None

    def parse(self, parser):
        self.lang = parser.expectAny([TOKEN_ID_STRING_SINGLE_QUOTES,
                                      TOKEN_ID_STRING_DOUBLE_QUOTES])
        parser.skipWhitespace()
        parser.expectStatementEnd()

        parser.parseUntilStatement(self, self.endname)
        parser.expectIdentifier(self.endname)

    def render(self, ctx, data, out):
        with io.StringIO() as tmp:
            inner_out = tmp.write
            for c in self.children:
                c.render(ctx, data, inner_out)

            raw_text = tmp.getvalue()

        if self.lang is None:
            lexer = guess_lexer(raw_text)
        else:
            lexer = get_lexer_by_name(self.lang, stripall=False)

        formatter = HtmlFormatter()
        code = highlight(raw_text, lexer, formatter)
        out(code)

    def compile(self, ctx, out):
        out.indent().write('with io.StringIO() as tmp:\n')
        out.push(False)
        out.indent().write('prev_out_write = out_write\n')
        out.indent().write('out_write = tmp.write\n')
        for c in self.children:
            c.compile(ctx, out)
        out.indent().write('out_write = prev_out_write\n')
        out.indent().write('raw_text = tmp.getvalue()\n')
        out.pull()
        if self.lang is None:
            out.indent().write('lexer = guess_lexer(raw_text)\n')
        else:
            out.indent().write(
                'lexer = get_lexer_by_name(%s, stripall=False)\n' %
                repr(self.lang))
        out.indent().write('formatter = HtmlFormatter()\n')
        out.indent().write('code = highlight(raw_text, lexer, formatter)\n')
        out.indent().write('out_write(code)\n')


class PieCrustGeshiStatementNode(PieCrustHighlightStatementNode):
    name = 'geshi'
    endname = 'endgeshi'


def get_highlight_css(style_name='default', class_name='.highlight'):
    return HtmlFormatter(style=style_name).get_style_defs(class_name)


class PieCrustCacheStatementNode(StatementNode):
    name = 'pccache'
    compiler_imports = ['import io']

    def __init__(self):
        super().__init__()
        self.cache_key = None

    def parse(self, parser):
        self.cache_key = parser.expectString()
        parser.skipWhitespace()
        parser.expectStatementEnd()

        parser.parseUntilStatement(self, 'endpccache')
        parser.expectIdentifier('endpccache')

    def render(self, ctx, data, out):
        raise Exception("No implemented")

        # exc_stack = ctx.engine.piecrust_app.env.exec_info_stack
        # render_ctx = exc_stack.current_page_info.render_ctx
        # rdr_pass = render_ctx.current_pass_info

        # pair = ctx.engine.piecrust_cache.get(self.cache_key)
        # if pair is not None:
        #     rdr_pass.used_source_names.update(pair[1])
        #     return pair[0]

        # prev_used = rdr_pass.used_source_names.copy()

        # with io.StringIO() as tmp:
        #     inner_out = tmp.write
        #     for c in self.children:
        #         c.render(ctx, data, inner_out)

        #     raw_text = tmp.getvalue()

        # after_used = rdr_pass.used_source_names.copy()
        # used_delta = after_used.difference(prev_used)
        # ctx.engine.piecrust_cache[self.cache_key] = (raw_text, used_delta)

        # return raw_text

    def compile(self, ctx, out):
        out.indent().write(
            'ctx_stack = ctx.engine.piecrust_app.env.render_ctx_stack\n')
        out.indent().write(
            'render_ctx = ctx_stack.current_ctx\n')
        out.indent().write(
            'rdr_pass = render_ctx.current_pass_info\n')

        pair_var = ctx.varname('pair')
        out.indent().write(
            '%s = ctx.engine.piecrust_cache.get(%s)\n' %
            (pair_var, repr(self.cache_key)))
        out.indent().write(
            'if %s is not None:\n' % pair_var)
        out.push().write(
            'rdr_pass.used_source_names.update(%s[1])\n' % pair_var)
        out.indent().write('out_write(%s[0])\n' % pair_var)
        out.pull()
        out.indent().write('else:\n')

        tmp_var = ctx.varname('tmp')
        prev_used_var = ctx.varname('prev_used')
        prev_out_write_var = ctx.varname('prev_out_write')
        prev_out_write_escaped_var = ctx.varname('prev_out_write_escaped')

        out.push().write(
            '%s = rdr_pass.used_source_names.copy()\n' % prev_used_var)
        out.indent().write(
            'with io.StringIO() as %s:\n' % tmp_var)
        out.push().write(
            '%s = out_write\n' % prev_out_write_var)
        out.indent().write(
            '%s = out_write_escaped\n' % prev_out_write_escaped_var)
        out.indent().write(
            'out_write = %s.write\n' % tmp_var)
        out.indent().write(
            'out_write_escaped = ctx.engine._getWriteEscapeFunc(out_write)\n')
        for c in self.children:
            c.compile(ctx, out)

        out.indent().write(
            'out_write_escaped = %s\n' % prev_out_write_escaped_var)
        out.indent().write(
            'out_write = %s\n' % prev_out_write_var)
        out.indent().write(
            'raw_text = %s.getvalue()\n' % tmp_var)
        out.pull()

        out.indent().write(
            'after_used = rdr_pass.used_source_names.copy()\n')
        out.indent().write(
            'used_delta = after_used.difference(%s)\n' % prev_used_var)
        out.indent().write(
            'ctx.engine.piecrust_cache[%s] = (raw_text, used_delta)\n' %
            repr(self.cache_key))
        out.indent().write('out_write(raw_text)\n')
        out.pull()