changeset 239:0e87dc411fe9

Better way to manage updating the wiki. * Make the difference between updating just a single page, and updating the whole wiki. * Properly resolve a just-updated page, but postpone resolving all the rest if we're using background updates. * Don't flatten single metas for the web UI.
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 25 Mar 2014 22:09:14 -0700
parents c22cdd051afc
children 68d6524a8333
files wikked/commands/manage.py wikked/commands/web.py wikked/db/base.py wikked/db/sql.py wikked/indexer/base.py wikked/indexer/elastic.py wikked/indexer/whooshidx.py wikked/tasks.py wikked/views/__init__.py wikked/wiki.py
diffstat 10 files changed, 160 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- a/wikked/commands/manage.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/commands/manage.py	Tue Mar 25 22:09:14 2014 -0700
@@ -90,7 +90,7 @@
                 nargs='?')
 
     def run(self, ctx):
-        ctx.wiki.update(path=ctx.args.path)
+        ctx.wiki.updatePage(path=ctx.args.path)
 
         if ctx.args.debug and ctx.args.path:
             page_info = ctx.wiki.fs.getPageInfo(ctx.args.path)
--- a/wikked/commands/web.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/commands/web.py	Tue Mar 25 22:09:14 2014 -0700
@@ -42,7 +42,7 @@
 
         app.wiki_params = ctx.params
         if bool(app.config.get('UPDATE_WIKI_ON_START')):
-            ctx.wiki.update()
+            ctx.wiki.updateAll()
 
         # Run!
         debug_mode = ctx.args.dev or app.config.get('DEBUG', False)
--- a/wikked/db/base.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/db/base.py	Tue Mar 25 22:09:14 2014 -0700
@@ -22,7 +22,10 @@
     def reset(self, page_infos, page_factory):
         pass
 
-    def update(self, page_infos, page_factory, force=False):
+    def updatePage(self, page_info):
+        pass
+
+    def updateAll(self, page_infos, force=False):
         pass
 
     def getPageUrls(self, subdir=None, uncached_only=False):
@@ -47,12 +50,12 @@
             raise PageNotFoundError(url or path)
         return page
 
+    def isPageValid(self, url):
+        return True
+
     def cachePage(self, page):
         pass
 
-    def isCacheValid(self, page):
-        raise NotImplementedError()
-
     def pageExists(self, url=None, path=None):
         raise NotImplementedError()
 
--- a/wikked/db/sql.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/db/sql.py	Tue Mar 25 22:09:14 2014 -0700
@@ -6,7 +6,7 @@
 import datetime
 from sqlalchemy import (
     create_engine,
-    and_, or_,
+    and_,
     Column, Boolean, Integer, DateTime, ForeignKey,
     String, Text, UnicodeText)
 from sqlalchemy.ext.declarative import declarative_base
@@ -15,7 +15,7 @@
     relationship, backref, load_only, subqueryload)
 from sqlalchemy.orm.exc import NoResultFound
 from wikked.db.base import Database
-from wikked.page import Page, PageData
+from wikked.page import Page, PageData, FileSystemPage
 
 
 logger = logging.getLogger(__name__)
@@ -216,7 +216,28 @@
             self._addPage(page)
         self.session.commit()
 
-    def update(self, page_infos, page_factory, force=False):
+    def updatePage(self, page_info):
+        if self._needsSchemaUpdate():
+            raise Exception("This wiki needs a database update. "
+                            "Please run `wk reset`.")
+
+        logger.debug("Updating SQL database for page: %s" % page_info.url)
+
+        db_page = self.session.query(SQLPage).\
+                options(load_only('id', 'url')).\
+                filter(SQLPage.url == page_info.url).\
+                first()
+        if db_page:
+            logger.debug("Removing page '%s' [%d] from SQL database." %
+                    (db_page.url, db_page.id))
+            self.session.delete(db_page)
+            self.session.commit()
+
+        page = FileSystemPage(self.wiki, page_info)
+        self._addPage(page)
+        self.session.commit()
+
+    def updateAll(self, page_infos, force=False):
         if self._needsSchemaUpdate():
             raise Exception("This wiki needs a database upgrade. "
                             "Please run `wk reset`.")
@@ -254,26 +275,11 @@
         for pi in page_infos:
             if (pi.path in to_update or
                     pi.path not in already_added):
-                page = page_factory(pi)
+                page = FileSystemPage(self.wiki, pi)
                 added_db_objs.append(self._addPage(page))
 
         self.session.commit()
 
-        if to_remove or added_db_objs:
-            # If pages have been added/removed/updated, invalidate everything
-            # in the wiki that has includes or queries.
-            db_pages = self.session.query(SQLPage.id, SQLPage.is_ready,
-                                          SQLPage.ready_meta).\
-                join(SQLReadyMeta).\
-                filter(or_(
-                    SQLReadyMeta.name == 'include',
-                    SQLReadyMeta.name == 'query')).\
-                all()
-            for p in db_pages:
-                p.is_ready = False
-
-            self.session.commit()
-
         logger.debug("...done updating SQL database.")
         return [o.id for o in added_db_objs]
 
@@ -305,15 +311,30 @@
         for p in q.all():
             yield SQLDatabasePage(self, p, fields)
 
-    def isCacheValid(self, page):
+    def isPageValid(self, url):
         db_obj = self.session.query(SQLPage).\
-            options(load_only('id', 'path', 'time')).\
-            filter(SQLPage.id == page._id).\
-            one()
+            options(load_only('id', 'url', 'path', 'time')).\
+            filter(SQLPage.url == url).\
+            first()
+        if not db_obj:
+            return False
         path_time = datetime.datetime.fromtimestamp(
             os.path.getmtime(db_obj.path))
         return path_time < db_obj.time
 
+    def invalidateCache(self, ids):
+        if not isinstance(ids, list):
+            ids = list(ids)
+        logger.debug("Invalidating %d page caches in SQL database." % len(ids))
+
+        db_pages = self.session.query(SQLPage).\
+            options(load_only('id', 'url', 'is_ready')).\
+            filter(SQLPage.id.in_(ids)).\
+            all()
+        for p in db_pages:
+            p.is_ready = False
+        self.session.commit()
+
     def cachePage(self, page):
         if not hasattr(page, '_id') or not page._id:
             raise Exception("Given page '%s' has no `_id` attribute set." % page.url)
@@ -392,7 +413,8 @@
                 'local_meta': 'meta',
                 'local_links': 'links',
                 'meta': 'ready_meta',
-                'links': 'ready_links'}
+                'links': 'ready_links',
+                'text': 'ready_text'}
         subqueryfields = {
                 'local_meta': SQLPage.meta,
                 'local_links': SQLPage.links,
--- a/wikked/indexer/base.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/indexer/base.py	Tue Mar 25 22:09:14 2014 -0700
@@ -23,7 +23,10 @@
     def reset(self, pages):
         pass
 
-    def update(self, pages):
+    def updatePage(self, page):
+        pass
+
+    def updateAll(self, pages):
         pass
 
     def search(self, query):
--- a/wikked/indexer/elastic.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/indexer/elastic.py	Tue Mar 25 22:09:14 2014 -0700
@@ -95,7 +95,17 @@
         actions = action_maker()
         bulk_index(self.es, actions)
 
-    def update(self, pages):
+    def updatePage(self, page):
+        body = {
+                'fields': ['url'],
+                'query': {'term': {'url': page.url}}}
+        docs = self.es.search(index='pages', doc_type='page', body=body)
+        docs = list(docs)
+        if len(docs) > 0:
+            self.es.delete(index='pages', doc_type='page', id=docs[0]['_id'])
+        self.es.index(index='page', doc_type='page', body=self._get_body(page))
+
+    def updateAll(self, pages):
         to_reindex = set()
         already_indexed = set()
 
--- a/wikked/indexer/whooshidx.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/indexer/whooshidx.py	Tue Mar 25 22:09:14 2014 -0700
@@ -35,7 +35,14 @@
             self._indexPage(writer, page)
         writer.commit()
 
-    def update(self, pages):
+    def updatePage(self, page):
+        logger.info("Updating index for page: %s" % page.url)
+        writer = self.ix.writer()
+        self._unindexPage(writer, page.url)
+        self._indexPage(writer, page)
+        writer.commit()
+
+    def updateAll(self, pages):
         logger.info("Updating index...")
         to_reindex = set()
         already_indexed = set()
--- a/wikked/tasks.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/tasks.py	Tue Mar 25 22:09:14 2014 -0700
@@ -34,5 +34,5 @@
 @app.task
 def update_wiki(wiki_root):
     with wiki_session(wiki_root) as wiki:
-        wiki.update()
+        wiki._postSetPageUpdate()
 
--- a/wikked/views/__init__.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/views/__init__.py	Tue Mar 25 22:09:14 2014 -0700
@@ -3,7 +3,7 @@
 from flask import g, abort, jsonify
 from flask.ext.login import current_user
 from wikked.fs import PageNotFoundError
-from wikked.utils import split_page_url, flatten_single_metas
+from wikked.utils import split_page_url
 from wikked.web import app
 
 
@@ -34,21 +34,21 @@
 def get_page_or_none(url, convert_url=True, check_perms=DONT_CHECK, force_resolve=False):
     if convert_url:
         url = url_from_viewarg(url)
+
     try:
+        if app.config.get('WIKI_AUTO_RELOAD'):
+            if not g.wiki.db.isPageValid(url):
+                app.logger.info("Page '%s' has changed, reloading." % url)
+                g.wiki.updatePage(url=url)
+            else:
+                app.logger.debug("Page '%s' is up to date." % url)
+        elif force_resolve:
+            g.wiki.resolve(only_urls=[url], force=True)
+
         page = g.wiki.getPage(url)
     except PageNotFoundError:
         return None
 
-    if app.config.get('WIKI_AUTO_RELOAD'):
-        if not g.wiki.db.isCacheValid(page):
-            app.logger.info("Page '%s' has changed, reloading." % url)
-            g.wiki.update(url)
-        else:
-            app.logger.debug("Page '%s' is up to date." % url)
-    elif force_resolve:
-        g.wiki._cachePages([url], force_resolve=True)
-        page = g.wiki.getPage(url)
-
     if check_perms == CHECK_FOR_READ and not is_page_readable(page):
         abort(401)
     elif check_perms == CHECK_FOR_WRITE and not is_page_writable(page):
@@ -78,7 +78,6 @@
         meta = dict(page.getLocalMeta())
     else:
         meta = dict(page.getMeta())
-    flatten_single_metas(meta)
     meta['title'] = page.title
     meta['url'] = urllib.quote(page.url.encode('utf-8'))
     for name in COERCE_META:
--- a/wikked/wiki.py	Mon Mar 24 23:05:19 2014 -0700
+++ b/wikked/wiki.py	Tue Mar 25 22:09:14 2014 -0700
@@ -126,18 +126,6 @@
             for name, val in self.config.items('ignore'):
                 yield val
 
-    def getPageUpdater(self):
-        if self._page_updater is None:
-            if self.config.getboolean('wiki', 'async_updates'):
-                logger.debug("Setting up asynchronous updater.")
-                from tasks import update_wiki
-                self._page_updater = lambda wiki, url: update_wiki.delay(
-                    self.root)
-            else:
-                logger.debug("Setting up simple updater.")
-                self._page_updater = lambda wiki, url: wiki.update(url)
-        return self._page_updater
-
     def tryAddFormatter(self, formatters, module_name, module_func,
                         extensions):
         try:
@@ -198,7 +186,8 @@
         self.scm = parameters.scm_factory(for_init)
         self.auth = parameters.auth_factory()
 
-        self._updateSetPage = parameters.getPageUpdater()
+        async_updates = parameters.config.getboolean('wiki', 'async_updates')
+        self._postSetPageUpdate = self._getPostSetPageUpdater(async_updates)
 
     @property
     def root(self):
@@ -212,7 +201,7 @@
             o.start(self)
 
         if update:
-            self.update()
+            self.updateAll()
 
     def init(self):
         """ Creates a new wiki at the specified root directory.
@@ -225,9 +214,13 @@
             o.postInit()
 
     def stop(self):
+        """ De-initializes the wiki and its sub-systems.
+        """
         self.db.close()
 
     def reset(self):
+        """ Clears all the cached data and rebuilds it from scratch.
+        """
         logger.info("Resetting wiki data...")
         page_infos = self.fs.getPageInfos()
         factory = lambda pi: FileSystemPage(self, pi)
@@ -236,6 +229,9 @@
         self.index.reset(self.getPages())
 
     def resolve(self, only_urls=None, force=False, parallel=False):
+        """ Compute the final info (text, meta, links) of all or a subset of
+            the pages, and caches it in the DB.
+        """
         logger.debug("Resolving pages...")
         if only_urls:
             page_urls = only_urls
@@ -246,24 +242,47 @@
         s = ResolveScheduler(self, page_urls)
         s.run(num_workers)
 
-    def update(self, url=None, path=None):
-        logger.info("Updating pages...")
-        factory = lambda pi: FileSystemPage(self, pi)
-        if url or path:
-            if url and path:
-                raise Exception("Can't specify both an URL and a path.")
-            if path:
-                page_info = self.fs.getPageInfo(path)
-            else:
-                page_info = self.fs.findPageInfo(url)
-            self.db.update([page_info], factory, force=True)
-            self.resolve(only_urls=[page_info.url])
-            self.index.update([self.getPage(page_info.url)])
+    def updatePage(self, url=None, path=None):
+        """ Completely updates a single page, i.e. read it from the file-system
+            and have it fully resolved and cached in the DB.
+        """
+        if url and path:
+            raise Exception("Can't specify both an URL and a path.")
+        logger.info("Updating page: %s" % (url or path))
+        if path:
+            page_info = self.fs.getPageInfo(path)
         else:
-            page_infos = self.fs.getPageInfos()
-            self.db.update(page_infos, factory)
-            self.resolve()
-            self.index.update(self.getPages())
+            page_info = self.fs.findPageInfo(url)
+        self.db.updatePage(page_info)
+        self.resolve(only_urls=[page_info.url])
+        self.index.updatePage(self.db.getPage(
+            page_info.url,
+            fields=['url', 'path', 'title', 'text']))
+
+        # Invalidate all the appropriate pages.
+        logger.info("Handling dependencies...")
+        invalidate_ids = []
+        db_pages = self.db.getPages(fields=['local_meta'])
+        for p in db_pages:
+            if p.getLocalMeta('include') or p.getLocalMeta('query'):
+                invalidate_ids.append(p._id)
+        self.db.invalidateCache(invalidate_ids)
+
+        # Update all the other pages.
+        self._postSetPageUpdate(self)
+
+    def updateAll(self):
+        """ Completely updates all pages, i.e. read them from the file-system
+            and have them fully resolved and cached in the DB.
+            This function will check for timestamps to only update pages that
+            need it.
+        """
+        logger.info("Updating all pages...")
+        page_infos = self.fs.getPageInfos()
+        self.db.updateAll(page_infos)
+        self.resolve()
+        self.index.updateAll(self.db.getPages(
+            fields=['url', 'path', 'title', 'text']))
 
     def getPageUrls(self, subdir=None):
         """ Returns all the page URLs in the wiki, or in the given
@@ -283,7 +302,7 @@
         """
         return self.db.getPage(url)
 
-    def setPage(self, url, page_fields, do_update=True):
+    def setPage(self, url, page_fields):
         """ Updates or creates a page for a given URL.
         """
         # Validate the parameters.
@@ -307,8 +326,7 @@
         self.scm.commit([page_info.path], commit_meta)
 
         # Update the DB and index with the new/modified page.
-        if do_update:
-            self._updateSetPage(self, url)
+        self.updatePage(path=page_info.path)
 
     def revertPage(self, url, page_fields):
         """ Reverts the page with the given URL to an older revision.
@@ -338,7 +356,7 @@
         self.scm.commit([path], commit_meta)
 
         # Update the DB and index with the modified page.
-        self.update(url)
+        self.updatePage(url)
 
     def pageExists(self, url):
         """ Returns whether a page exists at the given URL.
@@ -365,6 +383,23 @@
             endpoints[ep.name] = ep
         return endpoints
 
+    def _getPostSetPageUpdater(self, async):
+        if async:
+            logger.debug("Setting up asynchronous updater.")
+            from tasks import update_wiki
+            return lambda wiki: update_wiki.delay(self.root)
+        else:
+            logger.debug("Setting up simple updater.")
+            return lambda wiki: wiki._simplePostSetPageUpdate()
+
+    def _simplePostSetPageUpdate(self):
+        page_urls = self.db.getPageUrls(uncached_only=True)
+        self.resolve(only_urls=page_urls)
+        pages = [self.db.getPage(url=pu,
+                                 fields=['url', 'path', 'title', 'text'])
+                 for pu in page_urls]
+        self.index.updateAll(pages)
+
 
 def reloader_stat_loop(wiki, interval=1):
     mtimes = {}