Mercurial > piecrust2
comparison tests/mockutil.py @ 85:3471ffa059b2
Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 03 Sep 2014 17:27:50 -0700 |
parents | 485682a6de50 |
children | 133845647083 |
comparison
equal
deleted
inserted
replaced
84:b3ce11b2cf36 | 85:3471ffa059b2 |
---|---|
20 app = mock.MagicMock(spec=PieCrust) | 20 app = mock.MagicMock(spec=PieCrust) |
21 app.config = PieCrustConfiguration() | 21 app.config = PieCrustConfiguration() |
22 return app | 22 return app |
23 | 23 |
24 | 24 |
25 def with_mock_fs_app(f): | 25 class _MockFsEntry(object): |
26 @functools.wraps(f) | 26 def __init__(self, contents): |
27 def wrapper(app, *args, **kwargs): | 27 self.contents = contents |
28 with mock_fs_scope(app): | 28 self.metadata = {'mtime': time.time()} |
29 real_app = app.getApp() | 29 |
30 return f(real_app, *args, **kwargs) | 30 |
31 return wrapper | 31 class _MockFsEntryWriter(object): |
32 def __init__(self, entry): | |
33 self._entry = entry | |
34 if isinstance(entry.contents, str): | |
35 self._stream = io.StringIO(entry.contents) | |
36 elif isinstance(entry.contents, bytes): | |
37 self._stream = io.BytesIO(entry.contents) | |
38 else: | |
39 raise Exception("Unexpected entry contents: %s" % type(entry.contents)) | |
40 | |
41 def __getattr__(self, name): | |
42 return getattr(self._stream, name) | |
43 | |
44 def __enter__(self): | |
45 return self | |
46 | |
47 def __exit__(self, exc_type, exc_value, exc_tb): | |
48 self._entry.contents = self._stream.getvalue() | |
49 self._stream.close() | |
32 | 50 |
33 | 51 |
34 class mock_fs(object): | 52 class mock_fs(object): |
35 def __init__(self, default_spec=True): | 53 def __init__(self, default_spec=True): |
36 self._root = 'root_%d' % random.randrange(1000) | 54 self._root = 'root_%d' % random.randrange(1000) |
44 p = p.replace('\\', '/') | 62 p = p.replace('\\', '/') |
45 if p in ['/', '', None]: | 63 if p in ['/', '', None]: |
46 return '/%s' % self._root | 64 return '/%s' % self._root |
47 return '/%s/%s' % (self._root, p.lstrip('/')) | 65 return '/%s/%s' % (self._root, p.lstrip('/')) |
48 | 66 |
49 def getApp(self): | 67 def getApp(self, cache=True): |
50 root_dir = self.path('/kitchen') | 68 root_dir = self.path('/kitchen') |
51 return PieCrust(root_dir, cache=False) | 69 return PieCrust(root_dir, cache=cache, debug=True) |
52 | 70 |
53 def withDir(self, path): | 71 def withDir(self, path): |
54 path = path.replace('\\', '/') | 72 path = path.replace('\\', '/') |
55 path = path.lstrip('/') | 73 path = path.lstrip('/') |
56 path = '/%s/%s' % (self._root, path) | 74 path = '/%s/%s' % (self._root, path) |
110 for k, v in root.items(): | 128 for k, v in root.items(): |
111 self._getStructureRecursive(v, res, k) | 129 self._getStructureRecursive(v, res, k) |
112 return res | 130 return res |
113 | 131 |
114 def _getStructureRecursive(self, src, target, name): | 132 def _getStructureRecursive(self, src, target, name): |
115 if isinstance(src, tuple): | 133 if isinstance(src, _MockFsEntry): |
116 target[name] = src[0] | 134 target[name] = src.contents |
117 return | 135 return |
118 | 136 |
119 e = {} | 137 e = {} |
120 for k, v in src.items(): | 138 for k, v in src.items(): |
121 self._getStructureRecursive(v, e, k) | 139 self._getStructureRecursive(v, e, k) |
132 return None | 150 return None |
133 return cur | 151 return cur |
134 | 152 |
135 def _createDir(self, path): | 153 def _createDir(self, path): |
136 cur = self._fs | 154 cur = self._fs |
137 bits = path.strip('/').split('/') | 155 path = path.replace('\\', '/').strip('/') |
156 bits = path.split('/') | |
138 for b in bits: | 157 for b in bits: |
139 if b not in cur: | 158 if b not in cur: |
140 cur[b] = {} | 159 cur[b] = {} |
141 cur = cur[b] | 160 cur = cur[b] |
142 return self | 161 return self |
143 | 162 |
144 def _createFile(self, path, contents): | 163 def _createFile(self, path, contents): |
145 cur = self._fs | 164 cur = self._fs |
146 bits = path.strip('/').split('/') | 165 path = path.replace('\\', '/').lstrip('/') |
166 bits = path.split('/') | |
147 for b in bits[:-1]: | 167 for b in bits[:-1]: |
148 if b not in cur: | 168 if b not in cur: |
149 cur[b] = {} | 169 cur[b] = {} |
150 cur = cur[b] | 170 cur = cur[b] |
151 cur[bits[-1]] = (contents, {'mtime': time.time()}) | 171 cur[bits[-1]] = _MockFsEntry(contents) |
152 return self | 172 return self |
173 | |
174 def _deleteEntry(self, path): | |
175 parent = self._getEntry(os.path.dirname(path)) | |
176 assert parent is not None | |
177 name = os.path.basename(path) | |
178 assert name in parent | |
179 del parent[name] | |
153 | 180 |
154 | 181 |
155 class mock_fs_scope(object): | 182 class mock_fs_scope(object): |
156 def __init__(self, fs): | 183 def __init__(self, fs): |
157 self._fs = fs | 184 self._fs = fs |
169 def __exit__(self, type, value, traceback): | 196 def __exit__(self, type, value, traceback): |
170 self._endMock() | 197 self._endMock() |
171 | 198 |
172 def _startMock(self): | 199 def _startMock(self): |
173 self._createMock('__main__.open', open, self._open, create=True) | 200 self._createMock('__main__.open', open, self._open, create=True) |
201 # TODO: WTF, apparently the previous one doesn't really work? | |
202 self._createMock('piecrust.records.open', open, self._open, create=True) | |
174 self._createMock('codecs.open', codecs.open, self._codecsOpen) | 203 self._createMock('codecs.open', codecs.open, self._codecsOpen) |
175 self._createMock('os.listdir', os.listdir, self._listdir) | 204 self._createMock('os.listdir', os.listdir, self._listdir) |
176 self._createMock('os.makedirs', os.makedirs, self._makedirs) | 205 self._createMock('os.makedirs', os.makedirs, self._makedirs) |
177 self._createMock('os.path.isdir', os.path.isdir, self._isdir) | 206 self._createMock('os.path.isdir', os.path.isdir, self._isdir) |
178 self._createMock('os.path.isfile', os.path.isfile, self._isfile) | 207 self._createMock('os.path.isfile', os.path.isfile, self._isfile) |
179 self._createMock('os.path.islink', os.path.islink, self._islink) | 208 self._createMock('os.path.islink', os.path.islink, self._islink) |
180 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) | 209 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) |
181 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) | 210 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) |
211 self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree) | |
182 for p in self._patchers: | 212 for p in self._patchers: |
183 p.start() | 213 p.start() |
184 | 214 |
185 def _endMock(self): | 215 def _endMock(self): |
186 for p in self._patchers: | 216 for p in self._patchers: |
188 | 218 |
189 def _createMock(self, name, orig, func, **kwargs): | 219 def _createMock(self, name, orig, func, **kwargs): |
190 self._originals[name] = orig | 220 self._originals[name] = orig |
191 self._patchers.append(mock.patch(name, func, **kwargs)) | 221 self._patchers.append(mock.patch(name, func, **kwargs)) |
192 | 222 |
193 def _open(self, path, *args, **kwargs): | 223 def _doOpen(self, orig_name, path, mode, *args, **kwargs): |
194 path = os.path.normpath(path) | 224 path = os.path.normpath(path) |
195 if path.startswith(resources_path): | 225 if path.startswith(resources_path): |
196 return self._originals['__main__.open'](path, **kwargs) | 226 return self._originals[orig_name](path, mode, *args, **kwargs) |
197 e = self._getFsEntry(path) | 227 |
228 if 'r' in mode: | |
229 e = self._getFsEntry(path) | |
230 elif 'w' in mode: | |
231 e = self._getFsEntry(path) | |
232 if e is None: | |
233 contents = '' | |
234 if 'b' in mode: | |
235 contents = bytes() | |
236 self._fs._createFile(path, contents) | |
237 e = self._getFsEntry(path) | |
238 assert e is not None | |
239 else: | |
240 raise OSError("Unsupported open mode: %s" % mode) | |
241 | |
198 if e is None: | 242 if e is None: |
199 raise OSError("No such file: %s" % path) | 243 raise OSError("No such file: %s" % path) |
200 if not isinstance(e, tuple): | 244 if not isinstance(e, _MockFsEntry): |
201 raise OSError("'%s' is not a file" % path) | 245 raise OSError("'%s' is not a file %s" % (path, e)) |
202 return io.StringIO(e[0]) | 246 if 'b' in mode: |
203 | 247 assert isinstance(e.contents, bytes) |
204 def _codecsOpen(self, path, *args, **kwargs): | 248 return _MockFsEntryWriter(e) |
205 path = os.path.normpath(path) | 249 assert isinstance(e.contents, str) |
206 if path.startswith(resources_path): | 250 return _MockFsEntryWriter(e) |
207 return self._originals['codecs.open'](path, *args, **kwargs) | 251 |
208 e = self._getFsEntry(path) | 252 def _open(self, path, mode, *args, **kwargs): |
209 if e is None: | 253 return self._doOpen('__main__.open', path, mode, *args, **kwargs) |
210 raise OSError("No such file: %s" % path) | 254 |
211 if not isinstance(e, tuple): | 255 def _codecsOpen(self, path, mode, *args, **kwargs): |
212 raise OSError("'%s' is not a file" % path) | 256 return self._doOpen('codecs.open', path, mode, *args, **kwargs) |
213 return io.StringIO(e[0]) | |
214 | 257 |
215 def _listdir(self, path): | 258 def _listdir(self, path): |
216 if not path.startswith('/' + self.root): | 259 path = os.path.normpath(path) |
260 if path.startswith(resources_path): | |
217 return self._originals['os.listdir'](path) | 261 return self._originals['os.listdir'](path) |
262 | |
218 e = self._getFsEntry(path) | 263 e = self._getFsEntry(path) |
219 if e is None: | 264 if e is None: |
220 raise OSError("No such directory: %s" % path) | 265 raise OSError("No such directory: %s" % path) |
221 if not isinstance(e, dict): | 266 if not isinstance(e, dict): |
222 raise OSError("'%s' is not a directory." % path) | 267 raise OSError("'%s' is not a directory." % path) |
223 return list(e.keys()) | 268 return list(e.keys()) |
224 | 269 |
225 def _makedirs(self, path, mode): | 270 def _makedirs(self, path, mode=0o777): |
226 if not path.startswith('/' + self.root): | 271 if not path.replace('\\', '/').startswith('/' + self.root): |
227 raise Exception("Shouldn't create directory: %s" % path) | 272 raise Exception("Shouldn't create directory: %s" % path) |
228 self._fs._createDir(path) | 273 self._fs._createDir(path) |
229 | 274 |
230 def _isdir(self, path): | 275 def _isdir(self, path): |
231 if not path.startswith('/' + self.root): | 276 path = os.path.normpath(path) |
277 if path.startswith(resources_path): | |
232 return self._originals['os.path.isdir'](path) | 278 return self._originals['os.path.isdir'](path) |
233 e = self._getFsEntry(path) | 279 e = self._getFsEntry(path) |
234 return e is not None and isinstance(e, dict) | 280 return e is not None and isinstance(e, dict) |
235 | 281 |
236 def _isfile(self, path): | 282 def _isfile(self, path): |
237 if not path.startswith('/' + self.root): | 283 path = os.path.normpath(path) |
284 if path.startswith(resources_path): | |
238 return self._originals['os.path.isfile'](path) | 285 return self._originals['os.path.isfile'](path) |
239 e = self._getFsEntry(path) | 286 e = self._getFsEntry(path) |
240 return e is not None and isinstance(e, tuple) | 287 return e is not None and isinstance(e, _MockFsEntry) |
241 | 288 |
242 def _islink(self, path): | 289 def _islink(self, path): |
243 if not path.startswith('/' + self.root): | 290 path = os.path.normpath(path) |
291 if path.startswith(resources_path): | |
244 return self._originals['os.path.islink'](path) | 292 return self._originals['os.path.islink'](path) |
245 return False | 293 return False |
246 | 294 |
247 def _getmtime(self, path): | 295 def _getmtime(self, path): |
248 if not path.startswith('/' + self.root): | 296 path = os.path.normpath(path) |
297 if path.startswith(resources_path): | |
249 return self._originals['os.path.getmtime'](path) | 298 return self._originals['os.path.getmtime'](path) |
250 e = self._getFsEntry(path) | 299 e = self._getFsEntry(path) |
251 if e is None: | 300 if e is None: |
252 raise OSError("No such file: %s" % path) | 301 raise OSError("No such file: %s" % path) |
253 return e[1]['mtime'] | 302 return e.metadata['mtime'] |
254 | 303 |
255 def _copyfile(self, src, dst): | 304 def _copyfile(self, src, dst): |
256 if not src.startswith('/' + self.root): | 305 src = os.path.normpath(src) |
257 with open(src, 'r') as fp: | 306 if src.startswith(resources_path): |
307 with self._originals['__main__.open'](src, 'r') as fp: | |
258 src_text = fp.read() | 308 src_text = fp.read() |
259 else: | 309 else: |
260 e = self._getFsEntry(src) | 310 e = self._getFsEntry(src) |
261 src_text = e[0] | 311 src_text = e.contents |
262 if not dst.startswith('/' + self.root): | 312 if not dst.replace('\\', '/').startswith('/' + self.root): |
263 raise Exception("Shouldn't copy to: %s" % dst) | 313 raise Exception("Shouldn't copy to: %s" % dst) |
264 self._fs._createFile(dst, src_text) | 314 self._fs._createFile(dst, src_text) |
265 | 315 |
316 def _rmtree(self, path): | |
317 if not path.replace('\\', '/').startswith('/' + self.root): | |
318 raise Exception("Shouldn't delete trees from: %s" % path) | |
319 e = self._fs._getEntry(os.path.dirname(path)) | |
320 del e[os.path.basename(path)] | |
321 | |
266 def _getFsEntry(self, path): | 322 def _getFsEntry(self, path): |
267 return self._fs._getEntry(path) | 323 return self._fs._getEntry(path) |
268 | 324 |