Mercurial > wikked
view wikked/fs.py @ 221:4306c6b56b30
Some optimizations for `wk update`:
- don't always load/format the text contents.
- better naming for a few functions.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 03 Mar 2014 22:03:06 -0800 |
parents | fba20c625fb3 |
children | a978ecf99408 |
line wrap: on
line source
import os import os.path import re import string import codecs import fnmatch import logging import itertools from utils import (PageNotFoundError, NamespaceNotFoundError, split_page_url) META_ENDPOINT = '_meta' logger = logging.getLogger(__name__) valid_filename_pattern = re.compile('^[\w \.\-\(\)\[\]\\/]+$', re.UNICODE) class PageInfo(object): def __init__(self, url, path): self.url = url self.path = path self._content = None @property def content(self): if self._content is None: with codecs.open(self.path, 'r', encoding='utf-8') as f: self._content = f.read() return self._content class FileSystem(object): """ A class responsible for mapping page URLs to file-system paths, and for scanning the file-system to list existing pages. """ def __init__(self, root, config): self.root = unicode(root) self.excluded = None self.page_extensions = None self.default_extension = config.get('wiki', 'default_extension') def start(self, wiki): self.page_extensions = list(set( itertools.chain(*wiki.formatters.itervalues()))) excluded = [] excluded += wiki.getSpecialFilenames() excluded += wiki.scm.getSpecialFilenames() self.excluded = [os.path.join(self.root, e) for e in excluded] def init(self, wiki): pass def postInit(self): pass def getPageInfos(self, subdir=None): basepath = self.root if subdir is not None: basepath = self.getPhysicalNamespacePath(subdir) logger.debug("Scanning for pages in: %s" % basepath) for dirpath, dirnames, filenames in os.walk(basepath): incl_dirnames = [] for d in dirnames: full_d = os.path.join(dirpath, d) for e in self.excluded: if fnmatch.fnmatch(full_d, e): break else: incl_dirnames.append(d) dirnames[:] = incl_dirnames for filename in filenames: path = os.path.join(dirpath, filename) page_info = self.getPageInfo(path) if page_info is not None: yield page_info def getPageInfo(self, path): logger.debug("Reading page info from: %s" % path) if not isinstance(path, unicode): path = unicode(path) for e in self.excluded: if fnmatch.fnmatch(path, e): return None return self._getPageInfo(path) def findPageInfo(self, url): logger.debug("Searching for page: %s" % url) path = self.getPhysicalPagePath(url) return PageInfo(url, path) def setPage(self, url, content): path = self.getPhysicalPagePath(url, make_new=True) logger.debug("Saving page '%s' to: %s" % (url, path)) dirname = os.path.dirname(path) if not os.path.isdir(dirname): os.makedirs(dirname, 0775) with codecs.open(path, 'w', encoding='utf-8') as f: f.write(content) return PageInfo(url, path) def pageExists(self, url): logger.debug("Searching for page: %s" % url) try: self.getPhysicalPagePath(url) return True except PageNotFoundError: return False def getPhysicalPagePath(self, url, make_new=False): return self._getPhysicalPath(url, is_file=True, make_new=make_new) def getPhysicalNamespacePath(self, url, make_new=False): return self._getPhysicalPath(url, is_file=False, make_new=make_new) def _getPageInfo(self, path): meta = None rel_path = os.path.relpath(path, self.root) if rel_path.startswith(META_ENDPOINT + os.sep): rel_path = rel_path[len(META_ENDPOINT) + 1:] meta, rel_path = rel_path.split(os.sep, 1) rel_path_split = os.path.splitext(rel_path) ext = rel_path_split[1].lstrip('.') name = rel_path_split[0].replace(os.sep, '/') if len(ext) == 0: return None if self.page_extensions is not None and ext not in self.page_extensions: return None url = '/' + name if meta: url = u"%s:/%s" % (meta.lower(), name) return PageInfo(url, path) def _getPhysicalPath(self, url, is_file=True, make_new=False): endpoint, url = split_page_url(url) if url[0] != '/': raise ValueError("Page URLs need to be absolute: " + url) if string.find(url, '..') >= 0: raise ValueError("Page URLs can't contain '..': " + url) # Find the root directory in which we'll be searching for the # page file. root = self.root if endpoint: root = os.path.join(self.root, META_ENDPOINT, endpoint) # Make the URL into a relative file-system path. url_path = url[1:].replace('/', os.sep) if url_path[0] == os.sep: raise ValueError("Page URLs can only have one slash at the " "beginning. Got: %s" % url) # If we want a non-existing file's path, just build that. if make_new: if (url_path[-1] == os.sep or not valid_filename_pattern.match(url_path)): raise ValueError("Invalid URL: %s" % url_path) return os.path.join(root, url_path + '.' + self.default_extension) # Find the right file-system entry for this URL. url_path = os.path.join(root, url_path) if is_file: dirname, basename = os.path.split(url_path) if basename == '': raise ValueError("Invalid URL: %s" % url_path) if not os.path.isdir(dirname): self._throwNotFoundError(url, root, is_file) it = os.walk(dirname) # TODO: This is weird, `itertools.islice` seems useless here. for _, __, ___ in it: filenames = ___ break for filename in filenames: name, ext = os.path.splitext(filename) if name == basename: return os.path.join(dirname, filename) self._throwNotFoundError(url, root, is_file) else: if os.path.isdir(url_path): return url_path self._throwNotFoundError(url, root, is_file) def _throwNotFoundError(self, url, searched, is_file): if is_file: raise PageNotFoundError("No such page '%s' in: %s" % (url, searched)) else: raise NamespaceNotFoundError("No such namespace '%s' in: %s" % (url, searched))