Mercurial > piecrust2
view piecrust/sources/fs.py @ 1156:4ea64255eadf
sources: Add ability to force a realm on a source.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 11 Jun 2019 15:12:47 -0700 |
parents | a6618fdab37e |
children |
line wrap: on
line source
import os.path import re import glob import fnmatch import logging from werkzeug.utils import cached_property from piecrust import osutil from piecrust.routing import RouteParameter from piecrust.sources.base import ( ContentItem, ContentGroup, ContentSource, REL_PARENT_GROUP, REL_LOGICAL_PARENT_ITEM, REL_LOGICAL_CHILD_GROUP) logger = logging.getLogger(__name__) class InvalidFileSystemEndpointError(Exception): def __init__(self, source_name, fs_endpoint): super(InvalidFileSystemEndpointError, self).__init__( "Invalid file-system endpoint for source '%s': %s" % (source_name, fs_endpoint)) def _filter_crap_files(f): return (f[-1] != '~' and # Vim temp files and what-not f not in ['.DS_Store', 'Thumbs.db']) # OSX and Windows bullshit class FSContentSourceBase(ContentSource): """ Implements some basic stuff for a `ContentSource` that stores its items as files on disk. """ def __init__(self, app, name, config): super().__init__(app, name, config) self.fs_endpoint = config.get('fs_endpoint', name) self.fs_endpoint_path = os.path.join(self.root_dir, self.fs_endpoint) @cached_property def root_dir(self): if self.is_theme_source and not self.config.get('force_user_realm'): return self.app.theme_dir return self.app.root_dir def _checkFSEndpoint(self): if not os.path.isdir(self.fs_endpoint_path): if self.config.get('ignore_missing_dir'): return False raise InvalidFileSystemEndpointError(self.name, self.fs_endpoint_path) return True def openItem(self, item, mode='r', **kwargs): for m in 'wxa+': if m in mode: # If opening the file for writing, let's make sure the # directory exists. dirname = os.path.dirname(item.spec) if not os.path.exists(dirname): os.makedirs(dirname, 0o755) break return open(item.spec, mode, **kwargs) def getItemMtime(self, item): return os.path.getmtime(item.spec) def describe(self): return {'endpoint_path': self.fs_endpoint_path} class FSContentSource(FSContentSourceBase): """ Implements a `ContentSource` that simply returns files on disk under a given root directory. """ SOURCE_NAME = 'fs' def __init__(self, app, name, config): super().__init__(app, name, config) config.setdefault('data_type', 'asset_iterator') self._ignore = _parse_patterns(config.get('ignore')) self._filter = _parse_patterns(config.get('filter')) def getContents(self, group): if not self._checkFSEndpoint(): return None parent_path = self.fs_endpoint_path if group is not None: parent_path = group.spec names = filter(_filter_crap_files, osutil.listdir(parent_path)) items = [] groups = [] for name in names: path = os.path.join(parent_path, name) if self._filterPath(path): if os.path.isdir(path): metadata = self._createGroupMetadata(path) groups.append(ContentGroup(path, metadata)) else: metadata = self._createItemMetadata(path) items.append(ContentItem(path, metadata)) self._finalizeContent(group, items, groups) return items + groups def _filterPath(self, path): rel_path = os.path.relpath(path, self.fs_endpoint_path) if self._ignore is not None: if _matches_patterns(self._ignore, rel_path): return False if self._filter is not None: if _matches_patterns(self._filter, rel_path): return True return False return True def _createGroupMetadata(self, path): return {} def _createItemMetadata(self, path): return {} def _finalizeContent(self, parent_group, items, groups): pass def findContentFromSpec(self, spec): if os.path.isdir(spec): metadata = self._createGroupMetadata(spec) return ContentGroup(spec, metadata) elif os.path.isfile(spec): metadata = self._createItemMetadata(spec) return ContentItem(spec, metadata) return None def getRelatedContents(self, item, relationship): if relationship == REL_PARENT_GROUP: parent_dir = os.path.dirname(item.spec) if len(parent_dir) >= len(self.fs_endpoint_path): metadata = self._createGroupMetadata(parent_dir) return ContentGroup(parent_dir, metadata) # Don't return a group for paths that are outside of our # endpoint directory. return None if relationship == REL_LOGICAL_PARENT_ITEM: # If we want the logical parent item of a folder, we find a # page file with the same name as the folder. if not item.is_group: raise ValueError() parent_glob = item.spec.rstrip('/\\') + '.*' for n in glob.iglob(parent_glob): if os.path.isfile(n): metadata = self._createItemMetadata(n) return ContentItem(n, metadata) return None if relationship == REL_LOGICAL_CHILD_GROUP: # If we want the children items of an item, we look for # a directory that has the same name as the item's file. if item.is_group: raise ValueError( "'%s' is a content group and doesn't have a logical " "child. Did you call `family.children` on a group? " "You need to check `is_group` first.") dir_path, _ = os.path.splitext(item.spec) if os.path.isdir(dir_path): metadata = self._createGroupMetadata(dir_path) return ContentGroup(dir_path, metadata) return None return None def getSupportedRouteParameters(self): return [ RouteParameter('path', RouteParameter.TYPE_PATH)] def _parse_patterns(patterns): if not patterns: return None globs = [] regexes = [] for pat in patterns: if len(pat) > 2 and pat[0] == '/' and pat[-1] == '/': regexes.append(re.compile(pat[1:-1])) else: globs.append(pat) return globs, regexes def _matches_patterns(patterns, subj): globs, regexes = patterns if globs: for g in globs: if fnmatch.fnmatch(subj, g): return True if regexes: for r in regexes: if r.search(subj): return True return False