Mercurial > piecrust2
changeset 1043:54eb8ad9e809
serve: Use `watchdog` for a more efficient monitoring of assets files.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jan 2018 17:24:08 -0800 |
parents | 895f49c9833d |
children | 7b64eb5bbc81 |
files | piecrust/serving/procloop.py requirements.txt setup.py |
diffstat | 3 files changed, 214 insertions(+), 65 deletions(-) [+] |
line wrap: on
line diff
--- a/piecrust/serving/procloop.py Sat Jan 20 17:23:34 2018 -0800 +++ b/piecrust/serving/procloop.py Sat Jan 20 17:24:08 2018 -0800 @@ -81,22 +81,17 @@ self.last_bake_time = time.time() -class ProcessingLoop(threading.Thread): +class ProcessingLoopBase: def __init__(self, appfactory, out_dir): - super().__init__(name='pipeline-reloader', daemon=True) self.appfactory = appfactory self.out_dir = out_dir self.last_status_id = 0 - self.interval = 1 self._app = None - self._proc_infos = None - self._last_records = None - self._last_config_mtime = 0 self._obs = [] self._obs_lock = threading.Lock() config_name = ( THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) - self._config_path = os.path.join(appfactory.root_dir, config_name) + self.config_path = os.path.join(appfactory.root_dir, config_name) def addObserver(self, obs): with self._obs_lock: @@ -106,79 +101,48 @@ with self._obs_lock: self._obs.remove(obs) - def run(self): - logger.debug("Initializing processing loop with output: %s" % - self.out_dir) + def getApp(self): + return self._app + + def initialize(self): + self._app = self.appfactory.create() + self.onInitialize() + + def onInitialize(self): + pass + + def start(self): + logger.info("Starting processing loop with output: %s" % + self.out_dir) try: - self._init() + self.initialize() except Exception as ex: logger.error("Error initializing processing loop:") logger.exception(ex) return logger.debug("Doing initial processing loop bake...") - self._runPipelinesSafe() - - logger.debug("Running processing loop...") - self._last_config_mtime = os.path.getmtime(self._config_path) + # self.runPipelines() - while True: - cur_config_time = os.path.getmtime(self._config_path) - if self._last_config_mtime < cur_config_time: - logger.info("Site configuration changed, reloading pipeline.") - self._last_config_mtime = cur_config_time - self._init() - self._runPipelines() - continue + self.onStart() + + def onStart(self): + raise NotImplementedError() - for procinfo in self._proc_infos.values(): - # For each assets folder we try to find the first new or - # modified file. If any, we just run the pipeline on - # that source. - found_new_or_modified = False - for item in procinfo.source.getAllContents(): - path = item.spec - if path not in procinfo.paths: - logger.debug("Found new asset: %s" % path) - procinfo.paths.add(path) - found_new_or_modified = True - break - if os.path.getmtime(path) > procinfo.last_bake_time: - logger.debug("Found modified asset: %s" % path) - found_new_or_modified = True - break - if found_new_or_modified: - logger.info("change detected, reprocessed '%s'." % - procinfo.source.name) - self._runPipelinesSafe(procinfo.source) - procinfo.last_bake_time = time.time() + def getSources(self): + for src in self._app.sources: + if src.config.get('pipeline') != 'asset': + continue + yield src - time.sleep(self.interval) - - def _init(self): - self._app = self.appfactory.create() - self._last_records = MultiRecord() - - self._proc_infos = {} - for src in self._app.sources: - if src.config['pipeline'] != 'asset': - continue - - procinfo = _AssetProcessingInfo(src) - self._proc_infos[src.name] = procinfo - - # Build the list of initial asset files. - for item in src.getAllContents(): - procinfo.paths.add(item.spec) - - def _runPipelinesSafe(self, only_for_source=None): + def runPipelines(self, only_for_source=None): try: - self._runPipelines(only_for_source) + self._doRunPipelines(only_for_source) except Exception as ex: logger.error("Error while running asset pipeline:") logger.exception(ex) - def _runPipelines(self, only_for_source): + def _doRunPipelines(self, only_for_source): from piecrust.baking.baker import Baker allowed_sources = None @@ -231,3 +195,186 @@ for obs in observers: obs.addBuildEvent(item) + +try: + from watchdog.observers import Observer + from watchdog.events import FileSystemEventHandler + _has_watchdog = True +except ImportError: + _has_watchdog = False + + +if _has_watchdog: + class _AssetFileEventHandler(FileSystemEventHandler): + def __init__(self, proc_loop, source): + self._proc_loop = proc_loop + self._source = source + + def on_any_event(self, event): + if event.is_directory: + return + + pl = self._proc_loop + with pl._lock: + pl._ops.append({ + 'op': 'bake', + 'source': self._source, + 'path': event.src_path, + 'change': event.event_type, + 'time': time.time()}) + pl._event.set() + + + class _SiteConfigEventHandler(FileSystemEventHandler): + def __init__(self, proc_loop, path): + self._proc_loop = proc_loop + self._path = path + + def on_modified(self, event): + if event.src_path != self._path: + return + + pl = self._proc_loop + with pl._lock: + pl._ops.append({'op': 'reinit'}) + pl._event.set() + + + class WatchdogProcessingLoop(ProcessingLoopBase): + def __init__(self, appfactory, out_dir): + ProcessingLoopBase.__init__(self, appfactory, out_dir) + self._op_thread = threading.Thread( + name='watchdog-operations', + target=self._runOpThread, + daemon=True) + self._lock = threading.Lock() + self._event = threading.Event() + self._ops = [] + self._last_op_time = 0 + + def onStart(self): + logger.debug("Running watchdog monitor on:") + observer = Observer() + + event_handler = _SiteConfigEventHandler(self, self.config_path) + observer.schedule(event_handler, os.path.dirname(self.config_path)) + logger.debug(" - %s" % self.config_path) + + for src in self.getSources(): + path = getattr(src, 'fs_endpoint_path', None) + if not path: + logger.warn("Skipping source '%s' -- it doesn't have " + "a file-system endpoint." % src.name) + continue + if not os.path.isdir(path): + continue + + logger.debug(" - %s" % path) + event_handler = _AssetFileEventHandler(self, src) + observer.schedule(event_handler, path, recursive=True) + + observer.start() + self._op_thread.start() + + def _runOpThread(self): + while not server_shutdown: + try: + self._event.wait() + with self._lock: + ops = self._ops + self._ops = [] + self._event.clear() + + orig_len = len(ops) + lot = self._last_op_time + ops = list(filter(lambda o: o['time'] > lot, ops)) + logger.debug("Got %d ops, with %d that happened after " + "our last operation." % (orig_len, len(ops))) + if len(ops) == 0: + continue + + if any(filter(lambda o: o['op'] == 'reinit', ops)): + logger.info("Site configuration changed, " + "reloading pipeline.") + self.initialize() + self.runPipelines() + continue + + sources = set() + ops = list(filter(lambda o: o['op'] == 'bake', ops)) + for op in ops: + logger.info("Detected file-system change: " + "%s [%s]" % + (op['path'], op['change'])) + sources.add(op['source']) + + logger.debug("Processing: %s" % [s.name for s in sources]) + for s in sources: + self.runPipelines(s) + + self._last_op_time = time.time() + + except (KeyboardInterrupt, SystemExit): + break + + ProcessingLoop = WatchdogProcessingLoop + +else: + class LegacyProcessingLoop(ProcessingLoopBase, threading.Thread): + def __init__(self, appfactory, out_dir): + ProcessingLoopBase.__init__(self, appfactory, out_dir) + threading.Thread.__init__(self, name='pipeline-reloader', + daemon=True) + self.interval = 1 + self._proc_infos = None + self._last_config_mtime = 0 + + def onInitialize(self): + self._proc_infos = {} + for src in self.getSources(): + procinfo = _AssetProcessingInfo(src) + self._proc_infos[src.name] = procinfo + + # Build the list of initial asset files. + for item in src.getAllContents(): + procinfo.paths.add(item.spec) + + def onStart(self): + self._last_config_mtime = os.path.getmtime(self.config_path) + threading.Thread.start(self) + + def run(self): + while True: + cur_config_time = os.path.getmtime(self.config_path) + if self._last_config_mtime < cur_config_time: + logger.info("Site configuration changed, reloading pipeline.") + self._last_config_mtime = cur_config_time + self.initialize() + self.runPipelines() + continue + + for procinfo in self._proc_infos.values(): + # For each assets folder we try to find the first new or + # modified file. If any, we just run the pipeline on + # that source. + found_new_or_modified = False + for item in procinfo.source.getAllContents(): + path = item.spec + if path not in procinfo.paths: + logger.debug("Found new asset: %s" % path) + procinfo.paths.add(path) + found_new_or_modified = True + break + if os.path.getmtime(path) > procinfo.last_bake_time: + logger.debug("Found modified asset: %s" % path) + found_new_or_modified = True + break + if found_new_or_modified: + logger.info("change detected, reprocessed '%s'." % + procinfo.source.name) + self.runPipelines(procinfo.source) + procinfo.last_bake_time = time.time() + + time.sleep(self.interval) + + ProcessingLoop = LegacyProcessingLoop