view tests/memfs.py @ 868:8d25f76fce98

bake: Add ability to specify pipelines to exclude during the bake.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 12 Jun 2017 22:22:19 -0700
parents f987b29d6fab
children
line wrap: on
line source

import os.path
import io
import time
import errno
import random
import codecs
import shutil
import mock
from piecrust import RESOURCES_DIR
from .basefs import TestFileSystemBase


class _MockFsEntry(object):
    def __init__(self, contents):
        self.contents = contents
        self.metadata = {'mtime': time.time()}


class _MockFsEntryWriter(object):
    def __init__(self, entry, mode='rt'):
        self._entry = entry
        self._mode = mode

        if 'b' in mode:
            data = entry.contents
            if isinstance(data, str):
                data = data.encode('utf8')
            self._stream = io.BytesIO(data)
        else:
            self._stream = io.StringIO(entry.contents)

    def __getattr__(self, name):
        return getattr(self._stream, name)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        if 'w' in self._mode:
            if 'a' in self._mode:
                self._entry.contents += self._stream.getvalue()
            else:
                self._entry.contents = self._stream.getvalue()
            self._entry.metadata['mtime'] = time.time()
        self._stream.close()


class MemoryFileSystem(TestFileSystemBase):
    def __init__(self):
        self._root = 'root_%d' % random.randrange(1000)
        self._fs = {self._root: {}}

    def path(self, p):
        p = p.replace('\\', '/')
        if p in ['/', '', None]:
            return '/%s' % self._root
        return '/%s/%s' % (self._root, p.lstrip('/'))

    def getStructure(self, path=None):
        root = self._fs[self._root]
        if path:
            root = self._getEntry(self.path(path))
            if root is None:
                raise Exception("No such path: %s" % path)
            if not isinstance(root, dict):
                raise Exception("Path is not a directory: %s" % path)

        res = {}
        for k, v in root.items():
            self._getStructureRecursive(v, res, k)
        return res

    def getFileEntry(self, path):
        entry = self._getEntry(self.path(path))
        if entry is None:
            raise Exception("No such file: %s" % path)
        if not isinstance(entry, _MockFsEntry):
            raise Exception("Path is not a file: %s" % path)
        return entry.contents

    def _getStructureRecursive(self, src, target, name):
        if isinstance(src, _MockFsEntry):
            target[name] = src.contents
            return

        e = {}
        for k, v in src.items():
            self._getStructureRecursive(v, e, k)
        target[name] = e

    def _getEntry(self, path):
        cur = self._fs
        path = path.replace('\\', '/').lstrip('/')
        bits = path.split('/')
        for p in bits:
            try:
                cur = cur[p]
            except KeyError:
                return None
        return cur

    def _createDir(self, path):
        cur = self._fs
        path = path.replace('\\', '/').strip('/')
        bits = path.split('/')
        for b in bits:
            if b not in cur:
                cur[b] = {}
            cur = cur[b]
        return self

    def _createFile(self, path, contents):
        cur = self._fs
        path = path.replace('\\', '/').lstrip('/')
        bits = path.split('/')
        for b in bits[:-1]:
            if b not in cur:
                cur[b] = {}
            cur = cur[b]
        cur[bits[-1]] = _MockFsEntry(contents)
        return self

    def _deleteEntry(self, path):
        parent = self._getEntry(os.path.dirname(path))
        assert parent is not None
        name = os.path.basename(path)
        assert name in parent
        del parent[name]


class MemoryScope(object):
    def __init__(self, fs, open_patches=None):
        self.open_patches = open_patches or []
        self._fs = fs
        self._patchers = []
        self._originals = {}

    @property
    def root(self):
        return self._fs._root

    def __enter__(self):
        self._startMock()
        return self

    def __exit__(self, type, value, traceback):
        self._endMock()

    def _startMock(self):
        # TODO: sadly, there seems to be no way to replace `open` everywhere?
        modules = self.open_patches + [
                '__main__',
                'piecrust.records',
                'jinja2.utils']
        for m in modules:
            self._createMock('%s.open' % m, open, self._open, create=True)

        self._createMock('codecs.open', codecs.open, self._codecsOpen)
        self._createMock('os.listdir', os.listdir, self._listdir)
        self._createMock('os.makedirs', os.makedirs, self._makedirs)
        self._createMock('os.remove', os.remove, self._remove)
        self._createMock('os.rename', os.rename, self._rename)
        self._createMock('os.path.exists', os.path.exists, self._exists)
        self._createMock('os.path.isdir', os.path.isdir, self._isdir)
        self._createMock('os.path.isfile', os.path.isfile, self._isfile)
        self._createMock('os.path.islink', os.path.islink, self._islink)
        self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime)
        self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile)
        self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree)
        for p in self._patchers:
            p.start()

    def _endMock(self):
        for p in self._patchers:
            p.stop()

    def _createMock(self, name, orig, func, **kwargs):
        self._originals[name] = orig
        self._patchers.append(mock.patch(name, func, **kwargs))

    def _doOpen(self, orig_name, path, mode, *args, **kwargs):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals[orig_name](path, mode, *args, **kwargs)

        if 'r' in mode:
            e = self._getFsEntry(path)
        elif 'w' in mode or 'x' in mode or 'a' in mode:
            e = self._getFsEntry(path)
            if e is None:
                contents = ''
                if 'b' in mode:
                    contents = bytes()
                self._fs._createFile(path, contents)
                e = self._getFsEntry(path)
                assert e is not None
            elif 'x' in mode:
                err = IOError("File '%s' already exists" % path)
                err.errno = errno.EEXIST
                raise err
        else:
            err = IOError("Unsupported open mode: %s" % mode)
            err.errno = errno.EINVAL
            raise err

        if e is None:
            err = IOError("No such file: %s" % path)
            err.errno = errno.ENOENT
            raise err
        if not isinstance(e, _MockFsEntry):
            err = IOError("'%s' is not a file %s" % (path, e))
            err.errno = errno.EISDIR
            raise err

        return _MockFsEntryWriter(e, mode)

    def _open(self, path, mode, *args, **kwargs):
        return self._doOpen('__main__.open', path, mode, *args, **kwargs)

    def _codecsOpen(self, path, mode, *args, **kwargs):
        return self._doOpen('codecs.open', path, mode, *args, **kwargs)

    def _listdir(self, path):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals['os.listdir'](path)

        e = self._getFsEntry(path)
        if e is None:
            raise OSError("No such directory: %s" % path)
        if not isinstance(e, dict):
            raise OSError("'%s' is not a directory." % path)
        return list(e.keys())

    def _makedirs(self, path, mode=0o777):
        if not path.replace('\\', '/').startswith('/' + self.root):
            raise Exception("Shouldn't create directory: %s" % path)
        self._fs._createDir(path)

    def _remove(self, path):
        path = os.path.normpath(path)
        self._fs._deleteEntry(path)

    def _exists(self, path):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals['os.path.isdir'](path)
        e = self._getFsEntry(path)
        return e is not None

    def _isdir(self, path):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals['os.path.isdir'](path)
        e = self._getFsEntry(path)
        return e is not None and isinstance(e, dict)

    def _isfile(self, path):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals['os.path.isfile'](path)
        e = self._getFsEntry(path)
        return e is not None and isinstance(e, _MockFsEntry)

    def _islink(self, path):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals['os.path.islink'](path)
        return False

    def _getmtime(self, path):
        path = os.path.normpath(path)
        if path.startswith(RESOURCES_DIR):
            return self._originals['os.path.getmtime'](path)
        e = self._getFsEntry(path)
        if e is None:
            raise OSError("No such file: %s" % path)
        return e.metadata['mtime']

    def _copyfile(self, src, dst):
        src = os.path.normpath(src)
        if src.startswith(RESOURCES_DIR):
            with self._originals['__main__.open'](src, 'r') as fp:
                src_text = fp.read()
        else:
            e = self._getFsEntry(src)
            src_text = e.contents
        if not dst.replace('\\', '/').startswith('/' + self.root):
            raise Exception("Shouldn't copy to: %s" % dst)
        self._fs._createFile(dst, src_text)

    def _rename(self, src, dst):
        src = os.path.normpath(src)
        if src.startswith(RESOURCES_DIR) or dst.startswith(RESOURCES_DIR):
            raise Exception("Shouldn't rename files in the resources path.")
        self._copyfile(src, dst)
        self._remove(src)

    def _rmtree(self, path):
        if not path.replace('\\', '/').startswith('/' + self.root):
            raise Exception("Shouldn't delete trees from: %s" % path)
        e = self._fs._getEntry(os.path.dirname(path))
        del e[os.path.basename(path)]

    def _getFsEntry(self, path):
        return self._fs._getEntry(path)