changeset 1043:54eb8ad9e809

serve: Use `watchdog` for a more efficient monitoring of assets files.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jan 2018 17:24:08 -0800
parents 895f49c9833d
children 7b64eb5bbc81
files piecrust/serving/procloop.py requirements.txt setup.py
diffstat 3 files changed, 214 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/serving/procloop.py	Sat Jan 20 17:23:34 2018 -0800
+++ b/piecrust/serving/procloop.py	Sat Jan 20 17:24:08 2018 -0800
@@ -81,22 +81,17 @@
         self.last_bake_time = time.time()
 
 
-class ProcessingLoop(threading.Thread):
+class ProcessingLoopBase:
     def __init__(self, appfactory, out_dir):
-        super().__init__(name='pipeline-reloader', daemon=True)
         self.appfactory = appfactory
         self.out_dir = out_dir
         self.last_status_id = 0
-        self.interval = 1
         self._app = None
-        self._proc_infos = None
-        self._last_records = None
-        self._last_config_mtime = 0
         self._obs = []
         self._obs_lock = threading.Lock()
         config_name = (
             THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
-        self._config_path = os.path.join(appfactory.root_dir, config_name)
+        self.config_path = os.path.join(appfactory.root_dir, config_name)
 
     def addObserver(self, obs):
         with self._obs_lock:
@@ -106,79 +101,48 @@
         with self._obs_lock:
             self._obs.remove(obs)
 
-    def run(self):
-        logger.debug("Initializing processing loop with output: %s" %
-                     self.out_dir)
+    def getApp(self):
+        return self._app
+
+    def initialize(self):
+        self._app = self.appfactory.create()
+        self.onInitialize()
+
+    def onInitialize(self):
+        pass
+
+    def start(self):
+        logger.info("Starting processing loop with output: %s" %
+                    self.out_dir)
         try:
-            self._init()
+            self.initialize()
         except Exception as ex:
             logger.error("Error initializing processing loop:")
             logger.exception(ex)
             return
 
         logger.debug("Doing initial processing loop bake...")
-        self._runPipelinesSafe()
-
-        logger.debug("Running processing loop...")
-        self._last_config_mtime = os.path.getmtime(self._config_path)
+        # self.runPipelines()
 
-        while True:
-            cur_config_time = os.path.getmtime(self._config_path)
-            if self._last_config_mtime < cur_config_time:
-                logger.info("Site configuration changed, reloading pipeline.")
-                self._last_config_mtime = cur_config_time
-                self._init()
-                self._runPipelines()
-                continue
+        self.onStart()
+
+    def onStart(self):
+        raise NotImplementedError()
 
-            for procinfo in self._proc_infos.values():
-                # For each assets folder we try to find the first new or
-                # modified file. If any, we just run the pipeline on
-                # that source.
-                found_new_or_modified = False
-                for item in procinfo.source.getAllContents():
-                    path = item.spec
-                    if path not in procinfo.paths:
-                        logger.debug("Found new asset: %s" % path)
-                        procinfo.paths.add(path)
-                        found_new_or_modified = True
-                        break
-                    if os.path.getmtime(path) > procinfo.last_bake_time:
-                        logger.debug("Found modified asset: %s" % path)
-                        found_new_or_modified = True
-                        break
-                if found_new_or_modified:
-                    logger.info("change detected, reprocessed '%s'." %
-                                procinfo.source.name)
-                    self._runPipelinesSafe(procinfo.source)
-                    procinfo.last_bake_time = time.time()
+    def getSources(self):
+        for src in self._app.sources:
+            if src.config.get('pipeline') != 'asset':
+                continue
+            yield src
 
-            time.sleep(self.interval)
-
-    def _init(self):
-        self._app = self.appfactory.create()
-        self._last_records = MultiRecord()
-
-        self._proc_infos = {}
-        for src in self._app.sources:
-            if src.config['pipeline'] != 'asset':
-                continue
-
-            procinfo = _AssetProcessingInfo(src)
-            self._proc_infos[src.name] = procinfo
-
-            # Build the list of initial asset files.
-            for item in src.getAllContents():
-                procinfo.paths.add(item.spec)
-
-    def _runPipelinesSafe(self, only_for_source=None):
+    def runPipelines(self, only_for_source=None):
         try:
-            self._runPipelines(only_for_source)
+            self._doRunPipelines(only_for_source)
         except Exception as ex:
             logger.error("Error while running asset pipeline:")
             logger.exception(ex)
 
-    def _runPipelines(self, only_for_source):
+    def _doRunPipelines(self, only_for_source):
         from piecrust.baking.baker import Baker
 
         allowed_sources = None
@@ -231,3 +195,186 @@
         for obs in observers:
             obs.addBuildEvent(item)
 
+
+try:
+    from watchdog.observers import Observer
+    from watchdog.events import FileSystemEventHandler
+    _has_watchdog = True
+except ImportError:
+    _has_watchdog = False
+
+
+if _has_watchdog:
+    class _AssetFileEventHandler(FileSystemEventHandler):
+        def __init__(self, proc_loop, source):
+            self._proc_loop = proc_loop
+            self._source = source
+
+        def on_any_event(self, event):
+            if event.is_directory:
+                return
+
+            pl = self._proc_loop
+            with pl._lock:
+                pl._ops.append({
+                    'op': 'bake',
+                    'source': self._source,
+                    'path': event.src_path,
+                    'change': event.event_type,
+                    'time': time.time()})
+                pl._event.set()
+
+
+    class _SiteConfigEventHandler(FileSystemEventHandler):
+        def __init__(self, proc_loop, path):
+            self._proc_loop = proc_loop
+            self._path = path
+
+        def on_modified(self, event):
+            if event.src_path != self._path:
+                return
+
+            pl = self._proc_loop
+            with pl._lock:
+                pl._ops.append({'op': 'reinit'})
+                pl._event.set()
+
+
+    class WatchdogProcessingLoop(ProcessingLoopBase):
+        def __init__(self, appfactory, out_dir):
+            ProcessingLoopBase.__init__(self, appfactory, out_dir)
+            self._op_thread = threading.Thread(
+                name='watchdog-operations',
+                target=self._runOpThread,
+                daemon=True)
+            self._lock = threading.Lock()
+            self._event = threading.Event()
+            self._ops = []
+            self._last_op_time = 0
+
+        def onStart(self):
+            logger.debug("Running watchdog monitor on:")
+            observer = Observer()
+
+            event_handler = _SiteConfigEventHandler(self, self.config_path)
+            observer.schedule(event_handler, os.path.dirname(self.config_path))
+            logger.debug(" - %s" % self.config_path)
+
+            for src in self.getSources():
+                path = getattr(src, 'fs_endpoint_path', None)
+                if not path:
+                    logger.warn("Skipping source '%s' -- it doesn't have "
+                                "a file-system endpoint." % src.name)
+                    continue
+                if not os.path.isdir(path):
+                    continue
+
+                logger.debug(" - %s" % path)
+                event_handler = _AssetFileEventHandler(self, src)
+                observer.schedule(event_handler, path, recursive=True)
+
+            observer.start()
+            self._op_thread.start()
+
+        def _runOpThread(self):
+            while not server_shutdown:
+                try:
+                    self._event.wait()
+                    with self._lock:
+                        ops = self._ops
+                        self._ops = []
+                        self._event.clear()
+
+                    orig_len = len(ops)
+                    lot = self._last_op_time
+                    ops = list(filter(lambda o: o['time'] > lot, ops))
+                    logger.debug("Got %d ops, with %d that happened after "
+                                 "our last operation." % (orig_len, len(ops)))
+                    if len(ops) == 0:
+                        continue
+
+                    if any(filter(lambda o: o['op'] == 'reinit', ops)):
+                        logger.info("Site configuration changed, "
+                                    "reloading pipeline.")
+                        self.initialize()
+                        self.runPipelines()
+                        continue
+
+                    sources = set()
+                    ops = list(filter(lambda o: o['op'] == 'bake', ops))
+                    for op in ops:
+                        logger.info("Detected file-system change: "
+                                    "%s [%s]" %
+                                    (op['path'], op['change']))
+                        sources.add(op['source'])
+
+                    logger.debug("Processing: %s" % [s.name for s in sources])
+                    for s in sources:
+                        self.runPipelines(s)
+
+                    self._last_op_time = time.time()
+
+                except (KeyboardInterrupt, SystemExit):
+                    break
+
+    ProcessingLoop = WatchdogProcessingLoop
+
+else:
+    class LegacyProcessingLoop(ProcessingLoopBase, threading.Thread):
+        def __init__(self, appfactory, out_dir):
+            ProcessingLoopBase.__init__(self, appfactory, out_dir)
+            threading.Thread.__init__(self, name='pipeline-reloader',
+                                      daemon=True)
+            self.interval = 1
+            self._proc_infos = None
+            self._last_config_mtime = 0
+
+        def onInitialize(self):
+            self._proc_infos = {}
+            for src in self.getSources():
+                procinfo = _AssetProcessingInfo(src)
+                self._proc_infos[src.name] = procinfo
+
+                # Build the list of initial asset files.
+                for item in src.getAllContents():
+                    procinfo.paths.add(item.spec)
+
+        def onStart(self):
+            self._last_config_mtime = os.path.getmtime(self.config_path)
+            threading.Thread.start(self)
+
+        def run(self):
+            while True:
+                cur_config_time = os.path.getmtime(self.config_path)
+                if self._last_config_mtime < cur_config_time:
+                    logger.info("Site configuration changed, reloading pipeline.")
+                    self._last_config_mtime = cur_config_time
+                    self.initialize()
+                    self.runPipelines()
+                    continue
+
+                for procinfo in self._proc_infos.values():
+                    # For each assets folder we try to find the first new or
+                    # modified file. If any, we just run the pipeline on
+                    # that source.
+                    found_new_or_modified = False
+                    for item in procinfo.source.getAllContents():
+                        path = item.spec
+                        if path not in procinfo.paths:
+                            logger.debug("Found new asset: %s" % path)
+                            procinfo.paths.add(path)
+                            found_new_or_modified = True
+                            break
+                        if os.path.getmtime(path) > procinfo.last_bake_time:
+                            logger.debug("Found modified asset: %s" % path)
+                            found_new_or_modified = True
+                            break
+                    if found_new_or_modified:
+                        logger.info("change detected, reprocessed '%s'." %
+                                    procinfo.source.name)
+                        self.runPipelines(procinfo.source)
+                        procinfo.last_bake_time = time.time()
+
+                time.sleep(self.interval)
+
+    ProcessingLoop = LegacyProcessingLoop
--- a/requirements.txt	Sat Jan 20 17:23:34 2018 -0800
+++ b/requirements.txt	Sat Jan 20 17:24:08 2018 -0800
@@ -31,4 +31,5 @@
 strict-rfc3339==0.5
 textile==2.2.2
 Unidecode==0.4.18
+watchdog==0.8.3
 Werkzeug==0.12.2
--- a/setup.py	Sat Jan 20 17:23:34 2018 -0800
+++ b/setup.py	Sat Jan 20 17:24:08 2018 -0800
@@ -173,6 +173,7 @@
     'strict-rfc3339>=0.5',
     'textile>=2.2.2',
     'Unidecode>=0.4.18',
+    'watchdog>=0.8.3',
     'Werkzeug>=0.12.2'
 ]
 tests_require = [