Mercurial > piecrust2
view piecrust/templating/jinja/extensions.py @ 1188:a7c43131d871
bake: Fix file write flushing problem with Python 3.8+
Writing the cache files fails in Python 3.8 because it looks like flushing
behaviour has changed. We need to explicitly flush. And even then, in very
rare occurrences, it looks like it can still run into racing conditions,
so we do a very hacky and ugly "retry" loop when fetching cached data :(
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 15 Jun 2021 22:36:23 -0700 |
parents | 1857dbd4580f |
children |
line wrap: on
line source
from jinja2.ext import Extension, Markup from jinja2.lexer import Token, describe_token from jinja2.nodes import CallBlock, Const from compressinja.html import HtmlCompressor, StreamProcessContext from piecrust.rendering import format_text class PieCrustFormatExtension(Extension): tags = set(['pcformat']) def __init__(self, environment): super(PieCrustFormatExtension, self).__init__(environment) def parse(self, parser): lineno = next(parser.stream).lineno args = [parser.parse_expression()] body = parser.parse_statements(['name:endpcformat'], drop_needle=True) return CallBlock(self.call_method('_formatTimed', args), [], [], body).set_lineno(lineno) def _formatTimed(self, format_name, caller=None): with self.environment.app.env.stats.timerScope( 'JinjaTemplateEngine_extensions'): return self._format(format_name, caller) def _format(self, format_name, caller=None): body = caller() text = format_text(self.environment.app, format_name, Markup(body.rstrip()).unescape(), exact_format=True) return text class PieCrustHighlightExtension(Extension): tags = set(['highlight', 'geshi']) def __init__(self, environment): super(PieCrustHighlightExtension, self).__init__(environment) def parse(self, parser): lineno = next(parser.stream).lineno # Extract the language name. args = [parser.parse_expression()] # Extract optional arguments. kwarg_names = {'line_numbers': 0, 'use_classes': 0, 'class': 1, 'id': 1} kwargs = {} while not parser.stream.current.test('block_end'): name = parser.stream.expect('name') if name.value not in kwarg_names: raise Exception("'%s' is not a valid argument for the code " "highlighting tag." % name.value) if kwarg_names[name.value] == 0: kwargs[name.value] = Const(True) elif parser.stream.skip_if('assign'): kwargs[name.value] = parser.parse_expression() # body of the block body = parser.parse_statements(['name:endhighlight', 'name:endgeshi'], drop_needle=True) return CallBlock(self.call_method('_highlightTimed', args, kwargs), [], [], body).set_lineno(lineno) def _highlightTimed(self, lang, line_numbers=False, use_classes=False, css_class=None, css_id=None, caller=None): with self.environment.app.env.stats.timerScope( 'JinjaTemplateEngine_extensions'): return self._highlight(lang, line_numbers, use_classes, css_class, css_id, caller) def _highlight(self, lang, line_numbers=False, use_classes=False, css_class=None, css_id=None, caller=None): from pygments import highlight from pygments.formatters import HtmlFormatter from pygments.lexers import get_lexer_by_name, guess_lexer # Try to be mostly compatible with Jinja2-highlight's settings. body = caller() if lang is None: lexer = guess_lexer(body) else: lexer = get_lexer_by_name(lang, stripall=False) if css_class is None: try: css_class = self.environment.jinja2_highlight_cssclass except AttributeError: pass if css_class is not None: formatter = HtmlFormatter(cssclass=css_class, linenos=line_numbers) else: formatter = HtmlFormatter(linenos=line_numbers) code = highlight(Markup(body.rstrip()).unescape(), lexer, formatter) return code def get_highlight_css(style_name='default', class_name='.highlight'): from pygments.formatters import HtmlFormatter return HtmlFormatter(style=style_name).get_style_defs(class_name) class PieCrustCacheExtension(Extension): tags = set(['pccache', 'cache']) def __init__(self, environment): super(PieCrustCacheExtension, self).__init__(environment) environment.extend( piecrust_cache_prefix='', piecrust_cache={} ) def parse(self, parser): # the first token is the token that started the tag. In our case # we only listen to ``'pccache'`` so this will be a name token with # `pccache` as value. We get the line number so that we can give # that line number to the nodes we create by hand. lineno = next(parser.stream).lineno # now we parse a single expression that is used as cache key. args = [parser.parse_expression()] # now we parse the body of the cache block up to `endpccache` and # drop the needle (which would always be `endpccache` in that case) body = parser.parse_statements(['name:endpccache', 'name:endcache'], drop_needle=True) # now return a `CallBlock` node that calls our _renderCache # helper method on this extension. return CallBlock(self.call_method('_renderCacheTimed', args), [], [], body).set_lineno(lineno) def _renderCacheTimed(self, name, caller): with self.environment.app.env.stats.timerScope( 'JinjaTemplateEngine_extensions'): return self._renderCache(name, caller) def _renderCache(self, name, caller): key = self.environment.piecrust_cache_prefix + name rcs = self.environment.app.env.render_ctx_stack ctx = rcs.current_ctx # try to load the block from the cache # if there is no fragment in the cache, render it and store # it in the cache. pair = self.environment.piecrust_cache.get(key) if pair is not None: for usn in pair[1]: ctx.addUsedSource(usn) return pair[0] prev_used = set(ctx.current_used_source_names) rv = caller() after_used = set(ctx.current_used_source_names) used_delta = after_used.difference(prev_used) self.environment.piecrust_cache[key] = (rv, used_delta) return rv class PieCrustSpacelessExtension(HtmlCompressor): """ A re-implementation of `SelectiveHtmlCompressor` so that we can both use `strip` or `spaceless` in templates. """ def filter_stream(self, stream): ctx = StreamProcessContext(stream) strip_depth = 0 while 1: if stream.current.type == 'block_begin': for tk in ['strip', 'spaceless']: change = self._processToken(ctx, stream, tk) if change != 0: strip_depth += change if strip_depth < 0: ctx.fail('Unexpected tag end%s' % tk) break if strip_depth > 0 and stream.current.type == 'data': ctx.token = stream.current value = self.normalize(ctx) yield Token(stream.current.lineno, 'data', value) else: yield stream.current next(stream) def _processToken(self, ctx, stream, test_token): change = 0 if (stream.look().test('name:%s' % test_token) or stream.look().test('name:end%s' % test_token)): stream.skip() if stream.current.value == test_token: change = 1 else: change = -1 stream.skip() if stream.current.type != 'block_end': ctx.fail('expected end of block, got %s' % describe_token(stream.current)) stream.skip() return change