Mercurial > wikked
changeset 370:9eb314a48fd9
reset: Basic support for multi-threading.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 24 Sep 2015 20:10:44 -0700 |
parents | 8b55ceaf99dd |
children | 4b3f867a98b0 |
files | wikked/commands/manage.py wikked/db/sql.py wikked/scheduler.py wikked/wiki.py |
diffstat | 4 files changed, 80 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/wikked/commands/manage.py Tue Sep 22 08:21:10 2015 -0700 +++ b/wikked/commands/manage.py Thu Sep 24 20:10:44 2015 -0700 @@ -63,18 +63,24 @@ super(ResetCommand, self).__init__() self.name = 'reset' self.description = ("Re-generates the database and the full-text " - "search index.") + "search index.") def setupParser(self, parser): - parser.add_argument('--indexonly', + parser.add_argument( + '--single-threaded', + help="Run in single-threaded mode", + action='store_true') + parser.add_argument( + '--index-only', help="Only reset the full-text search index", action='store_true') def run(self, ctx): - if ctx.args.indexonly: + parallel = not ctx.args.single_threaded + if ctx.args.index_only: ctx.wiki.index.reset(ctx.wiki.getPages()) else: - ctx.wiki.reset() + ctx.wiki.reset(parallel=parallel) @register_command @@ -86,15 +92,21 @@ "index with any changed/new files.") def setupParser(self, parser): - parser.add_argument('path', + parser.add_argument( + 'path', help="The path to a page to update specifically", nargs='?') + parser.add_argument( + '--single-threaded', + help="Run in single-threaded mode", + action='store_true') def run(self, ctx): if ctx.args.path: ctx.wiki.updatePage(path=ctx.args.path) else: - ctx.wiki.updateAll() + parallel = not ctx.args.single_threaded + ctx.wiki.updateAll(parallel=parallel) if ctx.args.debug and ctx.args.path: page_info = ctx.wiki.fs.getPageInfo(ctx.args.path)
--- a/wikked/db/sql.py Tue Sep 22 08:21:10 2015 -0700 +++ b/wikked/db/sql.py Thu Sep 24 20:10:44 2015 -0700 @@ -173,9 +173,10 @@ running. This makes it possible to reuse the same engine and session factory. """ - def __init__(self, engine_url, scopefunc=None): + def __init__(self, engine_url, scopefunc=None, connect_args=None): logger.debug("Creating SQL state.") self.engine_url = engine_url + self.connect_args = connect_args or {} self._engine = None self._engine_lock = threading.Lock() self.session = scoped_session( @@ -191,8 +192,10 @@ if self._engine is None: logger.debug("Creating SQL engine with URL: %s" % self.engine_url) - self._engine = create_engine(self.engine_url, - convert_unicode=True) + self._engine = create_engine( + self.engine_url, + connect_args=self.connect_args, + convert_unicode=True) return self._engine def close(self, exception=None): @@ -222,13 +225,14 @@ """ The embedded state, used by default in command line wikis. """ def __init__(self, engine_url): - super(_EmbeddedSQLState, self).__init__(engine_url) + super(_EmbeddedSQLState, self).__init__( + engine_url, connect_args={'check_same_thread': False}) class SQLDatabase(Database): """ A database cache based on SQL. """ - schema_version = 7 + schema_version = 8 def __init__(self, config): Database.__init__(self) @@ -269,7 +273,8 @@ def _getState(self): """ If no state has been specified yet, use the default embedded one (which means no sharing of engines or session - factories with any other wikis. """ + factories with any other wiki instances). + """ if self._state is not None: return self._state with self._state_lock:
--- a/wikked/scheduler.py Tue Sep 22 08:21:10 2015 -0700 +++ b/wikked/scheduler.py Thu Sep 24 20:10:44 2015 -0700 @@ -25,6 +25,7 @@ self._pages_meta = None self._queue = None + self._results = None self._pool = None self._done = False @@ -45,15 +46,22 @@ return self._pages_meta def run(self, num_workers=1): - logger.info("Running resolve scheduler " - "(%d workers)" % num_workers) + logger.info("Running resolve scheduler (%d workers)" % num_workers) if num_workers > 1: # Multi-threaded resolving. + logger.debug("Main thread is %d" % threading.get_ident()) + self._done = False self._queue = Queue() + self._results = Queue() + + self.getPagesMeta() + + job_count = 0 for url in self.page_urls: self._queue.put_nowait(JobDesc(url)) + job_count += 1 self._pool = [] for i in range(num_workers): @@ -62,7 +70,22 @@ for thread in self._pool: thread.start() - self._queue.join() + + while job_count > 0: + try: + url, page, exc = self._results.get(True, 10) + except Empty: + logger.error("Resolve workers timed out, still have %d " + "jobs to go." % job_count) + return + + job_count -= 1 + if page: + self.wiki.db.cachePage(page) + if exc: + logger.error("Error resolving page: %s" % url) + logger.exception(exc) + logger.debug("Queue is empty... terminating workers.") self._done = True @@ -73,8 +96,10 @@ # Single-threaded resolving. for url in self.page_urls: page = self.getPage(url) - r = PageResolver(page, page_getter=self.getPage, - pages_meta_getter=self.getPagesMeta) + r = PageResolver( + page, + page_getter=self.getPage, + pages_meta_getter=self.getPagesMeta) runner = PageResolverRunner(page, r) runner.run(raise_on_failure=True) self.wiki.db.cachePage(page) @@ -128,40 +153,28 @@ def isDone(self): return self.scheduler._done - def getPage(self, url): - return self.scheduler.getPage(url) - - def getPagesMeta(self): - return self.scheduler.getPagesMeta() - def getJob(self): return self.scheduler._queue.get(True, 0.5) - def cachePage(self, page): - self.scheduler.wiki.db.cachePage(page) - - def finishJob(self, exception=None): - self.scheduler.wiki.db.close(commit=True, exception=exception) + def sendResult(self, url, page, exception): + res = (url, page, exception) + self.scheduler._results.put_nowait(res) self.scheduler._queue.task_done() - def finishWorker(self): - self.scheduler.wiki.db.close(commit=True, exception=None) - class JobWorker(threading.Thread): def __init__(self, wid, ctx): - super(JobWorker, self).__init__() + super(JobWorker, self).__init__(daemon=True) self.wid = wid self.ctx = ctx def run(self): + logger.debug("Starting worker on thread %d" % threading.get_ident()) try: self._unsafeRun() except Exception as ex: logger.exception(ex) logger.critical("Aborting resolver worker.") - finally: - self.ctx.finishWorker() def _unsafeRun(self): while True: @@ -175,20 +188,22 @@ before = datetime.datetime.now() try: - page = self.ctx.getPage(job.url) - r = PageResolver(page, page_getter=self.ctx.getPage, - pages_meta_getter=self.ctx.getPagesMeta) + page = self.ctx.scheduler.getPage(job.url) + r = PageResolver( + page, + page_getter=self.ctx.scheduler.getPage, + pages_meta_getter=self.ctx.scheduler.getPagesMeta) runner = PageResolverRunner(page, r) runner.run(raise_on_failure=self.ctx.abort_on_failure) - self.ctx.cachePage(page) + self.ctx.sendResult(job.url, page, None) except Exception as ex: logger.exception(ex) logger.error("Error resolving page: %s" % job.url) - self.ctx.finishJob(exception=ex) + self.ctx.sendResult(job.url, None, ex) return - self.ctx.finishJob() after = datetime.datetime.now() delta = after - before - logger.debug("[%d] %s done in %fs" % (self.wid, job.url, - delta.total_seconds())) + logger.debug("[%d] %s done in %fs" % ( + self.wid, job.url, delta.total_seconds())) +
--- a/wikked/wiki.py Tue Sep 22 08:21:10 2015 -0700 +++ b/wikked/wiki.py Thu Sep 24 20:10:44 2015 -0700 @@ -3,6 +3,7 @@ import time import logging import importlib +import multiprocessing from configparser import SafeConfigParser, NoOptionError from wikked.page import FileSystemPage from wikked.fs import FileSystem @@ -242,13 +243,13 @@ """ self.db.close(exception) - def reset(self): + def reset(self, parallel=False): """ Clears all the cached data and rebuilds it from scratch. """ logger.info("Resetting wiki data...") page_infos = self.fs.getPageInfos() self.db.reset(page_infos) - self.resolve(force=True) + self.resolve(force=True, parallel=parallel) self.index.reset(self.getPages()) def resolve(self, only_urls=None, force=False, parallel=False): @@ -261,7 +262,7 @@ else: page_urls = self.db.getPageUrls(uncached_only=(not force)) - num_workers = 4 if parallel else 1 + num_workers = multiprocessing.cpu_count() if parallel else 1 s = ResolveScheduler(self, page_urls) s.run(num_workers) @@ -282,7 +283,7 @@ page_info.url, fields=['url', 'path', 'title', 'text'])) - def updateAll(self): + def updateAll(self, parallel=False): """ Completely updates all pages, i.e. read them from the file-system and have them fully resolved and cached in the DB. This function will check for timestamps to only update pages that @@ -291,7 +292,7 @@ logger.info("Updating all pages...") page_infos = self.fs.getPageInfos() self.db.updateAll(page_infos) - self.resolve() + self.resolve(parallel=parallel) self.index.updateAll(self.db.getPages( fields=['url', 'path', 'title', 'text']))