diff tests/mock.py @ 225:ebb12ff21cb2

Updated unit tests to be able to run.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 12 Mar 2014 23:02:40 -0700
parents fd6eccb24882
children 03e3e793fa22
line wrap: on
line diff
--- a/tests/mock.py	Mon Mar 10 16:47:21 2014 -0700
+++ b/tests/mock.py	Wed Mar 12 23:02:40 2014 -0700
@@ -1,194 +1,164 @@
-import re
+import os
 import os.path
 import types
 import codecs
 import logging
 import StringIO
-from wikked.page import Page
-from wikked.fs import PageInfo, PageNotFoundError
-from wikked.db import Database
-from wikked.indexer import WikiIndex
-from wikked.scm import SourceControl
-from wikked.utils import title_to_url
+from collections import deque
+from contextlib import closing
+from ConfigParser import SafeConfigParser
+from wikked.fs import FileSystem
+from wikked.db.base import Database
+from wikked.indexer.base import WikiIndex
+from wikked.scm.base import SourceControl
+from wikked.wiki import WikiParameters, passthrough_formatter
 
 
-class MockWikiParameters(object):
-    def __init__(self):
-        self.formatters = {
-            self._passthrough: ['txt', 'html']
-        }
-
-        self.config_text = ""
-        self.special_filenames = []
-        self.use_db = False
-
-        self.logger_factory = lambda: logging.getLogger('wikked.tests')
-        self.page_factory = lambda wiki, url: MockPage(wiki, url)
-        self.config_factory = lambda: StringIO.StringIO(self.config_text)
-        self.fs_factory = lambda cfg: MockFileSystem()
-        self.index_factory = lambda cfg: MockWikiIndex()
-        self.db_factory = lambda cfg: MockDatabase()
-        self.scm_factory = lambda cfg: MockSourceControl()
-
-    def getSpecialFilenames(self):
-        return self.special_filenames
-
-    def _passthrough(self, text):
-        return text
-
-
-class MockPage(Page):
-    def __init__(self, wiki, url):
-        Page.__init__(self, wiki, url)
+logger = logging.getLogger(__name__)
 
 
-class MockDatabase(Database):
-    def __init__(self, content=None, logger=None):
-        Database.__init__(self, logger)
-        self.content = content
-        self.conn = None
-        self._open_count = 0
+class MockWikiParameters(WikiParameters):
+    def __init__(self, root=None):
+        super(MockWikiParameters, self).__init__(root)
+        self.config_text = ""
+        self.mock_fs = None
+        self.mock_index = None
+        self.mock_db = None
+        self.mock_scm = None
 
-    def initDb(self):
-        pass
+    def fs_factory(self):
+        if self.mock_fs is False:
+            return super(MockWikiParameters, self).fs_factory()
+        return self.mock_fs or MockFileSystem(self.root, self.config)
 
-    def open(self):
-        self._open_count += 1
-        self.conn = 'MOCK_CONNECTION'
+    def index_factory(self):
+        if self.mock_index is False:
+            return super(MockWikiParameters, self).index_factory()
+        return self.mock_index or MockWikiIndex()
+
+    def db_factory(self):
+        if self.mock_db is False:
+            return super(MockWikiParameters, self).db_factory()
+        return self.mock_db or MockDatabase()
 
-    def close(self):
-        self._open_count -= 1
-        if self._open_count < 0:
-            raise Exception(
-                "The database was closed more times than it was open.")
-        elif self._open_count == 0:
-            self.conn = None
+    def scm_factory(self, for_init=False):
+        if self.mock_scm is False:
+            return super(MockWikiParameters, self).scm_factory(for_init)
+        return self.mock_scm or MockSourceControl()
 
-    def reset(self, pages):
-        pass
-
-    def update(self, pages):
-        pass
+    def getFormatters(self):
+        formatters = {
+            passthrough_formatter: ['txt', 'html']
+        }
+        return formatters
 
-    def getPageUrls(self, subdir=None):
-        return []
+    def getPageUpdater(self):
+        return lambda wiki, url: wiki.update(url, cache_ext_data=True)
 
-    def getPages(self, subdir=None):
-        return []
+    def _loadConfig(self):
+        default_config_path = os.path.join(
+            os.path.dirname(__file__), '..', 'wikked', 'resources', 'defaults.cfg')
 
-    def getPage(self, url):
-        return None
+        config = SafeConfigParser()
+        config.readfp(open(default_config_path))
+        config.set('wiki', 'root', '/fake/root')
+        if self.config_text:
+            with closing(StringIO.StringIO(self.config_text)) as conf:
+                config.readfp(conf)
 
-    def pageExists(self, url):
-        return False
-
-    def getLinksTo(self, url):
-        return []
+        return config
 
 
-class MockFileSystem():
-    def __init__(self, structure=None, logger=None):
+def mock_os_walk(root_dir, root_node):
+    queue = deque()
+    queue.appendleft((root_dir, root_node))
+    while len(queue) > 0:
+        cur_dir, cur_node = queue.pop()
+
+        dirnames = []
+        filenames = []
+        for name, child in cur_node.iteritems():
+            if isinstance(child, dict):
+                dirnames.append(name)
+            else:
+                filenames.append(name)
+        yield cur_dir, dirnames, filenames
+        for name in dirnames:
+            fullname = os.path.join(cur_dir, name)
+            queue.appendleft((fullname, cur_node[name]))
+
+
+class MockFileSystem(FileSystem):
+    def __init__(self, root, config, structure=None):
+        super(MockFileSystem, self).__init__(root, config)
         if not structure:
-            structure = []
-        self.structure = structure
-        self.logger = logger
-        self.excluded = []
+            self.structure = {}
+        else:
+            self.structure = MockFileSystem.flat_to_nested(structure)
 
     def getPageInfos(self, subdir=None):
-        node = self._getNode(subdir)
-        if node is None:
-            raise PageNotFoundError()
-        for n in self._getChildren(node):
-            yield self._getPageInfo(n)
+        def tmp_walk(path):
+            node = self._getNode(path)
+            return mock_os_walk(path, node)
 
-    def getPageInfo(self, path):
-        node = self._getNode(path)
-        if node is None:
-            raise PageNotFoundError()
-        return self._getPageInfo(node)
+        orig_walk = os.walk
+        os.walk = tmp_walk
+        try:
+            gen = super(MockFileSystem, self).getPageInfos(subdir)
+            return list(gen)
+        finally:
+            os.walk = orig_walk
 
-    def getPage(self, url):
-        path = self._getPath(url, True)
-        node = self._getNode(path)
-        if node is None:
-            raise PageNotFoundError()
-        return self._getPageInfo(node, True)
-
-    def setPage(self, path, content):
+    def setPage(self, url, content):
         raise NotImplementedError()
 
-    def pageExists(self, url):
-        try:
-            self._getPath(url, True)
-            return True
-        except PageNotFoundError:
-            return False
-
-    def getPhysicalNamespacePath(self, url):
-        raise NotImplementedError()
-
-    def _getPageInfo(self, node, with_content=False):
-        path_split = os.path.splitext(node['path'])
-        url = title_to_url(path_split[0])
-        info = PageInfo(url, node['path'])
-        if with_content:
-            info.content = node['content']
-        return info
+    def _getPageInfo(self, path):
+        pi = super(MockFileSystem, self)._getPageInfo(path)
+        node = self._getNode(path)
+        if node is not None:
+            pi._content = node
+        else:
+            raise Exception("Can't find node: %s" % path)
+        return pi
 
     def _getNode(self, path):
         node = self.structure
-        if path:
+        path = path.lstrip('/')
+        if path != '':
             for n in path.split('/'):
                 if n not in node:
                     return None
                 node = node[n]
-        else:
-            path = ''
-        if isinstance(node, types.StringTypes):
-            return {'type': 'file', 'path': path, 'content': node}
-        return {'type': 'dir', 'path': path, 'content': node}
+        return node
+
+    def _getPhysicalPath(self, url, is_file=True, make_new=False):
+        def tmp_walk(path):
+            node = self._getNode(path)
+            return mock_os_walk(path, node)
+
+        orig_walk = os.walk
+        os.walk = tmp_walk
+        try:
+            return super(MockFileSystem, self)._getPhysicalPath(url, is_file,
+                                                                make_new)
+        finally:
+            os.walk = orig_walk
 
-    def _getChildren(self, node):
-        if node['type'] != 'dir':
-            raise Exception("'%s' is not a directory." % node['path'])
-        for name in node['content']:
-            child_path = os.path.join(node['path'], name)
-            child = node['content'][name]
-            if isinstance(child, types.StringTypes):
-                yield {
-                    'type': 'file',
-                    'path': child_path,
-                    'content': child
-                    }
-            else:
-                for c in self._getChildren({
-                    'type': 'dir',
-                    'path': child_path,
-                    'content': child
-                    }):
-                    yield c
+    @staticmethod
+    def flat_to_nested(flat):
+        nested = {}
+        for k, v in flat.iteritems():
+            bits = k.lstrip('/').split('/')
+            cur = nested
+            for i, b in enumerate(bits):
+                if i < len(bits) - 1:
+                    if b not in cur:
+                        cur[b] = {}
+                    cur = cur[b]
+                else:
+                    cur[b] = v
+        return nested
 
-    def _getPath(self, url, is_file):
-        path = ''
-        current = self.structure
-        parts = unicode(url).lower().split('/')
-        for i, part in enumerate(parts):
-            for name in current:
-                name_slug = title_to_url(name)
-                if is_file and i == len(parts) - 1:
-                    if re.match(r"%s\.[a-z]+" % re.escape(part), name_slug):
-                        current = current[name]
-                        path = os.path.join(path, name)
-                        break
-                else:
-                    if name_slug == part:
-                        current = current[name]
-                        path = os.path.join(path, name)
-                        break
-            else:
-                # Failed to find a part of the URL.
-                raise PageNotFoundError("No such page: " + url)
-        return path
 
     @staticmethod
     def save_structure(path, structure):
@@ -203,18 +173,37 @@
                 MockFileSystem.save_structure(node_path, structure[node])
 
 
-class MockWikiIndex(WikiIndex):
-    def __init__(self, logger=None):
-        WikiIndex.__init__(self, logger)
+class MockDatabase(Database):
+    def __init__(self, content=None):
+        super(MockDatabase, self).__init__()
+        self.content = content
+
+    def getPageUrls(self, subdir=None, uncached_only=False):
+        return []
+
+    def getPages(self, subdir=None, meta_query=None, uncached_only=False,
+                 fields=None):
+        return []
+
+    def isCacheValid(self, page):
+        return False
 
-    def initIndex(self):
-        pass
+    def pageExists(self, url=None, path=None):
+        return False
+
+    def getLinksTo(self, url):
+        return []
 
-    def reset(self, pages):
-        pass
+    def _getPageByUrl(self, url, fields):
+        return None
 
-    def update(self, pages):
-        pass
+    def _getPageByPath(self, path, fields):
+        return None
+
+
+class MockWikiIndex(WikiIndex):
+    def __init__(self):
+        super(MockWikiIndex, self).__init__()
 
     def search(self, query):
         # url, title, content_highlights
@@ -222,11 +211,8 @@
 
 
 class MockSourceControl(SourceControl):
-    def __init__(self, logger=None):
-        SourceControl.__init__(self, logger)
-
-    def initRepo(self):
-        pass
+    def __init__(self):
+        super(MockSourceControl, self).__init__()
 
     def getSpecialFilenames(self):
         return []
@@ -242,9 +228,3 @@
 
     def diff(self, path, rev1, rev2):
         raise NotImplementedError()
-
-    def commit(self, paths, op_meta):
-        raise NotImplementedError()
-
-    def revert(self, paths=None):
-        raise NotImplementedError()