Mercurial > piecrust2
view piecrust/sources/autoconfig.py @ 1188:a7c43131d871
bake: Fix file write flushing problem with Python 3.8+
Writing the cache files fails in Python 3.8 because it looks like flushing
behaviour has changed. We need to explicitly flush. And even then, in very
rare occurrences, it looks like it can still run into racing conditions,
so we do a very hacky and ugly "retry" loop when fetching cached data :(
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 15 Jun 2021 22:36:23 -0700 |
parents | 3d71cd95f90a |
children |
line wrap: on
line source
import re import os import os.path import logging from piecrust.configuration import ConfigurationError from piecrust.sources.base import ContentItem from piecrust.sources.default import DefaultContentSource logger = logging.getLogger(__name__) class AutoConfigContentSourceBase(DefaultContentSource): """ Base class for content sources that automatically apply configuration settings to their generated pages based on those pages' paths. """ def __init__(self, app, name, config): super().__init__(app, name, config) config.setdefault('data_type', 'page_iterator') self.capture_mode = config.get('capture_mode', 'path') if self.capture_mode not in ['path', 'dirname', 'filename']: raise ConfigurationError("Capture mode in source '%s' must be " "one of: path, dirname, filename" % name) def _finalizeContent(self, parent_group, items, groups): super()._finalizeContent(parent_group, items, groups) # If `capture_mode` is `dirname`, we don't need to recompute it # for each filename, so we do it here. if self.capture_mode == 'dirname': rel_dirpath = '.' if parent_group is not None: rel_dirpath = os.path.relpath(parent_group.spec, self.fs_endpoint_path) config = self._extractConfigFragment(rel_dirpath) for i in items: # Compute the config for the other capture modes. if self.capture_mode == 'path': rel_path = os.path.relpath(i.spec, self.fs_endpoint_path) config = self._extractConfigFragment(rel_path) elif self.capture_mode == 'filename': fname = os.path.basename(i.spec) config = self._extractConfigFragment(fname) # Set the config on the content item's metadata. i.metadata.setdefault('config', {}).update(config) def _extractConfigFragment(self, rel_path): raise NotImplementedError() class AutoConfigContentSource(AutoConfigContentSourceBase): """ Content source that extracts configuration settings from the sub-folders each page resides in. This is ideal for setting tags or categories on pages based on the folders they're in. """ SOURCE_NAME = 'autoconfig' def __init__(self, app, name, config): config['capture_mode'] = 'dirname' super().__init__(app, name, config) self.setting_name = config.get('setting_name', name) self.only_single_values = config.get('only_single_values', False) self.collapse_single_values = config.get('collapse_single_values', False) def _extractConfigFragment(self, rel_path): if rel_path == '.': values = [] else: values = rel_path.split(os.sep) if self.only_single_values: if len(values) > 1: raise Exception("Only one folder level is allowed for pages " "in source '%s'." % self.name) elif len(values) == 1: values = values[0] else: values = None if self.collapse_single_values: if len(values) == 1: values = values[0] elif len(values) == 0: values = None return {self.setting_name: values} def findContentFromRoute(self, route_params): # Pages from this source are effectively flattened, so we need to # find pages using a brute-force kinda way. route_slug = route_params.get('slug', '') if not route_slug: route_slug = '_index' for dirpath, dirnames, filenames in os.walk(self.fs_endpoint_path): for f in filenames: slug, _ = os.path.splitext(f) if slug == route_slug: path = os.path.join(dirpath, f) metadata = self._createItemMetadata(path) path = os.path.join(dirpath, f) rel_path = os.path.relpath(path, self.fs_endpoint_path) config = self._extractConfigFragment(rel_path) metadata.setdefault('config', {}).update(config) return ContentItem(path, metadata) return None def _makeSlug(self, path): slug = super()._makeSlug(path) return os.path.basename(slug) class OrderedContentSource(AutoConfigContentSourceBase): """ A content source that assigns an "order" to its pages based on a numerical prefix in their filename. Page iterators will automatically sort pages using that order. """ SOURCE_NAME = 'ordered' re_pattern = re.compile(r'(^|[/\\])(?P<num>\d+)_') def __init__(self, app, name, config): config['capture_mode'] = 'path' super().__init__(app, name, config) self.setting_name = config.get('setting_name', 'order') self.default_value = config.get('default_value', 0) def findContentFromRoute(self, route_params): uri_path = route_params.get('slug', '') if uri_path == '': uri_path = '_index' path = self.fs_endpoint_path uri_parts = uri_path.split('/') for i, p in enumerate(uri_parts): if i == len(uri_parts) - 1: # Last part, this is the filename. We need to check for either # the name, or the name with the prefix, but also handle a # possible extension. p_pat = r'(\d+_)?' + re.escape(p) _, ext = os.path.splitext(uri_path) if ext == '': p_pat += r'\.[\w\d]+' found = False for name in os.listdir(path): if re.match(p_pat, name): path = os.path.join(path, name) found = True break if not found: return None else: # Find each sub-directory. It can either be a directory with # the name itself, or the name with a number prefix. p_pat = r'(\d+_)?' + re.escape(p) + '$' found = False for name in os.listdir(path): if re.match(p_pat, name): path = os.path.join(path, name) found = True break if not found: return None metadata = self._createItemMetadata(path) rel_path = os.path.relpath(path, self.fs_endpoint_path) config = self._extractConfigFragment(rel_path) metadata.setdefault('config', {}).update(config) return ContentItem(path, metadata) def getSorterIterator(self, it): accessor = self.getSettingAccessor() return OrderTrailSortIterator(it, self.setting_name + '_trail', value_accessor=accessor) def _finalizeContent(self, parent_group, items, groups): super()._finalizeContent(parent_group, items, groups) sn = self.setting_name items.sort(key=lambda i: i.metadata['config'][sn]) def _extractConfigFragment(self, rel_path): values = [] for m in self.re_pattern.finditer(rel_path): val = int(m.group('num')) values.append(val) if len(values) == 0: values.append(self.default_value) return { self.setting_name: values[-1], self.setting_name + '_trail': values} def _makeSlug(self, path): slug = super()._makeSlug(path) slug = self.re_pattern.sub(r'\1', slug) if slug == '_index': slug = '' return slug class OrderTrailSortIterator(object): def __init__(self, it, trail_name, value_accessor): self.it = it self.trail_name = trail_name self.value_accessor = value_accessor def __iter__(self): return iter(sorted(self.it, key=self._key_getter)) def _key_getter(self, item): values = self.value_accessor(item, self.trail_name) key = ''.join(map(lambda v: str(v), values)) return key