view fontaine/parser.py @ 0:243401c49520

Initial commit.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 02 Jan 2017 12:30:49 -0800
parents
children 74b83e3d921e
line wrap: on
line source

import re
import logging


logger = logging.getLogger(__name__)


class FontaineState:
    can_merge = False

    def match(self, fp, ctx):
        return False

    def consume(self, fp, ctx):
        raise NotImplementedError()

    def exit(self, ctx):
        pass


class FontaineParserError(Exception):
    def __init__(self, line_no, message):
        super().__init__("Error line %d: %s" % (line_no, message))


ANY_STATE = object()
EOF_STATE = object()


RE_EMPTY_LINE = re.compile(r"^$", re.M)
RE_BLANK_LINE = re.compile(r"^\s*$", re.M)

RE_TITLE_KEY_VALUE = re.compile(r"^(?P<key>[\w\s\-]+)\s*:")


class _TitlePageState(FontaineState):
    def __init__(self):
        super().__init__()
        self._cur_key = None
        self._cur_val = None

    def consume(self, fp, ctx):
        line = fp.readline()
        if not line:
            return EOF_STATE

        if RE_EMPTY_LINE.match(line):
            self._commit(ctx)
            # Finished with the page title, now move on to the first scene.
            # However, if we never had any page title, go back to the beginning
            # so we don't consume anybody else's empty lines.
            if len(ctx.document.title_values) == 0:
                fp.seek0()
            return ANY_STATE

        m = RE_TITLE_KEY_VALUE.match(line)
        if m:
            # Commit current value, start new one.
            self._commit(ctx)
            self._cur_key = m.group('key')
            self._cur_val = line[m.end():].strip()
        else:
            if self._cur_val is None:
                if len(ctx.document.title_values) == 0:
                    # Early exit because there's no title page.
                    # Go back to the beginning so we don't consume somebody's
                    # first line of text.
                    fp.seek0()
                    return ANY_STATE

                raise FontaineParserError(
                    fp.line_no,
                    "Page title needs to be followed by 2 empty lines.")

            # Keep accumulating the value of one of the title page's values.
            self._cur_val += line.strip()
        return True

    def exit(self, ctx):
        self._commit(ctx)

    def _commit(self, ctx):
        if self._cur_key is not None:
            ctx.document.title_values[self._cur_key] = self._cur_val
            self._cur_key = None
            self._cur_val = None


RE_SCENE_HEADER_PATTERN = re.compile(
    r"^(int|ext|est|int/ext|int./ext|i/e)[\s\.]", re.I)


class _SceneHeaderState(FontaineState):
    def match(self, fp, ctx):
        lines = fp.peeklines(2)
        return (
            RE_EMPTY_LINE.match(lines[0]) and
            RE_SCENE_HEADER_PATTERN.match(lines[1]))

    def consume(self, fp, ctx):
        fp.readline()  # Get past the blank line.
        line = fp.readline().rstrip('\r\n')
        line = line.lstrip('.')  # In case it was forced.
        ctx.document.addScene(line)
        return ANY_STATE


class _ActionState(FontaineState):
    can_merge = True

    def __init__(self):
        super().__init__()
        self.text = ''

    def match(self, fp, ctx):
        return True

    def consume(self, fp, ctx):
        is_first_line = True
        while True:
            line = fp.readline()
            if not line:
                return EOF_STATE

            if is_first_line:
                line = line.lstrip('!')
                is_first_line = False

            self.text += line

            if RE_EMPTY_LINE.match(fp.peekline()):
                break

        return ANY_STATE

    def exit(self, ctx):
        ctx.document.lastScene().addAction(self.text)


RE_CHARACTER_LINE = re.compile(r"^[A-Z\-]+\s*(\(.*\))?$", re.M)


class _CharacterState(FontaineState):
    def match(self, fp, ctx):
        lines = fp.peeklines(3)
        return (RE_EMPTY_LINE.match(lines[0]) and
                RE_CHARACTER_LINE.match(lines[1]) and
                not RE_EMPTY_LINE.match(lines[2]))

    def consume(self, fp, ctx):
        fp.readline()  # Get past the empty line.
        line = fp.readline().rstrip('\r\n')
        line = line.lstrip('@')  # In case it was forced.
        ctx.document.lastScene().addCharacter(line)
        return [_ParentheticalState, _DialogState]


RE_PARENTHETICAL_LINE = re.compile(r"^\s*\(.*\)\s*$", re.M)


class _ParentheticalState(FontaineState):
    def match(self, fp, ctx):
        # We only get here from a `_CharacterState` so we know the previous
        # one is already that.
        line = fp.peekline()
        return RE_PARENTHETICAL_LINE.match(line)

    def consume(self, fp, ctx):
        line = fp.readline().rstrip('\r\n')
        ctx.document.lastScene().addParenthetical(line)
        return [_DialogState, _CharacterState, _ActionState]


class _DialogState(FontaineState):
    def __init__(self):
        super().__init__()
        self.text = ''

    def match(self, fp, ctx):
        line = fp.peekline()
        return not RE_EMPTY_LINE.match(line)

    def consume(self, fp, ctx):
        while True:
            line = fp.readline()
            if not line:
                return EOF_STATE
            self.text += line
            if RE_EMPTY_LINE.match(fp.peekline()):
                break
        return ANY_STATE

    def exit(self, ctx):
        ctx.document.lastScene().addDialog(self.text.rstrip('\r\n'))


class _LyricsState(FontaineState):
    pass


class _TransitionState(FontaineState):
    pass


class _ForcedParagraphStates(FontaineState):
    STATE_SYMBOLS = {
        '.': _SceneHeaderState,
        '!': _ActionState,
        '@': _CharacterState,
        '~': _LyricsState,
        '>': _TransitionState
    }

    def __init__(self):
        super().__init__()
        self._state_cls = None

    def match(self, fp, ctx):
        lines = fp.peeklines(2)
        if (RE_EMPTY_LINE.match(lines[0]) and
                lines[1][:1] in self.STATE_SYMBOLS):
            self._state_cls = self.STATE_SYMBOLS[lines[1][:1]]
            return True
        return False

    def consume(self, fp, ctx):
        return self._state_cls()


STATES = [
    _ForcedParagraphStates,  # Must be first.
    _SceneHeaderState,
    _CharacterState,
    _TransitionState,
    _ActionState,  # Must be last.
]


class _PeekableFile:
    def __init__(self, fp):
        self.line_no = 1
        self._fp = fp

    def read(self, size=-1):
        return self._doRead(size, True)

    def read1(self):
        return self.read(1)

    def peek1(self):
        pos = self._fp.tell()
        c = self._doRead(1, False)
        self._fp.seek(pos)
        return c

    def readline(self, size=-1):
        data = self._fp.readline(size)
        self.line_no += 1
        return data

    def peekline(self):
        pos = self._fp.tell()
        line = self._fp.readline()
        self._fp.seek(pos)
        return line

    def peeklines(self, count):
        pos = self._fp.tell()
        lines = []
        for i in range(count):
            lines.append(self._fp.readline())
        self._fp.seek(pos)
        return lines

    def seek0(self):
        self._fp.seek(0)
        self.line_no = 1

    def _doRead(self, size, advance_line_no):
        data = self._fp.read(size)
        if advance_line_no:
            self.line_no += data.count('\n')
        return data


class _FontaineStateMachine:
    def __init__(self, fp, doc):
        self.fp = _PeekableFile(fp)
        self.state = None
        self.document = doc

    @property
    def line_no(self):
        return self.fp.line_no

    def run(self):
        self.state = _TitlePageState()
        while True:
            logger.debug("State '%s' consuming from '%s'..." %
                         (self.state.__class__.__name__, self.fp.peekline()))
            res = self.state.consume(self.fp, self)

            # See if we reached the end of the file.
            if not self.fp.peekline():
                logger.debug("Reached end of line... ending parsing.")
                res = EOF_STATE

            # Figure out what to do next...

            if res is None:
                raise Exception(
                    "States need to return `ANY_STATE`, one or more specific "
                    "states, or `EOF_STATE` if they reached the end of the "
                    "file.")

            if res is True:
                # State continues to consume.
                continue

            if res is ANY_STATE or isinstance(res, list):
                # State wants to exit, we need to figure out what is the
                # next state.
                pos = self.fp._fp.tell()
                next_states = res
                if next_states is ANY_STATE:
                    next_states = STATES
                logger.debug("Trying to match next state from: %s" %
                             [t.__name__ for t in next_states])
                for sc in next_states:
                    s = sc()
                    if s.match(self.fp, self):
                        logger.debug("Matched state %s" %
                                     s.__class__.__name__)
                        self.fp._fp.seek(pos)
                        res = s
                        break
                else:
                    raise Exception("Can't match following state after: %s" %
                                    self.state)
                if self.state:
                    if type(self.state) == type(res) and self.state.can_merge:
                        # Don't switch states if the next state is the same
                        # type and that type supports merging.
                        continue

                    self.state.exit(self)

                self.state = res
                continue

            if isinstance(res, FontaineState):
                # State wants to exit, wants a specific state to be next.
                if self.state:
                    self.state.exit(self)
                self.state = res
                continue

            if res is EOF_STATE:
                # Reached end of file.
                if self.state:
                    self.state.exit(self)
                break

            raise Exception("Unsupported state result: %s" % res)


class FontaineParser:
    def __init__(self):
        pass

    def parse(self, filein):
        if isinstance(filein, str):
            with open(filein, 'r') as fp:
                return self._doParse(fp)
        else:
            return self._doParse(fp)

    def parseString(self, text):
        import io
        with io.StringIO(text) as fp:
            return self._doParse(fp)

    def _doParse(self, fp):
        from .document import FontaineDocument
        doc = FontaineDocument()
        machine = _FontaineStateMachine(fp, doc)
        machine.run()
        return doc