Mercurial > jouvence
diff fontaine/parser.py @ 0:243401c49520
Initial commit.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 02 Jan 2017 12:30:49 -0800 |
parents | |
children | 74b83e3d921e |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fontaine/parser.py Mon Jan 02 12:30:49 2017 -0800 @@ -0,0 +1,388 @@ +import re +import logging + + +logger = logging.getLogger(__name__) + + +class FontaineState: + can_merge = False + + def match(self, fp, ctx): + return False + + def consume(self, fp, ctx): + raise NotImplementedError() + + def exit(self, ctx): + pass + + +class FontaineParserError(Exception): + def __init__(self, line_no, message): + super().__init__("Error line %d: %s" % (line_no, message)) + + +ANY_STATE = object() +EOF_STATE = object() + + +RE_EMPTY_LINE = re.compile(r"^$", re.M) +RE_BLANK_LINE = re.compile(r"^\s*$", re.M) + +RE_TITLE_KEY_VALUE = re.compile(r"^(?P<key>[\w\s\-]+)\s*:") + + +class _TitlePageState(FontaineState): + def __init__(self): + super().__init__() + self._cur_key = None + self._cur_val = None + + def consume(self, fp, ctx): + line = fp.readline() + if not line: + return EOF_STATE + + if RE_EMPTY_LINE.match(line): + self._commit(ctx) + # Finished with the page title, now move on to the first scene. + # However, if we never had any page title, go back to the beginning + # so we don't consume anybody else's empty lines. + if len(ctx.document.title_values) == 0: + fp.seek0() + return ANY_STATE + + m = RE_TITLE_KEY_VALUE.match(line) + if m: + # Commit current value, start new one. + self._commit(ctx) + self._cur_key = m.group('key') + self._cur_val = line[m.end():].strip() + else: + if self._cur_val is None: + if len(ctx.document.title_values) == 0: + # Early exit because there's no title page. + # Go back to the beginning so we don't consume somebody's + # first line of text. + fp.seek0() + return ANY_STATE + + raise FontaineParserError( + fp.line_no, + "Page title needs to be followed by 2 empty lines.") + + # Keep accumulating the value of one of the title page's values. + self._cur_val += line.strip() + return True + + def exit(self, ctx): + self._commit(ctx) + + def _commit(self, ctx): + if self._cur_key is not None: + ctx.document.title_values[self._cur_key] = self._cur_val + self._cur_key = None + self._cur_val = None + + +RE_SCENE_HEADER_PATTERN = re.compile( + r"^(int|ext|est|int/ext|int./ext|i/e)[\s\.]", re.I) + + +class _SceneHeaderState(FontaineState): + def match(self, fp, ctx): + lines = fp.peeklines(2) + return ( + RE_EMPTY_LINE.match(lines[0]) and + RE_SCENE_HEADER_PATTERN.match(lines[1])) + + def consume(self, fp, ctx): + fp.readline() # Get past the blank line. + line = fp.readline().rstrip('\r\n') + line = line.lstrip('.') # In case it was forced. + ctx.document.addScene(line) + return ANY_STATE + + +class _ActionState(FontaineState): + can_merge = True + + def __init__(self): + super().__init__() + self.text = '' + + def match(self, fp, ctx): + return True + + def consume(self, fp, ctx): + is_first_line = True + while True: + line = fp.readline() + if not line: + return EOF_STATE + + if is_first_line: + line = line.lstrip('!') + is_first_line = False + + self.text += line + + if RE_EMPTY_LINE.match(fp.peekline()): + break + + return ANY_STATE + + def exit(self, ctx): + ctx.document.lastScene().addAction(self.text) + + +RE_CHARACTER_LINE = re.compile(r"^[A-Z\-]+\s*(\(.*\))?$", re.M) + + +class _CharacterState(FontaineState): + def match(self, fp, ctx): + lines = fp.peeklines(3) + return (RE_EMPTY_LINE.match(lines[0]) and + RE_CHARACTER_LINE.match(lines[1]) and + not RE_EMPTY_LINE.match(lines[2])) + + def consume(self, fp, ctx): + fp.readline() # Get past the empty line. + line = fp.readline().rstrip('\r\n') + line = line.lstrip('@') # In case it was forced. + ctx.document.lastScene().addCharacter(line) + return [_ParentheticalState, _DialogState] + + +RE_PARENTHETICAL_LINE = re.compile(r"^\s*\(.*\)\s*$", re.M) + + +class _ParentheticalState(FontaineState): + def match(self, fp, ctx): + # We only get here from a `_CharacterState` so we know the previous + # one is already that. + line = fp.peekline() + return RE_PARENTHETICAL_LINE.match(line) + + def consume(self, fp, ctx): + line = fp.readline().rstrip('\r\n') + ctx.document.lastScene().addParenthetical(line) + return [_DialogState, _CharacterState, _ActionState] + + +class _DialogState(FontaineState): + def __init__(self): + super().__init__() + self.text = '' + + def match(self, fp, ctx): + line = fp.peekline() + return not RE_EMPTY_LINE.match(line) + + def consume(self, fp, ctx): + while True: + line = fp.readline() + if not line: + return EOF_STATE + self.text += line + if RE_EMPTY_LINE.match(fp.peekline()): + break + return ANY_STATE + + def exit(self, ctx): + ctx.document.lastScene().addDialog(self.text.rstrip('\r\n')) + + +class _LyricsState(FontaineState): + pass + + +class _TransitionState(FontaineState): + pass + + +class _ForcedParagraphStates(FontaineState): + STATE_SYMBOLS = { + '.': _SceneHeaderState, + '!': _ActionState, + '@': _CharacterState, + '~': _LyricsState, + '>': _TransitionState + } + + def __init__(self): + super().__init__() + self._state_cls = None + + def match(self, fp, ctx): + lines = fp.peeklines(2) + if (RE_EMPTY_LINE.match(lines[0]) and + lines[1][:1] in self.STATE_SYMBOLS): + self._state_cls = self.STATE_SYMBOLS[lines[1][:1]] + return True + return False + + def consume(self, fp, ctx): + return self._state_cls() + + +STATES = [ + _ForcedParagraphStates, # Must be first. + _SceneHeaderState, + _CharacterState, + _TransitionState, + _ActionState, # Must be last. +] + + +class _PeekableFile: + def __init__(self, fp): + self.line_no = 1 + self._fp = fp + + def read(self, size=-1): + return self._doRead(size, True) + + def read1(self): + return self.read(1) + + def peek1(self): + pos = self._fp.tell() + c = self._doRead(1, False) + self._fp.seek(pos) + return c + + def readline(self, size=-1): + data = self._fp.readline(size) + self.line_no += 1 + return data + + def peekline(self): + pos = self._fp.tell() + line = self._fp.readline() + self._fp.seek(pos) + return line + + def peeklines(self, count): + pos = self._fp.tell() + lines = [] + for i in range(count): + lines.append(self._fp.readline()) + self._fp.seek(pos) + return lines + + def seek0(self): + self._fp.seek(0) + self.line_no = 1 + + def _doRead(self, size, advance_line_no): + data = self._fp.read(size) + if advance_line_no: + self.line_no += data.count('\n') + return data + + +class _FontaineStateMachine: + def __init__(self, fp, doc): + self.fp = _PeekableFile(fp) + self.state = None + self.document = doc + + @property + def line_no(self): + return self.fp.line_no + + def run(self): + self.state = _TitlePageState() + while True: + logger.debug("State '%s' consuming from '%s'..." % + (self.state.__class__.__name__, self.fp.peekline())) + res = self.state.consume(self.fp, self) + + # See if we reached the end of the file. + if not self.fp.peekline(): + logger.debug("Reached end of line... ending parsing.") + res = EOF_STATE + + # Figure out what to do next... + + if res is None: + raise Exception( + "States need to return `ANY_STATE`, one or more specific " + "states, or `EOF_STATE` if they reached the end of the " + "file.") + + if res is True: + # State continues to consume. + continue + + if res is ANY_STATE or isinstance(res, list): + # State wants to exit, we need to figure out what is the + # next state. + pos = self.fp._fp.tell() + next_states = res + if next_states is ANY_STATE: + next_states = STATES + logger.debug("Trying to match next state from: %s" % + [t.__name__ for t in next_states]) + for sc in next_states: + s = sc() + if s.match(self.fp, self): + logger.debug("Matched state %s" % + s.__class__.__name__) + self.fp._fp.seek(pos) + res = s + break + else: + raise Exception("Can't match following state after: %s" % + self.state) + if self.state: + if type(self.state) == type(res) and self.state.can_merge: + # Don't switch states if the next state is the same + # type and that type supports merging. + continue + + self.state.exit(self) + + self.state = res + continue + + if isinstance(res, FontaineState): + # State wants to exit, wants a specific state to be next. + if self.state: + self.state.exit(self) + self.state = res + continue + + if res is EOF_STATE: + # Reached end of file. + if self.state: + self.state.exit(self) + break + + raise Exception("Unsupported state result: %s" % res) + + +class FontaineParser: + def __init__(self): + pass + + def parse(self, filein): + if isinstance(filein, str): + with open(filein, 'r') as fp: + return self._doParse(fp) + else: + return self._doParse(fp) + + def parseString(self, text): + import io + with io.StringIO(text) as fp: + return self._doParse(fp) + + def _doParse(self, fp): + from .document import FontaineDocument + doc = FontaineDocument() + machine = _FontaineStateMachine(fp, doc) + machine.run() + return doc