comparison tests/mock.py @ 225:ebb12ff21cb2

Updated unit tests to be able to run.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 12 Mar 2014 23:02:40 -0700
parents fd6eccb24882
children 03e3e793fa22
comparison
equal deleted inserted replaced
224:d45450a0256a 225:ebb12ff21cb2
1 import re 1 import os
2 import os.path 2 import os.path
3 import types 3 import types
4 import codecs 4 import codecs
5 import logging 5 import logging
6 import StringIO 6 import StringIO
7 from wikked.page import Page 7 from collections import deque
8 from wikked.fs import PageInfo, PageNotFoundError 8 from contextlib import closing
9 from wikked.db import Database 9 from ConfigParser import SafeConfigParser
10 from wikked.indexer import WikiIndex 10 from wikked.fs import FileSystem
11 from wikked.scm import SourceControl 11 from wikked.db.base import Database
12 from wikked.utils import title_to_url 12 from wikked.indexer.base import WikiIndex
13 13 from wikked.scm.base import SourceControl
14 14 from wikked.wiki import WikiParameters, passthrough_formatter
15 class MockWikiParameters(object): 15
16 def __init__(self): 16
17 self.formatters = { 17 logger = logging.getLogger(__name__)
18 self._passthrough: ['txt', 'html'] 18
19
20 class MockWikiParameters(WikiParameters):
21 def __init__(self, root=None):
22 super(MockWikiParameters, self).__init__(root)
23 self.config_text = ""
24 self.mock_fs = None
25 self.mock_index = None
26 self.mock_db = None
27 self.mock_scm = None
28
29 def fs_factory(self):
30 if self.mock_fs is False:
31 return super(MockWikiParameters, self).fs_factory()
32 return self.mock_fs or MockFileSystem(self.root, self.config)
33
34 def index_factory(self):
35 if self.mock_index is False:
36 return super(MockWikiParameters, self).index_factory()
37 return self.mock_index or MockWikiIndex()
38
39 def db_factory(self):
40 if self.mock_db is False:
41 return super(MockWikiParameters, self).db_factory()
42 return self.mock_db or MockDatabase()
43
44 def scm_factory(self, for_init=False):
45 if self.mock_scm is False:
46 return super(MockWikiParameters, self).scm_factory(for_init)
47 return self.mock_scm or MockSourceControl()
48
49 def getFormatters(self):
50 formatters = {
51 passthrough_formatter: ['txt', 'html']
19 } 52 }
20 53 return formatters
21 self.config_text = "" 54
22 self.special_filenames = [] 55 def getPageUpdater(self):
23 self.use_db = False 56 return lambda wiki, url: wiki.update(url, cache_ext_data=True)
24 57
25 self.logger_factory = lambda: logging.getLogger('wikked.tests') 58 def _loadConfig(self):
26 self.page_factory = lambda wiki, url: MockPage(wiki, url) 59 default_config_path = os.path.join(
27 self.config_factory = lambda: StringIO.StringIO(self.config_text) 60 os.path.dirname(__file__), '..', 'wikked', 'resources', 'defaults.cfg')
28 self.fs_factory = lambda cfg: MockFileSystem() 61
29 self.index_factory = lambda cfg: MockWikiIndex() 62 config = SafeConfigParser()
30 self.db_factory = lambda cfg: MockDatabase() 63 config.readfp(open(default_config_path))
31 self.scm_factory = lambda cfg: MockSourceControl() 64 config.set('wiki', 'root', '/fake/root')
32 65 if self.config_text:
33 def getSpecialFilenames(self): 66 with closing(StringIO.StringIO(self.config_text)) as conf:
34 return self.special_filenames 67 config.readfp(conf)
35 68
36 def _passthrough(self, text): 69 return config
37 return text 70
38 71
39 72 def mock_os_walk(root_dir, root_node):
40 class MockPage(Page): 73 queue = deque()
41 def __init__(self, wiki, url): 74 queue.appendleft((root_dir, root_node))
42 Page.__init__(self, wiki, url) 75 while len(queue) > 0:
43 76 cur_dir, cur_node = queue.pop()
44 77
45 class MockDatabase(Database): 78 dirnames = []
46 def __init__(self, content=None, logger=None): 79 filenames = []
47 Database.__init__(self, logger) 80 for name, child in cur_node.iteritems():
48 self.content = content 81 if isinstance(child, dict):
49 self.conn = None 82 dirnames.append(name)
50 self._open_count = 0 83 else:
51 84 filenames.append(name)
52 def initDb(self): 85 yield cur_dir, dirnames, filenames
53 pass 86 for name in dirnames:
54 87 fullname = os.path.join(cur_dir, name)
55 def open(self): 88 queue.appendleft((fullname, cur_node[name]))
56 self._open_count += 1 89
57 self.conn = 'MOCK_CONNECTION' 90
58 91 class MockFileSystem(FileSystem):
59 def close(self): 92 def __init__(self, root, config, structure=None):
60 self._open_count -= 1 93 super(MockFileSystem, self).__init__(root, config)
61 if self._open_count < 0:
62 raise Exception(
63 "The database was closed more times than it was open.")
64 elif self._open_count == 0:
65 self.conn = None
66
67 def reset(self, pages):
68 pass
69
70 def update(self, pages):
71 pass
72
73 def getPageUrls(self, subdir=None):
74 return []
75
76 def getPages(self, subdir=None):
77 return []
78
79 def getPage(self, url):
80 return None
81
82 def pageExists(self, url):
83 return False
84
85 def getLinksTo(self, url):
86 return []
87
88
89 class MockFileSystem():
90 def __init__(self, structure=None, logger=None):
91 if not structure: 94 if not structure:
92 structure = [] 95 self.structure = {}
93 self.structure = structure 96 else:
94 self.logger = logger 97 self.structure = MockFileSystem.flat_to_nested(structure)
95 self.excluded = []
96 98
97 def getPageInfos(self, subdir=None): 99 def getPageInfos(self, subdir=None):
98 node = self._getNode(subdir) 100 def tmp_walk(path):
99 if node is None: 101 node = self._getNode(path)
100 raise PageNotFoundError() 102 return mock_os_walk(path, node)
101 for n in self._getChildren(node): 103
102 yield self._getPageInfo(n) 104 orig_walk = os.walk
103 105 os.walk = tmp_walk
104 def getPageInfo(self, path): 106 try:
107 gen = super(MockFileSystem, self).getPageInfos(subdir)
108 return list(gen)
109 finally:
110 os.walk = orig_walk
111
112 def setPage(self, url, content):
113 raise NotImplementedError()
114
115 def _getPageInfo(self, path):
116 pi = super(MockFileSystem, self)._getPageInfo(path)
105 node = self._getNode(path) 117 node = self._getNode(path)
106 if node is None: 118 if node is not None:
107 raise PageNotFoundError() 119 pi._content = node
108 return self._getPageInfo(node) 120 else:
109 121 raise Exception("Can't find node: %s" % path)
110 def getPage(self, url): 122 return pi
111 path = self._getPath(url, True)
112 node = self._getNode(path)
113 if node is None:
114 raise PageNotFoundError()
115 return self._getPageInfo(node, True)
116
117 def setPage(self, path, content):
118 raise NotImplementedError()
119
120 def pageExists(self, url):
121 try:
122 self._getPath(url, True)
123 return True
124 except PageNotFoundError:
125 return False
126
127 def getPhysicalNamespacePath(self, url):
128 raise NotImplementedError()
129
130 def _getPageInfo(self, node, with_content=False):
131 path_split = os.path.splitext(node['path'])
132 url = title_to_url(path_split[0])
133 info = PageInfo(url, node['path'])
134 if with_content:
135 info.content = node['content']
136 return info
137 123
138 def _getNode(self, path): 124 def _getNode(self, path):
139 node = self.structure 125 node = self.structure
140 if path: 126 path = path.lstrip('/')
127 if path != '':
141 for n in path.split('/'): 128 for n in path.split('/'):
142 if n not in node: 129 if n not in node:
143 return None 130 return None
144 node = node[n] 131 node = node[n]
145 else: 132 return node
146 path = '' 133
147 if isinstance(node, types.StringTypes): 134 def _getPhysicalPath(self, url, is_file=True, make_new=False):
148 return {'type': 'file', 'path': path, 'content': node} 135 def tmp_walk(path):
149 return {'type': 'dir', 'path': path, 'content': node} 136 node = self._getNode(path)
150 137 return mock_os_walk(path, node)
151 def _getChildren(self, node): 138
152 if node['type'] != 'dir': 139 orig_walk = os.walk
153 raise Exception("'%s' is not a directory." % node['path']) 140 os.walk = tmp_walk
154 for name in node['content']: 141 try:
155 child_path = os.path.join(node['path'], name) 142 return super(MockFileSystem, self)._getPhysicalPath(url, is_file,
156 child = node['content'][name] 143 make_new)
157 if isinstance(child, types.StringTypes): 144 finally:
158 yield { 145 os.walk = orig_walk
159 'type': 'file', 146
160 'path': child_path, 147 @staticmethod
161 'content': child 148 def flat_to_nested(flat):
162 } 149 nested = {}
163 else: 150 for k, v in flat.iteritems():
164 for c in self._getChildren({ 151 bits = k.lstrip('/').split('/')
165 'type': 'dir', 152 cur = nested
166 'path': child_path, 153 for i, b in enumerate(bits):
167 'content': child 154 if i < len(bits) - 1:
168 }): 155 if b not in cur:
169 yield c 156 cur[b] = {}
170 157 cur = cur[b]
171 def _getPath(self, url, is_file):
172 path = ''
173 current = self.structure
174 parts = unicode(url).lower().split('/')
175 for i, part in enumerate(parts):
176 for name in current:
177 name_slug = title_to_url(name)
178 if is_file and i == len(parts) - 1:
179 if re.match(r"%s\.[a-z]+" % re.escape(part), name_slug):
180 current = current[name]
181 path = os.path.join(path, name)
182 break
183 else: 158 else:
184 if name_slug == part: 159 cur[b] = v
185 current = current[name] 160 return nested
186 path = os.path.join(path, name) 161
187 break
188 else:
189 # Failed to find a part of the URL.
190 raise PageNotFoundError("No such page: " + url)
191 return path
192 162
193 @staticmethod 163 @staticmethod
194 def save_structure(path, structure): 164 def save_structure(path, structure):
195 if not os.path.isdir(path): 165 if not os.path.isdir(path):
196 os.makedirs(path) 166 os.makedirs(path)
201 f.write(structure[node]) 171 f.write(structure[node])
202 else: 172 else:
203 MockFileSystem.save_structure(node_path, structure[node]) 173 MockFileSystem.save_structure(node_path, structure[node])
204 174
205 175
176 class MockDatabase(Database):
177 def __init__(self, content=None):
178 super(MockDatabase, self).__init__()
179 self.content = content
180
181 def getPageUrls(self, subdir=None, uncached_only=False):
182 return []
183
184 def getPages(self, subdir=None, meta_query=None, uncached_only=False,
185 fields=None):
186 return []
187
188 def isCacheValid(self, page):
189 return False
190
191 def pageExists(self, url=None, path=None):
192 return False
193
194 def getLinksTo(self, url):
195 return []
196
197 def _getPageByUrl(self, url, fields):
198 return None
199
200 def _getPageByPath(self, path, fields):
201 return None
202
203
206 class MockWikiIndex(WikiIndex): 204 class MockWikiIndex(WikiIndex):
207 def __init__(self, logger=None): 205 def __init__(self):
208 WikiIndex.__init__(self, logger) 206 super(MockWikiIndex, self).__init__()
209
210 def initIndex(self):
211 pass
212
213 def reset(self, pages):
214 pass
215
216 def update(self, pages):
217 pass
218 207
219 def search(self, query): 208 def search(self, query):
220 # url, title, content_highlights 209 # url, title, content_highlights
221 return None 210 return None
222 211
223 212
224 class MockSourceControl(SourceControl): 213 class MockSourceControl(SourceControl):
225 def __init__(self, logger=None): 214 def __init__(self):
226 SourceControl.__init__(self, logger) 215 super(MockSourceControl, self).__init__()
227
228 def initRepo(self):
229 pass
230 216
231 def getSpecialFilenames(self): 217 def getSpecialFilenames(self):
232 return [] 218 return []
233 219
234 def getHistory(self, path=None): 220 def getHistory(self, path=None):
240 def getRevision(self, path, rev): 226 def getRevision(self, path, rev):
241 raise NotImplementedError() 227 raise NotImplementedError()
242 228
243 def diff(self, path, rev1, rev2): 229 def diff(self, path, rev1, rev2):
244 raise NotImplementedError() 230 raise NotImplementedError()
245
246 def commit(self, paths, op_meta):
247 raise NotImplementedError()
248
249 def revert(self, paths=None):
250 raise NotImplementedError()