Mercurial > piecrust2
view tests/conftest.py @ 411:e7b865f8f335
bake: Enable multiprocess baking.
Baking is now done by running a worker per CPU, and sending jobs to them.
This changes several things across the codebase:
* Ability to not cache things related to pages other than the 'main' page
(i.e. the page at the bottom of the execution stack).
* Decouple the baking process from the bake records, so only the main process
keeps track (and modifies) the bake record.
* Remove the need for 'batch page getters' and loading a page directly from
the page factories.
There are various smaller changes too included here, including support for
scope performance timers that are saved with the bake record and can be
printed out to the console. Yes I got carried away.
For testing, the in-memory 'mock' file-system doesn't work anymore, since
we're spawning processes, so this is replaced by a 'tmpfs' file-system which
is saved in temporary files on disk and deleted after tests have run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 12 Jun 2015 17:09:19 -0700 |
parents | a0724af26c12 |
children | 2aa879d63133 |
line wrap: on
line source
import io import sys import pprint import os.path import logging import pytest import yaml import colorama from piecrust.configuration import merge_dicts from .mockutil import mock_fs, mock_fs_scope def pytest_runtest_setup(item): pass def pytest_addoption(parser): parser.addoption( '--log-debug', action='store_true', help="Sets the PieCrust logger to output debug info to stdout.") def pytest_configure(config): if config.getoption('--log-debug'): hdl = logging.StreamHandler(stream=sys.stdout) logging.getLogger('piecrust').addHandler(hdl) logging.getLogger('piecrust').setLevel(logging.DEBUG) def pytest_collect_file(parent, path): if path.ext == '.yaml' and path.basename.startswith("test"): category = os.path.basename(path.dirname) if category == 'bakes': return BakeTestFile(path, parent) elif category == 'cli': return ChefTestFile(path, parent) elif category == 'servings': return ServeTestFile(path, parent) class YamlTestFileBase(pytest.File): def collect(self): spec = yaml.load_all(self.fspath.open()) for i, item in enumerate(spec): name = '%s_%d' % (self.fspath.basename, i) if 'test_name' in item: name += '_%s' % item['test_name'] yield self.__item_class__(name, self, item) class YamlTestItemBase(pytest.Item): def __init__(self, name, parent, spec): super(YamlTestItemBase, self).__init__(name, parent) self.spec = spec def _prepareMockFs(self): fs = mock_fs() # Website config. config = { 'site': { 'default_format': 'none', 'default_page_layout': 'none', 'default_post_layout': 'none'} } test_config = self.spec.get('config') if test_config is not None: merge_dicts(config, test_config) fs.withConfig(config) # Input file-system. input_files = self.spec.get('in') if input_files is not None: _add_mock_files(fs, '/kitchen', input_files) return fs class ChefTestItem(YamlTestItemBase): __initialized_logging__ = False def runtest(self): if not ChefTestItem.__initialized_logging__: colorama.init() hdl = logging.StreamHandler(stream=sys.stdout) logging.getLogger().addHandler(hdl) logging.getLogger().setLevel(logging.INFO) ChefTestItem.__initialized_logging__ = True fs = self._prepareMockFs() argv = self.spec['args'] if isinstance(argv, str): argv = argv.split(' ') expected_code = self.spec.get('code', 0) expected_out = self.spec.get('out', '') with mock_fs_scope(fs): memstream = io.StringIO() hdl = logging.StreamHandler(stream=memstream) logging.getLogger().addHandler(hdl) try: from piecrust.main import PreParsedChefArgs, _run_chef pre_args = PreParsedChefArgs( root=fs.path('/kitchen')) exit_code = _run_chef(pre_args, argv) finally: logging.getLogger().removeHandler(hdl) assert expected_code == exit_code assert expected_out == memstream.getvalue() def reportinfo(self): return self.fspath, 0, "bake: %s" % self.name def repr_failure(self, excinfo): if isinstance(excinfo.value, ExpectedChefOutputError): return ('\n'.join( ['Unexpected command output. Left is expected output, ' 'right is actual output'] + excinfo.value.args[0])) return super(ChefTestItem, self).repr_failure(excinfo) class ExpectedChefOutputError(Exception): pass class ChefTestFile(YamlTestFileBase): __item_class__ = ChefTestItem class BakeTestItem(YamlTestItemBase): def runtest(self): fs = self._prepareMockFs() # Output file-system. expected_output_files = self.spec.get('out') expected_partial_files = self.spec.get('outfiles') # Bake! from piecrust.baking.baker import Baker with mock_fs_scope(fs): out_dir = fs.path('kitchen/_counter') app = fs.getApp() baker = Baker(app, out_dir) record = baker.bake() if not record.success: errors = [] for e in record.entries: errors += e.getAllErrors() raise BakeError(errors) if expected_output_files: actual = fs.getStructure('kitchen/_counter') error = _compare_dicts(expected_output_files, actual) if error: raise ExpectedBakeOutputError(error) if expected_partial_files: keys = list(sorted(expected_partial_files.keys())) for key in keys: try: actual = fs.getFileEntry('kitchen/_counter/' + key.lstrip('/')) except Exception as e: raise ExpectedBakeOutputError([ "Can't access output file %s: %s" % (key, e)]) expected = expected_partial_files[key] # HACK because for some reason PyYAML adds a new line for # those and I have no idea why. actual = actual.rstrip('\n') expected = expected.rstrip('\n') cmpres = _compare_str(expected, actual, key) if cmpres: raise ExpectedBakeOutputError(cmpres) def reportinfo(self): return self.fspath, 0, "bake: %s" % self.name def repr_failure(self, excinfo): if isinstance(excinfo.value, ExpectedBakeOutputError): return ('\n'.join( ['Unexpected bake output. Left is expected output, ' 'right is actual output'] + excinfo.value.args[0])) elif isinstance(excinfo.value, BakeError): return ('\n'.join( ['Errors occured during bake:'] + excinfo.value.args[0])) return super(BakeTestItem, self).repr_failure(excinfo) class BakeError(Exception): pass class ExpectedBakeOutputError(Exception): pass class BakeTestFile(YamlTestFileBase): __item_class__ = BakeTestItem class ServeTestItem(YamlTestItemBase): class _TestApp(object): def __init__(self, server): self.server = server def __call__(self, environ, start_response): return self.server._try_run_request(environ, start_response) def runtest(self): fs = self._prepareMockFs() url = self.spec.get('url') if url is None: raise Exception("Missing URL in test spec.") expected_status = self.spec.get('status', 200) expected_headers = self.spec.get('headers') expected_output = self.spec.get('out') expected_contains = self.spec.get('out_contains') from werkzeug.test import Client from werkzeug.wrappers import BaseResponse from piecrust.serving.server import Server with mock_fs_scope(fs): server = Server(fs.path('/kitchen')) test_app = self._TestApp(server) client = Client(test_app, BaseResponse) resp = client.get(url) assert expected_status == resp.status_code if expected_headers: for k, v in expected_headers.items(): assert v == resp.headers.get(k) actual = resp.data.decode('utf8').rstrip() if expected_output: assert expected_output.rstrip() == actual if expected_contains: assert expected_contains.rstrip() in actual def reportinfo(self): return self.fspath, 0, "serve: %s" % self.name class ServeTestFile(YamlTestFileBase): __item_class__ = ServeTestItem def _add_mock_files(fs, parent_path, spec): for name, subspec in spec.items(): path = os.path.join(parent_path, name) if isinstance(subspec, str): fs.withFile(path, subspec) elif isinstance(subspec, dict): _add_mock_files(fs, path, subspec) def _compare(left, right, path): if type(left) != type(right): return (["Different items: ", "%s: %s" % (path, pprint.pformat(left)), "%s: %s" % (path, pprint.pformat(right))]) if isinstance(left, str): return _compare_str(left, right, path) elif isinstance(left, dict): return _compare_dicts(left, right, path) elif isinstance(left, list): return _compare_lists(left, right, path) elif left != right: return (["Different items: ", "%s: %s" % (path, pprint.pformat(left)), "%s: %s" % (path, pprint.pformat(right))]) def _compare_dicts(left, right, basepath=''): key_diff = set(left.keys()) ^ set(right.keys()) if key_diff: extra_left = set(left.keys()) - set(right.keys()) if extra_left: return (["Left contains more items: "] + ['- %s/%s' % (basepath, k) for k in extra_left]) extra_right = set(right.keys()) - set(left.keys()) if extra_right: return (["Right contains more items: "] + ['- %s/%s' % (basepath, k) for k in extra_right]) return ["Unknown difference"] for key in left.keys(): lv = left[key] rv = right[key] childpath = basepath + '/' + key cmpres = _compare(lv, rv, childpath) if cmpres: return cmpres return None def _compare_lists(left, right, path): for i in range(min(len(left), len(right))): l = left[i] r = right[i] cmpres = _compare(l, r, path) if cmpres: return cmpres if len(left) > len(right): return (["Left '%s' contains more items. First extra item: " % path, left[len(right)]]) if len(right) > len(left): return (["Right '%s' contains more items. First extra item: " % path, right[len(left)]]) return None def _compare_str(left, right, path): if left == right: return None for i in range(min(len(left), len(right))): if left[i] != right[i]: start = max(0, i - 15) l_end = min(len(left), i + 15) r_end = min(len(right), i + 15) l_str = '' l_offset = 0 for j in range(start, l_end): c = repr(left[j]).strip("'") l_str += c if j < i: l_offset += len(c) r_str = '' r_offset = 0 for j in range(start, r_end): c = repr(right[j]).strip("'") r_str += c if j < i: r_offset += len(c) return ["Items '%s' differ at index %d:" % (path, i), '', "Left:", left, '', "Right:", right, '', "Difference:", l_str, (' ' * l_offset + '^'), r_str, (' ' * r_offset + '^')] if len(left) > len(right): return ["Left is longer.", "Left '%s': " % path, left, "Right '%s': " % path, right, "Extra items: %r" % left[len(right):]] if len(right) > len(left): return ["Right is longer.", "Left '%s': " % path, left, "Right '%s': " % path, right, "Extra items: %r" % right[len(left):]]