Mercurial > piecrust2
comparison tests/mockutil.py @ 182:a54d3c0b5f4a
tests: Patch `os.path.exists` and improve patching for `open`.
You can specify additional modules for which to patch `open`.
Also, it was incorrectly updating the opened file, even when it was opened
for read only. Now it only updates the contents if the file was opened for
write, and supports appending to the end.
Last, it supports opening text files in binary mode.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jan 2015 14:55:41 -0800 |
parents | d356f6178623 |
children | 1d44d8bd93e2 |
comparison
equal
deleted
inserted
replaced
181:d356f6178623 | 182:a54d3c0b5f4a |
---|---|
40 self.contents = contents | 40 self.contents = contents |
41 self.metadata = {'mtime': time.time()} | 41 self.metadata = {'mtime': time.time()} |
42 | 42 |
43 | 43 |
44 class _MockFsEntryWriter(object): | 44 class _MockFsEntryWriter(object): |
45 def __init__(self, entry): | 45 def __init__(self, entry, mode='rt'): |
46 self._entry = entry | 46 self._entry = entry |
47 if isinstance(entry.contents, str): | 47 self._mode = mode |
48 | |
49 if 'b' in mode: | |
50 data = entry.contents | |
51 if isinstance(data, str): | |
52 data = data.encode('utf8') | |
53 self._stream = io.BytesIO(data) | |
54 else: | |
48 self._stream = io.StringIO(entry.contents) | 55 self._stream = io.StringIO(entry.contents) |
49 elif isinstance(entry.contents, bytes): | |
50 self._stream = io.BytesIO(entry.contents) | |
51 else: | |
52 raise Exception("Unexpected entry contents: %s" % type(entry.contents)) | |
53 | 56 |
54 def __getattr__(self, name): | 57 def __getattr__(self, name): |
55 return getattr(self._stream, name) | 58 return getattr(self._stream, name) |
56 | 59 |
57 def __enter__(self): | 60 def __enter__(self): |
58 return self | 61 return self |
59 | 62 |
60 def __exit__(self, exc_type, exc_value, exc_tb): | 63 def __exit__(self, exc_type, exc_value, exc_tb): |
61 self._entry.contents = self._stream.getvalue() | 64 if 'w' in self._mode: |
62 self._entry.metadata['mtime'] = time.time() | 65 if 'a' in self._mode: |
66 self._entry.contents += self._stream.getvalue() | |
67 else: | |
68 self._entry.contents = self._stream.getvalue() | |
69 self._entry.metadata['mtime'] = time.time() | |
63 self._stream.close() | 70 self._stream.close() |
64 | 71 |
65 | 72 |
66 class mock_fs(object): | 73 class mock_fs(object): |
67 def __init__(self, default_spec=True): | 74 def __init__(self, default_spec=True): |
192 assert name in parent | 199 assert name in parent |
193 del parent[name] | 200 del parent[name] |
194 | 201 |
195 | 202 |
196 class mock_fs_scope(object): | 203 class mock_fs_scope(object): |
197 def __init__(self, fs): | 204 def __init__(self, fs, open_patches=None): |
205 self.open_patches = open_patches or [] | |
198 self._fs = fs | 206 self._fs = fs |
199 self._patchers = [] | 207 self._patchers = [] |
200 self._originals = {} | 208 self._originals = {} |
201 | 209 |
202 @property | 210 @property |
210 def __exit__(self, type, value, traceback): | 218 def __exit__(self, type, value, traceback): |
211 self._endMock() | 219 self._endMock() |
212 | 220 |
213 def _startMock(self): | 221 def _startMock(self): |
214 # TODO: sadly, there seems to be no way to replace `open` everywhere? | 222 # TODO: sadly, there seems to be no way to replace `open` everywhere? |
215 self._createMock('__main__.open', open, self._open, create=True) | 223 modules = self.open_patches + ['__main__', 'piecrust.records'] |
216 self._createMock('piecrust.records.open', open, self._open, create=True) | 224 for m in modules: |
225 self._createMock('%s.open' % m, open, self._open, create=True) | |
226 | |
217 self._createMock('codecs.open', codecs.open, self._codecsOpen) | 227 self._createMock('codecs.open', codecs.open, self._codecsOpen) |
218 self._createMock('os.listdir', os.listdir, self._listdir) | 228 self._createMock('os.listdir', os.listdir, self._listdir) |
219 self._createMock('os.makedirs', os.makedirs, self._makedirs) | 229 self._createMock('os.makedirs', os.makedirs, self._makedirs) |
220 self._createMock('os.remove', os.remove, self._remove) | 230 self._createMock('os.remove', os.remove, self._remove) |
231 self._createMock('os.path.exists', os.path.exists, self._exists) | |
221 self._createMock('os.path.isdir', os.path.isdir, self._isdir) | 232 self._createMock('os.path.isdir', os.path.isdir, self._isdir) |
222 self._createMock('os.path.isfile', os.path.isfile, self._isfile) | 233 self._createMock('os.path.isfile', os.path.isfile, self._isfile) |
223 self._createMock('os.path.islink', os.path.islink, self._islink) | 234 self._createMock('os.path.islink', os.path.islink, self._islink) |
224 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) | 235 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) |
225 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) | 236 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) |
234 def _createMock(self, name, orig, func, **kwargs): | 245 def _createMock(self, name, orig, func, **kwargs): |
235 self._originals[name] = orig | 246 self._originals[name] = orig |
236 self._patchers.append(mock.patch(name, func, **kwargs)) | 247 self._patchers.append(mock.patch(name, func, **kwargs)) |
237 | 248 |
238 def _doOpen(self, orig_name, path, mode, *args, **kwargs): | 249 def _doOpen(self, orig_name, path, mode, *args, **kwargs): |
250 print("Opening %s" % path) | |
239 path = os.path.normpath(path) | 251 path = os.path.normpath(path) |
240 if path.startswith(resources_path): | 252 if path.startswith(resources_path): |
241 return self._originals[orig_name](path, mode, *args, **kwargs) | 253 return self._originals[orig_name](path, mode, *args, **kwargs) |
242 | 254 |
243 if 'r' in mode: | 255 if 'r' in mode: |
244 e = self._getFsEntry(path) | 256 e = self._getFsEntry(path) |
245 elif 'w' in mode: | 257 elif 'w' in mode or 'x' in mode or 'a' in mode: |
246 e = self._getFsEntry(path) | 258 e = self._getFsEntry(path) |
247 if e is None: | 259 if e is None: |
248 contents = '' | 260 contents = '' |
249 if 'b' in mode: | 261 if 'b' in mode: |
250 contents = bytes() | 262 contents = bytes() |
251 self._fs._createFile(path, contents) | 263 self._fs._createFile(path, contents) |
252 e = self._getFsEntry(path) | 264 e = self._getFsEntry(path) |
253 assert e is not None | 265 assert e is not None |
266 elif 'x' in mode: | |
267 raise OSError("File '%s' already exists" % path) | |
254 else: | 268 else: |
255 raise OSError("Unsupported open mode: %s" % mode) | 269 raise OSError("Unsupported open mode: %s" % mode) |
256 | 270 |
257 if e is None: | 271 if e is None: |
258 raise OSError("No such file: %s" % path) | 272 raise OSError("No such file: %s" % path) |
259 if not isinstance(e, _MockFsEntry): | 273 if not isinstance(e, _MockFsEntry): |
260 raise OSError("'%s' is not a file %s" % (path, e)) | 274 raise OSError("'%s' is not a file %s" % (path, e)) |
261 if 'b' in mode: | 275 |
262 assert isinstance(e.contents, bytes) | 276 return _MockFsEntryWriter(e, mode) |
263 return _MockFsEntryWriter(e) | |
264 assert isinstance(e.contents, str) | |
265 return _MockFsEntryWriter(e) | |
266 | 277 |
267 def _open(self, path, mode, *args, **kwargs): | 278 def _open(self, path, mode, *args, **kwargs): |
268 return self._doOpen('__main__.open', path, mode, *args, **kwargs) | 279 return self._doOpen('__main__.open', path, mode, *args, **kwargs) |
269 | 280 |
270 def _codecsOpen(self, path, mode, *args, **kwargs): | 281 def _codecsOpen(self, path, mode, *args, **kwargs): |
288 self._fs._createDir(path) | 299 self._fs._createDir(path) |
289 | 300 |
290 def _remove(self, path): | 301 def _remove(self, path): |
291 path = os.path.normpath(path) | 302 path = os.path.normpath(path) |
292 self._fs._deleteEntry(path) | 303 self._fs._deleteEntry(path) |
304 | |
305 def _exists(self, path): | |
306 print("Checking for %s" % path) | |
307 path = os.path.normpath(path) | |
308 if path.startswith(resources_path): | |
309 return self._originals['os.path.isdir'](path) | |
310 e = self._getFsEntry(path) | |
311 return e is not None | |
293 | 312 |
294 def _isdir(self, path): | 313 def _isdir(self, path): |
295 path = os.path.normpath(path) | 314 path = os.path.normpath(path) |
296 if path.startswith(resources_path): | 315 if path.startswith(resources_path): |
297 return self._originals['os.path.isdir'](path) | 316 return self._originals['os.path.isdir'](path) |