Mercurial > jouvence
diff fontaine/parser.py @ 1:74b83e3d921e
Add more states, add more tests.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 02 Jan 2017 21:54:59 -0800 |
parents | 243401c49520 |
children | 59fe8cb6190d |
line wrap: on
line diff
--- a/fontaine/parser.py Mon Jan 02 12:30:49 2017 -0800 +++ b/fontaine/parser.py Mon Jan 02 21:54:59 2017 -0800 @@ -7,6 +7,10 @@ class FontaineState: can_merge = False + needs_pending_empty_lines = True + + def __init__(self): + self.has_pending_empty_line = False def match(self, fp, ctx): return False @@ -14,10 +18,18 @@ def consume(self, fp, ctx): raise NotImplementedError() + def merge(self): + pass + def exit(self, ctx): pass +class _PassThroughState(FontaineState): + def consume(self, fp, ctx): + return ANY_STATE + + class FontaineParserError(Exception): def __init__(self, line_no, message): super().__init__("Error line %d: %s" % (line_no, message)) @@ -39,42 +51,34 @@ self._cur_key = None self._cur_val = None - def consume(self, fp, ctx): - line = fp.readline() - if not line: - return EOF_STATE + def match(self, fp, ctx): + line = fp.peekline() + return RE_TITLE_KEY_VALUE.match(line) - if RE_EMPTY_LINE.match(line): - self._commit(ctx) - # Finished with the page title, now move on to the first scene. - # However, if we never had any page title, go back to the beginning - # so we don't consume anybody else's empty lines. - if len(ctx.document.title_values) == 0: - fp.seek0() - return ANY_STATE + def consume(self, fp, ctx): + while True: + line = fp.readline() + if not line: + return EOF_STATE - m = RE_TITLE_KEY_VALUE.match(line) - if m: - # Commit current value, start new one. - self._commit(ctx) - self._cur_key = m.group('key') - self._cur_val = line[m.end():].strip() - else: - if self._cur_val is None: - if len(ctx.document.title_values) == 0: - # Early exit because there's no title page. - # Go back to the beginning so we don't consume somebody's - # first line of text. - fp.seek0() - return ANY_STATE + m = RE_TITLE_KEY_VALUE.match(line) + if m: + # Commit current value, start new one. + self._commit(ctx) + self._cur_key = m.group('key') + self._cur_val = line[m.end():].strip() + else: + # Keep accumulating the value of one of the title page's + # values. + self._cur_val += line.strip() - raise FontaineParserError( - fp.line_no, - "Page title needs to be followed by 2 empty lines.") + if RE_EMPTY_LINE.match(fp.peekline()): + self._commit(ctx) + # Finished with the page title, now move on to the first scene. + self.has_pending_empty_line = True + break - # Keep accumulating the value of one of the title page's values. - self._cur_val += line.strip() - return True + return ANY_STATE def exit(self, ctx): self._commit(ctx) @@ -92,25 +96,30 @@ class _SceneHeaderState(FontaineState): def match(self, fp, ctx): - lines = fp.peeklines(2) + lines = fp.peeklines(3) return ( RE_EMPTY_LINE.match(lines[0]) and - RE_SCENE_HEADER_PATTERN.match(lines[1])) + RE_SCENE_HEADER_PATTERN.match(lines[1]) and + RE_EMPTY_LINE.match(lines[2])) def consume(self, fp, ctx): fp.readline() # Get past the blank line. line = fp.readline().rstrip('\r\n') line = line.lstrip('.') # In case it was forced. ctx.document.addScene(line) + self.has_pending_empty_line = True return ANY_STATE class _ActionState(FontaineState): can_merge = True + needs_pending_empty_lines = False def __init__(self): super().__init__() self.text = '' + self._to_merge = None + self._was_merged = False def match(self, fp, ctx): return True @@ -123,21 +132,85 @@ return EOF_STATE if is_first_line: - line = line.lstrip('!') + line = line.lstrip('!') # In case it was forced. is_first_line = False + # If the next line is empty, strip the carriage return from + # the line we just got because it's probably gonna be the + # last one. + if RE_EMPTY_LINE.match(fp.peekline()): + stripped_line = line.rstrip("\r\n") + self.text += stripped_line + self._to_merge = line[len(stripped_line):] + break + # ...otherwise, add the line with in full. self.text += line - if RE_EMPTY_LINE.match(fp.peekline()): - break + return ANY_STATE - return ANY_STATE + def merge(self): + # Put back the stuff we stripped from what we thought was the + # last line. + self.text += self._to_merge + self._was_merged = True def exit(self, ctx): ctx.document.lastScene().addAction(self.text) -RE_CHARACTER_LINE = re.compile(r"^[A-Z\-]+\s*(\(.*\))?$", re.M) +RE_CENTERED_LINE = re.compile(r"^\s*>\s*.*\s*<\s*$", re.M) + + +class _CenteredActionState(FontaineState): + def __init__(self): + super().__init__() + self.text = '' + self._aborted = False + + def match(self, fp, ctx): + lines = fp.peeklines(2) + return ( + RE_EMPTY_LINE.match(lines[0]) and + RE_CENTERED_LINE.match(lines[1])) + + def consume(self, fp, ctx): + snapshot = fp.snapshot() + fp.readline() # Get past the empty line. + while True: + line = fp.readline() + if not line: + return EOF_STATE + + clean_line = line.rstrip('\r\n') + eol = line[len(clean_line):] + + clean_line = clean_line.strip() + if clean_line[0] != '>' or clean_line[-1] != '<': + # The whole paragraph must have `>` and `<` wrappers, so + # if we detect a line that doesn't have them, we make this + # paragraph be a normal action instead. + fp.restore(snapshot) + self.has_pending_empty_line = True + self._aborted = True + return _ActionState() + else: + # Remove wrapping `>`/`<`, and spaces. + clean_line = clean_line[1:-1].strip() + + if RE_EMPTY_LINE.match(fp.peekline()): + self.text += clean_line + self.has_pending_empty_line = True + break + self.text += clean_line + eol + + return ANY_STATE + + def exit(self, ctx): + if not self._aborted: + ctx.document.lastScene().addCenteredAction(self.text) + + +RE_CHARACTER_LINE = re.compile(r"^\s*[A-Z\-]+\s*(\(.*\))?$", re.M) class _CharacterState(FontaineState): @@ -150,6 +223,7 @@ def consume(self, fp, ctx): fp.readline() # Get past the empty line. line = fp.readline().rstrip('\r\n') + line = line.lstrip() # Remove indenting. line = line.lstrip('@') # In case it was forced. ctx.document.lastScene().addCharacter(line) return [_ParentheticalState, _DialogState] @@ -166,9 +240,15 @@ return RE_PARENTHETICAL_LINE.match(line) def consume(self, fp, ctx): - line = fp.readline().rstrip('\r\n') + line = fp.readline().lstrip().rstrip('\r\n') ctx.document.lastScene().addParenthetical(line) - return [_DialogState, _CharacterState, _ActionState] + + next_line = fp.peekline() + if not RE_EMPTY_LINE.match(next_line): + return _DialogState() + + self.has_pending_empty_line = True + return ANY_STATE class _DialogState(FontaineState): @@ -177,6 +257,8 @@ self.text = '' def match(self, fp, ctx): + # We only get here from a `_CharacterState` or `_ParentheticalState` + # so we just need to check there's some text. line = fp.peekline() return not RE_EMPTY_LINE.match(line) @@ -185,9 +267,23 @@ line = fp.readline() if not line: return EOF_STATE + + line = line.lstrip() # Remove indenting. + + # Next we could be either continuing the dialog line, going to + # a parenthetical, or exiting dialog altogether. + next_line = fp.peekline() + + if RE_PARENTHETICAL_LINE.match(next_line): + self.text += line.rstrip('\r\n') + return _ParentheticalState() + + if RE_EMPTY_LINE.match(next_line): + self.text += line.rstrip('\r\n') + self.has_pending_empty_line = True + break self.text += line - if RE_EMPTY_LINE.match(fp.peekline()): - break + return ANY_STATE def exit(self, ctx): @@ -195,11 +291,80 @@ class _LyricsState(FontaineState): - pass + def __init__(self): + super().__init__() + self.text = '' + self._aborted = False + + # No `match` method, this can only be forced. + # (see `_ForcedParagraphStates`) + + def consume(self, fp, ctx): + snapshot = fp.snapshot() + fp.readline() # Get past the empty line. + while True: + line = fp.readline() + if not line: + return EOF_STATE + + if line.startswith('~'): + line = line.lstrip('~') + else: + logger.debug("Rolling back lyrics into action paragraph.") + fp.restore(snapshot) + self.has_pending_empty_line = True + self._aborted = True + return _ActionState() + + if RE_EMPTY_LINE.match(fp.peekline()): + self.text += line.rstrip('\r\n') + self.has_pending_empty_line = True + break + self.text += line + + return ANY_STATE + + def exit(self, ctx): + if not self._aborted: + ctx.document.lastScene().addLyrics(self.text) + + +RE_TRANSITION_LINE = re.compile(r"^\s*[^a-z]+TO\:$", re.M) class _TransitionState(FontaineState): - pass + def match(self, fp, ctx): + lines = fp.peeklines(3) + return ( + RE_EMPTY_LINE.match(lines[0]) and + RE_TRANSITION_LINE.match(lines[1]) and + RE_EMPTY_LINE.match(lines[2])) + + def consume(self, fp, ctx): + fp.readline() # Get past the empty line. + line = fp.readline().lstrip().rstrip('\r\n') + line = line.lstrip('>') # In case it was forced. + ctx.document.lastScene().addTransition(line) + self.has_pending_empty_line = True + + +RE_PAGE_BREAK_LINE = re.compile(r"^\=\=\=+$", re.M) + + +class _PageBreakState(FontaineState): + def match(self, fp, ctx): + lines = fp.peeklines(3) + return ( + RE_EMPTY_LINE.match(lines[0]) and + RE_PAGE_BREAK_LINE.match(lines[1]) and + RE_EMPTY_LINE.match(lines[2])) + + def consume(self, fp, ctx): + fp.readline() + fp.readline() + ctx.document.lastScene().addPageBreak() + self.has_pending_empty_line = True + return ANY_STATE class _ForcedParagraphStates(FontaineState): @@ -214,24 +379,41 @@ def __init__(self): super().__init__() self._state_cls = None + self._consume_empty_line = False def match(self, fp, ctx): lines = fp.peeklines(2) + symbol = lines[1][:1] if (RE_EMPTY_LINE.match(lines[0]) and - lines[1][:1] in self.STATE_SYMBOLS): - self._state_cls = self.STATE_SYMBOLS[lines[1][:1]] + symbol in self.STATE_SYMBOLS): + # Special case: don't force a transition state if it's + # really some centered text. + if symbol == '>' and RE_CENTERED_LINE.match(lines[1]): + return False + + self._state_cls = self.STATE_SYMBOLS[symbol] + + # Special case: for forced action paragraphs, don't leave + # the blank line there. + if symbol == '!': + self._consume_empty_line = True + return True return False def consume(self, fp, ctx): + if self._consume_empty_line: + fp.readline() return self._state_cls() -STATES = [ +ROOT_STATES = [ _ForcedParagraphStates, # Must be first. _SceneHeaderState, _CharacterState, _TransitionState, + _PageBreakState, + _CenteredActionState, _ActionState, # Must be last. ] @@ -240,25 +422,21 @@ def __init__(self, fp): self.line_no = 1 self._fp = fp - - def read(self, size=-1): - return self._doRead(size, True) - - def read1(self): - return self.read(1) - - def peek1(self): - pos = self._fp.tell() - c = self._doRead(1, False) - self._fp.seek(pos) - return c + self._blankAt0 = False def readline(self, size=-1): + if self._blankAt0: + self._blankAt0 = False + return '\n' + data = self._fp.readline(size) self.line_no += 1 return data def peekline(self): + if self._blankAt0: + return '\n' + pos = self._fp.tell() line = self._fp.readline() self._fp.seek(pos) @@ -267,16 +445,29 @@ def peeklines(self, count): pos = self._fp.tell() lines = [] + if self._blankAt0: + lines.append('\n') + count -= 1 for i in range(count): lines.append(self._fp.readline()) self._fp.seek(pos) return lines - def seek0(self): - self._fp.seek(0) - self.line_no = 1 + def snapshot(self): + return (self._fp.tell(), self._blankAt0, self.line_no) + + def restore(self, snapshot): + self._fp.seek(snapshot[0]) + self._blankAt0 = snapshot[1] + self.line_no = snapshot[2] - def _doRead(self, size, advance_line_no): + def _addBlankAt0(self): + if self._fp.tell() != 0: + raise Exception( + "Can't add blank line at 0 if reading has started.") + self._blankAt0 = True + + def _read(self, size, advance_line_no): data = self._fp.read(size) if advance_line_no: self.line_no += data.count('\n') @@ -294,7 +485,24 @@ return self.fp.line_no def run(self): + # Start with the page title... unless it doesn't match, in which + # case we start with a "pass through" state that will just return + # `ANY_STATE` so we can start matching stuff. self.state = _TitlePageState() + if not self.state.match(self.fp, self): + logger.debug("No title page value found on line 1, " + "using pass-through state with added blank line.") + self.state = _PassThroughState() + if not RE_EMPTY_LINE.match(self.fp.peekline()): + # Add a fake empty line at the beginning of the text if + # there's not one already. This makes state matching easier. + self.fp._addBlankAt0() + # Make this added empty line "pending" so if the first line + # is an action paragraph, it doesn't include it. + self.state.has_pending_empty_line = True + + # Start parsing! Here we try to do a mostly-forward-only parser with + # non overlapping regexes to make it decently fast. while True: logger.debug("State '%s' consuming from '%s'..." % (self.state.__class__.__name__, self.fp.peekline())) @@ -313,17 +521,13 @@ "states, or `EOF_STATE` if they reached the end of the " "file.") - if res is True: - # State continues to consume. - continue - - if res is ANY_STATE or isinstance(res, list): + elif res is ANY_STATE or isinstance(res, list): # State wants to exit, we need to figure out what is the # next state. pos = self.fp._fp.tell() next_states = res if next_states is ANY_STATE: - next_states = STATES + next_states = ROOT_STATES logger.debug("Trying to match next state from: %s" % [t.__name__ for t in next_states]) for sc in next_states: @@ -337,31 +541,43 @@ else: raise Exception("Can't match following state after: %s" % self.state) + + # Handle the current state before we move on to the new one. if self.state: if type(self.state) == type(res) and self.state.can_merge: # Don't switch states if the next state is the same # type and that type supports merging. + self.state.merge() continue self.state.exit(self) + if (self.state.has_pending_empty_line and + not res.needs_pending_empty_lines): + logger.debug("Skipping pending blank line from %s" % + self.state.__class__.__name__) + self.fp.readline() self.state = res - continue - if isinstance(res, FontaineState): + elif isinstance(res, FontaineState): # State wants to exit, wants a specific state to be next. if self.state: self.state.exit(self) + if (self.state.has_pending_empty_line and + not res.needs_pending_empty_lines): + logger.debug("Skipping pending blank line from %s" % + self.state.__class__.__name__) + self.fp.readline() self.state = res - continue - if res is EOF_STATE: + elif res is EOF_STATE: # Reached end of file. if self.state: self.state.exit(self) break - raise Exception("Unsupported state result: %s" % res) + else: + raise Exception("Unsupported state result: %s" % res) class FontaineParser: