diff tests/mockutil.py @ 85:3471ffa059b2

Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 03 Sep 2014 17:27:50 -0700
parents 485682a6de50
children 133845647083
line wrap: on
line diff
--- a/tests/mockutil.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/tests/mockutil.py	Wed Sep 03 17:27:50 2014 -0700
@@ -22,13 +22,31 @@
     return app
 
 
-def with_mock_fs_app(f):
-    @functools.wraps(f)
-    def wrapper(app, *args, **kwargs):
-        with mock_fs_scope(app):
-            real_app = app.getApp()
-            return f(real_app, *args, **kwargs)
-    return wrapper
+class _MockFsEntry(object):
+    def __init__(self, contents):
+        self.contents = contents
+        self.metadata = {'mtime': time.time()}
+
+
+class _MockFsEntryWriter(object):
+    def __init__(self, entry):
+        self._entry = entry
+        if isinstance(entry.contents, str):
+            self._stream = io.StringIO(entry.contents)
+        elif isinstance(entry.contents, bytes):
+            self._stream = io.BytesIO(entry.contents)
+        else:
+            raise Exception("Unexpected entry contents: %s" % type(entry.contents))
+
+    def __getattr__(self, name):
+        return getattr(self._stream, name)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self._entry.contents = self._stream.getvalue()
+        self._stream.close()
 
 
 class mock_fs(object):
@@ -46,9 +64,9 @@
             return '/%s' % self._root
         return '/%s/%s' % (self._root, p.lstrip('/'))
 
-    def getApp(self):
+    def getApp(self, cache=True):
         root_dir = self.path('/kitchen')
-        return PieCrust(root_dir, cache=False)
+        return PieCrust(root_dir, cache=cache, debug=True)
 
     def withDir(self, path):
         path = path.replace('\\', '/')
@@ -112,8 +130,8 @@
         return res
 
     def _getStructureRecursive(self, src, target, name):
-        if isinstance(src, tuple):
-            target[name] = src[0]
+        if isinstance(src, _MockFsEntry):
+            target[name] = src.contents
             return
 
         e = {}
@@ -134,7 +152,8 @@
 
     def _createDir(self, path):
         cur = self._fs
-        bits = path.strip('/').split('/')
+        path = path.replace('\\', '/').strip('/')
+        bits = path.split('/')
         for b in bits:
             if b not in cur:
                 cur[b] = {}
@@ -143,14 +162,22 @@
 
     def _createFile(self, path, contents):
         cur = self._fs
-        bits = path.strip('/').split('/')
+        path = path.replace('\\', '/').lstrip('/')
+        bits = path.split('/')
         for b in bits[:-1]:
             if b not in cur:
                 cur[b] = {}
             cur = cur[b]
-        cur[bits[-1]] = (contents, {'mtime': time.time()})
+        cur[bits[-1]] = _MockFsEntry(contents)
         return self
 
+    def _deleteEntry(self, path):
+        parent = self._getEntry(os.path.dirname(path))
+        assert parent is not None
+        name = os.path.basename(path)
+        assert name in parent
+        del parent[name]
+
 
 class mock_fs_scope(object):
     def __init__(self, fs):
@@ -171,6 +198,8 @@
 
     def _startMock(self):
         self._createMock('__main__.open', open, self._open, create=True)
+        # TODO: WTF, apparently the previous one doesn't really work?
+        self._createMock('piecrust.records.open', open, self._open, create=True)
         self._createMock('codecs.open', codecs.open, self._codecsOpen)
         self._createMock('os.listdir', os.listdir, self._listdir)
         self._createMock('os.makedirs', os.makedirs, self._makedirs)
@@ -179,6 +208,7 @@
         self._createMock('os.path.islink', os.path.islink, self._islink)
         self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime)
         self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile)
+        self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree)
         for p in self._patchers:
             p.start()
 
@@ -190,31 +220,46 @@
         self._originals[name] = orig
         self._patchers.append(mock.patch(name, func, **kwargs))
 
-    def _open(self, path, *args, **kwargs):
+    def _doOpen(self, orig_name, path, mode, *args, **kwargs):
         path = os.path.normpath(path)
         if path.startswith(resources_path):
-            return self._originals['__main__.open'](path, **kwargs)
-        e = self._getFsEntry(path)
+            return self._originals[orig_name](path, mode, *args, **kwargs)
+
+        if 'r' in mode:
+            e = self._getFsEntry(path)
+        elif 'w' in mode:
+            e = self._getFsEntry(path)
+            if e is None:
+                contents = ''
+                if 'b' in mode:
+                    contents = bytes()
+                self._fs._createFile(path, contents)
+                e = self._getFsEntry(path)
+                assert e is not None
+        else:
+            raise OSError("Unsupported open mode: %s" % mode)
+
         if e is None:
             raise OSError("No such file: %s" % path)
-        if not isinstance(e, tuple):
-            raise OSError("'%s' is not a file" % path)
-        return io.StringIO(e[0])
+        if not isinstance(e, _MockFsEntry):
+            raise OSError("'%s' is not a file %s" % (path, e))
+        if 'b' in mode:
+            assert isinstance(e.contents, bytes)
+            return _MockFsEntryWriter(e)
+        assert isinstance(e.contents, str)
+        return _MockFsEntryWriter(e)
 
-    def _codecsOpen(self, path, *args, **kwargs):
+    def _open(self, path, mode, *args, **kwargs):
+        return self._doOpen('__main__.open', path, mode, *args, **kwargs)
+
+    def _codecsOpen(self, path, mode, *args, **kwargs):
+        return self._doOpen('codecs.open', path, mode, *args, **kwargs)
+
+    def _listdir(self, path):
         path = os.path.normpath(path)
         if path.startswith(resources_path):
-            return self._originals['codecs.open'](path, *args, **kwargs)
-        e = self._getFsEntry(path)
-        if e is None:
-            raise OSError("No such file: %s" % path)
-        if not isinstance(e, tuple):
-            raise OSError("'%s' is not a file" % path)
-        return io.StringIO(e[0])
+            return self._originals['os.listdir'](path)
 
-    def _listdir(self, path):
-        if not path.startswith('/' + self.root):
-            return self._originals['os.listdir'](path)
         e = self._getFsEntry(path)
         if e is None:
             raise OSError("No such directory: %s" % path)
@@ -222,47 +267,58 @@
             raise OSError("'%s' is not a directory." % path)
         return list(e.keys())
 
-    def _makedirs(self, path, mode):
-        if not path.startswith('/' + self.root):
+    def _makedirs(self, path, mode=0o777):
+        if not path.replace('\\', '/').startswith('/' + self.root):
             raise Exception("Shouldn't create directory: %s" % path)
         self._fs._createDir(path)
 
     def _isdir(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.isdir'](path)
         e = self._getFsEntry(path)
         return e is not None and isinstance(e, dict)
 
     def _isfile(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.isfile'](path)
         e = self._getFsEntry(path)
-        return e is not None and isinstance(e, tuple)
+        return e is not None and isinstance(e, _MockFsEntry)
 
     def _islink(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.islink'](path)
         return False
 
     def _getmtime(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.getmtime'](path)
         e = self._getFsEntry(path)
         if e is None:
             raise OSError("No such file: %s" % path)
-        return e[1]['mtime']
+        return e.metadata['mtime']
 
     def _copyfile(self, src, dst):
-        if not src.startswith('/' + self.root):
-            with open(src, 'r') as fp:
+        src = os.path.normpath(src)
+        if src.startswith(resources_path):
+            with self._originals['__main__.open'](src, 'r') as fp:
                 src_text = fp.read()
         else:
             e = self._getFsEntry(src)
-            src_text = e[0]
-        if not dst.startswith('/' + self.root):
+            src_text = e.contents
+        if not dst.replace('\\', '/').startswith('/' + self.root):
             raise Exception("Shouldn't copy to: %s" % dst)
         self._fs._createFile(dst, src_text)
 
+    def _rmtree(self, path):
+        if not path.replace('\\', '/').startswith('/' + self.root):
+            raise Exception("Shouldn't delete trees from: %s" % path)
+        e = self._fs._getEntry(os.path.dirname(path))
+        del e[os.path.basename(path)]
+
     def _getFsEntry(self, path):
         return self._fs._getEntry(path)