view wikked/webimpl/__init__.py @ 395:fcd6caf13cf7 0.6.6

runserver: Make auto-resolve work in synchronous update mode. Pages need to auto-resolve themselves when nobody else will.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 14 Oct 2015 23:11:58 -0700
parents e551a6918907
children e28f2c76691c
line wrap: on
line source

import os.path
import logging
import datetime
import urllib.parse
from wikked.utils import (
        get_absolute_url, PageNotFoundError, split_page_url, is_endpoint_url)
from wikked.web import app


logger = logging.getLogger(__name__)


CHECK_FOR_READ = 1
CHECK_FOR_WRITE = 2


class CircularRedirectError(Exception):
    def __init__(self, url, visited):
        super(CircularRedirectError, self).__init__(
                "Circular redirect detected at '%s' "
                "after visiting: %s" % (url, visited))


class RedirectNotFound(Exception):
    def __init__(self, url, not_found):
        super(RedirectNotFound, self).__init__(
                "Target redirect page '%s' not found from '%s'." %
                (url, not_found))


class PermissionError(Exception):
    pass


def url_from_viewarg(url):
    if is_endpoint_url(url):
        return url
    return '/' + url


def split_url_from_viewarg(url):
    url = urllib.parse.unquote(url)
    endpoint, path = split_page_url(url)
    if endpoint:
        return (endpoint, path)
    return (None, '/' + path)


def get_page_or_raise(wiki, url, fields=None,
                      check_perms=None):
    auto_reload = app.config.get('WIKI_AUTO_RELOAD', False)
    if auto_reload is True and fields is not None:
        if 'path' not in fields:
            fields.append('path')
        if 'cache_time' not in fields:
            fields.append('cache_time')

    async_update = app.config.get('WIKI_ASYNC_UPDATE', False)
    if not async_update and fields is not None:
        if 'is_resolved' not in fields:
            fields.append('is_resolved')

    page = wiki.getPage(url, fields=fields)

    if auto_reload:
        path_time = datetime.datetime.fromtimestamp(
                os.path.getmtime(page.path))
        if path_time >= page.cache_time:
            logger.info("Page '%s' has changed, reloading." % url)
            wiki.updatePage(path=page.path)
            page = wiki.getPage(url, fields=fields)

    if not async_update:
        if not page.is_resolved:
            logger.info("Page '%s' was not resolved, resolving now." % url)
            wiki.resolve(only_urls=[url])
            wiki.index.updatePage(wiki.db.getPage(
                url, fields=['url', 'path', 'title', 'text']))
            page = wiki.getPage(url, fields=fields)

    if check_perms is not None:
        user, mode = check_perms
        if mode == CHECK_FOR_READ and not is_page_readable(page, user):
            raise PermissionError()
        elif mode == CHECK_FOR_WRITE and not is_page_writable(page, user):
            raise PermissionError()

    return page


def get_page_or_none(wiki, url, **kwargs):
    try:
        return get_page_or_raise(wiki, url, **kwargs)
    except PageNotFoundError:
        return None


def is_page_readable(page, user):
    return page.wiki.auth.isPageReadable(page, user)


def is_page_writable(page, user):
    return page.wiki.auth.isPageWritable(page, user)


def get_page_meta(page, local_only=False):
    if local_only:
        meta = dict(page.getLocalMeta() or {})
    else:
        meta = dict(page.getMeta() or {})
    meta['title'] = page.title
    meta['url'] = urllib.parse.quote(page.url.encode('utf-8'))
    for name in COERCE_META:
        if name in meta:
            meta[name] = COERCE_META[name](meta[name])
    return meta


def get_category_meta(category):
    result = []
    for item in category:
        result.append({
            'url': 'category:/' + urllib.parse.quote(item.encode('utf-8')),
            'name': item
            })
    return result


def get_redirect_target(wiki, path, fields=None,
                        check_perms=None, first_only=False):
    page = None
    orig_path = path
    visited_paths = []

    while True:
        page = get_page_or_none(
                wiki, path,
                fields=fields,
                check_perms=check_perms)
        if page is None:
            raise RedirectNotFound(orig_path, path)

        visited_paths.append(path)
        redirect_meta = page.getMeta('redirect')
        if redirect_meta is None:
            break

        path = get_absolute_url(path, redirect_meta)
        if first_only:
            visited_paths.append(path)
            break

        if path in visited_paths:
            raise CircularRedirectError(path, visited_paths)

    return page, visited_paths


COERCE_META = {
    'category': get_category_meta
    }


def get_or_build_pagelist(wiki, list_name, builder, fields=None,
                          build_inline=True):
    # If the wiki is using background jobs, we can accept invalidated
    # lists... it just means that a background job is hopefully still
    # just catching up.
    # Otherwise, everything is synchronous and we need to build the
    # list if needed.
    page_list = wiki.db.getPageListOrNone(list_name, fields=fields,
                                          valid_only=build_inline)
    if page_list is None and build_inline:
        logger.info("Regenerating list: %s" % list_name)
        page_list = builder()
        wiki.db.addPageList(list_name, page_list)

    return page_list


def get_generic_pagelist_builder(wiki, filter_func, fields=None):
    fields = fields or ['url', 'title', 'meta']

    def builder():
        # Make sure all pages have been resolved.
        wiki.resolve()

        pages = []
        for page in wiki.getPages(
                no_endpoint_only=True,
                fields=fields):
            try:
                if filter_func(page):
                    pages.append(page)
            except Exception as e:
                logger.error("Error while inspecting page: %s" % page.url)
                logger.error("   %s" % e)
        return pages

    return builder