0
|
1 import io
|
|
2 import os
|
|
3 import os.path
|
|
4 import re
|
|
5 import logging
|
|
6 import tempfile
|
|
7 import pytest
|
|
8 import silorider.main
|
|
9
|
|
10
|
|
11 # def pytest_collect_file(parent, path):
|
|
12 # if path.ext == ".html" and path.basename.startswith("feeds"):
|
|
13 # return FeedFile(path, parent)
|
|
14
|
|
15
|
|
16 re_feed_test_sep = re.compile(r'^---$')
|
|
17
|
|
18
|
|
19 class FeedFile(pytest.File):
|
|
20 def collect(self):
|
|
21 with self.fspath.open() as fp:
|
|
22 markup = fp.read()
|
|
23
|
|
24 name = self.fspath.basename
|
|
25 html_markup, yaml_markup = re_feed_test_sep.split(markup, 1)
|
|
26 yield FeedItem(name, self, html_markup, yaml_markup)
|
|
27
|
|
28
|
|
29 class FeedItem(pytest.Item):
|
|
30 def __init__(self, name, parent, in_spec, out_spec):
|
|
31 super().__init__(name, parent)
|
|
32 self.in_spec = in_spec
|
|
33 self.out_spec = out_spec
|
|
34
|
|
35 def runtest(self):
|
|
36 pass
|
|
37
|
|
38
|
|
39 @pytest.fixture
|
|
40 def cli():
|
|
41 return CliRunner()
|
|
42
|
|
43
|
|
44 class CliRunner:
|
|
45 def __init__(self):
|
|
46 self._cfgtxt = """
|
|
47 [cache]
|
|
48 uri=memory://for_test
|
|
49 """
|
|
50 self._pre_hooks = []
|
|
51 self._cleanup = []
|
|
52
|
|
53 def getFeedPath(self, name):
|
|
54 return os.path.join(os.path.dirname(__file__),
|
|
55 'feeds',
|
|
56 '%s.html' % name)
|
|
57
|
|
58 def createTempFeed(self, contents):
|
|
59 tmpfd, tmpname = tempfile.mkstemp()
|
|
60 with os.fdopen(tmpfd, 'w', encoding='utf8') as tmpfp:
|
|
61 tmpfp.write(contents)
|
|
62 self._cleanup.append(tmpname)
|
|
63 return tmpname
|
|
64
|
|
65 def setConfig(self, cfgtxt):
|
|
66 self._cfgtxt = cfgtxt
|
|
67 return self
|
|
68
|
|
69 def appendConfig(self, cfgtxt):
|
|
70 self._cfgtxt += cfgtxt
|
|
71 return self
|
|
72
|
|
73 def appendSiloConfig(self, silo_name, silo_type, **options):
|
|
74 cfgtxt = '[silo:%s]\n' % silo_name
|
|
75 cfgtxt += 'type=%s\n' % silo_type
|
|
76 if options is not None:
|
|
77 for n, v in options.items():
|
|
78 cfgtxt += '%s=%s\n' % (n, v)
|
|
79 return self.appendConfig(cfgtxt)
|
|
80
|
|
81 def preExecHook(self, hook):
|
|
82 self._pre_hooks.append(hook)
|
|
83
|
|
84 def run(self, *args):
|
|
85 pre_args = []
|
|
86 if self._cfgtxt:
|
|
87 tmpfd, tmpcfg = tempfile.mkstemp()
|
|
88 print("Creating temporary configuration file: %s" % tmpcfg)
|
|
89 with os.fdopen(tmpfd, 'w') as tmpfp:
|
|
90 tmpfp.write(self._cfgtxt)
|
|
91 self._cleanup.append(tmpcfg)
|
|
92 pre_args = ['-c', tmpcfg]
|
|
93
|
|
94 captured = io.StringIO()
|
|
95 handler = logging.StreamHandler(captured)
|
|
96 handler.setLevel(logging.INFO)
|
|
97 silorider_logger = logging.getLogger('silorider')
|
|
98 silorider_logger.addHandler(handler)
|
|
99
|
|
100 main_ctx = None
|
|
101 main_res = None
|
|
102
|
|
103 def pre_exec_hook(ctx):
|
|
104 for h in self._pre_hooks:
|
|
105 h(ctx)
|
|
106
|
|
107 def post_exec_hook(ctx, res):
|
|
108 nonlocal main_ctx, main_res
|
|
109 main_ctx = ctx
|
|
110 main_res = res
|
|
111
|
|
112 silorider.main.pre_exec_hook = pre_exec_hook
|
|
113 silorider.main.post_exec_hook = post_exec_hook
|
|
114
|
|
115 args = pre_args + list(args)
|
|
116 print("Running command: %s" % list(args))
|
|
117 try:
|
|
118 silorider.main._unsafe_main(args)
|
|
119 finally:
|
|
120 silorider.main.pre_exec_hook = None
|
|
121 silorider.main.post_exec_hook = None
|
|
122
|
|
123 silorider_logger.removeHandler(handler)
|
|
124
|
|
125 print("Cleaning %d temporary files." % len(self._cleanup))
|
|
126 for tmpname in self._cleanup:
|
|
127 os.remove(tmpname)
|
|
128
|
|
129 return main_ctx, main_res
|
|
130
|
|
131
|
|
132 @pytest.fixture
|
|
133 def feedutil():
|
|
134 return FeedUtil()
|
|
135
|
|
136
|
|
137 class FeedUtil:
|
|
138 def makeFeed(self, *entries):
|
|
139 feed = '<html><body>\n'
|
|
140 for e in entries:
|
|
141 feed += '<article class="h-entry">\n'
|
|
142 feed += e
|
|
143 feed += '</article>\n'
|
|
144 feed += '</body></html>'
|
|
145 return feed
|