Mercurial > wikked
diff tests/mock.py @ 225:ebb12ff21cb2
Updated unit tests to be able to run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 12 Mar 2014 23:02:40 -0700 |
parents | fd6eccb24882 |
children | 03e3e793fa22 |
line wrap: on
line diff
--- a/tests/mock.py Mon Mar 10 16:47:21 2014 -0700 +++ b/tests/mock.py Wed Mar 12 23:02:40 2014 -0700 @@ -1,194 +1,164 @@ -import re +import os import os.path import types import codecs import logging import StringIO -from wikked.page import Page -from wikked.fs import PageInfo, PageNotFoundError -from wikked.db import Database -from wikked.indexer import WikiIndex -from wikked.scm import SourceControl -from wikked.utils import title_to_url +from collections import deque +from contextlib import closing +from ConfigParser import SafeConfigParser +from wikked.fs import FileSystem +from wikked.db.base import Database +from wikked.indexer.base import WikiIndex +from wikked.scm.base import SourceControl +from wikked.wiki import WikiParameters, passthrough_formatter -class MockWikiParameters(object): - def __init__(self): - self.formatters = { - self._passthrough: ['txt', 'html'] - } - - self.config_text = "" - self.special_filenames = [] - self.use_db = False - - self.logger_factory = lambda: logging.getLogger('wikked.tests') - self.page_factory = lambda wiki, url: MockPage(wiki, url) - self.config_factory = lambda: StringIO.StringIO(self.config_text) - self.fs_factory = lambda cfg: MockFileSystem() - self.index_factory = lambda cfg: MockWikiIndex() - self.db_factory = lambda cfg: MockDatabase() - self.scm_factory = lambda cfg: MockSourceControl() - - def getSpecialFilenames(self): - return self.special_filenames - - def _passthrough(self, text): - return text - - -class MockPage(Page): - def __init__(self, wiki, url): - Page.__init__(self, wiki, url) +logger = logging.getLogger(__name__) -class MockDatabase(Database): - def __init__(self, content=None, logger=None): - Database.__init__(self, logger) - self.content = content - self.conn = None - self._open_count = 0 +class MockWikiParameters(WikiParameters): + def __init__(self, root=None): + super(MockWikiParameters, self).__init__(root) + self.config_text = "" + self.mock_fs = None + self.mock_index = None + self.mock_db = None + self.mock_scm = None - def initDb(self): - pass + def fs_factory(self): + if self.mock_fs is False: + return super(MockWikiParameters, self).fs_factory() + return self.mock_fs or MockFileSystem(self.root, self.config) - def open(self): - self._open_count += 1 - self.conn = 'MOCK_CONNECTION' + def index_factory(self): + if self.mock_index is False: + return super(MockWikiParameters, self).index_factory() + return self.mock_index or MockWikiIndex() + + def db_factory(self): + if self.mock_db is False: + return super(MockWikiParameters, self).db_factory() + return self.mock_db or MockDatabase() - def close(self): - self._open_count -= 1 - if self._open_count < 0: - raise Exception( - "The database was closed more times than it was open.") - elif self._open_count == 0: - self.conn = None + def scm_factory(self, for_init=False): + if self.mock_scm is False: + return super(MockWikiParameters, self).scm_factory(for_init) + return self.mock_scm or MockSourceControl() - def reset(self, pages): - pass - - def update(self, pages): - pass + def getFormatters(self): + formatters = { + passthrough_formatter: ['txt', 'html'] + } + return formatters - def getPageUrls(self, subdir=None): - return [] + def getPageUpdater(self): + return lambda wiki, url: wiki.update(url, cache_ext_data=True) - def getPages(self, subdir=None): - return [] + def _loadConfig(self): + default_config_path = os.path.join( + os.path.dirname(__file__), '..', 'wikked', 'resources', 'defaults.cfg') - def getPage(self, url): - return None + config = SafeConfigParser() + config.readfp(open(default_config_path)) + config.set('wiki', 'root', '/fake/root') + if self.config_text: + with closing(StringIO.StringIO(self.config_text)) as conf: + config.readfp(conf) - def pageExists(self, url): - return False - - def getLinksTo(self, url): - return [] + return config -class MockFileSystem(): - def __init__(self, structure=None, logger=None): +def mock_os_walk(root_dir, root_node): + queue = deque() + queue.appendleft((root_dir, root_node)) + while len(queue) > 0: + cur_dir, cur_node = queue.pop() + + dirnames = [] + filenames = [] + for name, child in cur_node.iteritems(): + if isinstance(child, dict): + dirnames.append(name) + else: + filenames.append(name) + yield cur_dir, dirnames, filenames + for name in dirnames: + fullname = os.path.join(cur_dir, name) + queue.appendleft((fullname, cur_node[name])) + + +class MockFileSystem(FileSystem): + def __init__(self, root, config, structure=None): + super(MockFileSystem, self).__init__(root, config) if not structure: - structure = [] - self.structure = structure - self.logger = logger - self.excluded = [] + self.structure = {} + else: + self.structure = MockFileSystem.flat_to_nested(structure) def getPageInfos(self, subdir=None): - node = self._getNode(subdir) - if node is None: - raise PageNotFoundError() - for n in self._getChildren(node): - yield self._getPageInfo(n) + def tmp_walk(path): + node = self._getNode(path) + return mock_os_walk(path, node) - def getPageInfo(self, path): - node = self._getNode(path) - if node is None: - raise PageNotFoundError() - return self._getPageInfo(node) + orig_walk = os.walk + os.walk = tmp_walk + try: + gen = super(MockFileSystem, self).getPageInfos(subdir) + return list(gen) + finally: + os.walk = orig_walk - def getPage(self, url): - path = self._getPath(url, True) - node = self._getNode(path) - if node is None: - raise PageNotFoundError() - return self._getPageInfo(node, True) - - def setPage(self, path, content): + def setPage(self, url, content): raise NotImplementedError() - def pageExists(self, url): - try: - self._getPath(url, True) - return True - except PageNotFoundError: - return False - - def getPhysicalNamespacePath(self, url): - raise NotImplementedError() - - def _getPageInfo(self, node, with_content=False): - path_split = os.path.splitext(node['path']) - url = title_to_url(path_split[0]) - info = PageInfo(url, node['path']) - if with_content: - info.content = node['content'] - return info + def _getPageInfo(self, path): + pi = super(MockFileSystem, self)._getPageInfo(path) + node = self._getNode(path) + if node is not None: + pi._content = node + else: + raise Exception("Can't find node: %s" % path) + return pi def _getNode(self, path): node = self.structure - if path: + path = path.lstrip('/') + if path != '': for n in path.split('/'): if n not in node: return None node = node[n] - else: - path = '' - if isinstance(node, types.StringTypes): - return {'type': 'file', 'path': path, 'content': node} - return {'type': 'dir', 'path': path, 'content': node} + return node + + def _getPhysicalPath(self, url, is_file=True, make_new=False): + def tmp_walk(path): + node = self._getNode(path) + return mock_os_walk(path, node) + + orig_walk = os.walk + os.walk = tmp_walk + try: + return super(MockFileSystem, self)._getPhysicalPath(url, is_file, + make_new) + finally: + os.walk = orig_walk - def _getChildren(self, node): - if node['type'] != 'dir': - raise Exception("'%s' is not a directory." % node['path']) - for name in node['content']: - child_path = os.path.join(node['path'], name) - child = node['content'][name] - if isinstance(child, types.StringTypes): - yield { - 'type': 'file', - 'path': child_path, - 'content': child - } - else: - for c in self._getChildren({ - 'type': 'dir', - 'path': child_path, - 'content': child - }): - yield c + @staticmethod + def flat_to_nested(flat): + nested = {} + for k, v in flat.iteritems(): + bits = k.lstrip('/').split('/') + cur = nested + for i, b in enumerate(bits): + if i < len(bits) - 1: + if b not in cur: + cur[b] = {} + cur = cur[b] + else: + cur[b] = v + return nested - def _getPath(self, url, is_file): - path = '' - current = self.structure - parts = unicode(url).lower().split('/') - for i, part in enumerate(parts): - for name in current: - name_slug = title_to_url(name) - if is_file and i == len(parts) - 1: - if re.match(r"%s\.[a-z]+" % re.escape(part), name_slug): - current = current[name] - path = os.path.join(path, name) - break - else: - if name_slug == part: - current = current[name] - path = os.path.join(path, name) - break - else: - # Failed to find a part of the URL. - raise PageNotFoundError("No such page: " + url) - return path @staticmethod def save_structure(path, structure): @@ -203,18 +173,37 @@ MockFileSystem.save_structure(node_path, structure[node]) -class MockWikiIndex(WikiIndex): - def __init__(self, logger=None): - WikiIndex.__init__(self, logger) +class MockDatabase(Database): + def __init__(self, content=None): + super(MockDatabase, self).__init__() + self.content = content + + def getPageUrls(self, subdir=None, uncached_only=False): + return [] + + def getPages(self, subdir=None, meta_query=None, uncached_only=False, + fields=None): + return [] + + def isCacheValid(self, page): + return False - def initIndex(self): - pass + def pageExists(self, url=None, path=None): + return False + + def getLinksTo(self, url): + return [] - def reset(self, pages): - pass + def _getPageByUrl(self, url, fields): + return None - def update(self, pages): - pass + def _getPageByPath(self, path, fields): + return None + + +class MockWikiIndex(WikiIndex): + def __init__(self): + super(MockWikiIndex, self).__init__() def search(self, query): # url, title, content_highlights @@ -222,11 +211,8 @@ class MockSourceControl(SourceControl): - def __init__(self, logger=None): - SourceControl.__init__(self, logger) - - def initRepo(self): - pass + def __init__(self): + super(MockSourceControl, self).__init__() def getSpecialFilenames(self): return [] @@ -242,9 +228,3 @@ def diff(self, path, rev1, rev2): raise NotImplementedError() - - def commit(self, paths, op_meta): - raise NotImplementedError() - - def revert(self, paths=None): - raise NotImplementedError()