Mercurial > wikked
changeset 239:0e87dc411fe9
Better way to manage updating the wiki.
* Make the difference between updating just a single page, and updating the
whole wiki.
* Properly resolve a just-updated page, but postpone resolving all the rest
if we're using background updates.
* Don't flatten single metas for the web UI.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 25 Mar 2014 22:09:14 -0700 |
parents | c22cdd051afc |
children | 68d6524a8333 |
files | wikked/commands/manage.py wikked/commands/web.py wikked/db/base.py wikked/db/sql.py wikked/indexer/base.py wikked/indexer/elastic.py wikked/indexer/whooshidx.py wikked/tasks.py wikked/views/__init__.py wikked/wiki.py |
diffstat | 10 files changed, 160 insertions(+), 81 deletions(-) [+] |
line wrap: on
line diff
--- a/wikked/commands/manage.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/commands/manage.py Tue Mar 25 22:09:14 2014 -0700 @@ -90,7 +90,7 @@ nargs='?') def run(self, ctx): - ctx.wiki.update(path=ctx.args.path) + ctx.wiki.updatePage(path=ctx.args.path) if ctx.args.debug and ctx.args.path: page_info = ctx.wiki.fs.getPageInfo(ctx.args.path)
--- a/wikked/commands/web.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/commands/web.py Tue Mar 25 22:09:14 2014 -0700 @@ -42,7 +42,7 @@ app.wiki_params = ctx.params if bool(app.config.get('UPDATE_WIKI_ON_START')): - ctx.wiki.update() + ctx.wiki.updateAll() # Run! debug_mode = ctx.args.dev or app.config.get('DEBUG', False)
--- a/wikked/db/base.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/db/base.py Tue Mar 25 22:09:14 2014 -0700 @@ -22,7 +22,10 @@ def reset(self, page_infos, page_factory): pass - def update(self, page_infos, page_factory, force=False): + def updatePage(self, page_info): + pass + + def updateAll(self, page_infos, force=False): pass def getPageUrls(self, subdir=None, uncached_only=False): @@ -47,12 +50,12 @@ raise PageNotFoundError(url or path) return page + def isPageValid(self, url): + return True + def cachePage(self, page): pass - def isCacheValid(self, page): - raise NotImplementedError() - def pageExists(self, url=None, path=None): raise NotImplementedError()
--- a/wikked/db/sql.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/db/sql.py Tue Mar 25 22:09:14 2014 -0700 @@ -6,7 +6,7 @@ import datetime from sqlalchemy import ( create_engine, - and_, or_, + and_, Column, Boolean, Integer, DateTime, ForeignKey, String, Text, UnicodeText) from sqlalchemy.ext.declarative import declarative_base @@ -15,7 +15,7 @@ relationship, backref, load_only, subqueryload) from sqlalchemy.orm.exc import NoResultFound from wikked.db.base import Database -from wikked.page import Page, PageData +from wikked.page import Page, PageData, FileSystemPage logger = logging.getLogger(__name__) @@ -216,7 +216,28 @@ self._addPage(page) self.session.commit() - def update(self, page_infos, page_factory, force=False): + def updatePage(self, page_info): + if self._needsSchemaUpdate(): + raise Exception("This wiki needs a database update. " + "Please run `wk reset`.") + + logger.debug("Updating SQL database for page: %s" % page_info.url) + + db_page = self.session.query(SQLPage).\ + options(load_only('id', 'url')).\ + filter(SQLPage.url == page_info.url).\ + first() + if db_page: + logger.debug("Removing page '%s' [%d] from SQL database." % + (db_page.url, db_page.id)) + self.session.delete(db_page) + self.session.commit() + + page = FileSystemPage(self.wiki, page_info) + self._addPage(page) + self.session.commit() + + def updateAll(self, page_infos, force=False): if self._needsSchemaUpdate(): raise Exception("This wiki needs a database upgrade. " "Please run `wk reset`.") @@ -254,26 +275,11 @@ for pi in page_infos: if (pi.path in to_update or pi.path not in already_added): - page = page_factory(pi) + page = FileSystemPage(self.wiki, pi) added_db_objs.append(self._addPage(page)) self.session.commit() - if to_remove or added_db_objs: - # If pages have been added/removed/updated, invalidate everything - # in the wiki that has includes or queries. - db_pages = self.session.query(SQLPage.id, SQLPage.is_ready, - SQLPage.ready_meta).\ - join(SQLReadyMeta).\ - filter(or_( - SQLReadyMeta.name == 'include', - SQLReadyMeta.name == 'query')).\ - all() - for p in db_pages: - p.is_ready = False - - self.session.commit() - logger.debug("...done updating SQL database.") return [o.id for o in added_db_objs] @@ -305,15 +311,30 @@ for p in q.all(): yield SQLDatabasePage(self, p, fields) - def isCacheValid(self, page): + def isPageValid(self, url): db_obj = self.session.query(SQLPage).\ - options(load_only('id', 'path', 'time')).\ - filter(SQLPage.id == page._id).\ - one() + options(load_only('id', 'url', 'path', 'time')).\ + filter(SQLPage.url == url).\ + first() + if not db_obj: + return False path_time = datetime.datetime.fromtimestamp( os.path.getmtime(db_obj.path)) return path_time < db_obj.time + def invalidateCache(self, ids): + if not isinstance(ids, list): + ids = list(ids) + logger.debug("Invalidating %d page caches in SQL database." % len(ids)) + + db_pages = self.session.query(SQLPage).\ + options(load_only('id', 'url', 'is_ready')).\ + filter(SQLPage.id.in_(ids)).\ + all() + for p in db_pages: + p.is_ready = False + self.session.commit() + def cachePage(self, page): if not hasattr(page, '_id') or not page._id: raise Exception("Given page '%s' has no `_id` attribute set." % page.url) @@ -392,7 +413,8 @@ 'local_meta': 'meta', 'local_links': 'links', 'meta': 'ready_meta', - 'links': 'ready_links'} + 'links': 'ready_links', + 'text': 'ready_text'} subqueryfields = { 'local_meta': SQLPage.meta, 'local_links': SQLPage.links,
--- a/wikked/indexer/base.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/indexer/base.py Tue Mar 25 22:09:14 2014 -0700 @@ -23,7 +23,10 @@ def reset(self, pages): pass - def update(self, pages): + def updatePage(self, page): + pass + + def updateAll(self, pages): pass def search(self, query):
--- a/wikked/indexer/elastic.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/indexer/elastic.py Tue Mar 25 22:09:14 2014 -0700 @@ -95,7 +95,17 @@ actions = action_maker() bulk_index(self.es, actions) - def update(self, pages): + def updatePage(self, page): + body = { + 'fields': ['url'], + 'query': {'term': {'url': page.url}}} + docs = self.es.search(index='pages', doc_type='page', body=body) + docs = list(docs) + if len(docs) > 0: + self.es.delete(index='pages', doc_type='page', id=docs[0]['_id']) + self.es.index(index='page', doc_type='page', body=self._get_body(page)) + + def updateAll(self, pages): to_reindex = set() already_indexed = set()
--- a/wikked/indexer/whooshidx.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/indexer/whooshidx.py Tue Mar 25 22:09:14 2014 -0700 @@ -35,7 +35,14 @@ self._indexPage(writer, page) writer.commit() - def update(self, pages): + def updatePage(self, page): + logger.info("Updating index for page: %s" % page.url) + writer = self.ix.writer() + self._unindexPage(writer, page.url) + self._indexPage(writer, page) + writer.commit() + + def updateAll(self, pages): logger.info("Updating index...") to_reindex = set() already_indexed = set()
--- a/wikked/tasks.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/tasks.py Tue Mar 25 22:09:14 2014 -0700 @@ -34,5 +34,5 @@ @app.task def update_wiki(wiki_root): with wiki_session(wiki_root) as wiki: - wiki.update() + wiki._postSetPageUpdate()
--- a/wikked/views/__init__.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/views/__init__.py Tue Mar 25 22:09:14 2014 -0700 @@ -3,7 +3,7 @@ from flask import g, abort, jsonify from flask.ext.login import current_user from wikked.fs import PageNotFoundError -from wikked.utils import split_page_url, flatten_single_metas +from wikked.utils import split_page_url from wikked.web import app @@ -34,21 +34,21 @@ def get_page_or_none(url, convert_url=True, check_perms=DONT_CHECK, force_resolve=False): if convert_url: url = url_from_viewarg(url) + try: + if app.config.get('WIKI_AUTO_RELOAD'): + if not g.wiki.db.isPageValid(url): + app.logger.info("Page '%s' has changed, reloading." % url) + g.wiki.updatePage(url=url) + else: + app.logger.debug("Page '%s' is up to date." % url) + elif force_resolve: + g.wiki.resolve(only_urls=[url], force=True) + page = g.wiki.getPage(url) except PageNotFoundError: return None - if app.config.get('WIKI_AUTO_RELOAD'): - if not g.wiki.db.isCacheValid(page): - app.logger.info("Page '%s' has changed, reloading." % url) - g.wiki.update(url) - else: - app.logger.debug("Page '%s' is up to date." % url) - elif force_resolve: - g.wiki._cachePages([url], force_resolve=True) - page = g.wiki.getPage(url) - if check_perms == CHECK_FOR_READ and not is_page_readable(page): abort(401) elif check_perms == CHECK_FOR_WRITE and not is_page_writable(page): @@ -78,7 +78,6 @@ meta = dict(page.getLocalMeta()) else: meta = dict(page.getMeta()) - flatten_single_metas(meta) meta['title'] = page.title meta['url'] = urllib.quote(page.url.encode('utf-8')) for name in COERCE_META:
--- a/wikked/wiki.py Mon Mar 24 23:05:19 2014 -0700 +++ b/wikked/wiki.py Tue Mar 25 22:09:14 2014 -0700 @@ -126,18 +126,6 @@ for name, val in self.config.items('ignore'): yield val - def getPageUpdater(self): - if self._page_updater is None: - if self.config.getboolean('wiki', 'async_updates'): - logger.debug("Setting up asynchronous updater.") - from tasks import update_wiki - self._page_updater = lambda wiki, url: update_wiki.delay( - self.root) - else: - logger.debug("Setting up simple updater.") - self._page_updater = lambda wiki, url: wiki.update(url) - return self._page_updater - def tryAddFormatter(self, formatters, module_name, module_func, extensions): try: @@ -198,7 +186,8 @@ self.scm = parameters.scm_factory(for_init) self.auth = parameters.auth_factory() - self._updateSetPage = parameters.getPageUpdater() + async_updates = parameters.config.getboolean('wiki', 'async_updates') + self._postSetPageUpdate = self._getPostSetPageUpdater(async_updates) @property def root(self): @@ -212,7 +201,7 @@ o.start(self) if update: - self.update() + self.updateAll() def init(self): """ Creates a new wiki at the specified root directory. @@ -225,9 +214,13 @@ o.postInit() def stop(self): + """ De-initializes the wiki and its sub-systems. + """ self.db.close() def reset(self): + """ Clears all the cached data and rebuilds it from scratch. + """ logger.info("Resetting wiki data...") page_infos = self.fs.getPageInfos() factory = lambda pi: FileSystemPage(self, pi) @@ -236,6 +229,9 @@ self.index.reset(self.getPages()) def resolve(self, only_urls=None, force=False, parallel=False): + """ Compute the final info (text, meta, links) of all or a subset of + the pages, and caches it in the DB. + """ logger.debug("Resolving pages...") if only_urls: page_urls = only_urls @@ -246,24 +242,47 @@ s = ResolveScheduler(self, page_urls) s.run(num_workers) - def update(self, url=None, path=None): - logger.info("Updating pages...") - factory = lambda pi: FileSystemPage(self, pi) - if url or path: - if url and path: - raise Exception("Can't specify both an URL and a path.") - if path: - page_info = self.fs.getPageInfo(path) - else: - page_info = self.fs.findPageInfo(url) - self.db.update([page_info], factory, force=True) - self.resolve(only_urls=[page_info.url]) - self.index.update([self.getPage(page_info.url)]) + def updatePage(self, url=None, path=None): + """ Completely updates a single page, i.e. read it from the file-system + and have it fully resolved and cached in the DB. + """ + if url and path: + raise Exception("Can't specify both an URL and a path.") + logger.info("Updating page: %s" % (url or path)) + if path: + page_info = self.fs.getPageInfo(path) else: - page_infos = self.fs.getPageInfos() - self.db.update(page_infos, factory) - self.resolve() - self.index.update(self.getPages()) + page_info = self.fs.findPageInfo(url) + self.db.updatePage(page_info) + self.resolve(only_urls=[page_info.url]) + self.index.updatePage(self.db.getPage( + page_info.url, + fields=['url', 'path', 'title', 'text'])) + + # Invalidate all the appropriate pages. + logger.info("Handling dependencies...") + invalidate_ids = [] + db_pages = self.db.getPages(fields=['local_meta']) + for p in db_pages: + if p.getLocalMeta('include') or p.getLocalMeta('query'): + invalidate_ids.append(p._id) + self.db.invalidateCache(invalidate_ids) + + # Update all the other pages. + self._postSetPageUpdate(self) + + def updateAll(self): + """ Completely updates all pages, i.e. read them from the file-system + and have them fully resolved and cached in the DB. + This function will check for timestamps to only update pages that + need it. + """ + logger.info("Updating all pages...") + page_infos = self.fs.getPageInfos() + self.db.updateAll(page_infos) + self.resolve() + self.index.updateAll(self.db.getPages( + fields=['url', 'path', 'title', 'text'])) def getPageUrls(self, subdir=None): """ Returns all the page URLs in the wiki, or in the given @@ -283,7 +302,7 @@ """ return self.db.getPage(url) - def setPage(self, url, page_fields, do_update=True): + def setPage(self, url, page_fields): """ Updates or creates a page for a given URL. """ # Validate the parameters. @@ -307,8 +326,7 @@ self.scm.commit([page_info.path], commit_meta) # Update the DB and index with the new/modified page. - if do_update: - self._updateSetPage(self, url) + self.updatePage(path=page_info.path) def revertPage(self, url, page_fields): """ Reverts the page with the given URL to an older revision. @@ -338,7 +356,7 @@ self.scm.commit([path], commit_meta) # Update the DB and index with the modified page. - self.update(url) + self.updatePage(url) def pageExists(self, url): """ Returns whether a page exists at the given URL. @@ -365,6 +383,23 @@ endpoints[ep.name] = ep return endpoints + def _getPostSetPageUpdater(self, async): + if async: + logger.debug("Setting up asynchronous updater.") + from tasks import update_wiki + return lambda wiki: update_wiki.delay(self.root) + else: + logger.debug("Setting up simple updater.") + return lambda wiki: wiki._simplePostSetPageUpdate() + + def _simplePostSetPageUpdate(self): + page_urls = self.db.getPageUrls(uncached_only=True) + self.resolve(only_urls=page_urls) + pages = [self.db.getPage(url=pu, + fields=['url', 'path', 'title', 'text']) + for pu in page_urls] + self.index.updateAll(pages) + def reloader_stat_loop(wiki, interval=1): mtimes = {}