Mercurial > jouvence
view fontaine/parser.py @ 10:2cea36073188
Move core CLI tool code into the package.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 04 Jan 2017 08:46:27 -0800 |
parents | 59fe8cb6190d |
children |
line wrap: on
line source
import re import logging from .document import TYPE_ACTION logger = logging.getLogger(__name__) class FontaineState: def __init__(self): pass def match(self, fp, ctx): return False def consume(self, fp, ctx): raise NotImplementedError() def exit(self, ctx, next_state): pass class _PassThroughState(FontaineState): def consume(self, fp, ctx): return ANY_STATE class FontaineParserError(Exception): def __init__(self, line_no, message): super().__init__("Error line %d: %s" % (line_no, message)) ANY_STATE = object() EOF_STATE = object() RE_EMPTY_LINE = re.compile(r"^$", re.M) RE_BLANK_LINE = re.compile(r"^\s*$", re.M) RE_TITLE_KEY_VALUE = re.compile(r"^(?P<key>[\w\s\-]+)\s*:\s*") class _TitlePageState(FontaineState): def __init__(self): super().__init__() self._cur_key = None self._cur_val = None def match(self, fp, ctx): line = fp.peekline() return RE_TITLE_KEY_VALUE.match(line) def consume(self, fp, ctx): while True: line = fp.readline() if not line: return EOF_STATE m = RE_TITLE_KEY_VALUE.match(line) if m: # Commit current value, start new one. self._commit(ctx) self._cur_key = m.group('key').lower() self._cur_val = line[m.end():] else: # Keep accumulating the value of one of the title page's # values. self._cur_val += line.lstrip() if RE_EMPTY_LINE.match(fp.peekline()): self._commit(ctx) # Finished with the page title, now move on to the first scene. break return ANY_STATE def exit(self, ctx, next_state): self._commit(ctx) def _commit(self, ctx): if self._cur_key is not None: val = self._cur_val.rstrip('\r\n') ctx.document.title_values[self._cur_key] = val self._cur_key = None self._cur_val = None RE_SCENE_HEADER_PATTERN = re.compile( r"^(int|ext|est|int/ext|int./ext|i/e)[\s\.]", re.I) class _SceneHeaderState(FontaineState): def match(self, fp, ctx): lines = fp.peeklines(3) return ( RE_EMPTY_LINE.match(lines[0]) and RE_SCENE_HEADER_PATTERN.match(lines[1]) and RE_EMPTY_LINE.match(lines[2])) def consume(self, fp, ctx): fp.readline() # Get past the blank line. line = fp.readline().rstrip('\r\n') line = line.lstrip('.') # In case it was forced. ctx.document.addScene(line) return ANY_STATE class _ActionState(FontaineState): def __init__(self): super().__init__() self.text = '' def match(self, fp, ctx): return True def consume(self, fp, ctx): is_first_line = True while True: line = fp.readline() if not line: return EOF_STATE if is_first_line: # Ignore the fake blank line at 0 if it's threre. if fp.line_no == 0: continue line = line.lstrip('!') # In case it was forced. is_first_line = False # If the next line is empty, strip the carriage return from # the line we just got because it's probably gonna be the # last one. if RE_EMPTY_LINE.match(fp.peekline()): self.text += line.rstrip("\r\n") break # ...otherwise, add the line with in full. self.text += line return ANY_STATE def exit(self, ctx, next_state): last_para = ctx.document.lastParagraph() if last_para and last_para.type == TYPE_ACTION: last_para.text += '\n' + self.text else: ctx.document.lastScene().addAction(self.text) RE_CENTERED_LINE = re.compile(r"^\s*>\s*.*\s*<\s*$", re.M) class _CenteredActionState(FontaineState): def __init__(self): super().__init__() self.text = '' self._aborted = False def match(self, fp, ctx): lines = fp.peeklines(2) return ( RE_EMPTY_LINE.match(lines[0]) and RE_CENTERED_LINE.match(lines[1])) def consume(self, fp, ctx): snapshot = fp.snapshot() fp.readline() # Get past the empty line. while True: line = fp.readline() if not line: return EOF_STATE clean_line = line.rstrip('\r\n') eol = line[len(clean_line):] clean_line = clean_line.strip() if clean_line[0] != '>' or clean_line[-1] != '<': # The whole paragraph must have `>` and `<` wrappers, so # if we detect a line that doesn't have them, we make this # paragraph be a normal action instead. fp.restore(snapshot) self._aborted = True return _ActionState() else: # Remove wrapping `>`/`<`, and spaces. clean_line = clean_line[1:-1].strip() if RE_EMPTY_LINE.match(fp.peekline()): self.text += clean_line break self.text += clean_line + eol return ANY_STATE def exit(self, ctx, next_state): if not self._aborted: ctx.document.lastScene().addCenteredAction(self.text) RE_CHARACTER_LINE = re.compile(r"^\s*[A-Z][A-Z\-\._\s]+\s*(\(.*\))?$", re.M) class _CharacterState(FontaineState): def match(self, fp, ctx): lines = fp.peeklines(3) return (RE_EMPTY_LINE.match(lines[0]) and RE_CHARACTER_LINE.match(lines[1]) and not RE_EMPTY_LINE.match(lines[2])) def consume(self, fp, ctx): fp.readline() # Get past the empty line. line = fp.readline().rstrip('\r\n') line = line.lstrip() # Remove indenting. line = line.lstrip('@') # In case it was forced. ctx.document.lastScene().addCharacter(line) return [_ParentheticalState, _DialogState] RE_PARENTHETICAL_LINE = re.compile(r"^\s*\(.*\)\s*$", re.M) class _ParentheticalState(FontaineState): def match(self, fp, ctx): # We only get here from a `_CharacterState` so we know the previous # one is already that. line = fp.peekline() return RE_PARENTHETICAL_LINE.match(line) def consume(self, fp, ctx): line = fp.readline().lstrip().rstrip('\r\n') ctx.document.lastScene().addParenthetical(line) next_line = fp.peekline() if not RE_EMPTY_LINE.match(next_line): return _DialogState() return ANY_STATE class _DialogState(FontaineState): def __init__(self): super().__init__() self.text = '' def match(self, fp, ctx): # We only get here from a `_CharacterState` or `_ParentheticalState` # so we just need to check there's some text. line = fp.peekline() return not RE_EMPTY_LINE.match(line) def consume(self, fp, ctx): while True: line = fp.readline() if not line: return EOF_STATE line = line.lstrip() # Remove indenting. # Next we could be either continuing the dialog line, going to # a parenthetical, or exiting dialog altogether. next_line = fp.peekline() if RE_PARENTHETICAL_LINE.match(next_line): self.text += line.rstrip('\r\n') return _ParentheticalState() if RE_EMPTY_LINE.match(next_line): self.text += line.rstrip('\r\n') break self.text += line return ANY_STATE def exit(self, ctx, next_state): ctx.document.lastScene().addDialog(self.text.rstrip('\r\n')) class _LyricsState(FontaineState): def __init__(self): super().__init__() self.text = '' self._aborted = False # No `match` method, this can only be forced. # (see `_ForcedParagraphStates`) def consume(self, fp, ctx): snapshot = fp.snapshot() fp.readline() # Get past the empty line. while True: line = fp.readline() if not line: return EOF_STATE if line.startswith('~'): line = line.lstrip('~') else: logger.debug("Rolling back lyrics into action paragraph.") fp.restore(snapshot) self._aborted = True return _ActionState() if RE_EMPTY_LINE.match(fp.peekline()): self.text += line.rstrip('\r\n') break self.text += line return ANY_STATE def exit(self, ctx, next_state): if not self._aborted: ctx.document.lastScene().addLyrics(self.text) RE_TRANSITION_LINE = re.compile(r"^\s*[^a-z]+TO\:$", re.M) class _TransitionState(FontaineState): def match(self, fp, ctx): lines = fp.peeklines(3) return ( RE_EMPTY_LINE.match(lines[0]) and RE_TRANSITION_LINE.match(lines[1]) and RE_EMPTY_LINE.match(lines[2])) def consume(self, fp, ctx): fp.readline() # Get past the empty line. line = fp.readline().lstrip().rstrip('\r\n') line = line.lstrip('>') # In case it was forced. ctx.document.lastScene().addTransition(line) return ANY_STATE RE_PAGE_BREAK_LINE = re.compile(r"^\=\=\=+$", re.M) class _PageBreakState(FontaineState): def match(self, fp, ctx): lines = fp.peeklines(3) return ( RE_EMPTY_LINE.match(lines[0]) and RE_PAGE_BREAK_LINE.match(lines[1]) and RE_EMPTY_LINE.match(lines[2])) def consume(self, fp, ctx): fp.readline() fp.readline() ctx.document.lastScene().addPageBreak() return ANY_STATE class _ForcedParagraphStates(FontaineState): STATE_SYMBOLS = { '.': _SceneHeaderState, '!': _ActionState, '@': _CharacterState, '~': _LyricsState, '>': _TransitionState } def __init__(self): super().__init__() self._state_cls = None self._consume_empty_line = False def match(self, fp, ctx): lines = fp.peeklines(2) symbol = lines[1][:1] if (RE_EMPTY_LINE.match(lines[0]) and symbol in self.STATE_SYMBOLS): # Special case: don't force a transition state if it's # really some centered text. if symbol == '>' and RE_CENTERED_LINE.match(lines[1]): return False self._state_cls = self.STATE_SYMBOLS[symbol] # Special case: for forced action paragraphs, don't leave # the blank line there. if symbol == '!': self._consume_empty_line = True return True return False def consume(self, fp, ctx): if self._consume_empty_line: fp.readline() return self._state_cls() class _EmptyLineState(FontaineState): def __init__(self): super().__init__() self.line_count = 0 def match(self, fp, ctx): return RE_EMPTY_LINE.match(fp.peekline()) def consume(self, fp, ctx): fp.readline() if fp.line_no > 1: # Don't take into account the fake blank at 0 self.line_count += 1 return ANY_STATE def exit(self, ctx, next_state): if self.line_count > 0: text = self.line_count * '\n' last_para = ctx.document.lastParagraph() if last_para and last_para.type == TYPE_ACTION: last_para.text += text else: ctx.document.lastScene().addAction(text[1:]) ROOT_STATES = [ _ForcedParagraphStates, # Must be first. _SceneHeaderState, _CharacterState, _TransitionState, _PageBreakState, _CenteredActionState, _EmptyLineState, # Must be second to last. _ActionState, # Must be last. ] class _PeekableFile: def __init__(self, fp): self.line_no = 1 self._fp = fp self._blankAt0 = False def readline(self): if self._blankAt0: self._blankAt0 = False self.line_no = 0 return '\n' data = self._fp.readline() self.line_no += 1 return data def peekline(self): if self._blankAt0: return '\n' pos = self._fp.tell() line = self._fp.readline() self._fp.seek(pos) return line def peeklines(self, count): pos = self._fp.tell() lines = [] if self._blankAt0: lines.append('\n') count -= 1 for i in range(count): lines.append(self._fp.readline()) self._fp.seek(pos) return lines def snapshot(self): return (self._fp.tell(), self._blankAt0, self.line_no) def restore(self, snapshot): self._fp.seek(snapshot[0]) self._blankAt0 = snapshot[1] self.line_no = snapshot[2] def _addBlankAt0(self): if self._fp.tell() != 0: raise Exception( "Can't add blank line at 0 if reading has started.") self._blankAt0 = True self.line_no = 0 class _FontaineStateMachine: def __init__(self, fp, doc): self.fp = _PeekableFile(fp) self.state = None self.document = doc @property def line_no(self): return self.fp.line_no def run(self): # Start with the page title... unless it doesn't match, in which # case we start with a "pass through" state that will just return # `ANY_STATE` so we can start matching stuff. self.state = _TitlePageState() if not self.state.match(self.fp, self): logger.debug("No title page value found on line 1, " "using pass-through state with added blank line.") self.state = _PassThroughState() if not RE_EMPTY_LINE.match(self.fp.peekline()): # Add a fake empty line at the beginning of the text if # there's not one already. This makes state matching easier. self.fp._addBlankAt0() # Start parsing! Here we try to do a mostly-forward-only parser with # non overlapping regexes to make it decently fast. while True: logger.debug("State '%s' consuming from '%s'..." % (self.state.__class__.__name__, self.fp.peekline())) res = self.state.consume(self.fp, self) # See if we reached the end of the file. if not self.fp.peekline(): logger.debug("Reached end of line... ending parsing.") res = EOF_STATE # Figure out what to do next... if res is None: raise FontaineParserError( self.line_no, "State '%s' returned a `None` result. " "States need to return `ANY_STATE`, one or more specific " "states, or `EOF_STATE` if they reached the end of the " "file." % self.state.__class__.__name__) elif res is ANY_STATE or isinstance(res, list): # State wants to exit, we need to figure out what is the # next state. pos = self.fp._fp.tell() next_states = res if next_states is ANY_STATE: next_states = ROOT_STATES logger.debug("Trying to match next state from: %s" % [t.__name__ for t in next_states]) for sc in next_states: s = sc() if s.match(self.fp, self): logger.debug("Matched state %s" % s.__class__.__name__) self.fp._fp.seek(pos) res = s break else: raise Exception("Can't match following state after: %s" % self.state) # Handle the current state before we move on to the new one. if self.state: self.state.exit(self, res) self.state = res elif isinstance(res, FontaineState): # State wants to exit, wants a specific state to be next. if self.state: self.state.exit(self, res) self.state = res elif res is EOF_STATE: # Reached end of file. if self.state: self.state.exit(self, res) break else: raise Exception("Unsupported state result: %s" % res) class FontaineParser: def __init__(self): pass def parse(self, filein): if isinstance(filein, str): with open(filein, 'r') as fp: return self._doParse(fp) else: return self._doParse(fp) def parseString(self, text): import io with io.StringIO(text) as fp: return self._doParse(fp) def _doParse(self, fp): from .document import FontaineDocument doc = FontaineDocument() machine = _FontaineStateMachine(fp, doc) machine.run() return doc