Mercurial > wikked
view wikked/scm.py @ 19:884eb6c8edf0
Added "wiki history" special page:
- Added ability to get the whole repo history from the `scm`.
- Added ability to get pages changed in a revision.
- Added "wiki history" page (not complete).
- Better formatting for revision dates.
Fixed bugs with revision and page diff views:
- Using IDs instead of hashes if possible
- Using query parameters instead of path components.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 03 Jan 2013 08:00:24 -0800 |
parents | 793234411100 |
children | 60691eefbf67 |
line wrap: on
line source
import re import os import os.path import logging import tempfile import subprocess STATE_COMMITTED = 0 STATE_MODIFIED = 1 STATE_NEW = 2 STATE_NAMES = ['committed', 'modified', 'new'] ACTION_ADD = 0 ACTION_DELETE = 1 ACTION_EDIT = 2 ACTION_NAMES = ['add', 'delete', 'edit'] class SourceControl(object): def __init__(self, root, logger=None): self.root = root self.logger = logger if logger is None: self.logger = logging.getLogger('wikked.scm') def getSpecialDirs(self): raise NotImplementedError() def getHistory(self, path=None): raise NotImplementedError() def getState(self, path): raise NotImplementedError() def getRevision(self, path, rev): raise NotImplementedError() def diff(self, path, rev1, rev2): raise NotImplementedError() def commit(self, paths, op_meta): raise NotImplementedError() def revert(self, paths=None): raise NotImplementedError() class Revision(object): def __init__(self, rev_id=-1): self.rev_id = rev_id self.author = None self.timestamp = 0 self.description = None self.files = [] @property def is_local(self): return self.rev_id == -1 @property def is_committed(self): return self.rev_id != -1 class MercurialSourceControl(SourceControl): def __init__(self, root, logger=None): SourceControl.__init__(self, root, logger) self.hg = 'hg' if not os.path.isdir(os.path.join(root, '.hg')): self._run('init', root, norepo=True) ignore_path = os.path.join(root, '.hgignore') if not os.path.isfile(ignore_path): with open(ignore_path, 'w') as f: f.write('.cache') self._run('add', ignore_path) self._run('commit', ignore_path, '-m', 'Created .hgignore.') self.log_style = os.path.join(os.path.dirname(__file__), 'resources', 'hg_log.style') self.actions = { 'A': ACTION_ADD, 'R': ACTION_DELETE, 'M': ACTION_EDIT } def getSpecialDirs(self): specials = [ '.hg', '.hgignore', '.hgtags' ] return [ os.path.join(self.root, d) for d in specials ] def getHistory(self, path=None): if path is not None: st_out = self._run('status', path) if len(st_out) > 0 and st_out[0] == '?': return [ Revision() ] log_args = [] if path is not None: log_args.append(path) log_args += ['--style', self.log_style] log_out = self._run('log', *log_args) revisions = [] for group in log_out.split("$$$\n"): if group == '': continue revisions.append(self._parseRevision(group)) return revisions def getState(self, path): st_out = self._run('status', path) if len(st_out) > 0: if st_out[0] == '?' or st_out[0] == 'A': return STATE_NEW if st_out[0] == 'M': return STATE_MODIFIED return STATE_COMMITTED def getRevision(self, path, rev): cat_out = self._run('cat', '-r', rev, path) return cat_out def diff(self, path, rev1, rev2): if rev2 is None: diff_out = self._run('diff', '-c', rev1, '--git', path); else: diff_out = self._run('diff', '-r', rev1, '-r', rev2, '--git', path) return diff_out def commit(self, paths, op_meta): if 'message' not in op_meta or not op_meta['message']: raise ValueError("No commit message specified.") # Check if any of those paths needs to be added. st_out = self._run('status', *paths) add_paths = [] for line in st_out.splitlines(): if line[0] == '?': add_paths.append(line[2:]) if len(add_paths) > 0: self._run('add', *paths) # Create a temp file with the commit message. f, temp = tempfile.mkstemp() with os.fdopen(f, 'w') as fd: fd.write(op_meta['message']) # Commit and clean up the temp file. try: commit_args = list(paths) + [ '-l', temp ] if 'author' in op_meta: commit_args += [ '-u', op_meta['author'] ] self._run('commit', *commit_args) finally: os.remove(temp) def revert(self, paths=None): if paths is not None: self._run('revert', '-C', paths) else: self._run('revert', '-a', '-C') def _parseRevision(self, group): lines = group.split("\n") m = re.match(r'(\d+) ([0-9a-f]+) \[([^\]]+)\] ([^ ]+)', lines[0]) if m is None: raise Exception('Error parsing history from Mercurial, got: ' + lines[0]) rev = Revision() rev.rev_id = int(m.group(1)) rev.rev_hash = m.group(2) rev.author = m.group(3) rev.timestamp = float(m.group(4)) i = 1 rev.description = '' while lines[i] != '---': if i > 1: rev.description += "\n" rev.description += lines[i] i += 1 rev.files = [] for j in range(i + 1, len(lines)): if lines[j] == '': continue rev.files.append({ 'path': lines[j][2:], 'action': self.actions[lines[j][0]] }) return rev def _run(self, cmd, *args, **kwargs): exe = [ self.hg ] if 'norepo' not in kwargs or not kwargs['norepo']: exe += [ '-R', self.root ] exe.append(cmd) exe += args self.logger.debug("Running Mercurial: " + str(exe)) return subprocess.check_output(exe)