Mercurial > piecrust2
diff tests/mockutil.py @ 85:3471ffa059b2
Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 03 Sep 2014 17:27:50 -0700 |
parents | 485682a6de50 |
children | 133845647083 |
line wrap: on
line diff
--- a/tests/mockutil.py Wed Sep 03 17:26:38 2014 -0700 +++ b/tests/mockutil.py Wed Sep 03 17:27:50 2014 -0700 @@ -22,13 +22,31 @@ return app -def with_mock_fs_app(f): - @functools.wraps(f) - def wrapper(app, *args, **kwargs): - with mock_fs_scope(app): - real_app = app.getApp() - return f(real_app, *args, **kwargs) - return wrapper +class _MockFsEntry(object): + def __init__(self, contents): + self.contents = contents + self.metadata = {'mtime': time.time()} + + +class _MockFsEntryWriter(object): + def __init__(self, entry): + self._entry = entry + if isinstance(entry.contents, str): + self._stream = io.StringIO(entry.contents) + elif isinstance(entry.contents, bytes): + self._stream = io.BytesIO(entry.contents) + else: + raise Exception("Unexpected entry contents: %s" % type(entry.contents)) + + def __getattr__(self, name): + return getattr(self._stream, name) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entry.contents = self._stream.getvalue() + self._stream.close() class mock_fs(object): @@ -46,9 +64,9 @@ return '/%s' % self._root return '/%s/%s' % (self._root, p.lstrip('/')) - def getApp(self): + def getApp(self, cache=True): root_dir = self.path('/kitchen') - return PieCrust(root_dir, cache=False) + return PieCrust(root_dir, cache=cache, debug=True) def withDir(self, path): path = path.replace('\\', '/') @@ -112,8 +130,8 @@ return res def _getStructureRecursive(self, src, target, name): - if isinstance(src, tuple): - target[name] = src[0] + if isinstance(src, _MockFsEntry): + target[name] = src.contents return e = {} @@ -134,7 +152,8 @@ def _createDir(self, path): cur = self._fs - bits = path.strip('/').split('/') + path = path.replace('\\', '/').strip('/') + bits = path.split('/') for b in bits: if b not in cur: cur[b] = {} @@ -143,14 +162,22 @@ def _createFile(self, path, contents): cur = self._fs - bits = path.strip('/').split('/') + path = path.replace('\\', '/').lstrip('/') + bits = path.split('/') for b in bits[:-1]: if b not in cur: cur[b] = {} cur = cur[b] - cur[bits[-1]] = (contents, {'mtime': time.time()}) + cur[bits[-1]] = _MockFsEntry(contents) return self + def _deleteEntry(self, path): + parent = self._getEntry(os.path.dirname(path)) + assert parent is not None + name = os.path.basename(path) + assert name in parent + del parent[name] + class mock_fs_scope(object): def __init__(self, fs): @@ -171,6 +198,8 @@ def _startMock(self): self._createMock('__main__.open', open, self._open, create=True) + # TODO: WTF, apparently the previous one doesn't really work? + self._createMock('piecrust.records.open', open, self._open, create=True) self._createMock('codecs.open', codecs.open, self._codecsOpen) self._createMock('os.listdir', os.listdir, self._listdir) self._createMock('os.makedirs', os.makedirs, self._makedirs) @@ -179,6 +208,7 @@ self._createMock('os.path.islink', os.path.islink, self._islink) self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) + self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree) for p in self._patchers: p.start() @@ -190,31 +220,46 @@ self._originals[name] = orig self._patchers.append(mock.patch(name, func, **kwargs)) - def _open(self, path, *args, **kwargs): + def _doOpen(self, orig_name, path, mode, *args, **kwargs): path = os.path.normpath(path) if path.startswith(resources_path): - return self._originals['__main__.open'](path, **kwargs) - e = self._getFsEntry(path) + return self._originals[orig_name](path, mode, *args, **kwargs) + + if 'r' in mode: + e = self._getFsEntry(path) + elif 'w' in mode: + e = self._getFsEntry(path) + if e is None: + contents = '' + if 'b' in mode: + contents = bytes() + self._fs._createFile(path, contents) + e = self._getFsEntry(path) + assert e is not None + else: + raise OSError("Unsupported open mode: %s" % mode) + if e is None: raise OSError("No such file: %s" % path) - if not isinstance(e, tuple): - raise OSError("'%s' is not a file" % path) - return io.StringIO(e[0]) + if not isinstance(e, _MockFsEntry): + raise OSError("'%s' is not a file %s" % (path, e)) + if 'b' in mode: + assert isinstance(e.contents, bytes) + return _MockFsEntryWriter(e) + assert isinstance(e.contents, str) + return _MockFsEntryWriter(e) - def _codecsOpen(self, path, *args, **kwargs): + def _open(self, path, mode, *args, **kwargs): + return self._doOpen('__main__.open', path, mode, *args, **kwargs) + + def _codecsOpen(self, path, mode, *args, **kwargs): + return self._doOpen('codecs.open', path, mode, *args, **kwargs) + + def _listdir(self, path): path = os.path.normpath(path) if path.startswith(resources_path): - return self._originals['codecs.open'](path, *args, **kwargs) - e = self._getFsEntry(path) - if e is None: - raise OSError("No such file: %s" % path) - if not isinstance(e, tuple): - raise OSError("'%s' is not a file" % path) - return io.StringIO(e[0]) + return self._originals['os.listdir'](path) - def _listdir(self, path): - if not path.startswith('/' + self.root): - return self._originals['os.listdir'](path) e = self._getFsEntry(path) if e is None: raise OSError("No such directory: %s" % path) @@ -222,47 +267,58 @@ raise OSError("'%s' is not a directory." % path) return list(e.keys()) - def _makedirs(self, path, mode): - if not path.startswith('/' + self.root): + def _makedirs(self, path, mode=0o777): + if not path.replace('\\', '/').startswith('/' + self.root): raise Exception("Shouldn't create directory: %s" % path) self._fs._createDir(path) def _isdir(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.isdir'](path) e = self._getFsEntry(path) return e is not None and isinstance(e, dict) def _isfile(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.isfile'](path) e = self._getFsEntry(path) - return e is not None and isinstance(e, tuple) + return e is not None and isinstance(e, _MockFsEntry) def _islink(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.islink'](path) return False def _getmtime(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.getmtime'](path) e = self._getFsEntry(path) if e is None: raise OSError("No such file: %s" % path) - return e[1]['mtime'] + return e.metadata['mtime'] def _copyfile(self, src, dst): - if not src.startswith('/' + self.root): - with open(src, 'r') as fp: + src = os.path.normpath(src) + if src.startswith(resources_path): + with self._originals['__main__.open'](src, 'r') as fp: src_text = fp.read() else: e = self._getFsEntry(src) - src_text = e[0] - if not dst.startswith('/' + self.root): + src_text = e.contents + if not dst.replace('\\', '/').startswith('/' + self.root): raise Exception("Shouldn't copy to: %s" % dst) self._fs._createFile(dst, src_text) + def _rmtree(self, path): + if not path.replace('\\', '/').startswith('/' + self.root): + raise Exception("Shouldn't delete trees from: %s" % path) + e = self._fs._getEntry(os.path.dirname(path)) + del e[os.path.basename(path)] + def _getFsEntry(self, path): return self._fs._getEntry(path)