comparison tests/mockutil.py @ 182:a54d3c0b5f4a

tests: Patch `os.path.exists` and improve patching for `open`. You can specify additional modules for which to patch `open`. Also, it was incorrectly updating the opened file, even when it was opened for read only. Now it only updates the contents if the file was opened for write, and supports appending to the end. Last, it supports opening text files in binary mode.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jan 2015 14:55:41 -0800
parents d356f6178623
children 1d44d8bd93e2
comparison
equal deleted inserted replaced
181:d356f6178623 182:a54d3c0b5f4a
40 self.contents = contents 40 self.contents = contents
41 self.metadata = {'mtime': time.time()} 41 self.metadata = {'mtime': time.time()}
42 42
43 43
44 class _MockFsEntryWriter(object): 44 class _MockFsEntryWriter(object):
45 def __init__(self, entry): 45 def __init__(self, entry, mode='rt'):
46 self._entry = entry 46 self._entry = entry
47 if isinstance(entry.contents, str): 47 self._mode = mode
48
49 if 'b' in mode:
50 data = entry.contents
51 if isinstance(data, str):
52 data = data.encode('utf8')
53 self._stream = io.BytesIO(data)
54 else:
48 self._stream = io.StringIO(entry.contents) 55 self._stream = io.StringIO(entry.contents)
49 elif isinstance(entry.contents, bytes):
50 self._stream = io.BytesIO(entry.contents)
51 else:
52 raise Exception("Unexpected entry contents: %s" % type(entry.contents))
53 56
54 def __getattr__(self, name): 57 def __getattr__(self, name):
55 return getattr(self._stream, name) 58 return getattr(self._stream, name)
56 59
57 def __enter__(self): 60 def __enter__(self):
58 return self 61 return self
59 62
60 def __exit__(self, exc_type, exc_value, exc_tb): 63 def __exit__(self, exc_type, exc_value, exc_tb):
61 self._entry.contents = self._stream.getvalue() 64 if 'w' in self._mode:
62 self._entry.metadata['mtime'] = time.time() 65 if 'a' in self._mode:
66 self._entry.contents += self._stream.getvalue()
67 else:
68 self._entry.contents = self._stream.getvalue()
69 self._entry.metadata['mtime'] = time.time()
63 self._stream.close() 70 self._stream.close()
64 71
65 72
66 class mock_fs(object): 73 class mock_fs(object):
67 def __init__(self, default_spec=True): 74 def __init__(self, default_spec=True):
192 assert name in parent 199 assert name in parent
193 del parent[name] 200 del parent[name]
194 201
195 202
196 class mock_fs_scope(object): 203 class mock_fs_scope(object):
197 def __init__(self, fs): 204 def __init__(self, fs, open_patches=None):
205 self.open_patches = open_patches or []
198 self._fs = fs 206 self._fs = fs
199 self._patchers = [] 207 self._patchers = []
200 self._originals = {} 208 self._originals = {}
201 209
202 @property 210 @property
210 def __exit__(self, type, value, traceback): 218 def __exit__(self, type, value, traceback):
211 self._endMock() 219 self._endMock()
212 220
213 def _startMock(self): 221 def _startMock(self):
214 # TODO: sadly, there seems to be no way to replace `open` everywhere? 222 # TODO: sadly, there seems to be no way to replace `open` everywhere?
215 self._createMock('__main__.open', open, self._open, create=True) 223 modules = self.open_patches + ['__main__', 'piecrust.records']
216 self._createMock('piecrust.records.open', open, self._open, create=True) 224 for m in modules:
225 self._createMock('%s.open' % m, open, self._open, create=True)
226
217 self._createMock('codecs.open', codecs.open, self._codecsOpen) 227 self._createMock('codecs.open', codecs.open, self._codecsOpen)
218 self._createMock('os.listdir', os.listdir, self._listdir) 228 self._createMock('os.listdir', os.listdir, self._listdir)
219 self._createMock('os.makedirs', os.makedirs, self._makedirs) 229 self._createMock('os.makedirs', os.makedirs, self._makedirs)
220 self._createMock('os.remove', os.remove, self._remove) 230 self._createMock('os.remove', os.remove, self._remove)
231 self._createMock('os.path.exists', os.path.exists, self._exists)
221 self._createMock('os.path.isdir', os.path.isdir, self._isdir) 232 self._createMock('os.path.isdir', os.path.isdir, self._isdir)
222 self._createMock('os.path.isfile', os.path.isfile, self._isfile) 233 self._createMock('os.path.isfile', os.path.isfile, self._isfile)
223 self._createMock('os.path.islink', os.path.islink, self._islink) 234 self._createMock('os.path.islink', os.path.islink, self._islink)
224 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) 235 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime)
225 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) 236 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile)
234 def _createMock(self, name, orig, func, **kwargs): 245 def _createMock(self, name, orig, func, **kwargs):
235 self._originals[name] = orig 246 self._originals[name] = orig
236 self._patchers.append(mock.patch(name, func, **kwargs)) 247 self._patchers.append(mock.patch(name, func, **kwargs))
237 248
238 def _doOpen(self, orig_name, path, mode, *args, **kwargs): 249 def _doOpen(self, orig_name, path, mode, *args, **kwargs):
250 print("Opening %s" % path)
239 path = os.path.normpath(path) 251 path = os.path.normpath(path)
240 if path.startswith(resources_path): 252 if path.startswith(resources_path):
241 return self._originals[orig_name](path, mode, *args, **kwargs) 253 return self._originals[orig_name](path, mode, *args, **kwargs)
242 254
243 if 'r' in mode: 255 if 'r' in mode:
244 e = self._getFsEntry(path) 256 e = self._getFsEntry(path)
245 elif 'w' in mode: 257 elif 'w' in mode or 'x' in mode or 'a' in mode:
246 e = self._getFsEntry(path) 258 e = self._getFsEntry(path)
247 if e is None: 259 if e is None:
248 contents = '' 260 contents = ''
249 if 'b' in mode: 261 if 'b' in mode:
250 contents = bytes() 262 contents = bytes()
251 self._fs._createFile(path, contents) 263 self._fs._createFile(path, contents)
252 e = self._getFsEntry(path) 264 e = self._getFsEntry(path)
253 assert e is not None 265 assert e is not None
266 elif 'x' in mode:
267 raise OSError("File '%s' already exists" % path)
254 else: 268 else:
255 raise OSError("Unsupported open mode: %s" % mode) 269 raise OSError("Unsupported open mode: %s" % mode)
256 270
257 if e is None: 271 if e is None:
258 raise OSError("No such file: %s" % path) 272 raise OSError("No such file: %s" % path)
259 if not isinstance(e, _MockFsEntry): 273 if not isinstance(e, _MockFsEntry):
260 raise OSError("'%s' is not a file %s" % (path, e)) 274 raise OSError("'%s' is not a file %s" % (path, e))
261 if 'b' in mode: 275
262 assert isinstance(e.contents, bytes) 276 return _MockFsEntryWriter(e, mode)
263 return _MockFsEntryWriter(e)
264 assert isinstance(e.contents, str)
265 return _MockFsEntryWriter(e)
266 277
267 def _open(self, path, mode, *args, **kwargs): 278 def _open(self, path, mode, *args, **kwargs):
268 return self._doOpen('__main__.open', path, mode, *args, **kwargs) 279 return self._doOpen('__main__.open', path, mode, *args, **kwargs)
269 280
270 def _codecsOpen(self, path, mode, *args, **kwargs): 281 def _codecsOpen(self, path, mode, *args, **kwargs):
288 self._fs._createDir(path) 299 self._fs._createDir(path)
289 300
290 def _remove(self, path): 301 def _remove(self, path):
291 path = os.path.normpath(path) 302 path = os.path.normpath(path)
292 self._fs._deleteEntry(path) 303 self._fs._deleteEntry(path)
304
305 def _exists(self, path):
306 print("Checking for %s" % path)
307 path = os.path.normpath(path)
308 if path.startswith(resources_path):
309 return self._originals['os.path.isdir'](path)
310 e = self._getFsEntry(path)
311 return e is not None
293 312
294 def _isdir(self, path): 313 def _isdir(self, path):
295 path = os.path.normpath(path) 314 path = os.path.normpath(path)
296 if path.startswith(resources_path): 315 if path.startswith(resources_path):
297 return self._originals['os.path.isdir'](path) 316 return self._originals['os.path.isdir'](path)