view wikked/wiki.py @ 49:fb6ae96756c1

Added unit tests. Refactored core APIs to make them more testable. Removed unused stuff like caching the configuration in the SQL database. Fixed the web bootstrap. Some cosmetic changes to be PEP8 compliant.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 28 Jan 2013 23:13:04 -0800
parents 86ee1b696070
children 350f7f084028
line wrap: on
line source

import os
import os.path
import time
import logging
import itertools
from ConfigParser import SafeConfigParser
import markdown
import textile
import creole
from page import Page, DatabasePage
from fs import FileSystem
from db import SQLiteDatabase, conn_scope
from scm import MercurialSourceControl
from indexer import WhooshWikiIndex
from auth import UserManager


def passthrough_formatter(text):
    """ Passthrough formatter. Pretty simple stuff.
    """
    return text


class InitializationError(Exception):
    """ An exception that can get raised while the wiki gets
        initialized.
    """
    pass


class WikiParameters(object):
    """ An object that defines how a wiki gets initialized.
    """
    def __init__(self, root=None):
        if root is None:
            root = os.getcwd()
        self.root = root

        self.formatters = {
            markdown.markdown: ['md', 'mdown', 'markdown'],
            textile.textile: ['tl', 'text', 'textile'],
            creole.creole2html: ['cr', 'creole'],
            passthrough_formatter: ['txt', 'html']
        }
        self.config_path = os.path.join(self.root, '.wikirc')
        self.index_path = os.path.join(self.root, '.wiki', 'index')
        self.db_path = os.path.join(self.root, '.wiki', 'wiki.db')

        self.page_factory = DatabasePage.factory

    def logger_factory(self):
        if getattr(self, 'logger', None):
            return self.logger
        return logging.getLogger('wikked.wiki')

    def config_factory(self):
        return open(self.config_path)

    def fs_factory(self, config):
        return FileSystem(self.root, slugify=Page.title_to_url, logger=self.logger_factory())

    def index_factory(self, config):
        return WhooshWikiIndex(self.index_path, logger=self.logger_factory())

    def db_factory(self, config):
        return SQLiteDatabase(self.db_path, logger=self.logger_factory())

    def scm_factory(self, config):
        scm_type = config.get('wiki', 'scm')
        if scm_type == 'hg':
            return MercurialSourceControl(self.root, logger=self.logger_factory())
        else:
            raise InitializationError("No such source control: " + scm_type)

    def getSpecialFilenames(self):
        yield self.config_path
        yield os.path.join(self.root, '.wiki')


class Wiki(object):
    """ The wiki class! This is where the magic happens.
    """
    def __init__(self, parameters):
        """ Creates a new wiki instance. It won't be fully functional
            until you call `start`, which does the actual initialization.
            This gives you a chance to customize a few more things before
            getting started.
        """
        if parameters is None:
            raise ValueError("No parameters were given to the wiki.")

        self.logger = parameters.logger_factory()
        self.logger.debug("Initializing wiki.")

        self.config = self._loadConfig(parameters)

        self.formatters = parameters.formatters
        self.page_factory = DatabasePage.factory

        self.fs = parameters.fs_factory(self.config)
        self.index = parameters.index_factory(self.config)
        self.db = parameters.db_factory(self.config)
        self.scm = parameters.scm_factory(self.config)

        self.auth = UserManager(self.config, logger=self.logger)

        self.fs.page_extensions = list(set(
            itertools.chain(*self.formatters.itervalues())))
        self.fs.excluded += parameters.getSpecialFilenames()
        self.fs.excluded += self.scm.getSpecialFilenames()

    def start(self, update=True):
        """ Properly initializes the wiki and all its sub-systems.
        """
        self.scm.initRepo()
        self.index.initIndex()
        self.db.initDb()

        if update:
            with conn_scope(self.db):
                self.db.update(self.getPages(from_db=False, factory=Page.factory))
                self.index.update(self.getPages())

    def stop(self):
        self.db.close()

    def getPageUrls(self, subdir=None, from_db=True):
        """ Returns all the page URLs in the wiki, or in the given
            sub-directory.
            By default, it queries the DB, but it can query the file-system
            directly if `from_db` is `False`.
        """
        if from_db:
            for url in self.db.getPageUrls(subdir):
                yield url
        else:
            for info in self.fs.getPageInfos(subdir):
                yield info['url']

    def getPages(self, subdir=None, from_db=True, factory=None):
        """ Gets all the pages in the wiki, or in the given sub-directory.
            By default it will use the DB to fetch the list of pages, but it
            can scan the file-system directly if `from_db` is `False`. If
            that's the case, it's probably a good idea to provide a custom
            `factory` for creating `Page` instances, since by default it will
            use `DatabasePage` which also uses the DB to load its information.
        """
        if factory is None:
            factory = self.page_factory
        for url in self.getPageUrls(subdir, from_db):
            yield factory(self, url)

    def getPage(self, url, factory=None):
        """ Gets the page for a given URL.
        """
        if factory is None:
            factory = self.page_factory
        return factory(self, url)

    def setPage(self, url, page_fields):
        """ Updates or creates a page for a given URL.
        """
        # Validate the parameters.
        if 'author' not in page_fields:
            raise ValueError(
                "No author specified for editing page '%s'." % url)
        if 'message' not in page_fields:
            raise ValueError(
                "No commit message specified for editing page '%s'." % url)

        # Save the new/modified text.
        do_commit = False
        path = self.fs.getPhysicalPagePath(url)
        if 'text' in page_fields:
            self.fs.setPage(path, page_fields['text'])
            do_commit = True

        # Commit the file to the source-control.
        if do_commit:
            commit_meta = {
                    'author': page_fields['author'],
                    'message': page_fields['message']
                    }
            self.scm.commit([path], commit_meta)

        # Update the DB and index with the new/modified page.
        self.db.update([self.getPage(url)])
        self.index.update([self.getPage(url)])

    def pageExists(self, url, from_db=True):
        """ Returns whether a page exists at the given URL.
            By default it will query the DB, but it can query the underlying
            file-system directly if `from_db` is `False`.
        """
        if from_db:
            return self.db.pageExists(url)
        return self.fs.pageExists(url)

    def getHistory(self):
        """ Shorthand method to get the history from the source-control.
        """
        return self.scm.getHistory()

    def _loadConfig(self, parameters):
        # Merge the default settings with any settings provided by
        # the parameters.
        default_config_path = os.path.join(
            os.path.dirname(__file__), 'resources', 'defaults.cfg')
        config = SafeConfigParser()
        config.readfp(open(default_config_path))

        fp = parameters.config_factory()
        config.readfp(fp)
        fp.close()

        return config


def reloader_stat_loop(wiki, interval=1):
    mtimes = {}
    while 1:
        for page_info in wiki.fs.getPageInfos():
            path = page_info['path']
            try:
                mtime = os.stat(path).st_mtime
            except OSError:
                continue

            old_time = mtimes.get(path)
            if old_time is None:
                mtimes[path] = mtime
                continue
            elif mtime > old_time:
                print "Change detected in '%s'." % path
        time.sleep(interval)