Mercurial > jouvence
changeset 0:243401c49520
Initial commit.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 02 Jan 2017 12:30:49 -0800 |
parents | |
children | 74b83e3d921e |
files | .hgignore dev-requirements.txt fontaine/__init__.py fontaine/document.py fontaine/parser.py tests/__init__.py tests/conftest.py tests/test_action.yaml tests/test_character.yaml tests/test_parenthetical.yaml tests/test_titlepage.yaml |
diffstat | 9 files changed, 709 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,4 @@ +syntax:glob +__pycache__ +.cache +*.pyc
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dev-requirements.txt Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,3 @@ +py==1.4.32 +pytest==3.0.5 +PyYAML==3.12
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fontaine/document.py Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,97 @@ +import sys + + +class FontaineDocument: + def __init__(self): + self.title_values = {} + self.scenes = [] + + def addScene(self, header=None): + s = FontaineScene() + if header: + s.header = header + self.scenes.append(s) + return s + + def lastScene(self, auto_create=True): + try: + return self.scenes[-1] + except IndexError: + if auto_create: + s = self.addScene() + return s + return None + + def lastParagraph(self): + s = self.lastScene(False) + if s: + return s.lastParagraph() + return None + + +class FontaineScene: + def __init__(self): + self.header = None + self.paragraphs = [] + self._adders = {} + + def __getattr__(self, name): + if name.startswith('add'): + add_type_name = name[3:] + try: + adder = self._adders[add_type_name] + except KeyError: + module = sys.modules[__name__] + add_type = getattr(module, + 'TYPE_%s' % add_type_name.upper()) + + def _type_adder(_text): + self.paragraphs.append( + FontaineSceneElement(add_type, _text)) + + adder = _type_adder + self._adders[add_type_name] = adder + return adder + else: + raise AttributeError + + def lastParagraph(self): + try: + return self.paragraphs[-1] + except IndexError: + return None + + +class FontaineSceneElement: + def __init__(self, el_type, text): + self.type = el_type + self.text = text + + def __str__(self): + return '%s: %s' % ( + _scene_element_type_str(self.type), + _ellipsis(self.text, 15)) + + +TYPE_ACTION = 0 +TYPE_CHARACTER = 1 +TYPE_DIALOG = 2 +TYPE_PARENTHETICAL = 3 + + +def _scene_element_type_str(t): + if t == TYPE_ACTION: + return 'ACTION' + if t == TYPE_CHARACTER: + return 'CHARACTER' + if t == TYPE_DIALOG: + return 'DIALOG' + if t == TYPE_PARENTHETICAL: + return 'PARENTHETICAL' + raise NotImplementedError() + + +def _ellipsis(text, length): + if len(text) > length: + return text[:length - 3] + '...' + return text
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fontaine/parser.py Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,388 @@ +import re +import logging + + +logger = logging.getLogger(__name__) + + +class FontaineState: + can_merge = False + + def match(self, fp, ctx): + return False + + def consume(self, fp, ctx): + raise NotImplementedError() + + def exit(self, ctx): + pass + + +class FontaineParserError(Exception): + def __init__(self, line_no, message): + super().__init__("Error line %d: %s" % (line_no, message)) + + +ANY_STATE = object() +EOF_STATE = object() + + +RE_EMPTY_LINE = re.compile(r"^$", re.M) +RE_BLANK_LINE = re.compile(r"^\s*$", re.M) + +RE_TITLE_KEY_VALUE = re.compile(r"^(?P<key>[\w\s\-]+)\s*:") + + +class _TitlePageState(FontaineState): + def __init__(self): + super().__init__() + self._cur_key = None + self._cur_val = None + + def consume(self, fp, ctx): + line = fp.readline() + if not line: + return EOF_STATE + + if RE_EMPTY_LINE.match(line): + self._commit(ctx) + # Finished with the page title, now move on to the first scene. + # However, if we never had any page title, go back to the beginning + # so we don't consume anybody else's empty lines. + if len(ctx.document.title_values) == 0: + fp.seek0() + return ANY_STATE + + m = RE_TITLE_KEY_VALUE.match(line) + if m: + # Commit current value, start new one. + self._commit(ctx) + self._cur_key = m.group('key') + self._cur_val = line[m.end():].strip() + else: + if self._cur_val is None: + if len(ctx.document.title_values) == 0: + # Early exit because there's no title page. + # Go back to the beginning so we don't consume somebody's + # first line of text. + fp.seek0() + return ANY_STATE + + raise FontaineParserError( + fp.line_no, + "Page title needs to be followed by 2 empty lines.") + + # Keep accumulating the value of one of the title page's values. + self._cur_val += line.strip() + return True + + def exit(self, ctx): + self._commit(ctx) + + def _commit(self, ctx): + if self._cur_key is not None: + ctx.document.title_values[self._cur_key] = self._cur_val + self._cur_key = None + self._cur_val = None + + +RE_SCENE_HEADER_PATTERN = re.compile( + r"^(int|ext|est|int/ext|int./ext|i/e)[\s\.]", re.I) + + +class _SceneHeaderState(FontaineState): + def match(self, fp, ctx): + lines = fp.peeklines(2) + return ( + RE_EMPTY_LINE.match(lines[0]) and + RE_SCENE_HEADER_PATTERN.match(lines[1])) + + def consume(self, fp, ctx): + fp.readline() # Get past the blank line. + line = fp.readline().rstrip('\r\n') + line = line.lstrip('.') # In case it was forced. + ctx.document.addScene(line) + return ANY_STATE + + +class _ActionState(FontaineState): + can_merge = True + + def __init__(self): + super().__init__() + self.text = '' + + def match(self, fp, ctx): + return True + + def consume(self, fp, ctx): + is_first_line = True + while True: + line = fp.readline() + if not line: + return EOF_STATE + + if is_first_line: + line = line.lstrip('!') + is_first_line = False + + self.text += line + + if RE_EMPTY_LINE.match(fp.peekline()): + break + + return ANY_STATE + + def exit(self, ctx): + ctx.document.lastScene().addAction(self.text) + + +RE_CHARACTER_LINE = re.compile(r"^[A-Z\-]+\s*(\(.*\))?$", re.M) + + +class _CharacterState(FontaineState): + def match(self, fp, ctx): + lines = fp.peeklines(3) + return (RE_EMPTY_LINE.match(lines[0]) and + RE_CHARACTER_LINE.match(lines[1]) and + not RE_EMPTY_LINE.match(lines[2])) + + def consume(self, fp, ctx): + fp.readline() # Get past the empty line. + line = fp.readline().rstrip('\r\n') + line = line.lstrip('@') # In case it was forced. + ctx.document.lastScene().addCharacter(line) + return [_ParentheticalState, _DialogState] + + +RE_PARENTHETICAL_LINE = re.compile(r"^\s*\(.*\)\s*$", re.M) + + +class _ParentheticalState(FontaineState): + def match(self, fp, ctx): + # We only get here from a `_CharacterState` so we know the previous + # one is already that. + line = fp.peekline() + return RE_PARENTHETICAL_LINE.match(line) + + def consume(self, fp, ctx): + line = fp.readline().rstrip('\r\n') + ctx.document.lastScene().addParenthetical(line) + return [_DialogState, _CharacterState, _ActionState] + + +class _DialogState(FontaineState): + def __init__(self): + super().__init__() + self.text = '' + + def match(self, fp, ctx): + line = fp.peekline() + return not RE_EMPTY_LINE.match(line) + + def consume(self, fp, ctx): + while True: + line = fp.readline() + if not line: + return EOF_STATE + self.text += line + if RE_EMPTY_LINE.match(fp.peekline()): + break + return ANY_STATE + + def exit(self, ctx): + ctx.document.lastScene().addDialog(self.text.rstrip('\r\n')) + + +class _LyricsState(FontaineState): + pass + + +class _TransitionState(FontaineState): + pass + + +class _ForcedParagraphStates(FontaineState): + STATE_SYMBOLS = { + '.': _SceneHeaderState, + '!': _ActionState, + '@': _CharacterState, + '~': _LyricsState, + '>': _TransitionState + } + + def __init__(self): + super().__init__() + self._state_cls = None + + def match(self, fp, ctx): + lines = fp.peeklines(2) + if (RE_EMPTY_LINE.match(lines[0]) and + lines[1][:1] in self.STATE_SYMBOLS): + self._state_cls = self.STATE_SYMBOLS[lines[1][:1]] + return True + return False + + def consume(self, fp, ctx): + return self._state_cls() + + +STATES = [ + _ForcedParagraphStates, # Must be first. + _SceneHeaderState, + _CharacterState, + _TransitionState, + _ActionState, # Must be last. +] + + +class _PeekableFile: + def __init__(self, fp): + self.line_no = 1 + self._fp = fp + + def read(self, size=-1): + return self._doRead(size, True) + + def read1(self): + return self.read(1) + + def peek1(self): + pos = self._fp.tell() + c = self._doRead(1, False) + self._fp.seek(pos) + return c + + def readline(self, size=-1): + data = self._fp.readline(size) + self.line_no += 1 + return data + + def peekline(self): + pos = self._fp.tell() + line = self._fp.readline() + self._fp.seek(pos) + return line + + def peeklines(self, count): + pos = self._fp.tell() + lines = [] + for i in range(count): + lines.append(self._fp.readline()) + self._fp.seek(pos) + return lines + + def seek0(self): + self._fp.seek(0) + self.line_no = 1 + + def _doRead(self, size, advance_line_no): + data = self._fp.read(size) + if advance_line_no: + self.line_no += data.count('\n') + return data + + +class _FontaineStateMachine: + def __init__(self, fp, doc): + self.fp = _PeekableFile(fp) + self.state = None + self.document = doc + + @property + def line_no(self): + return self.fp.line_no + + def run(self): + self.state = _TitlePageState() + while True: + logger.debug("State '%s' consuming from '%s'..." % + (self.state.__class__.__name__, self.fp.peekline())) + res = self.state.consume(self.fp, self) + + # See if we reached the end of the file. + if not self.fp.peekline(): + logger.debug("Reached end of line... ending parsing.") + res = EOF_STATE + + # Figure out what to do next... + + if res is None: + raise Exception( + "States need to return `ANY_STATE`, one or more specific " + "states, or `EOF_STATE` if they reached the end of the " + "file.") + + if res is True: + # State continues to consume. + continue + + if res is ANY_STATE or isinstance(res, list): + # State wants to exit, we need to figure out what is the + # next state. + pos = self.fp._fp.tell() + next_states = res + if next_states is ANY_STATE: + next_states = STATES + logger.debug("Trying to match next state from: %s" % + [t.__name__ for t in next_states]) + for sc in next_states: + s = sc() + if s.match(self.fp, self): + logger.debug("Matched state %s" % + s.__class__.__name__) + self.fp._fp.seek(pos) + res = s + break + else: + raise Exception("Can't match following state after: %s" % + self.state) + if self.state: + if type(self.state) == type(res) and self.state.can_merge: + # Don't switch states if the next state is the same + # type and that type supports merging. + continue + + self.state.exit(self) + + self.state = res + continue + + if isinstance(res, FontaineState): + # State wants to exit, wants a specific state to be next. + if self.state: + self.state.exit(self) + self.state = res + continue + + if res is EOF_STATE: + # Reached end of file. + if self.state: + self.state.exit(self) + break + + raise Exception("Unsupported state result: %s" % res) + + +class FontaineParser: + def __init__(self): + pass + + def parse(self, filein): + if isinstance(filein, str): + with open(filein, 'r') as fp: + return self._doParse(fp) + else: + return self._doParse(fp) + + def parseString(self, text): + import io + with io.StringIO(text) as fp: + return self._doParse(fp) + + def _doParse(self, fp): + from .document import FontaineDocument + doc = FontaineDocument() + machine = _FontaineStateMachine(fp, doc) + machine.run() + return doc
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/conftest.py Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,134 @@ +import sys +import logging +import yaml +import pytest +from fontaine.document import ( + FontaineSceneElement, + TYPE_ACTION, TYPE_CHARACTER, TYPE_DIALOG, TYPE_PARENTHETICAL) +from fontaine.parser import FontaineParser, FontaineParserError + + +def pytest_addoption(parser): + parser.addoption( + '--log-debug', + action='store_true', + help="Sets the Fontaine logger to output debug info to stdout.") + + +def pytest_configure(config): + if config.getoption('--log-debug'): + hdl = logging.StreamHandler(stream=sys.stdout) + logging.getLogger('fontaine').addHandler(hdl) + logging.getLogger('fontaine').setLevel(logging.DEBUG) + + +def pytest_collect_file(parent, path): + if path.ext == '.yaml' and path.basename.startswith("test"): + return FontaineScriptTestFile(path, parent) + return None + + +def assert_scenes(actual, scenes): + assert len(actual) == len(scenes) + for a, e in zip(actual, scenes): + assert_scene(a, e[0], e[1:]) + + +def assert_scene(actual, header, paragraphs): + if header is not None: + assert actual.header == header + assert len(actual.paragraphs) == len(paragraphs) + for a, e in zip(actual.paragraphs, paragraphs): + assert_paragraph(a, e) + + +def assert_paragraph(actual, expected): + if isinstance(expected, str): + assert isinstance(actual, FontaineSceneElement) + assert actual.type == TYPE_ACTION + assert actual.text == expected + elif isinstance(expected, FontaineSceneElement): + assert isinstance(actual, FontaineSceneElement) + assert actual.type == expected.type + assert actual.text == expected.text + else: + raise NotImplementedError("Don't know what this is: %s" % expected) + + +def _c(name): + return FontaineSceneElement(TYPE_CHARACTER, name) + + +def _p(text): + return FontaineSceneElement(TYPE_PARENTHETICAL, text) + + +def _d(text): + return FontaineSceneElement(TYPE_DIALOG, text) + + +class FontaineScriptTestFile(pytest.File): + def collect(self): + spec = yaml.load_all(self.fspath.open(encoding='utf8')) + for i, item in enumerate(spec): + name = '%s_%d' % (self.fspath.basename, i) + if 'test_name' in item: + name += '_%s' % item['test_name'] + yield FontaineScriptTestItem(name, self, item) + + +class FontaineScriptTestItem(pytest.Item): + def __init__(self, name, parent, spec): + super().__init__(name, parent) + self.spec = spec + + def reportinfo(self): + return self.fspath, 0, "fontaine script test: %s" % self.name + + def runtest(self): + intext = self.spec.get('in') + expected = self.spec.get('out') + title = self.spec.get('title') + if intext is None or expected is None: + raise Exception("No 'in' or 'out' specified.") + + parser = FontaineParser() + doc = parser.parseString(intext) + if title is not None: + assert title == doc.title_values + assert_scenes(doc.scenes, make_scenes(expected)) + + def repr_failure(self, excinfo): + if isinstance(excinfo.value, FontaineParserError): + return ('\n'.join( + ['Parser error:', str(excinfo.value)])) + return super().repr_failure(excinfo) + + +def make_scenes(spec): + if not isinstance(spec, list): + raise Exception("Script specs must be lists.") + + out = [] + cur_header = None + cur_paras = [] + + for item in spec: + token = item[:1] + if token == '.': + if cur_header or cur_paras: + out.append([cur_header] + cur_paras) + cur_header = item[1:] + elif token == '!': + cur_paras.append(item[1:]) + elif token == '@': + cur_paras.append(_c(item[1:])) + elif token == '=': + cur_paras.append(_d(item[1:])) + elif token == '_': + cur_paras.append(_p(item[1:])) + else: + raise Exception("Unknown token: %s" % token) + if cur_header or cur_paras: + out.append([cur_header] + cur_paras) + return out
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_action.yaml Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,16 @@ +--- +in: "This is a simple action." +out: + - "!This is a simple action." +--- +in: "This is a multi\nline\naction." +out: + - "!This is a multi\nline\naction." +--- +in: "This is a multi\n\nparagraph\n\n\naction." +out: + - "!This is a multi\n\nparagraph\n\n\naction." +--- +in: "!EXT. ACTION. FORCED." +out: + - "!EXT. ACTION. FORCED."
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_character.yaml Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,20 @@ +--- +in: "\nSTEEL\nThe man's a myth!" +out: + - '@STEEL' + - "=The man's a myth!" +--- +in: "\nMOM (O. S.)\nLuke! Come down for supper!" +out: + - '@MOM (O. S.)' + - "=Luke! Come down for supper!" +--- +in: "\nHANS (on the radio)\nWhat was it you said?" +out: + - '@HANS (on the radio)' + - "=What was it you said?" +--- +in: "\n@McCLANE\nYippie ki-yay!" +out: + - '@McCLANE' + - "=Yippie ki-yay!"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_parenthetical.yaml Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,10 @@ +--- +in: | + + STEEL + (starting the engine) + So much for retirement! +out: + - '@STEEL' + - '_(starting the engine)' + - '=So much for retirement!'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_titlepage.yaml Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,37 @@ +--- +in: "" +title: {} +out: [] +--- +in: "\n" +title: {} +out: + - "!\n" +--- +in: "\n\n" +title: {} +out: + - "!\n\n" +--- +in: "\n\n\n" +title: {} +out: + - "!\n\n\n" +--- +in: | + Title: This simple test + Author: Ludovic +title: + Title: "This simple test" + Author: "Ludovic" +out: [] +--- +in: | + Title: This simple test + + It doesn't have much. +title: + Title: "This simple test" +out: + - "!It doesn't have much.\n" +