Mercurial > wikked
view wikked/scm.py @ 125:886f36b05e5f
More optimized `watch` mode for `grunt`.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 24 Nov 2013 14:04:05 -0800 |
parents | 6c261cb94631 |
children | 9079fb01abb8 |
line wrap: on
line source
import re import os import os.path import time import logging import tempfile import subprocess try: import pygit2 SUPPORTS_GIT = True except ImportError: SUPPORTS_GIT = False STATE_COMMITTED = 0 STATE_MODIFIED = 1 STATE_NEW = 2 STATE_NAMES = ['committed', 'modified', 'new'] ACTION_ADD = 0 ACTION_DELETE = 1 ACTION_EDIT = 2 ACTION_NAMES = ['add', 'delete', 'edit'] class SourceControl(object): def __init__(self, logger=None): self.logger = logger if logger is None: self.logger = logging.getLogger('wikked.scm') def initRepo(self): raise NotImplementedError() def getSpecialFilenames(self): raise NotImplementedError() def getHistory(self, path=None): raise NotImplementedError() def getState(self, path): raise NotImplementedError() def getRevision(self, path, rev): raise NotImplementedError() def diff(self, path, rev1, rev2): raise NotImplementedError() def commit(self, paths, op_meta): raise NotImplementedError() def revert(self, paths=None): raise NotImplementedError() class Revision(object): def __init__(self, rev_id=-1): self.rev_id = rev_id self.rev_name = rev_id self.author = None self.timestamp = 0 self.description = None self.files = [] @property def is_local(self): return self.rev_id == -1 @property def is_committed(self): return self.rev_id != -1 class MercurialBaseSourceControl(SourceControl): def __init__(self, root, logger=None): SourceControl.__init__(self, logger) self.root = root self.actions = { 'A': ACTION_ADD, 'R': ACTION_DELETE, 'M': ACTION_EDIT } def initRepo(self): # Make a Mercurial repo if there's none. if not os.path.isdir(os.path.join(self.root, '.hg')): self.logger.info("Creating Mercurial repository at: " + self.root) self._run('init', self.root, norepo=True) # Create a `.hgignore` file is there's none. ignore_path = os.path.join(self.root, '.hgignore') if not os.path.isfile(ignore_path): self.logger.info("Creating `.hgignore` file.") with open(ignore_path, 'w') as f: f.write('.wiki') self._run('add', ignore_path) self._run('commit', ignore_path, '-m', 'Created .hgignore.') def getSpecialFilenames(self): specials = ['.hg', '.hgignore', '.hgtags'] return [os.path.join(self.root, d) for d in specials] def _run(self, cmd, *args, **kwargs): exe = [self.hg] if 'norepo' not in kwargs or not kwargs['norepo']: exe += ['-R', self.root] exe.append(cmd) exe += args self.logger.debug("Running Mercurial: " + str(exe)) return subprocess.check_output(exe) class MercurialSourceControl(MercurialBaseSourceControl): def __init__(self, root, logger=None): MercurialBaseSourceControl.__init__(self, root, logger) self.hg = 'hg' self.log_style = os.path.join(os.path.dirname(__file__), 'resources', 'hg_log.style') def getHistory(self, path=None): if path is not None: st_out = self._run('status', path) if len(st_out) > 0 and st_out[0] == '?': return [] log_args = [] if path is not None: log_args.append(path) log_args += ['--style', self.log_style] log_out = self._run('log', *log_args) revisions = [] for group in log_out.split("$$$\n"): if group == '': continue revisions.append(self._parseRevision(group)) return revisions def getState(self, path): st_out = self._run('status', path) if len(st_out) > 0: if st_out[0] == '?' or st_out[0] == 'A': return STATE_NEW if st_out[0] == 'M': return STATE_MODIFIED return STATE_COMMITTED def getRevision(self, path, rev): cat_out = self._run('cat', '-r', rev, path) return cat_out def diff(self, path, rev1, rev2): if rev2 is None: diff_out = self._run('diff', '-c', rev1, '--git', path) else: diff_out = self._run('diff', '-r', rev1, '-r', rev2, '--git', path) return diff_out def commit(self, paths, op_meta): if 'message' not in op_meta or not op_meta['message']: raise ValueError("No commit message specified.") # Check if any of those paths needs to be added. st_out = self._run('status', *paths) add_paths = [] for line in st_out.splitlines(): if line[0] == '?': add_paths.append(line[2:]) if len(add_paths) > 0: self._run('add', *paths) # Create a temp file with the commit message. f, temp = tempfile.mkstemp() with os.fdopen(f, 'w') as fd: fd.write(op_meta['message']) # Commit and clean up the temp file. try: commit_args = list(paths) + ['-l', temp] if 'author' in op_meta: commit_args += ['-u', op_meta['author']] self._run('commit', *commit_args) finally: os.remove(temp) def revert(self, paths=None): if paths is not None: self._run('revert', '-C', paths) else: self._run('revert', '-a', '-C') def _parseRevision(self, group): lines = group.split("\n") m = re.match(r'(\d+) ([0-9a-f]+) \[([^\]]+)\] ([^ ]+)', lines[0]) if m is None: raise Exception('Error parsing history from Mercurial, got: ' + lines[0]) rev = Revision() rev.rev_id = int(m.group(1)) rev.rev_name = rev.rev_id[:12] rev.rev_hash = m.group(2) rev.author = m.group(3) rev.timestamp = float(m.group(4)) i = 1 rev.description = '' while lines[i] != '---': if i > 1: rev.description += "\n" rev.description += lines[i] i += 1 rev.files = [] for j in range(i + 1, len(lines)): if lines[j] == '': continue rev.files.append({ 'path': lines[j][2:], 'action': self.actions[lines[j][0]] }) return rev class MercurialCommandServerSourceControl(MercurialBaseSourceControl): def __init__(self, root, logger=None): MercurialBaseSourceControl.__init__(self, root, logger) import hglib self.client = hglib.open(self.root) def getHistory(self, path=None): if path is not None: rel_path = os.path.relpath(path, self.root) status = self.client.status(include=[rel_path]) if len(status) > 0 and status[0] == '?': return [] needs_files = False if path is not None: repo_revs = self.client.log(files=[path], follow=True) else: needs_files = True repo_revs = self.client.log(follow=True) revisions = [] for rev in repo_revs: r = Revision(rev.node) r.rev_name = rev.node[:12] r.author = unicode(rev.author) r.timestamp = time.mktime(rev.date.timetuple()) r.description = unicode(rev.desc) if needs_files: rev_statuses = self.client.status(change=rev.node) for rev_status in rev_statuses: r.files.append({ 'path': rev_status[1].decode('utf-8', 'replace'), 'action': self.actions[rev_status[0]] }) revisions.append(r) return revisions def getState(self, path): rel_path = os.path.relpath(path, self.root) statuses = self.client.status(include=[rel_path]) if len(statuses) == 0: return STATE_COMMITTED status = statuses[0] if status[0] == '?' or status[0] == 'A': return STATE_NEW if status[0] == 'M': return STATE_MODIFIED raise Exception("Unsupported status: %s" % status) def getRevision(self, path, rev): rel_path = os.path.relpath(path, self.root) return self.client.cat([rel_path], rev=rev) def diff(self, path, rev1, rev2): rel_path = os.path.relpath(path, self.root) if rev2 is None: return self.client.diff(files=[rel_path], change=rev1, git=True) return self.client.diff(files=[rel_path], revs=[rev1, rev2], git=True) def commit(self, paths, op_meta): if 'message' not in op_meta or not op_meta['message']: raise ValueError("No commit message specified.") # Get repo-relative paths. rel_paths = [os.path.relpath(p, self.root) for p in paths] # Check if any of those paths needs to be added. status = self.client.status(unknown=True) add_paths = [] for s in status: if s[1] in rel_paths: add_paths.append(s[1]) if len(add_paths) > 0: self.client.add(files=add_paths) # Commit! if 'author' in op_meta: self.client.commit(include=rel_paths, message=op_meta['message'], user=op_meta['author']) else: self.client.commit(include=rel_paths, message=op_meta['message']) def revert(self, paths=None): if paths is not None: rel_paths = [os.path.relpath(p, self.root) for p in paths] self.client.revert(files=rel_paths, nobackup=True) else: self.client.revert(all=True, nobackup=True) class GitBaseSourceControl(SourceControl): def __init__(self, root, logger=None): SourceControl.__init__(self, logger) self.root = root def initRepo(self): # Make a Git repo if there's none. if not os.path.isdir(os.path.join(self.root, '.git')): self.logger.info("Creating Git repository at: " + self.root) self._initRepo(self.root) # Create a `.gitignore` file there's none. ignore_path = os.path.join(self.root, '.gitignore') if not os.path.isfile(ignore_path): self.logger.info("Creating `.gitignore` file.") with open(ignore_path, 'w') as f: f.write('.wiki') self._add(ignore_path) self._commit('Created .gitignore.', [ignore_path]) def getSpecialFilenames(self): specials = ['.git', '.gitignore'] return [os.path.join(self.root, d) for d in specials] def getState(self, path): return self._status(path) def _run(self, cmd, *args, **kwargs): exe = [self.git] if 'norepo' not in kwargs or not kwargs['norepo']: exe.append('--git-dir="%s"' % self.root) exe.append(cmd) exe += args self.logger.debug("Running Git: " + str(exe)) return subprocess.check_output(exe) class GitLibSourceControl(GitBaseSourceControl): def __init__(self, root, logger=None): if not SUPPORTS_GIT: raise Exception("Can't support Git because pygit2 is not available.") GitBaseSourceControl.__init__(self, root, logger) def initRepo(self): GitBaseSourceControl.initRepo(self) self.repo = pygit2.Repository(self.root) def _initRepo(self, path): pygit2.init_repository(path, False) def _add(self, paths): pass def _commit(self, message, paths): pass def _status(self, path): flags = self.repo.status_file(self._getRepoPath(path)) if flags == pygit2.GIT_STATUS_CURRENT: return STATE_COMMITTED if (flags & pygit2.GIT_STATUS_WT_MODIFIED or flags & pygit2.GIT_STATUS_INDEX_MODIFIED): return STATE_MODIFIED if (flags & pygit2.GIT_STATUS_WT_NEW or flags & pygit2.GIT_STATUS_INDEX_NEW): return STATE_NEW raise Exception("Unsupported status flag combination: %s" % flags) def _getRepoPath(self, path): return os.path.relpath(path, self.root).replace('\\', '/')