Mercurial > wikked
comparison tests/mock.py @ 225:ebb12ff21cb2
Updated unit tests to be able to run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 12 Mar 2014 23:02:40 -0700 |
parents | fd6eccb24882 |
children | 03e3e793fa22 |
comparison
equal
deleted
inserted
replaced
224:d45450a0256a | 225:ebb12ff21cb2 |
---|---|
1 import re | 1 import os |
2 import os.path | 2 import os.path |
3 import types | 3 import types |
4 import codecs | 4 import codecs |
5 import logging | 5 import logging |
6 import StringIO | 6 import StringIO |
7 from wikked.page import Page | 7 from collections import deque |
8 from wikked.fs import PageInfo, PageNotFoundError | 8 from contextlib import closing |
9 from wikked.db import Database | 9 from ConfigParser import SafeConfigParser |
10 from wikked.indexer import WikiIndex | 10 from wikked.fs import FileSystem |
11 from wikked.scm import SourceControl | 11 from wikked.db.base import Database |
12 from wikked.utils import title_to_url | 12 from wikked.indexer.base import WikiIndex |
13 | 13 from wikked.scm.base import SourceControl |
14 | 14 from wikked.wiki import WikiParameters, passthrough_formatter |
15 class MockWikiParameters(object): | 15 |
16 def __init__(self): | 16 |
17 self.formatters = { | 17 logger = logging.getLogger(__name__) |
18 self._passthrough: ['txt', 'html'] | 18 |
19 | |
20 class MockWikiParameters(WikiParameters): | |
21 def __init__(self, root=None): | |
22 super(MockWikiParameters, self).__init__(root) | |
23 self.config_text = "" | |
24 self.mock_fs = None | |
25 self.mock_index = None | |
26 self.mock_db = None | |
27 self.mock_scm = None | |
28 | |
29 def fs_factory(self): | |
30 if self.mock_fs is False: | |
31 return super(MockWikiParameters, self).fs_factory() | |
32 return self.mock_fs or MockFileSystem(self.root, self.config) | |
33 | |
34 def index_factory(self): | |
35 if self.mock_index is False: | |
36 return super(MockWikiParameters, self).index_factory() | |
37 return self.mock_index or MockWikiIndex() | |
38 | |
39 def db_factory(self): | |
40 if self.mock_db is False: | |
41 return super(MockWikiParameters, self).db_factory() | |
42 return self.mock_db or MockDatabase() | |
43 | |
44 def scm_factory(self, for_init=False): | |
45 if self.mock_scm is False: | |
46 return super(MockWikiParameters, self).scm_factory(for_init) | |
47 return self.mock_scm or MockSourceControl() | |
48 | |
49 def getFormatters(self): | |
50 formatters = { | |
51 passthrough_formatter: ['txt', 'html'] | |
19 } | 52 } |
20 | 53 return formatters |
21 self.config_text = "" | 54 |
22 self.special_filenames = [] | 55 def getPageUpdater(self): |
23 self.use_db = False | 56 return lambda wiki, url: wiki.update(url, cache_ext_data=True) |
24 | 57 |
25 self.logger_factory = lambda: logging.getLogger('wikked.tests') | 58 def _loadConfig(self): |
26 self.page_factory = lambda wiki, url: MockPage(wiki, url) | 59 default_config_path = os.path.join( |
27 self.config_factory = lambda: StringIO.StringIO(self.config_text) | 60 os.path.dirname(__file__), '..', 'wikked', 'resources', 'defaults.cfg') |
28 self.fs_factory = lambda cfg: MockFileSystem() | 61 |
29 self.index_factory = lambda cfg: MockWikiIndex() | 62 config = SafeConfigParser() |
30 self.db_factory = lambda cfg: MockDatabase() | 63 config.readfp(open(default_config_path)) |
31 self.scm_factory = lambda cfg: MockSourceControl() | 64 config.set('wiki', 'root', '/fake/root') |
32 | 65 if self.config_text: |
33 def getSpecialFilenames(self): | 66 with closing(StringIO.StringIO(self.config_text)) as conf: |
34 return self.special_filenames | 67 config.readfp(conf) |
35 | 68 |
36 def _passthrough(self, text): | 69 return config |
37 return text | 70 |
38 | 71 |
39 | 72 def mock_os_walk(root_dir, root_node): |
40 class MockPage(Page): | 73 queue = deque() |
41 def __init__(self, wiki, url): | 74 queue.appendleft((root_dir, root_node)) |
42 Page.__init__(self, wiki, url) | 75 while len(queue) > 0: |
43 | 76 cur_dir, cur_node = queue.pop() |
44 | 77 |
45 class MockDatabase(Database): | 78 dirnames = [] |
46 def __init__(self, content=None, logger=None): | 79 filenames = [] |
47 Database.__init__(self, logger) | 80 for name, child in cur_node.iteritems(): |
48 self.content = content | 81 if isinstance(child, dict): |
49 self.conn = None | 82 dirnames.append(name) |
50 self._open_count = 0 | 83 else: |
51 | 84 filenames.append(name) |
52 def initDb(self): | 85 yield cur_dir, dirnames, filenames |
53 pass | 86 for name in dirnames: |
54 | 87 fullname = os.path.join(cur_dir, name) |
55 def open(self): | 88 queue.appendleft((fullname, cur_node[name])) |
56 self._open_count += 1 | 89 |
57 self.conn = 'MOCK_CONNECTION' | 90 |
58 | 91 class MockFileSystem(FileSystem): |
59 def close(self): | 92 def __init__(self, root, config, structure=None): |
60 self._open_count -= 1 | 93 super(MockFileSystem, self).__init__(root, config) |
61 if self._open_count < 0: | |
62 raise Exception( | |
63 "The database was closed more times than it was open.") | |
64 elif self._open_count == 0: | |
65 self.conn = None | |
66 | |
67 def reset(self, pages): | |
68 pass | |
69 | |
70 def update(self, pages): | |
71 pass | |
72 | |
73 def getPageUrls(self, subdir=None): | |
74 return [] | |
75 | |
76 def getPages(self, subdir=None): | |
77 return [] | |
78 | |
79 def getPage(self, url): | |
80 return None | |
81 | |
82 def pageExists(self, url): | |
83 return False | |
84 | |
85 def getLinksTo(self, url): | |
86 return [] | |
87 | |
88 | |
89 class MockFileSystem(): | |
90 def __init__(self, structure=None, logger=None): | |
91 if not structure: | 94 if not structure: |
92 structure = [] | 95 self.structure = {} |
93 self.structure = structure | 96 else: |
94 self.logger = logger | 97 self.structure = MockFileSystem.flat_to_nested(structure) |
95 self.excluded = [] | |
96 | 98 |
97 def getPageInfos(self, subdir=None): | 99 def getPageInfos(self, subdir=None): |
98 node = self._getNode(subdir) | 100 def tmp_walk(path): |
99 if node is None: | 101 node = self._getNode(path) |
100 raise PageNotFoundError() | 102 return mock_os_walk(path, node) |
101 for n in self._getChildren(node): | 103 |
102 yield self._getPageInfo(n) | 104 orig_walk = os.walk |
103 | 105 os.walk = tmp_walk |
104 def getPageInfo(self, path): | 106 try: |
107 gen = super(MockFileSystem, self).getPageInfos(subdir) | |
108 return list(gen) | |
109 finally: | |
110 os.walk = orig_walk | |
111 | |
112 def setPage(self, url, content): | |
113 raise NotImplementedError() | |
114 | |
115 def _getPageInfo(self, path): | |
116 pi = super(MockFileSystem, self)._getPageInfo(path) | |
105 node = self._getNode(path) | 117 node = self._getNode(path) |
106 if node is None: | 118 if node is not None: |
107 raise PageNotFoundError() | 119 pi._content = node |
108 return self._getPageInfo(node) | 120 else: |
109 | 121 raise Exception("Can't find node: %s" % path) |
110 def getPage(self, url): | 122 return pi |
111 path = self._getPath(url, True) | |
112 node = self._getNode(path) | |
113 if node is None: | |
114 raise PageNotFoundError() | |
115 return self._getPageInfo(node, True) | |
116 | |
117 def setPage(self, path, content): | |
118 raise NotImplementedError() | |
119 | |
120 def pageExists(self, url): | |
121 try: | |
122 self._getPath(url, True) | |
123 return True | |
124 except PageNotFoundError: | |
125 return False | |
126 | |
127 def getPhysicalNamespacePath(self, url): | |
128 raise NotImplementedError() | |
129 | |
130 def _getPageInfo(self, node, with_content=False): | |
131 path_split = os.path.splitext(node['path']) | |
132 url = title_to_url(path_split[0]) | |
133 info = PageInfo(url, node['path']) | |
134 if with_content: | |
135 info.content = node['content'] | |
136 return info | |
137 | 123 |
138 def _getNode(self, path): | 124 def _getNode(self, path): |
139 node = self.structure | 125 node = self.structure |
140 if path: | 126 path = path.lstrip('/') |
127 if path != '': | |
141 for n in path.split('/'): | 128 for n in path.split('/'): |
142 if n not in node: | 129 if n not in node: |
143 return None | 130 return None |
144 node = node[n] | 131 node = node[n] |
145 else: | 132 return node |
146 path = '' | 133 |
147 if isinstance(node, types.StringTypes): | 134 def _getPhysicalPath(self, url, is_file=True, make_new=False): |
148 return {'type': 'file', 'path': path, 'content': node} | 135 def tmp_walk(path): |
149 return {'type': 'dir', 'path': path, 'content': node} | 136 node = self._getNode(path) |
150 | 137 return mock_os_walk(path, node) |
151 def _getChildren(self, node): | 138 |
152 if node['type'] != 'dir': | 139 orig_walk = os.walk |
153 raise Exception("'%s' is not a directory." % node['path']) | 140 os.walk = tmp_walk |
154 for name in node['content']: | 141 try: |
155 child_path = os.path.join(node['path'], name) | 142 return super(MockFileSystem, self)._getPhysicalPath(url, is_file, |
156 child = node['content'][name] | 143 make_new) |
157 if isinstance(child, types.StringTypes): | 144 finally: |
158 yield { | 145 os.walk = orig_walk |
159 'type': 'file', | 146 |
160 'path': child_path, | 147 @staticmethod |
161 'content': child | 148 def flat_to_nested(flat): |
162 } | 149 nested = {} |
163 else: | 150 for k, v in flat.iteritems(): |
164 for c in self._getChildren({ | 151 bits = k.lstrip('/').split('/') |
165 'type': 'dir', | 152 cur = nested |
166 'path': child_path, | 153 for i, b in enumerate(bits): |
167 'content': child | 154 if i < len(bits) - 1: |
168 }): | 155 if b not in cur: |
169 yield c | 156 cur[b] = {} |
170 | 157 cur = cur[b] |
171 def _getPath(self, url, is_file): | |
172 path = '' | |
173 current = self.structure | |
174 parts = unicode(url).lower().split('/') | |
175 for i, part in enumerate(parts): | |
176 for name in current: | |
177 name_slug = title_to_url(name) | |
178 if is_file and i == len(parts) - 1: | |
179 if re.match(r"%s\.[a-z]+" % re.escape(part), name_slug): | |
180 current = current[name] | |
181 path = os.path.join(path, name) | |
182 break | |
183 else: | 158 else: |
184 if name_slug == part: | 159 cur[b] = v |
185 current = current[name] | 160 return nested |
186 path = os.path.join(path, name) | 161 |
187 break | |
188 else: | |
189 # Failed to find a part of the URL. | |
190 raise PageNotFoundError("No such page: " + url) | |
191 return path | |
192 | 162 |
193 @staticmethod | 163 @staticmethod |
194 def save_structure(path, structure): | 164 def save_structure(path, structure): |
195 if not os.path.isdir(path): | 165 if not os.path.isdir(path): |
196 os.makedirs(path) | 166 os.makedirs(path) |
201 f.write(structure[node]) | 171 f.write(structure[node]) |
202 else: | 172 else: |
203 MockFileSystem.save_structure(node_path, structure[node]) | 173 MockFileSystem.save_structure(node_path, structure[node]) |
204 | 174 |
205 | 175 |
176 class MockDatabase(Database): | |
177 def __init__(self, content=None): | |
178 super(MockDatabase, self).__init__() | |
179 self.content = content | |
180 | |
181 def getPageUrls(self, subdir=None, uncached_only=False): | |
182 return [] | |
183 | |
184 def getPages(self, subdir=None, meta_query=None, uncached_only=False, | |
185 fields=None): | |
186 return [] | |
187 | |
188 def isCacheValid(self, page): | |
189 return False | |
190 | |
191 def pageExists(self, url=None, path=None): | |
192 return False | |
193 | |
194 def getLinksTo(self, url): | |
195 return [] | |
196 | |
197 def _getPageByUrl(self, url, fields): | |
198 return None | |
199 | |
200 def _getPageByPath(self, path, fields): | |
201 return None | |
202 | |
203 | |
206 class MockWikiIndex(WikiIndex): | 204 class MockWikiIndex(WikiIndex): |
207 def __init__(self, logger=None): | 205 def __init__(self): |
208 WikiIndex.__init__(self, logger) | 206 super(MockWikiIndex, self).__init__() |
209 | |
210 def initIndex(self): | |
211 pass | |
212 | |
213 def reset(self, pages): | |
214 pass | |
215 | |
216 def update(self, pages): | |
217 pass | |
218 | 207 |
219 def search(self, query): | 208 def search(self, query): |
220 # url, title, content_highlights | 209 # url, title, content_highlights |
221 return None | 210 return None |
222 | 211 |
223 | 212 |
224 class MockSourceControl(SourceControl): | 213 class MockSourceControl(SourceControl): |
225 def __init__(self, logger=None): | 214 def __init__(self): |
226 SourceControl.__init__(self, logger) | 215 super(MockSourceControl, self).__init__() |
227 | |
228 def initRepo(self): | |
229 pass | |
230 | 216 |
231 def getSpecialFilenames(self): | 217 def getSpecialFilenames(self): |
232 return [] | 218 return [] |
233 | 219 |
234 def getHistory(self, path=None): | 220 def getHistory(self, path=None): |
240 def getRevision(self, path, rev): | 226 def getRevision(self, path, rev): |
241 raise NotImplementedError() | 227 raise NotImplementedError() |
242 | 228 |
243 def diff(self, path, rev1, rev2): | 229 def diff(self, path, rev1, rev2): |
244 raise NotImplementedError() | 230 raise NotImplementedError() |
245 | |
246 def commit(self, paths, op_meta): | |
247 raise NotImplementedError() | |
248 | |
249 def revert(self, paths=None): | |
250 raise NotImplementedError() |