changeset 370:9eb314a48fd9

reset: Basic support for multi-threading.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 24 Sep 2015 20:10:44 -0700
parents 8b55ceaf99dd
children 4b3f867a98b0
files wikked/commands/manage.py wikked/db/sql.py wikked/scheduler.py wikked/wiki.py
diffstat 4 files changed, 80 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/wikked/commands/manage.py	Tue Sep 22 08:21:10 2015 -0700
+++ b/wikked/commands/manage.py	Thu Sep 24 20:10:44 2015 -0700
@@ -63,18 +63,24 @@
         super(ResetCommand, self).__init__()
         self.name = 'reset'
         self.description = ("Re-generates the database and the full-text "
-                "search index.")
+                            "search index.")
 
     def setupParser(self, parser):
-        parser.add_argument('--indexonly',
+        parser.add_argument(
+                '--single-threaded',
+                help="Run in single-threaded mode",
+                action='store_true')
+        parser.add_argument(
+                '--index-only',
                 help="Only reset the full-text search index",
                 action='store_true')
 
     def run(self, ctx):
-        if ctx.args.indexonly:
+        parallel = not ctx.args.single_threaded
+        if ctx.args.index_only:
             ctx.wiki.index.reset(ctx.wiki.getPages())
         else:
-            ctx.wiki.reset()
+            ctx.wiki.reset(parallel=parallel)
 
 
 @register_command
@@ -86,15 +92,21 @@
                 "index with any changed/new files.")
 
     def setupParser(self, parser):
-        parser.add_argument('path',
+        parser.add_argument(
+                'path',
                 help="The path to a page to update specifically",
                 nargs='?')
+        parser.add_argument(
+                '--single-threaded',
+                help="Run in single-threaded mode",
+                action='store_true')
 
     def run(self, ctx):
         if ctx.args.path:
             ctx.wiki.updatePage(path=ctx.args.path)
         else:
-            ctx.wiki.updateAll()
+            parallel = not ctx.args.single_threaded
+            ctx.wiki.updateAll(parallel=parallel)
 
         if ctx.args.debug and ctx.args.path:
             page_info = ctx.wiki.fs.getPageInfo(ctx.args.path)
--- a/wikked/db/sql.py	Tue Sep 22 08:21:10 2015 -0700
+++ b/wikked/db/sql.py	Thu Sep 24 20:10:44 2015 -0700
@@ -173,9 +173,10 @@
         running. This makes it possible to reuse the same engine and
         session factory.
     """
-    def __init__(self, engine_url, scopefunc=None):
+    def __init__(self, engine_url, scopefunc=None, connect_args=None):
         logger.debug("Creating SQL state.")
         self.engine_url = engine_url
+        self.connect_args = connect_args or {}
         self._engine = None
         self._engine_lock = threading.Lock()
         self.session = scoped_session(
@@ -191,8 +192,10 @@
                 if self._engine is None:
                     logger.debug("Creating SQL engine with URL: %s" %
                                  self.engine_url)
-                    self._engine = create_engine(self.engine_url,
-                                                 convert_unicode=True)
+                    self._engine = create_engine(
+                            self.engine_url,
+                            connect_args=self.connect_args,
+                            convert_unicode=True)
         return self._engine
 
     def close(self, exception=None):
@@ -222,13 +225,14 @@
     """ The embedded state, used by default in command line wikis.
     """
     def __init__(self, engine_url):
-        super(_EmbeddedSQLState, self).__init__(engine_url)
+        super(_EmbeddedSQLState, self).__init__(
+                engine_url, connect_args={'check_same_thread': False})
 
 
 class SQLDatabase(Database):
     """ A database cache based on SQL.
     """
-    schema_version = 7
+    schema_version = 8
 
     def __init__(self, config):
         Database.__init__(self)
@@ -269,7 +273,8 @@
     def _getState(self):
         """ If no state has been specified yet, use the default
             embedded one (which means no sharing of engines or session
-            factories with any other wikis. """
+            factories with any other wiki instances).
+        """
         if self._state is not None:
             return self._state
         with self._state_lock:
--- a/wikked/scheduler.py	Tue Sep 22 08:21:10 2015 -0700
+++ b/wikked/scheduler.py	Thu Sep 24 20:10:44 2015 -0700
@@ -25,6 +25,7 @@
         self._pages_meta = None
 
         self._queue = None
+        self._results = None
         self._pool = None
         self._done = False
 
@@ -45,15 +46,22 @@
         return self._pages_meta
 
     def run(self, num_workers=1):
-        logger.info("Running resolve scheduler "
-                     "(%d workers)" % num_workers)
+        logger.info("Running resolve scheduler (%d workers)" % num_workers)
 
         if num_workers > 1:
             # Multi-threaded resolving.
+            logger.debug("Main thread is %d" % threading.get_ident())
+
             self._done = False
             self._queue = Queue()
+            self._results = Queue()
+
+            self.getPagesMeta()
+
+            job_count = 0
             for url in self.page_urls:
                 self._queue.put_nowait(JobDesc(url))
+                job_count += 1
 
             self._pool = []
             for i in range(num_workers):
@@ -62,7 +70,22 @@
 
             for thread in self._pool:
                 thread.start()
-            self._queue.join()
+
+            while job_count > 0:
+                try:
+                    url, page, exc = self._results.get(True, 10)
+                except Empty:
+                    logger.error("Resolve workers timed out, still have %d "
+                                 "jobs to go." % job_count)
+                    return
+
+                job_count -= 1
+                if page:
+                    self.wiki.db.cachePage(page)
+                if exc:
+                    logger.error("Error resolving page: %s" % url)
+                    logger.exception(exc)
+
             logger.debug("Queue is empty... terminating workers.")
             self._done = True
 
@@ -73,8 +96,10 @@
             # Single-threaded resolving.
             for url in self.page_urls:
                 page = self.getPage(url)
-                r = PageResolver(page, page_getter=self.getPage,
-                                 pages_meta_getter=self.getPagesMeta)
+                r = PageResolver(
+                        page,
+                        page_getter=self.getPage,
+                        pages_meta_getter=self.getPagesMeta)
                 runner = PageResolverRunner(page, r)
                 runner.run(raise_on_failure=True)
                 self.wiki.db.cachePage(page)
@@ -128,40 +153,28 @@
     def isDone(self):
         return self.scheduler._done
 
-    def getPage(self, url):
-        return self.scheduler.getPage(url)
-
-    def getPagesMeta(self):
-        return self.scheduler.getPagesMeta()
-
     def getJob(self):
         return self.scheduler._queue.get(True, 0.5)
 
-    def cachePage(self, page):
-        self.scheduler.wiki.db.cachePage(page)
-
-    def finishJob(self, exception=None):
-        self.scheduler.wiki.db.close(commit=True, exception=exception)
+    def sendResult(self, url, page, exception):
+        res = (url, page, exception)
+        self.scheduler._results.put_nowait(res)
         self.scheduler._queue.task_done()
 
-    def finishWorker(self):
-        self.scheduler.wiki.db.close(commit=True, exception=None)
-
 
 class JobWorker(threading.Thread):
     def __init__(self, wid, ctx):
-        super(JobWorker, self).__init__()
+        super(JobWorker, self).__init__(daemon=True)
         self.wid = wid
         self.ctx = ctx
 
     def run(self):
+        logger.debug("Starting worker on thread %d" % threading.get_ident())
         try:
             self._unsafeRun()
         except Exception as ex:
             logger.exception(ex)
             logger.critical("Aborting resolver worker.")
-        finally:
-            self.ctx.finishWorker()
 
     def _unsafeRun(self):
         while True:
@@ -175,20 +188,22 @@
             before = datetime.datetime.now()
 
             try:
-                page = self.ctx.getPage(job.url)
-                r = PageResolver(page, page_getter=self.ctx.getPage,
-                                 pages_meta_getter=self.ctx.getPagesMeta)
+                page = self.ctx.scheduler.getPage(job.url)
+                r = PageResolver(
+                        page,
+                        page_getter=self.ctx.scheduler.getPage,
+                        pages_meta_getter=self.ctx.scheduler.getPagesMeta)
                 runner = PageResolverRunner(page, r)
                 runner.run(raise_on_failure=self.ctx.abort_on_failure)
-                self.ctx.cachePage(page)
+                self.ctx.sendResult(job.url, page, None)
             except Exception as ex:
                 logger.exception(ex)
                 logger.error("Error resolving page: %s" % job.url)
-                self.ctx.finishJob(exception=ex)
+                self.ctx.sendResult(job.url, None, ex)
                 return
 
-            self.ctx.finishJob()
             after = datetime.datetime.now()
             delta = after - before
-            logger.debug("[%d] %s done in %fs" % (self.wid, job.url,
-                    delta.total_seconds()))
+            logger.debug("[%d] %s done in %fs" % (
+                    self.wid, job.url, delta.total_seconds()))
+
--- a/wikked/wiki.py	Tue Sep 22 08:21:10 2015 -0700
+++ b/wikked/wiki.py	Thu Sep 24 20:10:44 2015 -0700
@@ -3,6 +3,7 @@
 import time
 import logging
 import importlib
+import multiprocessing
 from configparser import SafeConfigParser, NoOptionError
 from wikked.page import FileSystemPage
 from wikked.fs import FileSystem
@@ -242,13 +243,13 @@
         """
         self.db.close(exception)
 
-    def reset(self):
+    def reset(self, parallel=False):
         """ Clears all the cached data and rebuilds it from scratch.
         """
         logger.info("Resetting wiki data...")
         page_infos = self.fs.getPageInfos()
         self.db.reset(page_infos)
-        self.resolve(force=True)
+        self.resolve(force=True, parallel=parallel)
         self.index.reset(self.getPages())
 
     def resolve(self, only_urls=None, force=False, parallel=False):
@@ -261,7 +262,7 @@
         else:
             page_urls = self.db.getPageUrls(uncached_only=(not force))
 
-        num_workers = 4 if parallel else 1
+        num_workers = multiprocessing.cpu_count() if parallel else 1
         s = ResolveScheduler(self, page_urls)
         s.run(num_workers)
 
@@ -282,7 +283,7 @@
             page_info.url,
             fields=['url', 'path', 'title', 'text']))
 
-    def updateAll(self):
+    def updateAll(self, parallel=False):
         """ Completely updates all pages, i.e. read them from the file-system
             and have them fully resolved and cached in the DB.
             This function will check for timestamps to only update pages that
@@ -291,7 +292,7 @@
         logger.info("Updating all pages...")
         page_infos = self.fs.getPageInfos()
         self.db.updateAll(page_infos)
-        self.resolve()
+        self.resolve(parallel=parallel)
         self.index.updateAll(self.db.getPages(
             fields=['url', 'path', 'title', 'text']))