# HG changeset patch # User Ludovic Chabant # Date 1393394912 28800 # Node ID 062d5e6ddb8727feb0f9e65e02a6c567c6ca08f0 # Parent 034a5c2c23a1e9000e852980c2efd868c3b7fc6b Refactoring for better SQL abstraction and performance (hopefully): - Page instances don't keep the SQL object. This make it simpler to share those instances between threads. - Added a scheduler for running multiple page resolve operations. Sadly, multi-threading doesn't seem to help here. - Updated SQLAlchemy to 0.9.3. diff -r 034a5c2c23a1 -r 062d5e6ddb87 requirements.txt --- a/requirements.txt Mon Feb 24 12:35:47 2014 -0800 +++ b/requirements.txt Tue Feb 25 22:08:32 2014 -0800 @@ -1,7 +1,6 @@ Flask==0.10.1 Flask-Bcrypt==0.5.2 Flask-Login==0.1.3 -Flask-SQLAlchemy==1.0 Flask-Script==0.5.1 Genshi==0.6 GitPython==0.3.2.RC1 @@ -10,7 +9,7 @@ PyMeta==0.5.0 PyYAML==3.10 Pygments==1.5 -SQLAlchemy==0.8.3 +SQLAlchemy==0.9.3 Werkzeug==0.8.3 Whoosh==2.5.5 amqp==1.3.3 @@ -29,6 +28,7 @@ pybars==0.0.4 python-creole==1.0.6 python-hglib==unknown +repoze.lru==0.6 pytz==2013.8 smmap==0.8.2 twill==0.9 diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/commands/manage.py --- a/wikked/commands/manage.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/commands/manage.py Tue Feb 25 22:08:32 2014 -0800 @@ -14,9 +14,6 @@ "search index.") def setupParser(self, parser): - parser.add_argument('--cache', - help="Re-cache all pages", - action='store_true') parser.add_argument('--indexonly', help="Only update the full-text search index", action='store_true') @@ -25,7 +22,7 @@ if ctx.args.indexonly: ctx.wiki.index.reset(ctx.wiki.getPages()) else: - ctx.wiki.reset(cache_ext_data=ctx.args.cache) + ctx.wiki.reset() @register_command @@ -60,7 +57,11 @@ parser.add_argument('-f', '--force', help="Force cache all pages", action='store_true') + parser.add_argument('--parallel', + help="Run the operation with multiple workers in parallel", + action='store_true') def run(self, ctx): - ctx.wiki._cachePages(force_resolve=ctx.args.force) - + ctx.wiki._cachePages( + force_resolve=ctx.args.force, + parallel=ctx.args.parallel) diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/db/base.py --- a/wikked/db/base.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/db/base.py Tue Feb 25 22:08:32 2014 -0800 @@ -7,7 +7,7 @@ def __init__(self): pass - def initDb(self, wiki): + def start(self, wiki): raise NotImplementedError() def close(self): @@ -19,39 +19,39 @@ def update(self, pages, force=False): raise NotImplementedError() - def getPageUrls(self, subdir=None): + def getPageUrls(self, subdir=None, uncached_only=False): raise NotImplementedError() - def getPages(self, subdir=None, meta_query=None): + def getPages(self, subdir=None, meta_query=None, uncached_only=False, + fields=None): raise NotImplementedError() - def getUncachedPages(self): - raise NotImplementedError() - - def getPage(self, url=None, path=None, raise_if_none=True): + def getPage(self, url=None, path=None, fields=None, raise_if_none=True): if not url and not path: raise ValueError("Either URL or path need to be specified.") if url and path: raise ValueError("Can't specify both URL and path.") if url: - page = self._getPageByUrl(url) + page = self._getPageByUrl(url, fields) elif path: - page = self._getPageByPath(path) + page = self._getPageByPath(path, fields) else: raise NotImplementedError() if page is None and raise_if_none: raise PageNotFoundError(url or path) return page + def cachePage(self, page): + raise NotImplementedError() + def pageExists(self, url=None, path=None): raise NotImplementedError() def getLinksTo(self, url): raise NotImplementedError() - def _getPageByUrl(self, url): + def _getPageByUrl(self, url, fields): raise NotImplementedError() - def _getPageByPath(self, path): + def _getPageByPath(self, path, fields): raise NotImplementedError() - diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/db/sql.py --- a/wikked/db/sql.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/db/sql.py Tue Feb 25 22:08:32 2014 -0800 @@ -5,17 +5,18 @@ import logging import datetime from sqlalchemy import ( - create_engine, - and_, or_, - Column, Boolean, Integer, String, Text, DateTime, ForeignKey) + create_engine, + and_, or_, + Column, Boolean, Integer, DateTime, ForeignKey, + String, Text, UnicodeText) +from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import ( - scoped_session, sessionmaker, - relationship, backref, defer) -from sqlalchemy.ext.declarative import declarative_base -from base import Database -from wikked.page import Page, FileSystemPage, PageData, PageLoadingError + scoped_session, sessionmaker, + relationship, backref, noload, load_only, subqueryload) +from sqlalchemy.orm.exc import NoResultFound +from wikked.db.base import Database +from wikked.page import Page, PageData from wikked.formatter import SINGLE_METAS -from wikked.utils import PageNotFoundError logger = logging.getLogger(__name__) @@ -29,28 +30,39 @@ id = Column(Integer, primary_key=True) time = Column(DateTime) - url = Column(Text) - path = Column(Text) - title = Column(Text) - raw_text = Column(Text) - formatted_text = Column(Text) + # Most browsers/search engines won't accept URLs longer than ~2000 chars. + url = Column(String(2048)) + # In the spirit of cross-platformness we let Windows' suckiness dictacte + # this length. + path = Column(String(260)) + title = Column(UnicodeText) + raw_text = Column(UnicodeText(length=2 ** 31)) + formatted_text = Column(UnicodeText(length=2 ** 31)) - meta = relationship('SQLMeta', order_by='SQLMeta.id', - backref=backref('page'), - cascade='all, delete, delete-orphan') - links = relationship('SQLLink', order_by='SQLLink.id', - backref=backref('source'), - cascade='all, delete, delete-orphan') + meta = relationship( + 'SQLMeta', + order_by='SQLMeta.id', + backref=backref('page'), + cascade='all, delete, delete-orphan') + links = relationship( + 'SQLLink', + order_by='SQLLink.id', + backref=backref('source'), + cascade='all, delete, delete-orphan') - ready_text = Column(Text) + ready_text = Column(UnicodeText(length=2 ** 31)) is_ready = Column(Boolean) - ready_meta = relationship('SQLReadyMeta', order_by='SQLReadyMeta.id', - backref=backref('page'), - cascade='all, delete, delete-orphan') - ready_links = relationship('SQLReadyLink', order_by='SQLReadyLink.id', - backref=backref('source'), - cascade='all, delete, delete-orphan') + ready_meta = relationship( + 'SQLReadyMeta', + order_by='SQLReadyMeta.id', + backref=backref('page'), + cascade='all, delete, delete-orphan') + ready_links = relationship( + 'SQLReadyLink', + order_by='SQLReadyLink.id', + backref=backref('source'), + cascade='all, delete, delete-orphan') class SQLMeta(Base): @@ -58,7 +70,7 @@ id = Column(Integer, primary_key=True) page_id = Column(Integer, ForeignKey('pages.id')) - name = Column(String(128)) + name = Column(String(128), index=True) value = Column(Text) def __init__(self, name=None, value=None): @@ -71,7 +83,7 @@ id = Column(Integer, primary_key=True) page_id = Column(Integer, ForeignKey('pages.id')) - name = Column(String(128)) + name = Column(String(128), index=True) value = Column(Text) def __init__(self, name=None, value=None): @@ -123,7 +135,7 @@ self._engine = None self._session = None - def initDb(self, wiki): + def start(self, wiki): self.wiki = wiki def createDb(self): @@ -133,12 +145,13 @@ schema_version = self._getSchemaVersion() if schema_version < self.schema_version: logger.debug( - "SQL database is outdated (got version %s), will re-create.", - schema_version) + "SQL database is outdated (got version %s), " + "will re-create.", + schema_version) create_schema = True else: logger.debug( - "SQL database has up-to-date schema.") + "SQL database has up-to-date schema.") else: create_schema = True if create_schema: @@ -153,11 +166,11 @@ @property def session(self): if self._session is None: - logger.info("Opening database from URL: %s" % self.engine_url) + logger.debug("Opening database from URL: %s" % self.engine_url) self._session = scoped_session(sessionmaker( - autocommit=False, - autoflush=False, - bind=self.engine)) + autocommit=False, + autoflush=False, + bind=self.engine)) return self._session def close(self, commit, exception): @@ -182,7 +195,8 @@ logger.debug("Updating SQL database...") page_urls = [p.url for p in pages] db_pages = self.session.query(SQLPage).\ - all() + options(load_only('id', 'url', 'path', 'time')).\ + all() for p in db_pages: if not os.path.isfile(p.path): # File was deleted. @@ -196,14 +210,16 @@ to_remove.append(p) to_update.add(p.path) for p in to_remove: - self._removePage(p) + logger.debug("Removing page '%s' [%d] from SQL database." % + (p.url, p.id)) + self.session.delete(p) self.session.commit() added_db_objs = [] for p in pages: if (p.path in to_update or - p.path not in already_added): + p.path not in already_added): added_db_objs.append(self._addPage(p)) self.session.commit() @@ -211,15 +227,13 @@ if to_remove or added_db_objs: # If pages have been added/removed/updated, invalidate everything # in the wiki that has includes or queries. - db_pages = self.session.query(SQLPage).\ - options( - defer(SQLPage.title), - defer(SQLPage.raw_text), - defer(SQLPage.formatted_text), - defer(SQLPage.ready_text)).\ - join(SQLReadyMeta).\ - filter(or_(SQLReadyMeta.name == 'include', SQLReadyMeta.name == 'query')).\ - all() + db_pages = self.session.query(SQLPage.id, SQLPage.is_ready, + SQLPage.ready_meta).\ + join(SQLReadyMeta).\ + filter(or_( + SQLReadyMeta.name == 'include', + SQLReadyMeta.name == 'query')).\ + all() for p in db_pages: p.is_ready = False @@ -228,32 +242,77 @@ logger.debug("...done updating SQL database.") return [o.id for o in added_db_objs] - def getPageUrls(self, subdir=None): + def getPageUrls(self, subdir=None, uncached_only=False): q = self.session.query(SQLPage.url) if subdir: subdir = string.rstrip(subdir, '/') + '/%' q = q.filter(SQLPage.url.like(subdir)) - urls = [] + if uncached_only: + q = q.filter(SQLPage.is_ready is False) for p in q.all(): - urls.append(p.url) - return urls + yield p.url - def getPages(self, subdir=None, meta_query=None): + def getPages(self, subdir=None, meta_query=None, uncached_only=False, + fields=None): q = self.session.query(SQLPage) if meta_query: q = q.join(SQLReadyMeta) for name, values in meta_query.iteritems(): for v in values: - q = q.filter(and_(SQLReadyMeta.name == name, SQLReadyMeta.value == v)) + q = q.filter(and_(SQLReadyMeta.name == name, + SQLReadyMeta.value == v)) if subdir: subdir = string.rstrip(subdir, '/') + '/%' q = q.filter(SQLPage.url.like(subdir)) + if uncached_only: + q = q.filter(SQLPage.is_ready is False) + q = self._addFieldOptions(q, fields) for p in q.all(): - yield SQLDatabasePage(self, db_obj=p) + yield SQLDatabasePage(self, p, fields) + + def cachePage(self, page): + if not hasattr(page, '_id') or not page._id: + raise Exception("Given page '%s' has no `_id` attribute set." % page.url) + + logger.debug("Caching extended data for page '%s' [%d]." % (page.url, page._id)) + + try: + db_obj = self.session.query(SQLPage).\ + options(load_only('id', 'url')).\ + options( + subqueryload('ready_meta'), + subqueryload('ready_links')).\ + filter(SQLPage.id == page._id).\ + one() + except NoResultFound as nrf: + logging.exception(nrf) + logging.error("Can't cache page: %s" % page.url) + raise + + db_obj.ready_text = page._data.text + + del db_obj.ready_meta[:] + for name, value in page._data.ext_meta.iteritems(): + if isinstance(value, bool): + value = "" + if isinstance(value, types.StringTypes): + db_obj.ready_meta.append(SQLReadyMeta(name, value)) + else: + for v in value: + db_obj.ready_meta.append(SQLReadyMeta(name, v)) + + del db_obj.ready_links[:] + for link_url in page._data.ext_links: + db_obj.ready_links.append(SQLReadyLink(link_url)) + + db_obj.is_ready = True + + self.session.commit() def pageExists(self, url=None, path=None): - # TODO: replace with an `EXIST` query. - return self.getPage(url, path, raise_if_none=False) is not None + q = self.session.query(SQLPage.id, SQLPage.url).filter_by(url=url) + res = self.session.query(q.exists()) + return res.scalar() def getLinksTo(self, url): q = self.session.query(SQLReadyLink).\ @@ -263,26 +322,48 @@ for l in q: yield l.source.url - def getUncachedPages(self): + def _getPageByUrl(self, url, fields): q = self.session.query(SQLPage).\ - filter(SQLPage.is_ready == False).\ - all() - for p in q: - yield SQLDatabasePage(self, db_obj=p) + filter(SQLPage.url == url) + q = self._addFieldOptions(q, fields) + page = q.first() + if page is None: + return None + return SQLDatabasePage(self, page, fields) - def _getPageByUrl(self, url): - q = self.session.query(SQLPage).filter_by(url=url) + def _getPageByPath(self, path, fields): + q = self.session.query(SQLPage).\ + filter(SQLPage.path == path) + q = self._addFieldOptions(q, fields) page = q.first() if page is None: return None - return SQLDatabasePage(self, db_obj=page) + return SQLDatabasePage(self, page, fields) + + def _addFieldOptions(self, query, fields): + if fields is None: + return query - def _getPageByPath(self, path): - q = self.session.query(SQLPage).filter_by(path=path) - page = q.first() - if page is None: - return None - return SQLDatabasePage(self, db_obj=page) + fieldnames = { + 'local_meta': 'meta', + 'local_links': 'links', + 'meta': 'ready_meta', + 'links': 'ready_links'} + subqueryfields = { + 'local_meta': SQLPage.meta, + 'local_links': SQLPage.links, + 'meta': SQLPage.ready_meta, + 'links': SQLPage.ready_links} + # Always load the ID. + query = query.options(load_only('id')) + # Load requested fields... some need subqueries. + for f in fields: + col = fieldnames.get(f) or f + query = query.options(load_only(col)) + sqf = subqueryfields.get(f) + if sqf: + query = query.options(subqueryload(sqf)) + return query def _createSchema(self): Base.metadata.drop_all(self.engine) @@ -334,132 +415,67 @@ return po - def _cacheExtendedData(self, page): - logger.info("Caching extended data for page '%s' [%d]." % (page.url, page._id)) - - if not hasattr(page, '_id') or not page._id: - raise Exception("Given page '%s' has no `_id` attribute set." % page.url) - db_obj = self.session.query(SQLPage).filter(SQLPage.id == page._id).one() - - db_obj.ready_text = page._data.text - - del db_obj.ready_meta[:] - for name, value in page._data.ext_meta.iteritems(): - if isinstance(value, bool): - value = "" - if isinstance(value, types.StringTypes): - db_obj.ready_meta.append(SQLReadyMeta(name, value)) - else: - for v in value: - db_obj.ready_meta.append(SQLReadyMeta(name, v)) - - del db_obj.ready_links[:] - for link_url in page._data.ext_links: - db_obj.ready_links.append(SQLReadyLink(link_url)) - - db_obj.is_ready = True - - self.session.commit() - - - def _removePage(self, page): - logger.debug("Removing page '%s' [%d] from SQL database." % - (page.url, page.id)) - self.session.delete(page) - class SQLDatabasePage(Page): """ A page that can load its properties from a database. """ - def __init__(self, db, url=None, db_obj=None): - if url and db_obj: - raise Exception("You can't specify both an url and a database object.") - if not url and not db_obj: - raise Exception("You need to specify either a url or a database object.") - - super(SQLDatabasePage, self).__init__(db.wiki, url or db_obj.url) - self._db_obj = db_obj - - @property - def path(self): - if self._db_obj: - return self._db_obj.path - return super(SQLDatabasePage, self).path + def __init__(self, db, db_obj, fields): + data = self._loadFromDbObject(db_obj, fields) + super(SQLDatabasePage, self).__init__(db.wiki, data) @property def _id(self): - if self._db_obj: - return self._db_obj.id - self._ensureData() return self._data._db_id - def _loadData(self): - try: - db_obj = self._db_obj or self.wiki.db.getPage(self.url) - except PageNotFoundError: - raise PageNotFoundError(self.url, "Please run `update` or `reset`.") - data = self._loadFromDbObject(db_obj) - self._db_obj = None - return data - - def _onExtendedDataLoaded(self): - self.wiki.db._cacheExtendedData(self) - - def _loadFromDbObject(self, db_obj, bypass_auto_update=False): - if not bypass_auto_update and self.wiki.db.auto_update: - path_time = datetime.datetime.fromtimestamp( - os.path.getmtime(db_obj.path)) - if path_time >= db_obj.time: - logger.debug( - "Updating database cache for page '%s'." % self.url) - try: - fs_page = FileSystemPage(self.wiki, self.url) - fs_page._ensureData() - added_ids = self.wiki.db.update([fs_page]) - if not added_ids: - raise Exception("Page '%s' has been updated, but the database can't find it." % self.url) - fs_page._data._db_id = added_ids[0] - return fs_page._data - except Exception as e: - msg = "Error updating database cache from the file-system: %s" % e - raise PageLoadingError(msg, e) - + def _loadFromDbObject(self, db_obj, fields): data = PageData() data._db_id = db_obj.id - data.path = db_obj.path - split = os.path.splitext(data.path) - data.filename = split[0] - data.extension = split[1].lstrip('.') - data.title = db_obj.title - data.raw_text = db_obj.raw_text - data.formatted_text = db_obj.formatted_text + if fields is None or 'url' in fields: + data.url = db_obj.url + if fields is None or 'path' in fields: + data.path = db_obj.path + if fields is None or 'title' in fields: + data.title = db_obj.title + if fields is None or 'raw_text' in fields: + data.raw_text = db_obj.raw_text + if fields is None or 'formatted_text' in fields: + data.formatted_text = db_obj.formatted_text - data.local_meta = {} - for m in db_obj.meta: - value = data.local_meta.get(m.name) - if m.name in SINGLE_METAS: - data.local_meta[m.name] = m.value - else: - if value is None: - data.local_meta[m.name] = [m.value] + if fields is None or 'local_meta' in fields: + data.local_meta = {} + for m in db_obj.meta: + value = data.local_meta.get(m.name) + if m.name in SINGLE_METAS: + data.local_meta[m.name] = m.value else: - data.local_meta[m.name].append(m.value) + if value is None: + data.local_meta[m.name] = [m.value] + else: + data.local_meta[m.name].append(m.value) - data.local_links = [l.target_url for l in db_obj.links] + if fields is None or 'local_links' in fields: + data.local_links = [l.target_url for l in db_obj.links] + + if fields is None or ('meta' in fields or 'links' in fields or + 'text' in fields): + if not db_obj.is_ready: + raise Exception( + "Requested extended data for page '%s' " + "but data is not cached." % (data.url or data._db_id)) - if db_obj.is_ready and not self._force_resolve: - # If we have extended cache data from the database, we might as - # well load it now too. - data.text = db_obj.ready_text - for m in db_obj.ready_meta: - value = data.ext_meta.get(m.name) - if value is None: - data.ext_meta[m.name] = [m.value] - else: - data.ext_meta[m.name].append(m.value) - data.ext_links = [l.target_url for l in db_obj.ready_links] - # Flag this data as completely loaded. - data.has_extended_data = True + if fields is None or 'text' in fields: + data.text = db_obj.ready_text + + if fields is None or 'meta' in fields: + data.ext_meta = {} + for m in db_obj.ready_meta: + value = data.ext_meta.get(m.name) + if value is None: + data.ext_meta[m.name] = [m.value] + else: + data.ext_meta[m.name].append(m.value) + + if fields is None or 'links' in fields: + data.ext_links = [l.target_url for l in db_obj.ready_links] return data - diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/fs.py --- a/wikked/fs.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/fs.py Tue Feb 25 22:08:32 2014 -0800 @@ -45,7 +45,7 @@ self.page_extensions = None self.default_extension = config.get('wiki', 'default_extension') - def initFs(self, wiki): + def start(self, wiki): self.page_extensions = list(set( itertools.chain(*wiki.formatters.itervalues()))) diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/indexer/base.py --- a/wikked/indexer/base.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/indexer/base.py Tue Feb 25 22:08:32 2014 -0800 @@ -11,7 +11,7 @@ def __init__(self): pass - def initIndex(self, wiki): + def start(self, wiki): raise NotImplementedError() def reset(self, pages): diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/indexer/elastic.py --- a/wikked/indexer/elastic.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/indexer/elastic.py Tue Feb 25 22:08:32 2014 -0800 @@ -15,7 +15,7 @@ def __init__(self): WikiIndex.__init__(self) - def initIndex(self, wiki): + def start(self, wiki): self.es = Elasticsearch() if not self.es.indices.exists('pages'): logger.debug("Creating the `pages` index.") diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/indexer/whooshidx.py --- a/wikked/indexer/whooshidx.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/indexer/whooshidx.py Tue Feb 25 22:08:32 2014 -0800 @@ -17,7 +17,7 @@ def __init__(self): WikiIndex.__init__(self) - def initIndex(self, wiki): + def start(self, wiki): self.store_dir = os.path.join(wiki.root, '.wiki', 'index') if not os.path.isdir(self.store_dir): logger.debug("Creating new index in: " + self.store_dir) @@ -27,7 +27,7 @@ self.ix = open_dir(self.store_dir) def reset(self, pages): - logger.debug("Re-creating new index in: " + self.store_dir) + logger.info("Re-creating new index in: " + self.store_dir) self.ix = create_in(self.store_dir, schema=self._getSchema()) writer = self.ix.writer() for page in pages: @@ -35,7 +35,7 @@ writer.commit() def update(self, pages): - logger.debug("Updating index...") + logger.info("Updating index...") to_reindex = set() already_indexed = set() diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/page.py --- a/wikked/page.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/page.py Tue Feb 25 22:08:32 2014 -0800 @@ -2,9 +2,7 @@ import os.path import re import logging -import jinja2 from formatter import PageFormatter, FormattingContext -from resolver import PageResolver, CircularIncludeError logger = logging.getLogger(__name__) @@ -18,66 +16,65 @@ class PageData(object): def __init__(self): + self.url = None self.path = None self.title = None self.raw_text = None self.formatted_text = None + self.local_meta = None + self.local_links = None self.text = None - self.local_meta = {} - self.local_links = [] - self.ext_meta = {} - self.ext_links = [] - self.has_extended_data = False + self.ext_meta = None + self.ext_links = None class Page(object): """ A wiki page. This is a non-functional class, as it doesn't know where to load things from. Use `FileSystemPage` or `DatabasePage` instead. """ - def __init__(self, wiki, url): + def __init__(self, wiki, data): self.wiki = wiki - self.url = url - self._data = None - self._force_resolve = False + self._data = data + + @property + def url(self): + return self._data.url @property def path(self): - self._ensureData() return self._data.path @property def extension(self): - self._ensureData() - return self._data.extension + if self._data.path is None: + raise Exception("The 'path' field was not loaded.") + return os.path.splitext(self._data.path)[1].lstrip('.') @property def filename(self): - self._ensureData() - return self._data.filename + if self._data.path is None: + raise Exception("The 'path' field was not loaded.") + basename = os.path.basename(self._data.filename) + return os.path.splitext(basename)[0] @property def title(self): - self._ensureData() return self._data.title @property def raw_text(self): - self._ensureData() return self._data.raw_text @property def text(self): - self._ensureExtendedData() return self._data.text @property def meta(self): - self._ensureExtendedData() return self._data.ext_meta @property def links(self): - self._ensureExtendedData() return self._data.ext_links def getIncomingLinks(self): @@ -96,123 +93,50 @@ return self.wiki.scm.diff(self.path, rev1, rev2) def getFormattedText(self): - self._ensureData() return self._data.formatted_text def getLocalMeta(self): - self._ensureData() return self._data.local_meta def getLocalLinks(self): - self._ensureData() return self._data.local_links - def _ensureData(self): - if self._data is not None: - return - - self._data = self._loadData() - if self._data is not None: - return - - raise PageLoadingError() - - def _loadData(self): - raise NotImplementedError() - - def _onExtendedDataLoading(self): - pass - - def _onExtendedDataLoaded(self): - pass - - def _ensureExtendedData(self, force=False): - if (not force and - self._data is not None and - self._data.has_extended_data): - return - - self._ensureData() - - self._onExtendedDataLoading() - if (self._data.has_extended_data and - not force and - not self._force_resolve): - return - - try: - r = PageResolver(self) - out = r.run() - self._data.text = out.text - self._data.ext_meta = out.meta - self._data.ext_links = out.out_links - self._data.has_extended_data = True - self._onExtendedDataLoaded() - except CircularIncludeError as cie: - template_path = os.path.join( - os.path.dirname(__file__), - 'templates', - 'circular_include_error.html' - ) - with open(template_path, 'r') as f: - env = jinja2.Environment() - template = env.from_string(f.read()) - self._data.text = template.render({ - 'message': str(cie), - 'url_trail': cie.url_trail - }) + def _setExtendedData(self, result): + self._data.text = result.text + self._data.ext_meta = result.meta + self._data.ext_links = result.out_links class FileSystemPage(Page): """ A page that can load its properties directly from the file-system. """ - def __init__(self, wiki, url=None, page_info=None): - if url and page_info: - raise Exception("You can't specify both an url and a page info.") - if not url and not page_info: - raise Exception("You need to specify either a url or a page info.") - - super(FileSystemPage, self).__init__(wiki, url or page_info.url) - self._page_info = page_info + def __init__(self, wiki, page_info): + data = self._loadFromPageInfo(wiki, page_info) + super(FileSystemPage, self).__init__(wiki, data) - @property - def path(self): - if self._page_info: - return self._page_info.path - return super(FileSystemPage, self).path - - def _loadData(self): - # Get info from the file-system. - page_info = self._page_info or self.wiki.fs.getPage(self.url) - data = self._loadFromPageInfo(page_info) - self._page_info = None - return data - - def _loadFromPageInfo(self, page_info): + def _loadFromPageInfo(self, wiki, page_info): data = PageData() + data.url = page_info.url data.path = page_info.path data.raw_text = page_info.content - split = os.path.splitext(data.path) - data.filename = split[0] - data.extension = split[1].lstrip('.') # Format the page and get the meta properties. - filename = os.path.basename(data.path) - filename_split = os.path.splitext(filename) - ctx = FormattingContext(self.url) - f = PageFormatter(self.wiki) + ctx = FormattingContext(page_info.url) + f = PageFormatter(wiki) data.formatted_text = f.formatText(ctx, data.raw_text) data.local_meta = ctx.meta data.local_links = ctx.out_links # Add some common meta. - data.title = (data.local_meta.get('title') or - re.sub(r'\-', ' ', filename_split[0])) + data.title = data.local_meta.get('title') + if data.title is None: + filename = os.path.basename(data.path) + filename_split = os.path.splitext(filename) + data.title = re.sub(r'\-', ' ', filename_split[0]) return data @staticmethod def fromPageInfos(wiki, page_infos): for p in page_infos: - yield FileSystemPage(wiki, page_info=p) - + yield FileSystemPage(wiki, p) diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/resolver.py --- a/wikked/resolver.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/resolver.py Tue Feb 25 22:08:32 2014 -0800 @@ -107,10 +107,13 @@ '__empty': "

No page matches the query.

\n" } - def __init__(self, page, ctx=None, parameters=None): + def __init__(self, page, ctx=None, parameters=None, page_getter=None, + pages_meta_getter=None): self.page = page self.ctx = ctx or ResolveContext(page) self.parameters = parameters + self.page_getter = page_getter or self._getPage + self.pages_meta_getter = pages_meta_getter or self._getPagesMeta self.output = None self.env = None @@ -137,6 +140,15 @@ self.output.text = u'
%s
' % e return self.output + def _getPage(self, url): + fields = ['url', 'title', 'path', 'formatted_text', 'local_meta', + 'local_links'] + return self.wiki.db.getPage(url, fields=fields) + + def _getPagesMeta(self): + fields = ['url', 'title', 'local_meta'] + return self.wiki.db.getPages(fields=fields) + def _unsafeRun(self): # Create default parameters. if not self.parameters: @@ -258,12 +270,13 @@ # Re-run the resolver on the included page to get its final # formatted text. try: - page = self.wiki.getPage(include_url) + page = self.page_getter(include_url) except PageNotFoundError: raise IncludeError(include_url, self.page.url, "Page not found") current_url_trail = list(self.ctx.url_trail) self.ctx.url_trail.append(page.url) - child = PageResolver(page, self.ctx, parameters) + child = PageResolver(page, self.ctx, parameters, self.page_getter, + self.pages_meta_getter) child_output = child.run() self.output.add(child_output) self.ctx.url_trail = current_url_trail @@ -295,7 +308,7 @@ # Find pages that match the query, excluding any page # that is in the URL trail. matched_pages = [] - for p in self.wiki.getPages(): + for p in self.pages_meta_getter(): if p.url in self.ctx.url_trail: continue for key, value in meta_query.iteritems(): @@ -315,8 +328,7 @@ for p in matched_pages: tokens = { 'url': p.url, - 'title': p.title - } + 'title': p.title} tokens.update(p.getLocalMeta()) item_url, item_text = self._valueOrPageText(parameters['__item'], with_url=True) text += self._renderTemplate(item_text, tokens, error_url=item_url or self.page.url) @@ -328,7 +340,7 @@ if re.match(r'^\[\[.*\]\]$', value): include_url = value[2:-2] try: - page = self.wiki.getPage(include_url) + page = self.page_getter(include_url) except PageNotFoundError: raise IncludeError(include_url, self.page.url, "Page not found") if with_url: @@ -388,7 +400,7 @@ # Recurse into included pages. for url in included_urls: try: - p = self.wiki.getPage(url) + p = self.page_getter(url) except PageNotFoundError: raise IncludeError(url, page.url, "Page not found") if self._isPageMatch(p, name, value, level + 1): diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/scheduler.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/wikked/scheduler.py Tue Feb 25 22:08:32 2014 -0800 @@ -0,0 +1,194 @@ +import os.path +import logging +import datetime +import threading +import jinja2 +from Queue import Queue, Empty +from repoze.lru import LRUCache +from wikked.resolver import PageResolver, ResolveOutput, CircularIncludeError + + +logger = logging.getLogger(__name__) + + +class ResolveScheduler(object): + """ A class that can resolve multiple pages in a potentially + multi-threaded way. + """ + PAGE_REGISTRY_SIZE = 256 + + def __init__(self, wiki, page_urls, registry_size=None): + self.wiki = wiki + self.page_urls = page_urls + + self._cache = LRUCache(registry_size or self.PAGE_REGISTRY_SIZE) + self._pages_meta = None + + self._queue = None + self._pool = None + self._done = False + + def getPage(self, url): + page = self._cache.get(url) + if page is None: + logger.debug("Caching page in scheduler registry: %s" % url) + fields = ['url', 'title', 'path', 'formatted_text', 'local_meta', + 'local_links'] + page = self.wiki.db.getPage(url, fields=fields) + self._cache.put(url, page) + return page + + def getPagesMeta(self): + if self._pages_meta is None: + fields = ['url', 'title', 'local_meta'] + self._pages_meta = list(self.wiki.db.getPages(fields=fields)) + return self._pages_meta + + def run(self, num_workers=1): + logger.info("Running resolve scheduler " + "(%d workers)" % num_workers) + + if num_workers > 1: + # Multi-threaded resolving. + self._done = False + self._queue = Queue() + for url in self.page_urls: + self._queue.put_nowait(JobDesc(url)) + + self._pool = [] + for i in range(num_workers): + ctx = JobContext(self) + self._pool.append(JobWorker(i, ctx)) + + for thread in self._pool: + thread.start() + self._queue.join() + logger.debug("Queue is empty... terminating workers.") + self._done = True + + for thread in self._pool: + thread.join() + logging.debug("Worker [%d] ended." % thread.wid) + else: + # Single-threaded resolving. + for url in self.page_urls: + page = self.getPage(url) + r = PageResolver(page, page_getter=self.getPage, + pages_meta_getter=self.getPagesMeta) + runner = PageResolverRunner(page, r) + runner.run(raise_on_failure=True) + self.wiki.db.cachePage(page) + + +class PageResolverRunner(object): + """ A class that resolves one page with the option to fail hard or + softly (i.e. raise an exception, or replace the page's text with + the error message). + """ + def __init__(self, page, resolver): + self.page = page + self.resolver = resolver + + def run(self, raise_on_failure=False): + try: + logger.debug("Resolving page: %s" % self.page.url) + result = self.resolver.run() + except CircularIncludeError as cie: + if raise_on_failure: + raise + + # Handle error by printing it in the page's text so the + # user can see it. + template_path = os.path.join( + os.path.dirname(__file__), + 'templates', + 'circular_include_error.html') + with open(template_path, 'r') as f: + env = jinja2.Environment() + template = env.from_string(f.read()) + + result = ResolveOutput() + result.text = template.render({ + 'message': str(cie), + 'url_trail': cie.url_trail}) + + self.page._setExtendedData(result) + + +class JobDesc(object): + def __init__(self, url): + self.url = url + + +class JobContext(object): + def __init__(self, scheduler): + self.scheduler = scheduler + self.abort_on_failure = True + + def isDone(self): + return self.scheduler._done + + def getPage(self, url): + return self.scheduler.getPage(url) + + def getPagesMeta(self): + return self.scheduler.getPagesMeta() + + def getJob(self): + return self.scheduler._queue.get(True, 0.5) + + def cachePage(self, page): + self.scheduler.wiki.db.cachePage(page) + + def finishJob(self, exception=None): + self.scheduler.wiki.db.close(commit=True, exception=exception) + self.scheduler._queue.task_done() + + def finishWorker(self): + self.scheduler.wiki.db.close(commit=True, exception=None) + + +class JobWorker(threading.Thread): + def __init__(self, wid, ctx): + super(JobWorker, self).__init__() + self.wid = wid + self.ctx = ctx + + def run(self): + try: + self._unsafeRun() + except Exception as ex: + logging.exception(ex) + logging.critical("Aborting resolver worker.") + finally: + self.ctx.finishWorker() + + def _unsafeRun(self): + while True: + try: + job = self.ctx.getJob() + except Empty: + if self.ctx.isDone(): + break + continue + logger.debug("[%d] -> %s" % (self.wid, job.url)) + before = datetime.datetime.now() + + try: + page = self.ctx.getPage(job.url) + r = PageResolver(page, page_getter=self.ctx.getPage, + pages_meta_getter=self.ctx.getPagesMeta) + runner = PageResolverRunner(page, r) + runner.run(raise_on_failure=self.ctx.abort_on_failure) + self.ctx.cachePage(page) + except Exception as ex: + logging.exception(ex) + logging.error("Error resolving page: %s" % job.url) + self.ctx.finishJob(exception=ex) + return + + self.ctx.finishJob() + after = datetime.datetime.now() + delta = after - before + logger.debug("[%d] %s done in %fs" % (self.wid, job.url, + delta.total_seconds())) diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/scm/base.py --- a/wikked/scm/base.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/scm/base.py Tue Feb 25 22:08:32 2014 -0800 @@ -16,7 +16,7 @@ def __init__(self): pass - def initRepo(self, wiki): + def start(self, wiki): raise NotImplementedError() def getSpecialFilenames(self): diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/scm/git.py --- a/wikked/scm/git.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/scm/git.py Tue Feb 25 22:08:32 2014 -0800 @@ -21,7 +21,7 @@ SourceControl.__init__(self) self.root = root - def initRepo(self, wiki): + def start(self, wiki): # Make a Git repo if there's none. if not os.path.isdir(os.path.join(self.root, '.git')): logger.info("Creating Git repository at: " + self.root) diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/scm/mercurial.py --- a/wikked/scm/mercurial.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/scm/mercurial.py Tue Feb 25 22:08:32 2014 -0800 @@ -26,7 +26,7 @@ 'M': ACTION_EDIT } - def initRepo(self, wiki): + def start(self, wiki): pass def createRepo(self): diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/wiki.py --- a/wikked/wiki.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/wiki.py Tue Feb 25 22:08:32 2014 -0800 @@ -7,6 +7,7 @@ from wikked.page import FileSystemPage from wikked.fs import FileSystem from wikked.auth import UserManager +from wikked.scheduler import ResolveScheduler logger = logging.getLogger(__name__) @@ -85,9 +86,12 @@ # Only create the command server once. import hglib client = hglib.open(self.root) + def impl(): - from wikked.scm.mercurial import MercurialCommandServerSourceControl - return MercurialCommandServerSourceControl(self.root, client) + from wikked.scm.mercurial import ( + MercurialCommandServerSourceControl) + return MercurialCommandServerSourceControl( + self.root, client) self._scm_factory = impl elif scm_type == 'git': def impl(): @@ -95,7 +99,8 @@ return GitLibSourceControl(self.root) self._scm_factory = impl else: - raise InitializationError("No such source control: " + scm_type) + raise InitializationError( + "No such source control: " + scm_type) return self._scm_factory() def auth_factory(self): @@ -103,9 +108,12 @@ def getFormatters(self): formatters = {passthrough_formatter: ['txt', 'html']} - self.tryAddFormatter(formatters, 'markdown', 'markdown', ['md', 'mdown', 'markdown']) - self.tryAddFormatter(formatters, 'textile', 'textile', ['tl', 'text', 'textile']) - self.tryAddFormatter(formatters, 'creole', 'creole2html', ['cr', 'creole']) + self.tryAddFormatter(formatters, 'markdown', 'markdown', + ['md', 'mdown', 'markdown']) + self.tryAddFormatter(formatters, 'textile', 'textile', + ['tl', 'text', 'textile']) + self.tryAddFormatter(formatters, 'creole', 'creole2html', + ['cr', 'creole']) return formatters def getSpecialFilenames(self): @@ -121,13 +129,16 @@ if self.config.getboolean('wiki', 'async_updates'): logger.debug("Setting up asynchronous updater.") from tasks import update_wiki - self._page_updater = lambda wiki, url: update_wiki.delay(self.root) + self._page_updater = lambda wiki, url: update_wiki.delay( + self.root) else: logger.debug("Setting up simple updater.") - self._page_updater = lambda wiki, url: wiki.update(url, cache_ext_data=False) + self._page_updater = lambda wiki, url: wiki.update( + url, cache_ext_data=False) return self._page_updater - def tryAddFormatter(self, formatters, module_name, module_func, extensions): + def tryAddFormatter(self, formatters, module_name, module_func, + extensions): try: module = importlib.import_module(module_name) func = getattr(module, module_func) @@ -172,8 +183,12 @@ self.formatters = parameters.formatters self.special_filenames = parameters.getSpecialFilenames() - self.main_page_url = '/' + parameters.config.get('wiki', 'main_page').strip('/') - self.templates_url = '/' + parameters.config.get('wiki', 'templates_dir').strip('/') + '/' + self.main_page_url = ( + '/' + + parameters.config.get('wiki', 'main_page').strip('/')) + self.templates_url = ( + '/' + + parameters.config.get('wiki', 'templates_dir').strip('/') + '/') self.endpoints = self._createEndpointInfos(parameters.config) self.fs = parameters.fs_factory() @@ -191,10 +206,10 @@ def start(self, update=False): """ Properly initializes the wiki and all its sub-systems. """ - self.fs.initFs(self) - self.scm.initRepo(self) - self.index.initIndex(self) - self.db.initDb(self) + self.fs.start(self) + self.scm.start(self) + self.index.start(self) + self.db.start(self) if update: self.update() @@ -202,22 +217,20 @@ def stop(self): self.db.close() - def reset(self, cache_ext_data=True): - logger.debug("Resetting wiki data...") + def reset(self): + logger.info("Resetting wiki data...") page_infos = self.fs.getPageInfos() fs_pages = FileSystemPage.fromPageInfos(self, page_infos) self.db.reset(fs_pages) + self._cachePages(force_resolve=True) self.index.reset(self.getPages()) - if cache_ext_data: - self._cachePages() - def update(self, url=None, cache_ext_data=True): updated_urls = [] - logger.debug("Updating pages...") + logger.info("Updating pages...") if url: page_info = self.fs.getPage(url) - fs_page = FileSystemPage(self, page_info=page_info) + fs_page = FileSystemPage(self, page_info) self.db.update([fs_page], force=True) updated_urls.append(url) self.index.update([self.getPage(url)]) @@ -255,22 +268,21 @@ # Validate the parameters. if 'text' not in page_fields: raise ValueError( - "No text specified for editing page '%s'." % url) + "No text specified for editing page '%s'." % url) if 'author' not in page_fields: raise ValueError( - "No author specified for editing page '%s'." % url) + "No author specified for editing page '%s'." % url) if 'message' not in page_fields: raise ValueError( - "No commit message specified for editing page '%s'." % url) + "No commit message specified for editing page '%s'." % url) # Save the new/modified text. page_info = self.fs.setPage(url, page_fields['text']) # Commit the file to the source-control. commit_meta = { - 'author': page_fields['author'], - 'message': page_fields['message'] - } + 'author': page_fields['author'], + 'message': page_fields['message']} self.scm.commit([page_info.path], commit_meta) # Update the DB and index with the new/modified page. @@ -283,13 +295,13 @@ # Validate the parameters. if 'rev' not in page_fields: raise ValueError( - "No revision specified for reverting page '%s'." % url) + "No revision specified for reverting page '%s'." % url) if 'author' not in page_fields: raise ValueError( - "No author specified for reverting page '%s'." % url) + "No author specified for reverting page '%s'." % url) if 'message' not in page_fields: raise ValueError( - "No commit message specified for reverting page '%s'." % url) + "No commit message specified for reverting page '%s'." % url) # Get the revision. path = self.fs.getPhysicalPagePath(url) @@ -300,9 +312,8 @@ # Commit to source-control. commit_meta = { - 'author': page_fields['author'], - 'message': page_fields['message'] - } + 'author': page_fields['author'], + 'message': page_fields['message']} self.scm.commit([path], commit_meta) # Update the DB and index with the modified page. @@ -321,18 +332,17 @@ def getSpecialFilenames(self): return self.special_filenames - def _cachePages(self, only_urls=None, force_resolve=False): + def _cachePages(self, only_urls=None, force_resolve=False, + parallel=False): logger.debug("Caching extended page data...") if only_urls: - for url in only_urls: - page = self.getPage(url) - page._ensureExtendedData(force=force_resolve) - elif force_resolve: - for page in self.db.getPages(): - page._ensureExtendedData(force=True) + page_urls = only_urls else: - for page in self.db.getUncachedPages(): - page._ensureExtendedData() + page_urls = self.db.getPageUrls(uncached_only=(not force_resolve)) + + num_workers = 4 if parallel else 1 + s = ResolveScheduler(self, page_urls) + s.run(num_workers) def _createEndpointInfos(self, config): endpoints = {} @@ -364,4 +374,3 @@ elif mtime > old_time: print "Change detected in '%s'." % path time.sleep(interval) - diff -r 034a5c2c23a1 -r 062d5e6ddb87 wikked/witch.py --- a/wikked/witch.py Mon Feb 24 12:35:47 2014 -0800 +++ b/wikked/witch.py Tue Feb 25 22:08:32 2014 -0800 @@ -64,6 +64,8 @@ root_logger.setLevel(logging.WARNING) elif arg_debug: root_logger.setLevel(logging.DEBUG) + else: + root_logger.setLevel(logging.INFO) if arg_log: from logging.handlers import FileHandler root_logger.addHandler(FileHandler(arg_log))