changeset 240:68d6524a8333

Improve support for async wiki updates. The wiki can now be properly updated in the background using Celery after a page has been edited by a user. The Flask configuration is passed on to Celery, and the background updater function is only set when running the Flask application (not when running updates via `wk`, for instance).
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 28 Mar 2014 22:02:59 -0700
parents 0e87dc411fe9
children 3580410865fe
files backend.py wikked/commands/web.py wikked/resources/defaults.cfg wikked/tasks.py wikked/web.py wikked/wiki.py wikked/wsgiutil.py
diffstat 7 files changed, 49 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/backend.py	Tue Mar 25 22:09:14 2014 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,5 +0,0 @@
-from wikked.tasks import app
-
-if __name__ == '__main__':
-    app.start()
-
--- a/wikked/commands/web.py	Tue Mar 25 22:09:14 2014 -0700
+++ b/wikked/commands/web.py	Fri Mar 28 22:02:59 2014 -0700
@@ -40,7 +40,7 @@
             app.config['DEV_ASSETS'] = True
         app.config['WIKI_AUTO_RELOAD'] = True
 
-        app.wiki_params = ctx.params
+        app.set_wiki_params(ctx.params)
         if bool(app.config.get('UPDATE_WIKI_ON_START')):
             ctx.wiki.updateAll()
 
--- a/wikked/resources/defaults.cfg	Tue Mar 25 22:09:14 2014 -0700
+++ b/wikked/resources/defaults.cfg	Fri Mar 28 22:02:59 2014 -0700
@@ -7,7 +7,6 @@
 indexer=whoosh
 database=sql
 database_url=sqlite:///%(root)s/.wiki/wiki.db
-async_updates=False
 
 [endpoint:special]
 query=False
--- a/wikked/tasks.py	Tue Mar 25 22:09:14 2014 -0700
+++ b/wikked/tasks.py	Fri Mar 28 22:02:59 2014 -0700
@@ -1,17 +1,13 @@
 import logging
 from celery import Celery
-from wiki import Wiki, WikiParameters
+from wikked.wiki import Wiki, WikiParameters
 
 
 logger = logging.getLogger(__name__)
 
 
-#TODO: Make those settings configurable!
-app = Celery(
-        'wikked',
-        broker='amqp://',
-        backend='amqp://',
-        include=['wikked.tasks'])
+logger.debug("Creating Celery application...")
+celery_app = Celery('wikked', include=['wikked.tasks'])
 
 
 class wiki_session(object):
@@ -31,8 +27,8 @@
         return False
 
 
-@app.task
+@celery_app.task
 def update_wiki(wiki_root):
     with wiki_session(wiki_root) as wiki:
-        wiki._postSetPageUpdate()
+        wiki.updateAll()
 
--- a/wikked/web.py	Tue Mar 25 22:09:14 2014 -0700
+++ b/wikked/web.py	Fri Mar 28 22:02:59 2014 -0700
@@ -22,7 +22,8 @@
 app.config.setdefault('WIKI_ROOT', None)
 app.config.setdefault('UPDATE_WIKI_ON_START', True)
 app.config.setdefault('WIKI_AUTO_RELOAD', False)
-app.config.setdefault('SYNCHRONOUS_UPDATE', True)
+app.config.setdefault('WIKI_ASYNC_UPDATE', False)
+app.config.setdefault('BROKER_URL', 'amqp://')
 
 
 # Find the wiki root, and further configure the app if there's a
@@ -56,6 +57,17 @@
     l = logging.getLogger('sqlalchemy')
     l.setLevel(logging.DEBUG)
 
+app.logger.debug("Creating Flask application...")
+
+
+def set_app_wiki_params(params):
+    app.wiki_params = params
+    if app.wiki_updater is not None:
+        app.wiki_params.wiki_updater = app.wiki_updater
+
+app.set_wiki_params = set_app_wiki_params
+app.wiki_updater = None
+
 
 # Set the wiki as a request global, and open/close the database.
 # NOTE: this must happen before the login extension is registered
@@ -113,3 +125,17 @@
 import wikked.views.special
 import wikked.views.admin
 
+
+# Async wiki update.
+if app.config['WIKI_ASYNC_UPDATE']:
+    app.logger.debug("Will use Celery tasks to update the wiki...")
+    from wikked.tasks import celery_app, update_wiki
+
+    # Configure Celery.
+    celery_app.conf.update(app.config)
+
+    # Make the wiki use the background update task.
+    def async_updater(wiki):
+        update_wiki.delay(wiki.root)
+    app.wiki_updater = async_updater
+
--- a/wikked/wiki.py	Tue Mar 25 22:09:14 2014 -0700
+++ b/wikked/wiki.py	Fri Mar 28 22:02:59 2014 -0700
@@ -33,10 +33,10 @@
             root = os.getcwd()
         self.root = root
         self.formatters = self.getFormatters()
+        self.wiki_updater = self.getWikiUpdater()
         self._config = None
         self._index_factory = None
         self._scm_factory = None
-        self._page_updater = None
 
     @property
     def config(self):
@@ -135,6 +135,9 @@
         except ImportError:
             pass
 
+    def getWikiUpdater(self):
+        return lambda wiki: wiki.updateAll()
+
     def _loadConfig(self):
         # Merge the default settings with any settings provided by
         # the local config file(s).
@@ -186,8 +189,7 @@
         self.scm = parameters.scm_factory(for_init)
         self.auth = parameters.auth_factory()
 
-        async_updates = parameters.config.getboolean('wiki', 'async_updates')
-        self._postSetPageUpdate = self._getPostSetPageUpdater(async_updates)
+        self._wiki_updater = parameters.wiki_updater
 
     @property
     def root(self):
@@ -268,9 +270,6 @@
                 invalidate_ids.append(p._id)
         self.db.invalidateCache(invalidate_ids)
 
-        # Update all the other pages.
-        self._postSetPageUpdate(self)
-
     def updateAll(self):
         """ Completely updates all pages, i.e. read them from the file-system
             and have them fully resolved and cached in the DB.
@@ -328,6 +327,9 @@
         # Update the DB and index with the new/modified page.
         self.updatePage(path=page_info.path)
 
+        # Update all the other pages.
+        self._wiki_updater(self)
+
     def revertPage(self, url, page_fields):
         """ Reverts the page with the given URL to an older revision.
         """
@@ -358,6 +360,9 @@
         # Update the DB and index with the modified page.
         self.updatePage(url)
 
+        # Update all the other pages.
+        self._wiki_updater(self)
+
     def pageExists(self, url):
         """ Returns whether a page exists at the given URL.
         """
@@ -383,16 +388,16 @@
             endpoints[ep.name] = ep
         return endpoints
 
-    def _getPostSetPageUpdater(self, async):
+    def _setupPostSetPageUpdater(self, async):
         if async:
             logger.debug("Setting up asynchronous updater.")
             from tasks import update_wiki
-            return lambda wiki: update_wiki.delay(self.root)
+            self._postSetPageUpdate = lambda wiki: update_wiki.delay(self.root)
         else:
             logger.debug("Setting up simple updater.")
-            return lambda wiki: wiki._simplePostSetPageUpdate()
+            self._postSetPageUpdate = lambda wiki: wiki._simplePostSetPageUpdate()
 
-    def _simplePostSetPageUpdate(self):
+    def _simpleWikiUpdater(self):
         page_urls = self.db.getPageUrls(uncached_only=True)
         self.resolve(only_urls=page_urls)
         pages = [self.db.getPage(url=pu,
--- a/wikked/wsgiutil.py	Tue Mar 25 22:09:14 2014 -0700
+++ b/wikked/wsgiutil.py	Fri Mar 28 22:02:59 2014 -0700
@@ -10,7 +10,7 @@
     logging.basicConfig(stream=sys.stderr)
 
     from wikked.web import app
-    app.wiki_params = WikiParameters(wiki_root)
+    app.set_wiki_params(WikiParameters(wiki_root))
 
     if log_file is not None:
         h = logging.handlers.RotatingFileHandler(log_file, maxBytes=4096)