changeset 0:243401c49520

Initial commit.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 02 Jan 2017 12:30:49 -0800
parents
children 74b83e3d921e
files .hgignore dev-requirements.txt fontaine/__init__.py fontaine/document.py fontaine/parser.py tests/__init__.py tests/conftest.py tests/test_action.yaml tests/test_character.yaml tests/test_parenthetical.yaml tests/test_titlepage.yaml
diffstat 9 files changed, 709 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,4 @@
+syntax:glob
+__pycache__
+.cache
+*.pyc
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dev-requirements.txt	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,3 @@
+py==1.4.32
+pytest==3.0.5
+PyYAML==3.12
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/fontaine/document.py	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,97 @@
+import sys
+
+
+class FontaineDocument:
+    def __init__(self):
+        self.title_values = {}
+        self.scenes = []
+
+    def addScene(self, header=None):
+        s = FontaineScene()
+        if header:
+            s.header = header
+        self.scenes.append(s)
+        return s
+
+    def lastScene(self, auto_create=True):
+        try:
+            return self.scenes[-1]
+        except IndexError:
+            if auto_create:
+                s = self.addScene()
+                return s
+            return None
+
+    def lastParagraph(self):
+        s = self.lastScene(False)
+        if s:
+            return s.lastParagraph()
+        return None
+
+
+class FontaineScene:
+    def __init__(self):
+        self.header = None
+        self.paragraphs = []
+        self._adders = {}
+
+    def __getattr__(self, name):
+        if name.startswith('add'):
+            add_type_name = name[3:]
+            try:
+                adder = self._adders[add_type_name]
+            except KeyError:
+                module = sys.modules[__name__]
+                add_type = getattr(module,
+                                   'TYPE_%s' % add_type_name.upper())
+
+                def _type_adder(_text):
+                    self.paragraphs.append(
+                        FontaineSceneElement(add_type, _text))
+
+                adder = _type_adder
+                self._adders[add_type_name] = adder
+            return adder
+        else:
+            raise AttributeError
+
+    def lastParagraph(self):
+        try:
+            return self.paragraphs[-1]
+        except IndexError:
+            return None
+
+
+class FontaineSceneElement:
+    def __init__(self, el_type, text):
+        self.type = el_type
+        self.text = text
+
+    def __str__(self):
+        return '%s: %s' % (
+            _scene_element_type_str(self.type),
+            _ellipsis(self.text, 15))
+
+
+TYPE_ACTION = 0
+TYPE_CHARACTER = 1
+TYPE_DIALOG = 2
+TYPE_PARENTHETICAL = 3
+
+
+def _scene_element_type_str(t):
+    if t == TYPE_ACTION:
+        return 'ACTION'
+    if t == TYPE_CHARACTER:
+        return 'CHARACTER'
+    if t == TYPE_DIALOG:
+        return 'DIALOG'
+    if t == TYPE_PARENTHETICAL:
+        return 'PARENTHETICAL'
+    raise NotImplementedError()
+
+
+def _ellipsis(text, length):
+    if len(text) > length:
+        return text[:length - 3] + '...'
+    return text
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/fontaine/parser.py	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,388 @@
+import re
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FontaineState:
+    can_merge = False
+
+    def match(self, fp, ctx):
+        return False
+
+    def consume(self, fp, ctx):
+        raise NotImplementedError()
+
+    def exit(self, ctx):
+        pass
+
+
+class FontaineParserError(Exception):
+    def __init__(self, line_no, message):
+        super().__init__("Error line %d: %s" % (line_no, message))
+
+
+ANY_STATE = object()
+EOF_STATE = object()
+
+
+RE_EMPTY_LINE = re.compile(r"^$", re.M)
+RE_BLANK_LINE = re.compile(r"^\s*$", re.M)
+
+RE_TITLE_KEY_VALUE = re.compile(r"^(?P<key>[\w\s\-]+)\s*:")
+
+
+class _TitlePageState(FontaineState):
+    def __init__(self):
+        super().__init__()
+        self._cur_key = None
+        self._cur_val = None
+
+    def consume(self, fp, ctx):
+        line = fp.readline()
+        if not line:
+            return EOF_STATE
+
+        if RE_EMPTY_LINE.match(line):
+            self._commit(ctx)
+            # Finished with the page title, now move on to the first scene.
+            # However, if we never had any page title, go back to the beginning
+            # so we don't consume anybody else's empty lines.
+            if len(ctx.document.title_values) == 0:
+                fp.seek0()
+            return ANY_STATE
+
+        m = RE_TITLE_KEY_VALUE.match(line)
+        if m:
+            # Commit current value, start new one.
+            self._commit(ctx)
+            self._cur_key = m.group('key')
+            self._cur_val = line[m.end():].strip()
+        else:
+            if self._cur_val is None:
+                if len(ctx.document.title_values) == 0:
+                    # Early exit because there's no title page.
+                    # Go back to the beginning so we don't consume somebody's
+                    # first line of text.
+                    fp.seek0()
+                    return ANY_STATE
+
+                raise FontaineParserError(
+                    fp.line_no,
+                    "Page title needs to be followed by 2 empty lines.")
+
+            # Keep accumulating the value of one of the title page's values.
+            self._cur_val += line.strip()
+        return True
+
+    def exit(self, ctx):
+        self._commit(ctx)
+
+    def _commit(self, ctx):
+        if self._cur_key is not None:
+            ctx.document.title_values[self._cur_key] = self._cur_val
+            self._cur_key = None
+            self._cur_val = None
+
+
+RE_SCENE_HEADER_PATTERN = re.compile(
+    r"^(int|ext|est|int/ext|int./ext|i/e)[\s\.]", re.I)
+
+
+class _SceneHeaderState(FontaineState):
+    def match(self, fp, ctx):
+        lines = fp.peeklines(2)
+        return (
+            RE_EMPTY_LINE.match(lines[0]) and
+            RE_SCENE_HEADER_PATTERN.match(lines[1]))
+
+    def consume(self, fp, ctx):
+        fp.readline()  # Get past the blank line.
+        line = fp.readline().rstrip('\r\n')
+        line = line.lstrip('.')  # In case it was forced.
+        ctx.document.addScene(line)
+        return ANY_STATE
+
+
+class _ActionState(FontaineState):
+    can_merge = True
+
+    def __init__(self):
+        super().__init__()
+        self.text = ''
+
+    def match(self, fp, ctx):
+        return True
+
+    def consume(self, fp, ctx):
+        is_first_line = True
+        while True:
+            line = fp.readline()
+            if not line:
+                return EOF_STATE
+
+            if is_first_line:
+                line = line.lstrip('!')
+                is_first_line = False
+
+            self.text += line
+
+            if RE_EMPTY_LINE.match(fp.peekline()):
+                break
+
+        return ANY_STATE
+
+    def exit(self, ctx):
+        ctx.document.lastScene().addAction(self.text)
+
+
+RE_CHARACTER_LINE = re.compile(r"^[A-Z\-]+\s*(\(.*\))?$", re.M)
+
+
+class _CharacterState(FontaineState):
+    def match(self, fp, ctx):
+        lines = fp.peeklines(3)
+        return (RE_EMPTY_LINE.match(lines[0]) and
+                RE_CHARACTER_LINE.match(lines[1]) and
+                not RE_EMPTY_LINE.match(lines[2]))
+
+    def consume(self, fp, ctx):
+        fp.readline()  # Get past the empty line.
+        line = fp.readline().rstrip('\r\n')
+        line = line.lstrip('@')  # In case it was forced.
+        ctx.document.lastScene().addCharacter(line)
+        return [_ParentheticalState, _DialogState]
+
+
+RE_PARENTHETICAL_LINE = re.compile(r"^\s*\(.*\)\s*$", re.M)
+
+
+class _ParentheticalState(FontaineState):
+    def match(self, fp, ctx):
+        # We only get here from a `_CharacterState` so we know the previous
+        # one is already that.
+        line = fp.peekline()
+        return RE_PARENTHETICAL_LINE.match(line)
+
+    def consume(self, fp, ctx):
+        line = fp.readline().rstrip('\r\n')
+        ctx.document.lastScene().addParenthetical(line)
+        return [_DialogState, _CharacterState, _ActionState]
+
+
+class _DialogState(FontaineState):
+    def __init__(self):
+        super().__init__()
+        self.text = ''
+
+    def match(self, fp, ctx):
+        line = fp.peekline()
+        return not RE_EMPTY_LINE.match(line)
+
+    def consume(self, fp, ctx):
+        while True:
+            line = fp.readline()
+            if not line:
+                return EOF_STATE
+            self.text += line
+            if RE_EMPTY_LINE.match(fp.peekline()):
+                break
+        return ANY_STATE
+
+    def exit(self, ctx):
+        ctx.document.lastScene().addDialog(self.text.rstrip('\r\n'))
+
+
+class _LyricsState(FontaineState):
+    pass
+
+
+class _TransitionState(FontaineState):
+    pass
+
+
+class _ForcedParagraphStates(FontaineState):
+    STATE_SYMBOLS = {
+        '.': _SceneHeaderState,
+        '!': _ActionState,
+        '@': _CharacterState,
+        '~': _LyricsState,
+        '>': _TransitionState
+    }
+
+    def __init__(self):
+        super().__init__()
+        self._state_cls = None
+
+    def match(self, fp, ctx):
+        lines = fp.peeklines(2)
+        if (RE_EMPTY_LINE.match(lines[0]) and
+                lines[1][:1] in self.STATE_SYMBOLS):
+            self._state_cls = self.STATE_SYMBOLS[lines[1][:1]]
+            return True
+        return False
+
+    def consume(self, fp, ctx):
+        return self._state_cls()
+
+
+STATES = [
+    _ForcedParagraphStates,  # Must be first.
+    _SceneHeaderState,
+    _CharacterState,
+    _TransitionState,
+    _ActionState,  # Must be last.
+]
+
+
+class _PeekableFile:
+    def __init__(self, fp):
+        self.line_no = 1
+        self._fp = fp
+
+    def read(self, size=-1):
+        return self._doRead(size, True)
+
+    def read1(self):
+        return self.read(1)
+
+    def peek1(self):
+        pos = self._fp.tell()
+        c = self._doRead(1, False)
+        self._fp.seek(pos)
+        return c
+
+    def readline(self, size=-1):
+        data = self._fp.readline(size)
+        self.line_no += 1
+        return data
+
+    def peekline(self):
+        pos = self._fp.tell()
+        line = self._fp.readline()
+        self._fp.seek(pos)
+        return line
+
+    def peeklines(self, count):
+        pos = self._fp.tell()
+        lines = []
+        for i in range(count):
+            lines.append(self._fp.readline())
+        self._fp.seek(pos)
+        return lines
+
+    def seek0(self):
+        self._fp.seek(0)
+        self.line_no = 1
+
+    def _doRead(self, size, advance_line_no):
+        data = self._fp.read(size)
+        if advance_line_no:
+            self.line_no += data.count('\n')
+        return data
+
+
+class _FontaineStateMachine:
+    def __init__(self, fp, doc):
+        self.fp = _PeekableFile(fp)
+        self.state = None
+        self.document = doc
+
+    @property
+    def line_no(self):
+        return self.fp.line_no
+
+    def run(self):
+        self.state = _TitlePageState()
+        while True:
+            logger.debug("State '%s' consuming from '%s'..." %
+                         (self.state.__class__.__name__, self.fp.peekline()))
+            res = self.state.consume(self.fp, self)
+
+            # See if we reached the end of the file.
+            if not self.fp.peekline():
+                logger.debug("Reached end of line... ending parsing.")
+                res = EOF_STATE
+
+            # Figure out what to do next...
+
+            if res is None:
+                raise Exception(
+                    "States need to return `ANY_STATE`, one or more specific "
+                    "states, or `EOF_STATE` if they reached the end of the "
+                    "file.")
+
+            if res is True:
+                # State continues to consume.
+                continue
+
+            if res is ANY_STATE or isinstance(res, list):
+                # State wants to exit, we need to figure out what is the
+                # next state.
+                pos = self.fp._fp.tell()
+                next_states = res
+                if next_states is ANY_STATE:
+                    next_states = STATES
+                logger.debug("Trying to match next state from: %s" %
+                             [t.__name__ for t in next_states])
+                for sc in next_states:
+                    s = sc()
+                    if s.match(self.fp, self):
+                        logger.debug("Matched state %s" %
+                                     s.__class__.__name__)
+                        self.fp._fp.seek(pos)
+                        res = s
+                        break
+                else:
+                    raise Exception("Can't match following state after: %s" %
+                                    self.state)
+                if self.state:
+                    if type(self.state) == type(res) and self.state.can_merge:
+                        # Don't switch states if the next state is the same
+                        # type and that type supports merging.
+                        continue
+
+                    self.state.exit(self)
+
+                self.state = res
+                continue
+
+            if isinstance(res, FontaineState):
+                # State wants to exit, wants a specific state to be next.
+                if self.state:
+                    self.state.exit(self)
+                self.state = res
+                continue
+
+            if res is EOF_STATE:
+                # Reached end of file.
+                if self.state:
+                    self.state.exit(self)
+                break
+
+            raise Exception("Unsupported state result: %s" % res)
+
+
+class FontaineParser:
+    def __init__(self):
+        pass
+
+    def parse(self, filein):
+        if isinstance(filein, str):
+            with open(filein, 'r') as fp:
+                return self._doParse(fp)
+        else:
+            return self._doParse(fp)
+
+    def parseString(self, text):
+        import io
+        with io.StringIO(text) as fp:
+            return self._doParse(fp)
+
+    def _doParse(self, fp):
+        from .document import FontaineDocument
+        doc = FontaineDocument()
+        machine = _FontaineStateMachine(fp, doc)
+        machine.run()
+        return doc
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/conftest.py	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,134 @@
+import sys
+import logging
+import yaml
+import pytest
+from fontaine.document import (
+    FontaineSceneElement,
+    TYPE_ACTION, TYPE_CHARACTER, TYPE_DIALOG, TYPE_PARENTHETICAL)
+from fontaine.parser import FontaineParser, FontaineParserError
+
+
+def pytest_addoption(parser):
+    parser.addoption(
+            '--log-debug',
+            action='store_true',
+            help="Sets the Fontaine logger to output debug info to stdout.")
+
+
+def pytest_configure(config):
+    if config.getoption('--log-debug'):
+        hdl = logging.StreamHandler(stream=sys.stdout)
+        logging.getLogger('fontaine').addHandler(hdl)
+        logging.getLogger('fontaine').setLevel(logging.DEBUG)
+
+
+def pytest_collect_file(parent, path):
+    if path.ext == '.yaml' and path.basename.startswith("test"):
+        return FontaineScriptTestFile(path, parent)
+    return None
+
+
+def assert_scenes(actual, scenes):
+    assert len(actual) == len(scenes)
+    for a, e in zip(actual, scenes):
+        assert_scene(a, e[0], e[1:])
+
+
+def assert_scene(actual, header, paragraphs):
+    if header is not None:
+        assert actual.header == header
+    assert len(actual.paragraphs) == len(paragraphs)
+    for a, e in zip(actual.paragraphs, paragraphs):
+        assert_paragraph(a, e)
+
+
+def assert_paragraph(actual, expected):
+    if isinstance(expected, str):
+        assert isinstance(actual, FontaineSceneElement)
+        assert actual.type == TYPE_ACTION
+        assert actual.text == expected
+    elif isinstance(expected, FontaineSceneElement):
+        assert isinstance(actual, FontaineSceneElement)
+        assert actual.type == expected.type
+        assert actual.text == expected.text
+    else:
+        raise NotImplementedError("Don't know what this is: %s" % expected)
+
+
+def _c(name):
+    return FontaineSceneElement(TYPE_CHARACTER, name)
+
+
+def _p(text):
+    return FontaineSceneElement(TYPE_PARENTHETICAL, text)
+
+
+def _d(text):
+    return FontaineSceneElement(TYPE_DIALOG, text)
+
+
+class FontaineScriptTestFile(pytest.File):
+    def collect(self):
+        spec = yaml.load_all(self.fspath.open(encoding='utf8'))
+        for i, item in enumerate(spec):
+            name = '%s_%d' % (self.fspath.basename, i)
+            if 'test_name' in item:
+                name += '_%s' % item['test_name']
+            yield FontaineScriptTestItem(name, self, item)
+
+
+class FontaineScriptTestItem(pytest.Item):
+    def __init__(self, name, parent, spec):
+        super().__init__(name, parent)
+        self.spec = spec
+
+    def reportinfo(self):
+        return self.fspath, 0, "fontaine script test: %s" % self.name
+
+    def runtest(self):
+        intext = self.spec.get('in')
+        expected = self.spec.get('out')
+        title = self.spec.get('title')
+        if intext is None or expected is None:
+            raise Exception("No 'in' or 'out' specified.")
+
+        parser = FontaineParser()
+        doc = parser.parseString(intext)
+        if title is not None:
+            assert title == doc.title_values
+        assert_scenes(doc.scenes, make_scenes(expected))
+
+    def repr_failure(self, excinfo):
+        if isinstance(excinfo.value, FontaineParserError):
+            return ('\n'.join(
+                ['Parser error:', str(excinfo.value)]))
+        return super().repr_failure(excinfo)
+
+
+def make_scenes(spec):
+    if not isinstance(spec, list):
+        raise Exception("Script specs must be lists.")
+
+    out = []
+    cur_header = None
+    cur_paras = []
+
+    for item in spec:
+        token = item[:1]
+        if token == '.':
+            if cur_header or cur_paras:
+                out.append([cur_header] + cur_paras)
+            cur_header = item[1:]
+        elif token == '!':
+            cur_paras.append(item[1:])
+        elif token == '@':
+            cur_paras.append(_c(item[1:]))
+        elif token == '=':
+            cur_paras.append(_d(item[1:]))
+        elif token == '_':
+            cur_paras.append(_p(item[1:]))
+        else:
+            raise Exception("Unknown token: %s" % token)
+    if cur_header or cur_paras:
+        out.append([cur_header] + cur_paras)
+    return out
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test_action.yaml	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,16 @@
+---
+in: "This is a simple action."
+out: 
+    - "!This is a simple action."
+---
+in: "This is a multi\nline\naction."
+out:
+    - "!This is a multi\nline\naction."
+---
+in: "This is a multi\n\nparagraph\n\n\naction."
+out: 
+    - "!This is a multi\n\nparagraph\n\n\naction."
+---
+in: "!EXT. ACTION. FORCED."
+out: 
+    - "!EXT. ACTION. FORCED."
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test_character.yaml	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,20 @@
+---
+in: "\nSTEEL\nThe man's a myth!"
+out:
+    - '@STEEL'
+    - "=The man's a myth!"
+---
+in: "\nMOM (O. S.)\nLuke! Come down for supper!"
+out:
+    - '@MOM (O. S.)'
+    - "=Luke! Come down for supper!"
+---
+in: "\nHANS (on the radio)\nWhat was it you said?"
+out:
+    - '@HANS (on the radio)'
+    - "=What was it you said?"
+---
+in: "\n@McCLANE\nYippie ki-yay!"
+out:
+    - '@McCLANE'
+    - "=Yippie ki-yay!"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test_parenthetical.yaml	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,10 @@
+---
+in: |
+
+    STEEL
+    (starting the engine)
+    So much for retirement!
+out: 
+    - '@STEEL'
+    - '_(starting the engine)'
+    - '=So much for retirement!'
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test_titlepage.yaml	Mon Jan 02 12:30:49 2017 -0800
@@ -0,0 +1,37 @@
+---
+in: ""
+title: {}
+out: []
+---
+in: "\n"
+title: {}
+out:
+    - "!\n"
+---
+in: "\n\n"
+title: {}
+out:
+    - "!\n\n"
+---
+in: "\n\n\n"
+title: {}
+out:
+    - "!\n\n\n"
+---
+in: |
+    Title: This simple test
+    Author: Ludovic
+title:
+    Title: "This simple test"
+    Author: "Ludovic"
+out: []
+---
+in: |
+    Title: This simple test
+
+    It doesn't have much.
+title:
+    Title: "This simple test"
+out:
+    - "!It doesn't have much.\n"
+