changeset 182:a54d3c0b5f4a

tests: Patch `os.path.exists` and improve patching for `open`. You can specify additional modules for which to patch `open`. Also, it was incorrectly updating the opened file, even when it was opened for read only. Now it only updates the contents if the file was opened for write, and supports appending to the end. Last, it supports opening text files in binary mode.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jan 2015 14:55:41 -0800
parents d356f6178623
children fff195335d0a
files tests/mockutil.py
diffstat 1 files changed, 36 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/tests/mockutil.py	Sun Jan 04 14:53:13 2015 -0800
+++ b/tests/mockutil.py	Sun Jan 04 14:55:41 2015 -0800
@@ -42,14 +42,17 @@
 
 
 class _MockFsEntryWriter(object):
-    def __init__(self, entry):
+    def __init__(self, entry, mode='rt'):
         self._entry = entry
-        if isinstance(entry.contents, str):
+        self._mode = mode
+
+        if 'b' in mode:
+            data = entry.contents
+            if isinstance(data, str):
+                data = data.encode('utf8')
+            self._stream = io.BytesIO(data)
+        else:
             self._stream = io.StringIO(entry.contents)
-        elif isinstance(entry.contents, bytes):
-            self._stream = io.BytesIO(entry.contents)
-        else:
-            raise Exception("Unexpected entry contents: %s" % type(entry.contents))
 
     def __getattr__(self, name):
         return getattr(self._stream, name)
@@ -58,8 +61,12 @@
         return self
 
     def __exit__(self, exc_type, exc_value, exc_tb):
-        self._entry.contents = self._stream.getvalue()
-        self._entry.metadata['mtime'] = time.time()
+        if 'w' in self._mode:
+            if 'a' in self._mode:
+                self._entry.contents += self._stream.getvalue()
+            else:
+                self._entry.contents = self._stream.getvalue()
+            self._entry.metadata['mtime'] = time.time()
         self._stream.close()
 
 
@@ -194,7 +201,8 @@
 
 
 class mock_fs_scope(object):
-    def __init__(self, fs):
+    def __init__(self, fs, open_patches=None):
+        self.open_patches = open_patches or []
         self._fs = fs
         self._patchers = []
         self._originals = {}
@@ -212,12 +220,15 @@
 
     def _startMock(self):
         # TODO: sadly, there seems to be no way to replace `open` everywhere?
-        self._createMock('__main__.open', open, self._open, create=True)
-        self._createMock('piecrust.records.open', open, self._open, create=True)
+        modules = self.open_patches + ['__main__', 'piecrust.records']
+        for m in modules:
+            self._createMock('%s.open' % m, open, self._open, create=True)
+
         self._createMock('codecs.open', codecs.open, self._codecsOpen)
         self._createMock('os.listdir', os.listdir, self._listdir)
         self._createMock('os.makedirs', os.makedirs, self._makedirs)
         self._createMock('os.remove', os.remove, self._remove)
+        self._createMock('os.path.exists', os.path.exists, self._exists)
         self._createMock('os.path.isdir', os.path.isdir, self._isdir)
         self._createMock('os.path.isfile', os.path.isfile, self._isfile)
         self._createMock('os.path.islink', os.path.islink, self._islink)
@@ -236,13 +247,14 @@
         self._patchers.append(mock.patch(name, func, **kwargs))
 
     def _doOpen(self, orig_name, path, mode, *args, **kwargs):
+        print("Opening %s" % path)
         path = os.path.normpath(path)
         if path.startswith(resources_path):
             return self._originals[orig_name](path, mode, *args, **kwargs)
 
         if 'r' in mode:
             e = self._getFsEntry(path)
-        elif 'w' in mode:
+        elif 'w' in mode or 'x' in mode or 'a' in mode:
             e = self._getFsEntry(path)
             if e is None:
                 contents = ''
@@ -251,6 +263,8 @@
                 self._fs._createFile(path, contents)
                 e = self._getFsEntry(path)
                 assert e is not None
+            elif 'x' in mode:
+                raise OSError("File '%s' already exists" % path)
         else:
             raise OSError("Unsupported open mode: %s" % mode)
 
@@ -258,11 +272,8 @@
             raise OSError("No such file: %s" % path)
         if not isinstance(e, _MockFsEntry):
             raise OSError("'%s' is not a file %s" % (path, e))
-        if 'b' in mode:
-            assert isinstance(e.contents, bytes)
-            return _MockFsEntryWriter(e)
-        assert isinstance(e.contents, str)
-        return _MockFsEntryWriter(e)
+
+        return _MockFsEntryWriter(e, mode)
 
     def _open(self, path, mode, *args, **kwargs):
         return self._doOpen('__main__.open', path, mode, *args, **kwargs)
@@ -291,6 +302,14 @@
         path = os.path.normpath(path)
         self._fs._deleteEntry(path)
 
+    def _exists(self, path):
+        print("Checking for %s" % path)
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
+            return self._originals['os.path.isdir'](path)
+        e = self._getFsEntry(path)
+        return e is not None
+
     def _isdir(self, path):
         path = os.path.normpath(path)
         if path.startswith(resources_path):