Mercurial > piecrust2
comparison tests/mockutil.py @ 85:3471ffa059b2
Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
| author | Ludovic Chabant <ludovic@chabant.com> |
|---|---|
| date | Wed, 03 Sep 2014 17:27:50 -0700 |
| parents | 485682a6de50 |
| children | 133845647083 |
comparison
equal
deleted
inserted
replaced
| 84:b3ce11b2cf36 | 85:3471ffa059b2 |
|---|---|
| 20 app = mock.MagicMock(spec=PieCrust) | 20 app = mock.MagicMock(spec=PieCrust) |
| 21 app.config = PieCrustConfiguration() | 21 app.config = PieCrustConfiguration() |
| 22 return app | 22 return app |
| 23 | 23 |
| 24 | 24 |
| 25 def with_mock_fs_app(f): | 25 class _MockFsEntry(object): |
| 26 @functools.wraps(f) | 26 def __init__(self, contents): |
| 27 def wrapper(app, *args, **kwargs): | 27 self.contents = contents |
| 28 with mock_fs_scope(app): | 28 self.metadata = {'mtime': time.time()} |
| 29 real_app = app.getApp() | 29 |
| 30 return f(real_app, *args, **kwargs) | 30 |
| 31 return wrapper | 31 class _MockFsEntryWriter(object): |
| 32 def __init__(self, entry): | |
| 33 self._entry = entry | |
| 34 if isinstance(entry.contents, str): | |
| 35 self._stream = io.StringIO(entry.contents) | |
| 36 elif isinstance(entry.contents, bytes): | |
| 37 self._stream = io.BytesIO(entry.contents) | |
| 38 else: | |
| 39 raise Exception("Unexpected entry contents: %s" % type(entry.contents)) | |
| 40 | |
| 41 def __getattr__(self, name): | |
| 42 return getattr(self._stream, name) | |
| 43 | |
| 44 def __enter__(self): | |
| 45 return self | |
| 46 | |
| 47 def __exit__(self, exc_type, exc_value, exc_tb): | |
| 48 self._entry.contents = self._stream.getvalue() | |
| 49 self._stream.close() | |
| 32 | 50 |
| 33 | 51 |
| 34 class mock_fs(object): | 52 class mock_fs(object): |
| 35 def __init__(self, default_spec=True): | 53 def __init__(self, default_spec=True): |
| 36 self._root = 'root_%d' % random.randrange(1000) | 54 self._root = 'root_%d' % random.randrange(1000) |
| 44 p = p.replace('\\', '/') | 62 p = p.replace('\\', '/') |
| 45 if p in ['/', '', None]: | 63 if p in ['/', '', None]: |
| 46 return '/%s' % self._root | 64 return '/%s' % self._root |
| 47 return '/%s/%s' % (self._root, p.lstrip('/')) | 65 return '/%s/%s' % (self._root, p.lstrip('/')) |
| 48 | 66 |
| 49 def getApp(self): | 67 def getApp(self, cache=True): |
| 50 root_dir = self.path('/kitchen') | 68 root_dir = self.path('/kitchen') |
| 51 return PieCrust(root_dir, cache=False) | 69 return PieCrust(root_dir, cache=cache, debug=True) |
| 52 | 70 |
| 53 def withDir(self, path): | 71 def withDir(self, path): |
| 54 path = path.replace('\\', '/') | 72 path = path.replace('\\', '/') |
| 55 path = path.lstrip('/') | 73 path = path.lstrip('/') |
| 56 path = '/%s/%s' % (self._root, path) | 74 path = '/%s/%s' % (self._root, path) |
| 110 for k, v in root.items(): | 128 for k, v in root.items(): |
| 111 self._getStructureRecursive(v, res, k) | 129 self._getStructureRecursive(v, res, k) |
| 112 return res | 130 return res |
| 113 | 131 |
| 114 def _getStructureRecursive(self, src, target, name): | 132 def _getStructureRecursive(self, src, target, name): |
| 115 if isinstance(src, tuple): | 133 if isinstance(src, _MockFsEntry): |
| 116 target[name] = src[0] | 134 target[name] = src.contents |
| 117 return | 135 return |
| 118 | 136 |
| 119 e = {} | 137 e = {} |
| 120 for k, v in src.items(): | 138 for k, v in src.items(): |
| 121 self._getStructureRecursive(v, e, k) | 139 self._getStructureRecursive(v, e, k) |
| 132 return None | 150 return None |
| 133 return cur | 151 return cur |
| 134 | 152 |
| 135 def _createDir(self, path): | 153 def _createDir(self, path): |
| 136 cur = self._fs | 154 cur = self._fs |
| 137 bits = path.strip('/').split('/') | 155 path = path.replace('\\', '/').strip('/') |
| 156 bits = path.split('/') | |
| 138 for b in bits: | 157 for b in bits: |
| 139 if b not in cur: | 158 if b not in cur: |
| 140 cur[b] = {} | 159 cur[b] = {} |
| 141 cur = cur[b] | 160 cur = cur[b] |
| 142 return self | 161 return self |
| 143 | 162 |
| 144 def _createFile(self, path, contents): | 163 def _createFile(self, path, contents): |
| 145 cur = self._fs | 164 cur = self._fs |
| 146 bits = path.strip('/').split('/') | 165 path = path.replace('\\', '/').lstrip('/') |
| 166 bits = path.split('/') | |
| 147 for b in bits[:-1]: | 167 for b in bits[:-1]: |
| 148 if b not in cur: | 168 if b not in cur: |
| 149 cur[b] = {} | 169 cur[b] = {} |
| 150 cur = cur[b] | 170 cur = cur[b] |
| 151 cur[bits[-1]] = (contents, {'mtime': time.time()}) | 171 cur[bits[-1]] = _MockFsEntry(contents) |
| 152 return self | 172 return self |
| 173 | |
| 174 def _deleteEntry(self, path): | |
| 175 parent = self._getEntry(os.path.dirname(path)) | |
| 176 assert parent is not None | |
| 177 name = os.path.basename(path) | |
| 178 assert name in parent | |
| 179 del parent[name] | |
| 153 | 180 |
| 154 | 181 |
| 155 class mock_fs_scope(object): | 182 class mock_fs_scope(object): |
| 156 def __init__(self, fs): | 183 def __init__(self, fs): |
| 157 self._fs = fs | 184 self._fs = fs |
| 169 def __exit__(self, type, value, traceback): | 196 def __exit__(self, type, value, traceback): |
| 170 self._endMock() | 197 self._endMock() |
| 171 | 198 |
| 172 def _startMock(self): | 199 def _startMock(self): |
| 173 self._createMock('__main__.open', open, self._open, create=True) | 200 self._createMock('__main__.open', open, self._open, create=True) |
| 201 # TODO: WTF, apparently the previous one doesn't really work? | |
| 202 self._createMock('piecrust.records.open', open, self._open, create=True) | |
| 174 self._createMock('codecs.open', codecs.open, self._codecsOpen) | 203 self._createMock('codecs.open', codecs.open, self._codecsOpen) |
| 175 self._createMock('os.listdir', os.listdir, self._listdir) | 204 self._createMock('os.listdir', os.listdir, self._listdir) |
| 176 self._createMock('os.makedirs', os.makedirs, self._makedirs) | 205 self._createMock('os.makedirs', os.makedirs, self._makedirs) |
| 177 self._createMock('os.path.isdir', os.path.isdir, self._isdir) | 206 self._createMock('os.path.isdir', os.path.isdir, self._isdir) |
| 178 self._createMock('os.path.isfile', os.path.isfile, self._isfile) | 207 self._createMock('os.path.isfile', os.path.isfile, self._isfile) |
| 179 self._createMock('os.path.islink', os.path.islink, self._islink) | 208 self._createMock('os.path.islink', os.path.islink, self._islink) |
| 180 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) | 209 self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) |
| 181 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) | 210 self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) |
| 211 self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree) | |
| 182 for p in self._patchers: | 212 for p in self._patchers: |
| 183 p.start() | 213 p.start() |
| 184 | 214 |
| 185 def _endMock(self): | 215 def _endMock(self): |
| 186 for p in self._patchers: | 216 for p in self._patchers: |
| 188 | 218 |
| 189 def _createMock(self, name, orig, func, **kwargs): | 219 def _createMock(self, name, orig, func, **kwargs): |
| 190 self._originals[name] = orig | 220 self._originals[name] = orig |
| 191 self._patchers.append(mock.patch(name, func, **kwargs)) | 221 self._patchers.append(mock.patch(name, func, **kwargs)) |
| 192 | 222 |
| 193 def _open(self, path, *args, **kwargs): | 223 def _doOpen(self, orig_name, path, mode, *args, **kwargs): |
| 194 path = os.path.normpath(path) | 224 path = os.path.normpath(path) |
| 195 if path.startswith(resources_path): | 225 if path.startswith(resources_path): |
| 196 return self._originals['__main__.open'](path, **kwargs) | 226 return self._originals[orig_name](path, mode, *args, **kwargs) |
| 197 e = self._getFsEntry(path) | 227 |
| 228 if 'r' in mode: | |
| 229 e = self._getFsEntry(path) | |
| 230 elif 'w' in mode: | |
| 231 e = self._getFsEntry(path) | |
| 232 if e is None: | |
| 233 contents = '' | |
| 234 if 'b' in mode: | |
| 235 contents = bytes() | |
| 236 self._fs._createFile(path, contents) | |
| 237 e = self._getFsEntry(path) | |
| 238 assert e is not None | |
| 239 else: | |
| 240 raise OSError("Unsupported open mode: %s" % mode) | |
| 241 | |
| 198 if e is None: | 242 if e is None: |
| 199 raise OSError("No such file: %s" % path) | 243 raise OSError("No such file: %s" % path) |
| 200 if not isinstance(e, tuple): | 244 if not isinstance(e, _MockFsEntry): |
| 201 raise OSError("'%s' is not a file" % path) | 245 raise OSError("'%s' is not a file %s" % (path, e)) |
| 202 return io.StringIO(e[0]) | 246 if 'b' in mode: |
| 203 | 247 assert isinstance(e.contents, bytes) |
| 204 def _codecsOpen(self, path, *args, **kwargs): | 248 return _MockFsEntryWriter(e) |
| 205 path = os.path.normpath(path) | 249 assert isinstance(e.contents, str) |
| 206 if path.startswith(resources_path): | 250 return _MockFsEntryWriter(e) |
| 207 return self._originals['codecs.open'](path, *args, **kwargs) | 251 |
| 208 e = self._getFsEntry(path) | 252 def _open(self, path, mode, *args, **kwargs): |
| 209 if e is None: | 253 return self._doOpen('__main__.open', path, mode, *args, **kwargs) |
| 210 raise OSError("No such file: %s" % path) | 254 |
| 211 if not isinstance(e, tuple): | 255 def _codecsOpen(self, path, mode, *args, **kwargs): |
| 212 raise OSError("'%s' is not a file" % path) | 256 return self._doOpen('codecs.open', path, mode, *args, **kwargs) |
| 213 return io.StringIO(e[0]) | |
| 214 | 257 |
| 215 def _listdir(self, path): | 258 def _listdir(self, path): |
| 216 if not path.startswith('/' + self.root): | 259 path = os.path.normpath(path) |
| 260 if path.startswith(resources_path): | |
| 217 return self._originals['os.listdir'](path) | 261 return self._originals['os.listdir'](path) |
| 262 | |
| 218 e = self._getFsEntry(path) | 263 e = self._getFsEntry(path) |
| 219 if e is None: | 264 if e is None: |
| 220 raise OSError("No such directory: %s" % path) | 265 raise OSError("No such directory: %s" % path) |
| 221 if not isinstance(e, dict): | 266 if not isinstance(e, dict): |
| 222 raise OSError("'%s' is not a directory." % path) | 267 raise OSError("'%s' is not a directory." % path) |
| 223 return list(e.keys()) | 268 return list(e.keys()) |
| 224 | 269 |
| 225 def _makedirs(self, path, mode): | 270 def _makedirs(self, path, mode=0o777): |
| 226 if not path.startswith('/' + self.root): | 271 if not path.replace('\\', '/').startswith('/' + self.root): |
| 227 raise Exception("Shouldn't create directory: %s" % path) | 272 raise Exception("Shouldn't create directory: %s" % path) |
| 228 self._fs._createDir(path) | 273 self._fs._createDir(path) |
| 229 | 274 |
| 230 def _isdir(self, path): | 275 def _isdir(self, path): |
| 231 if not path.startswith('/' + self.root): | 276 path = os.path.normpath(path) |
| 277 if path.startswith(resources_path): | |
| 232 return self._originals['os.path.isdir'](path) | 278 return self._originals['os.path.isdir'](path) |
| 233 e = self._getFsEntry(path) | 279 e = self._getFsEntry(path) |
| 234 return e is not None and isinstance(e, dict) | 280 return e is not None and isinstance(e, dict) |
| 235 | 281 |
| 236 def _isfile(self, path): | 282 def _isfile(self, path): |
| 237 if not path.startswith('/' + self.root): | 283 path = os.path.normpath(path) |
| 284 if path.startswith(resources_path): | |
| 238 return self._originals['os.path.isfile'](path) | 285 return self._originals['os.path.isfile'](path) |
| 239 e = self._getFsEntry(path) | 286 e = self._getFsEntry(path) |
| 240 return e is not None and isinstance(e, tuple) | 287 return e is not None and isinstance(e, _MockFsEntry) |
| 241 | 288 |
| 242 def _islink(self, path): | 289 def _islink(self, path): |
| 243 if not path.startswith('/' + self.root): | 290 path = os.path.normpath(path) |
| 291 if path.startswith(resources_path): | |
| 244 return self._originals['os.path.islink'](path) | 292 return self._originals['os.path.islink'](path) |
| 245 return False | 293 return False |
| 246 | 294 |
| 247 def _getmtime(self, path): | 295 def _getmtime(self, path): |
| 248 if not path.startswith('/' + self.root): | 296 path = os.path.normpath(path) |
| 297 if path.startswith(resources_path): | |
| 249 return self._originals['os.path.getmtime'](path) | 298 return self._originals['os.path.getmtime'](path) |
| 250 e = self._getFsEntry(path) | 299 e = self._getFsEntry(path) |
| 251 if e is None: | 300 if e is None: |
| 252 raise OSError("No such file: %s" % path) | 301 raise OSError("No such file: %s" % path) |
| 253 return e[1]['mtime'] | 302 return e.metadata['mtime'] |
| 254 | 303 |
| 255 def _copyfile(self, src, dst): | 304 def _copyfile(self, src, dst): |
| 256 if not src.startswith('/' + self.root): | 305 src = os.path.normpath(src) |
| 257 with open(src, 'r') as fp: | 306 if src.startswith(resources_path): |
| 307 with self._originals['__main__.open'](src, 'r') as fp: | |
| 258 src_text = fp.read() | 308 src_text = fp.read() |
| 259 else: | 309 else: |
| 260 e = self._getFsEntry(src) | 310 e = self._getFsEntry(src) |
| 261 src_text = e[0] | 311 src_text = e.contents |
| 262 if not dst.startswith('/' + self.root): | 312 if not dst.replace('\\', '/').startswith('/' + self.root): |
| 263 raise Exception("Shouldn't copy to: %s" % dst) | 313 raise Exception("Shouldn't copy to: %s" % dst) |
| 264 self._fs._createFile(dst, src_text) | 314 self._fs._createFile(dst, src_text) |
| 265 | 315 |
| 316 def _rmtree(self, path): | |
| 317 if not path.replace('\\', '/').startswith('/' + self.root): | |
| 318 raise Exception("Shouldn't delete trees from: %s" % path) | |
| 319 e = self._fs._getEntry(os.path.dirname(path)) | |
| 320 del e[os.path.basename(path)] | |
| 321 | |
| 266 def _getFsEntry(self, path): | 322 def _getFsEntry(self, path): |
| 267 return self._fs._getEntry(path) | 323 return self._fs._getEntry(path) |
| 268 | 324 |
