view tests/test_pipelines_asset.py @ 975:a0a62d0da723

internal: Check that the `Assetor` has an asset URL format to work with.
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 17 Oct 2017 01:08:56 -0700
parents 72f17534d58e
children 45ad976712ec
line wrap: on
line source

import time
import os.path
import shutil
import inspect
import pytest
from piecrust.pipelines.asset import get_filtered_processors
from piecrust.pipelines.records import MultiRecord
from piecrust.processing.base import SimpleFileProcessor
from .mockutil import mock_fs, mock_fs_scope


class FooProcessor(SimpleFileProcessor):
    def __init__(self, exts=None, open_func=None):
        exts = exts or {'foo', 'foo'}
        super(FooProcessor, self).__init__({exts[0]: exts[1]})
        self.PROCESSOR_NAME = exts[0]
        self.open_func = open_func or open

    def _doProcess(self, in_path, out_path):
        with self.open_func(in_path, 'r') as f:
            text = f.read()
        with self.open_func(out_path, 'w') as f:
            f.write("%s: %s" % (self.PROCESSOR_NAME.upper(), text))
        return True


class NoopProcessor(SimpleFileProcessor):
    def __init__(self, exts):
        super(NoopProcessor, self).__init__({exts[0]: exts[1]})
        self.PROCESSOR_NAME = exts[0]
        self.processed = []

    def _doProcess(self, in_path, out_path):
        self.processed.append(in_path)
        shutil.copyfile(in_path, out_path)
        return True


def _get_test_fs(processors=None):
    if processors is None:
        processors = 'copy'
    return (mock_fs()
            .withDir('counter')
            .withConfig({
                'pipelines': {
                    'asset': {
                        'processors': processors
                    }
                }
            }))


def _create_test_plugin(fs, *, foo_exts=None, noop_exts=None):
    src = [
        'from piecrust.plugins.base import PieCrustPlugin',
        'from piecrust.processing.base import SimpleFileProcessor']

    foo_lines = inspect.getsourcelines(FooProcessor)
    src += ['']
    src += map(lambda l: l.rstrip('\n'), foo_lines[0])

    noop_lines = inspect.getsourcelines(NoopProcessor)
    src += ['']
    src += map(lambda l: l.rstrip('\n'), noop_lines[0])

    src += [
        '',
        'class FooNoopPlugin(PieCrustPlugin):',
        '    def getProcessors(self):',
        '        yield FooProcessor(%s)' % repr(foo_exts),
        '        yield NoopProcessor(%s)' % repr(noop_exts),
        '',
        '__piecrust_plugin__ = FooNoopPlugin']

    fs.withFile('kitchen/plugins/foonoop.py', src)


def _bake_assets(fs):
    fs.runChef('bake', '-p', 'asset')


def test_empty():
    fs = _get_test_fs()
    with mock_fs_scope(fs):
        expected = {}
        assert expected == fs.getStructure('counter')
        _bake_assets(fs)
        expected = {}
        assert expected == fs.getStructure('counter')


def test_one_file():
    fs = (_get_test_fs()
          .withFile('kitchen/assets/something.html', 'A test file.'))
    with mock_fs_scope(fs):
        expected = {}
        assert expected == fs.getStructure('counter')
        _bake_assets(fs)
        expected = {'something.html': 'A test file.'}
        assert expected == fs.getStructure('counter')


def test_one_level_dirtyness():
    fs = (_get_test_fs()
          .withFile('kitchen/assets/blah.foo', 'A test file.'))
    with mock_fs_scope(fs):
        _bake_assets(fs)
        expected = {'blah.foo': 'A test file.'}
        assert expected == fs.getStructure('counter')
        mtime = os.path.getmtime(fs.path('/counter/blah.foo'))
        assert abs(time.time() - mtime) <= 2

        time.sleep(1)
        _bake_assets(fs)
        assert expected == fs.getStructure('counter')
        assert mtime == os.path.getmtime(fs.path('/counter/blah.foo'))

        time.sleep(1)
        fs.withFile('kitchen/assets/blah.foo', 'A new test file.')
        _bake_assets(fs)
        expected = {'blah.foo': 'A new test file.'}
        assert expected == fs.getStructure('counter')
        assert mtime < os.path.getmtime(fs.path('/counter/blah.foo'))


def test_two_levels_dirtyness():
    fs = (_get_test_fs()
          .withFile('kitchen/assets/blah.foo', 'A test file.'))
    _create_test_plugin(fs, foo_exts=('foo', 'bar'))
    with mock_fs_scope(fs):
        _bake_assets(fs)
        expected = {'blah.bar': 'FOO: A test file.'}
        assert expected == fs.getStructure('counter')
        mtime = os.path.getmtime(fs.path('/counter/blah.bar'))
        assert abs(time.time() - mtime) <= 2

        time.sleep(1)
        _bake_assets(fs)
        assert expected == fs.getStructure('counter')
        assert mtime == os.path.getmtime(fs.path('/counter/blah.bar'))

        time.sleep(1)
        fs.withFile('kitchen/assets/blah.foo', 'A new test file.')
        _bake_assets(fs)
        expected = {'blah.bar': 'FOO: A new test file.'}
        assert expected == fs.getStructure('counter')
        assert mtime < os.path.getmtime(fs.path('/counter/blah.bar'))


def test_removed():
    fs = (_get_test_fs()
          .withFile('kitchen/assets/blah1.foo', 'A test file.')
          .withFile('kitchen/assets/blah2.foo', 'Ooops'))
    with mock_fs_scope(fs):
        expected = {
            'blah1.foo': 'A test file.',
            'blah2.foo': 'Ooops'}
        assert expected == fs.getStructure('kitchen/assets')
        _bake_assets(fs)
        assert expected == fs.getStructure('counter')

        time.sleep(1)
        os.remove(fs.path('/kitchen/assets/blah2.foo'))
        expected = {
            'blah1.foo': 'A test file.'}
        assert expected == fs.getStructure('kitchen/assets')
        _bake_assets(1)
        assert expected == fs.getStructure('counter')


def test_record_version_change():
    fs = (_get_test_fs()
          .withFile('kitchen/assets/blah.foo', 'A test file.'))
    _create_test_plugin(fs, foo_exts=('foo', 'foo'))
    with mock_fs_scope(fs):
        _bake_assets(fs)
        assert os.path.exists(fs.path('/counter/blah.foo')) is True
        mtime = os.path.getmtime(fs.path('/counter/blah.foo'))

        time.sleep(1)
        _bake_assets(fs)
        assert mtime == os.path.getmtime(fs.path('/counter/blah.foo'))

        time.sleep(1)
        MultiRecord.RECORD_VERSION += 1
        try:
            _bake_assets(fs)
            assert mtime < os.path.getmtime(fs.path('/counter/blah.foo'))
        finally:
            MultiRecord.RECORD_VERSION -= 1


@pytest.mark.parametrize('patterns, expected', [
    (['_'],
     {'something.html': 'A test file.'}),
    (['html'],
     {}),
    (['/^_/'],
     {'something.html': 'A test file.',
      'foo': {'_important.html': 'Important!'}})
])
def test_ignore_pattern(patterns, expected):
    fs = (_get_test_fs()
          .withFile('kitchen/assets/something.html', 'A test file.')
          .withFile('kitchen/assets/_hidden.html', 'Shhh')
          .withFile('kitchen/assets/foo/_important.html', 'Important!'))
    fs.withConfig({'pipelines': {'asset': {'ignore': patterns}}})
    with mock_fs_scope(fs):
        assert {} == fs.getStructure('counter')
        _bake_assets(fs)
        assert expected == fs.getStructure('counter')


@pytest.mark.parametrize('names, expected', [
    ('all', ['cleancss', 'compass', 'copy', 'concat', 'less', 'requirejs',
             'sass', 'sitemap', 'uglifyjs', 'pygments_style']),
    ('all -sitemap', ['cleancss', 'copy', 'compass', 'concat', 'less',
                      'requirejs', 'sass', 'uglifyjs', 'pygments_style']),
    ('-sitemap -less -sass all', ['cleancss', 'copy', 'compass', 'concat',
                                  'requirejs', 'uglifyjs',
                                  'pygments_style']),
    ('copy', ['copy']),
    ('less sass', ['less', 'sass'])
])
def test_filter_processor(names, expected):
    fs = mock_fs().withConfig()
    with mock_fs_scope(fs):
        app = fs.getApp()
        processors = app.plugin_loader.getProcessors()
        procs = get_filtered_processors(processors, names)
        actual = [p.PROCESSOR_NAME for p in procs]
        assert sorted(actual) == sorted(expected)