Mercurial > piecrust2
diff piecrust/appconfig.py @ 584:9ccc933ac2c7
internal: Refactor the app configuration class.
* Moved to its own module.
* More extensible validation.
* Allow easier setup of defaults so `showconfig` shows more useful stuff.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 01 Jan 2016 23:18:26 -0800 |
parents | |
children | 25df894f9ab9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/appconfig.py Fri Jan 01 23:18:26 2016 -0800 @@ -0,0 +1,504 @@ +import re +import os.path +import copy +import json +import urllib +import logging +import hashlib +import collections +import yaml +from piecrust import ( + APP_VERSION, CACHE_VERSION, + DEFAULT_FORMAT, DEFAULT_TEMPLATE_ENGINE, DEFAULT_POSTS_FS, + DEFAULT_DATE_FORMAT, DEFAULT_THEME_SOURCE) +from piecrust.cache import NullCache +from piecrust.configuration import ( + Configuration, ConfigurationError, ConfigurationLoader, + merge_dicts, visit_dict) +from piecrust.sources.base import REALM_USER, REALM_THEME + + +logger = logging.getLogger(__name__) + + +class VariantNotFoundError(Exception): + def __init__(self, variant_path, message=None): + super(VariantNotFoundError, self).__init__( + message or ("No such configuration variant: %s" % + variant_path)) + + +class PieCrustConfiguration(Configuration): + def __init__(self, paths=None, cache=None, values=None, validate=True): + super(PieCrustConfiguration, self).__init__(values, validate) + self.paths = paths + self.cache = cache or NullCache() + self.fixups = [] + + def applyVariant(self, variant_path, raise_if_not_found=True): + variant = self.get(variant_path) + if variant is None: + if raise_if_not_found: + raise VariantNotFoundError(variant_path) + return + if not isinstance(variant, dict): + raise VariantNotFoundError( + variant_path, + "Configuration variant '%s' is not an array. " + "Check your configuration file." % variant_path) + self.merge(variant) + + def _load(self): + if self.paths is None: + self._values = self._validateAll({}) + return + + path_times = [os.path.getmtime(p) for p in self.paths] + + cache_key_hash = hashlib.md5( + ("version=%s&cache=%d" % ( + APP_VERSION, CACHE_VERSION)).encode('utf8')) + for p in self.paths: + cache_key_hash.update(("&path=%s" % p).encode('utf8')) + cache_key = cache_key_hash.hexdigest() + + if self.cache.isValid('config.json', path_times): + logger.debug("Loading configuration from cache...") + config_text = self.cache.read('config.json') + self._values = json.loads( + config_text, + object_pairs_hook=collections.OrderedDict) + + actual_cache_key = self._values.get('__cache_key') + if actual_cache_key == cache_key: + self._values['__cache_valid'] = True + return + logger.debug("Outdated cache key '%s' (expected '%s')." % ( + actual_cache_key, cache_key)) + + logger.debug("Loading configuration from: %s" % self.paths) + values = {} + for i, p in enumerate(self.paths): + with open(p, 'r', encoding='utf-8') as fp: + loaded_values = yaml.load( + fp.read(), + Loader=ConfigurationLoader) + if loaded_values is None: + loaded_values = {} + for fixup in self.fixups: + fixup(i, loaded_values) + merge_dicts(values, loaded_values) + + for fixup in self.fixups: + fixup(len(self.paths), values) + + self._values = self._validateAll(values) + + logger.debug("Caching configuration...") + self._values['__cache_key'] = cache_key + config_text = json.dumps(self._values) + self.cache.write('config.json', config_text) + + self._values['__cache_valid'] = False + + def _validateAll(self, values): + if values is None: + values = {} + + # Add the loaded values to the default configuration. + values = merge_dicts(copy.deepcopy(default_configuration), values) + + # Figure out if we need to generate the configuration for the + # default content model. + sitec = values.setdefault('site', {}) + if ( + ('sources' not in sitec and + 'routes' not in sitec and + 'taxonomies' not in sitec) or + sitec.get('use_default_content')): + logger.debug("Generating default content model...") + values = self._generateDefaultContentModel(values) + + # Add a section for our cached information. + cachec = collections.OrderedDict() + values['__cache'] = cachec + cache_writer = _ConfigCacheWriter(cachec) + globs = globals() + + def _visitor(path, val, parent_val, parent_key): + callback_name = '_validate_' + path.replace('/', '_') + callback = globs.get(callback_name) + if callback: + val2 = callback(val, values, cache_writer) + if val2 is None: + raise Exception("Validator '%s' isn't returning a " + "coerced value." % callback_name) + parent_val[parent_key] = val2 + + visit_dict(values, _visitor) + + return values + + def _generateDefaultContentModel(self, values): + dcmcopy = copy.deepcopy(default_content_model_base) + values = merge_dicts(dcmcopy, values) + + dcm = get_default_content_model(values) + values = merge_dicts(dcm, values) + + blogsc = values['site'].get('blogs') + if blogsc is None: + blogsc = ['posts'] + values['site']['blogs'] = blogsc + + is_only_blog = (len(blogsc) == 1) + for blog_name in blogsc: + blog_cfg = get_default_content_model_for_blog( + blog_name, is_only_blog, values) + values = merge_dicts(blog_cfg, values) + + return values + + +class _ConfigCacheWriter(object): + def __init__(self, cache_dict): + self._cache_dict = cache_dict + + def write(self, name, val): + logger.debug("Caching configuration item '%s' = %s" % (name, val)) + self._cache_dict[name] = val + + +default_configuration = collections.OrderedDict({ + 'site': collections.OrderedDict({ + 'title': "Untitled PieCrust website", + 'root': '/', + 'default_format': DEFAULT_FORMAT, + 'default_template_engine': DEFAULT_TEMPLATE_ENGINE, + 'enable_gzip': True, + 'pretty_urls': False, + 'trailing_slash': False, + 'date_format': DEFAULT_DATE_FORMAT, + 'auto_formats': collections.OrderedDict([ + ('html', ''), + ('md', 'markdown'), + ('textile', 'textile')]), + 'default_auto_format': 'md', + 'pagination_suffix': '/%num%', + 'slugify_mode': 'encode', + 'themes_sources': [DEFAULT_THEME_SOURCE], + 'cache_time': 28800, + 'enable_debug_info': True, + 'show_debug_info': False, + 'use_default_content': True + }), + 'baker': collections.OrderedDict({ + 'no_bake_setting': 'draft' + }) + }) + + +default_content_model_base = collections.OrderedDict({ + 'site': collections.OrderedDict({ + 'posts_fs': DEFAULT_POSTS_FS, + 'date_format': DEFAULT_DATE_FORMAT, + 'default_page_layout': 'default', + 'default_post_layout': 'post', + 'post_url': '%year%/%month%/%day%/%slug%', + 'tag_url': 'tag/%tag%', + 'category_url': '%category%', + 'posts_per_page': 5 + }) + }) + + +def get_default_content_model(values): + default_layout = values['site']['default_page_layout'] + return collections.OrderedDict({ + 'site': collections.OrderedDict({ + 'sources': collections.OrderedDict({ + 'pages': { + 'type': 'default', + 'ignore_missing_dir': True, + 'data_endpoint': 'site.pages', + 'default_layout': default_layout, + 'item_name': 'page' + } + }), + 'routes': [ + { + 'url': '/%path:slug%', + 'source': 'pages', + 'func': 'pcurl(slug)' + } + ], + 'taxonomies': collections.OrderedDict({ + 'tags': { + 'multiple': True, + 'term': 'tag' + }, + 'categories': { + 'term': 'category' + } + }) + }) + }) + + +def get_default_content_model_for_blog(blog_name, is_only_blog, values): + posts_fs = values['site']['posts_fs'] + blog_cfg = values.get(blog_name, {}) + + if is_only_blog: + url_prefix = '' + tax_page_prefix = '' + fs_endpoint = 'posts' + data_endpoint = 'blog' + item_name = 'post' + else: + url_prefix = blog_name + '/' + tax_page_prefix = blog_name + '/' + fs_endpoint = 'posts/%s' % blog_name + data_endpoint = blog_name + item_name = '%s-post' % blog_name + + items_per_page = blog_cfg.get( + 'posts_per_page', values['site']['posts_per_page']) + date_format = blog_cfg.get( + 'date_format', values['site']['date_format']) + default_layout = blog_cfg.get( + 'default_layout', values['site']['default_post_layout']) + + post_url = '/' + blog_cfg.get( + 'post_url', + url_prefix + values['site']['post_url']).lstrip('/') + tag_url = '/' + blog_cfg.get( + 'tag_url', + url_prefix + values['site']['tag_url']).lstrip('/') + category_url = '/' + blog_cfg.get( + 'category_url', + url_prefix + values['site']['category_url']).lstrip('/') + + return collections.OrderedDict({ + 'site': collections.OrderedDict({ + 'sources': collections.OrderedDict({ + blog_name: collections.OrderedDict({ + 'type': 'posts/%s' % posts_fs, + 'fs_endpoint': fs_endpoint, + 'data_endpoint': data_endpoint, + 'item_name': item_name, + 'ignore_missing_dir': True, + 'data_type': 'blog', + 'items_per_page': items_per_page, + 'date_format': date_format, + 'default_layout': default_layout, + 'taxonomy_pages': collections.OrderedDict({ + 'tags': ('pages:%s_tag.%%ext%%;' + 'theme_pages:_tag.%%ext%%' % + tax_page_prefix), + 'categories': ('pages:%s_category.%%ext%%;' + 'theme_pages:_category.%%ext%%' % + tax_page_prefix) + }) + }) + }), + 'routes': [ + { + 'url': post_url, + 'source': blog_name, + 'func': 'pcposturl(year,month,day,slug)' + }, + { + 'url': tag_url, + 'source': blog_name, + 'taxonomy': 'tags', + 'func': 'pctagurl(tag)' + }, + { + 'url': category_url, + 'source': blog_name, + 'taxonomy': 'categories', + 'func': 'pccaturl(category)' + } + ] + }) + }) + + +# Configuration value validators. +# +# Make sure we have basic site stuff. +def _validate_site(v, values, cache): + sources = v.get('sources') + if not sources: + raise ConfigurationError("No sources were defined.") + routes = v.get('routes') + if not routes: + raise ConfigurationError("No routes were defined.") + taxonomies = v.get('taxonomies') + if taxonomies is None: + v['taxonomies'] = {} + return v + +# Make sure the site root starts and ends with a slash. +def _validate_site_root(v, values, cache): + if not v.startswith('/'): + raise ConfigurationError("The `site/root` setting must start " + "with a slash.") + root_url = urllib.parse.quote(v.rstrip('/') + '/') + return root_url + + +# Cache auto-format regexes, check that `.html` is in there. +def _validate_site_auto_formats(v, values, cache): + if not isinstance(v, dict): + raise ConfigurationError("The 'site/auto_formats' setting must be " + "a dictionary.") + + v.setdefault('html', values['site']['default_format']) + auto_formats_re = r"\.(%s)$" % ( + '|'.join( + [re.escape(i) for i in list(v.keys())])) + cache.write('auto_formats_re', auto_formats_re) + return v + + +# Check that the default auto-format is known. +def _validate_site_default_auto_format(v, values, cache): + if v not in values['site']['auto_formats']: + raise ConfigurationError( + "Default auto-format '%s' is not declared." % v) + return v + + +# Cache pagination suffix regex and format. +def _validate_site_pagination_suffix(v, values, cache): + if len(v) == 0 or v[0] != '/': + raise ConfigurationError("The 'site/pagination_suffix' setting " + "must start with a slash.") + if '%num%' not in v: + raise ConfigurationError("The 'site/pagination_suffix' setting " + "must contain the '%num%' placeholder.") + + pgn_suffix_fmt = v.replace('%num%', '%(num)d') + cache.write('pagination_suffix_format', pgn_suffix_fmt) + + pgn_suffix_re = re.escape(v) + pgn_suffix_re = (pgn_suffix_re.replace("\\%num\\%", "(?P<num>\\d+)") + + '$') + cache.write('pagination_suffix_re', pgn_suffix_re) + return v + + +# Make sure theme sources is a list. +def _validate_site_theme_sources(v, values, cache): + if not isinstance(v, list): + v = [v] + return v + + +def _validate_site_sources(v, values, cache): + # Basic checks. + if not v: + raise ConfigurationError("There are no sources defined.") + if not isinstance(v, dict): + raise ConfigurationError("The 'site/sources' setting must be a " + "dictionary.") + + # Add the theme page source if no sources were defined in the theme + # configuration itself. + has_any_theme_source = False + for sn, sc in v.items(): + if sc.get('realm') == REALM_THEME: + has_any_theme_source = True + break + if not has_any_theme_source: + v['theme_pages'] = { + 'theme_source': True, + 'fs_endpoint': 'pages', + 'data_endpoint': 'site/pages', + 'item_name': 'page', + 'realm': REALM_THEME} + values['site']['routes'].append({ + 'url': '/%path:slug%', + 'source': 'theme_pages', + 'func': 'pcurl(slug)'}) + + # Sources have the `default` scanner by default, duh. Also, a bunch + # of other default values for other configuration stuff. + for sn, sc in v.items(): + if not isinstance(sc, dict): + raise ConfigurationError("All sources in 'site/sources' must " + "be dictionaries.") + sc.setdefault('type', 'default') + sc.setdefault('fs_endpoint', sn) + sc.setdefault('ignore_missing_dir', False) + sc.setdefault('data_endpoint', sn) + sc.setdefault('data_type', 'iterator') + sc.setdefault('item_name', sn) + sc.setdefault('items_per_page', 5) + sc.setdefault('date_format', DEFAULT_DATE_FORMAT) + sc.setdefault('realm', REALM_USER) + + return v + + +def _validate_site_routes(v, values, cache): + if not v: + raise ConfigurationError("There are no routes defined.") + if not isinstance(v, list): + raise ConfigurationError("The 'site/routes' setting must be a " + "list.") + + # Check routes are referencing correct sources, have default + # values, etc. + for rc in v: + if not isinstance(rc, dict): + raise ConfigurationError("All routes in 'site/routes' must be " + "dictionaries.") + rc_url = rc.get('url') + if not rc_url: + raise ConfigurationError("All routes in 'site/routes' must " + "have an 'url'.") + if rc_url[0] != '/': + raise ConfigurationError("Route URLs must start with '/'.") + if rc.get('source') is None: + raise ConfigurationError("Routes must specify a source.") + if rc['source'] not in list(values['site']['sources'].keys()): + raise ConfigurationError("Route is referencing unknown " + "source: %s" % rc['source']) + rc.setdefault('taxonomy', None) + rc.setdefault('page_suffix', '/%num%') + + return v + + +def _validate_site_taxonomies(v, values, cache): + for tn, tc in v.items(): + tc.setdefault('multiple', False) + tc.setdefault('term', tn) + tc.setdefault('page', '_%s.%%ext%%' % tc['term']) + + # Validate endpoints, and make sure the theme has a default source. + reserved_endpoints = set(['piecrust', 'site', 'page', 'route', + 'assets', 'pagination', 'siblings', + 'family']) + for name, src in values['site']['sources'].items(): + endpoint = src['data_endpoint'] + if endpoint in reserved_endpoints: + raise ConfigurationError( + "Source '%s' is using a reserved endpoint name: %s" % + (name, endpoint)) + + return v + + +def _validate_site_plugins(v, values, cache): + if isinstance(v, str): + v = v.split(',') + elif not isinstance(v, list): + raise ConfigurationError( + "The 'site/plugins' setting must be an array, or a " + "comma-separated list.") + return v +