Mercurial > piecrust2
comparison piecrust/serving/procloop.py @ 1043:54eb8ad9e809
serve: Use `watchdog` for a more efficient monitoring of assets files.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 20 Jan 2018 17:24:08 -0800 |
parents | 89d94955b818 |
children | 2f39ffa601a9 |
comparison
equal
deleted
inserted
replaced
1042:895f49c9833d | 1043:54eb8ad9e809 |
---|---|
79 self.source = source | 79 self.source = source |
80 self.paths = set() | 80 self.paths = set() |
81 self.last_bake_time = time.time() | 81 self.last_bake_time = time.time() |
82 | 82 |
83 | 83 |
84 class ProcessingLoop(threading.Thread): | 84 class ProcessingLoopBase: |
85 def __init__(self, appfactory, out_dir): | 85 def __init__(self, appfactory, out_dir): |
86 super().__init__(name='pipeline-reloader', daemon=True) | |
87 self.appfactory = appfactory | 86 self.appfactory = appfactory |
88 self.out_dir = out_dir | 87 self.out_dir = out_dir |
89 self.last_status_id = 0 | 88 self.last_status_id = 0 |
90 self.interval = 1 | |
91 self._app = None | 89 self._app = None |
92 self._proc_infos = None | |
93 self._last_records = None | |
94 self._last_config_mtime = 0 | |
95 self._obs = [] | 90 self._obs = [] |
96 self._obs_lock = threading.Lock() | 91 self._obs_lock = threading.Lock() |
97 config_name = ( | 92 config_name = ( |
98 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) | 93 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) |
99 self._config_path = os.path.join(appfactory.root_dir, config_name) | 94 self.config_path = os.path.join(appfactory.root_dir, config_name) |
100 | 95 |
101 def addObserver(self, obs): | 96 def addObserver(self, obs): |
102 with self._obs_lock: | 97 with self._obs_lock: |
103 self._obs.append(obs) | 98 self._obs.append(obs) |
104 | 99 |
105 def removeObserver(self, obs): | 100 def removeObserver(self, obs): |
106 with self._obs_lock: | 101 with self._obs_lock: |
107 self._obs.remove(obs) | 102 self._obs.remove(obs) |
108 | 103 |
109 def run(self): | 104 def getApp(self): |
110 logger.debug("Initializing processing loop with output: %s" % | 105 return self._app |
111 self.out_dir) | 106 |
107 def initialize(self): | |
108 self._app = self.appfactory.create() | |
109 self.onInitialize() | |
110 | |
111 def onInitialize(self): | |
112 pass | |
113 | |
114 def start(self): | |
115 logger.info("Starting processing loop with output: %s" % | |
116 self.out_dir) | |
112 try: | 117 try: |
113 self._init() | 118 self.initialize() |
114 except Exception as ex: | 119 except Exception as ex: |
115 logger.error("Error initializing processing loop:") | 120 logger.error("Error initializing processing loop:") |
116 logger.exception(ex) | 121 logger.exception(ex) |
117 return | 122 return |
118 | 123 |
119 logger.debug("Doing initial processing loop bake...") | 124 logger.debug("Doing initial processing loop bake...") |
120 self._runPipelinesSafe() | 125 # self.runPipelines() |
121 | 126 |
122 logger.debug("Running processing loop...") | 127 self.onStart() |
123 self._last_config_mtime = os.path.getmtime(self._config_path) | 128 |
124 | 129 def onStart(self): |
125 while True: | 130 raise NotImplementedError() |
126 cur_config_time = os.path.getmtime(self._config_path) | 131 |
127 if self._last_config_mtime < cur_config_time: | 132 def getSources(self): |
128 logger.info("Site configuration changed, reloading pipeline.") | 133 for src in self._app.sources: |
129 self._last_config_mtime = cur_config_time | 134 if src.config.get('pipeline') != 'asset': |
130 self._init() | |
131 self._runPipelines() | |
132 continue | 135 continue |
133 | 136 yield src |
134 for procinfo in self._proc_infos.values(): | 137 |
135 # For each assets folder we try to find the first new or | 138 def runPipelines(self, only_for_source=None): |
136 # modified file. If any, we just run the pipeline on | |
137 # that source. | |
138 found_new_or_modified = False | |
139 for item in procinfo.source.getAllContents(): | |
140 path = item.spec | |
141 if path not in procinfo.paths: | |
142 logger.debug("Found new asset: %s" % path) | |
143 procinfo.paths.add(path) | |
144 found_new_or_modified = True | |
145 break | |
146 if os.path.getmtime(path) > procinfo.last_bake_time: | |
147 logger.debug("Found modified asset: %s" % path) | |
148 found_new_or_modified = True | |
149 break | |
150 if found_new_or_modified: | |
151 logger.info("change detected, reprocessed '%s'." % | |
152 procinfo.source.name) | |
153 self._runPipelinesSafe(procinfo.source) | |
154 procinfo.last_bake_time = time.time() | |
155 | |
156 time.sleep(self.interval) | |
157 | |
158 def _init(self): | |
159 self._app = self.appfactory.create() | |
160 self._last_records = MultiRecord() | |
161 | |
162 self._proc_infos = {} | |
163 for src in self._app.sources: | |
164 if src.config['pipeline'] != 'asset': | |
165 continue | |
166 | |
167 procinfo = _AssetProcessingInfo(src) | |
168 self._proc_infos[src.name] = procinfo | |
169 | |
170 # Build the list of initial asset files. | |
171 for item in src.getAllContents(): | |
172 procinfo.paths.add(item.spec) | |
173 | |
174 def _runPipelinesSafe(self, only_for_source=None): | |
175 try: | 139 try: |
176 self._runPipelines(only_for_source) | 140 self._doRunPipelines(only_for_source) |
177 except Exception as ex: | 141 except Exception as ex: |
178 logger.error("Error while running asset pipeline:") | 142 logger.error("Error while running asset pipeline:") |
179 logger.exception(ex) | 143 logger.exception(ex) |
180 | 144 |
181 def _runPipelines(self, only_for_source): | 145 def _doRunPipelines(self, only_for_source): |
182 from piecrust.baking.baker import Baker | 146 from piecrust.baking.baker import Baker |
183 | 147 |
184 allowed_sources = None | 148 allowed_sources = None |
185 if only_for_source: | 149 if only_for_source: |
186 allowed_sources = [only_for_source.name] | 150 allowed_sources = [only_for_source.name] |
229 with self._obs_lock: | 193 with self._obs_lock: |
230 observers = list(self._obs) | 194 observers = list(self._obs) |
231 for obs in observers: | 195 for obs in observers: |
232 obs.addBuildEvent(item) | 196 obs.addBuildEvent(item) |
233 | 197 |
198 | |
199 try: | |
200 from watchdog.observers import Observer | |
201 from watchdog.events import FileSystemEventHandler | |
202 _has_watchdog = True | |
203 except ImportError: | |
204 _has_watchdog = False | |
205 | |
206 | |
207 if _has_watchdog: | |
208 class _AssetFileEventHandler(FileSystemEventHandler): | |
209 def __init__(self, proc_loop, source): | |
210 self._proc_loop = proc_loop | |
211 self._source = source | |
212 | |
213 def on_any_event(self, event): | |
214 if event.is_directory: | |
215 return | |
216 | |
217 pl = self._proc_loop | |
218 with pl._lock: | |
219 pl._ops.append({ | |
220 'op': 'bake', | |
221 'source': self._source, | |
222 'path': event.src_path, | |
223 'change': event.event_type, | |
224 'time': time.time()}) | |
225 pl._event.set() | |
226 | |
227 | |
228 class _SiteConfigEventHandler(FileSystemEventHandler): | |
229 def __init__(self, proc_loop, path): | |
230 self._proc_loop = proc_loop | |
231 self._path = path | |
232 | |
233 def on_modified(self, event): | |
234 if event.src_path != self._path: | |
235 return | |
236 | |
237 pl = self._proc_loop | |
238 with pl._lock: | |
239 pl._ops.append({'op': 'reinit'}) | |
240 pl._event.set() | |
241 | |
242 | |
243 class WatchdogProcessingLoop(ProcessingLoopBase): | |
244 def __init__(self, appfactory, out_dir): | |
245 ProcessingLoopBase.__init__(self, appfactory, out_dir) | |
246 self._op_thread = threading.Thread( | |
247 name='watchdog-operations', | |
248 target=self._runOpThread, | |
249 daemon=True) | |
250 self._lock = threading.Lock() | |
251 self._event = threading.Event() | |
252 self._ops = [] | |
253 self._last_op_time = 0 | |
254 | |
255 def onStart(self): | |
256 logger.debug("Running watchdog monitor on:") | |
257 observer = Observer() | |
258 | |
259 event_handler = _SiteConfigEventHandler(self, self.config_path) | |
260 observer.schedule(event_handler, os.path.dirname(self.config_path)) | |
261 logger.debug(" - %s" % self.config_path) | |
262 | |
263 for src in self.getSources(): | |
264 path = getattr(src, 'fs_endpoint_path', None) | |
265 if not path: | |
266 logger.warn("Skipping source '%s' -- it doesn't have " | |
267 "a file-system endpoint." % src.name) | |
268 continue | |
269 if not os.path.isdir(path): | |
270 continue | |
271 | |
272 logger.debug(" - %s" % path) | |
273 event_handler = _AssetFileEventHandler(self, src) | |
274 observer.schedule(event_handler, path, recursive=True) | |
275 | |
276 observer.start() | |
277 self._op_thread.start() | |
278 | |
279 def _runOpThread(self): | |
280 while not server_shutdown: | |
281 try: | |
282 self._event.wait() | |
283 with self._lock: | |
284 ops = self._ops | |
285 self._ops = [] | |
286 self._event.clear() | |
287 | |
288 orig_len = len(ops) | |
289 lot = self._last_op_time | |
290 ops = list(filter(lambda o: o['time'] > lot, ops)) | |
291 logger.debug("Got %d ops, with %d that happened after " | |
292 "our last operation." % (orig_len, len(ops))) | |
293 if len(ops) == 0: | |
294 continue | |
295 | |
296 if any(filter(lambda o: o['op'] == 'reinit', ops)): | |
297 logger.info("Site configuration changed, " | |
298 "reloading pipeline.") | |
299 self.initialize() | |
300 self.runPipelines() | |
301 continue | |
302 | |
303 sources = set() | |
304 ops = list(filter(lambda o: o['op'] == 'bake', ops)) | |
305 for op in ops: | |
306 logger.info("Detected file-system change: " | |
307 "%s [%s]" % | |
308 (op['path'], op['change'])) | |
309 sources.add(op['source']) | |
310 | |
311 logger.debug("Processing: %s" % [s.name for s in sources]) | |
312 for s in sources: | |
313 self.runPipelines(s) | |
314 | |
315 self._last_op_time = time.time() | |
316 | |
317 except (KeyboardInterrupt, SystemExit): | |
318 break | |
319 | |
320 ProcessingLoop = WatchdogProcessingLoop | |
321 | |
322 else: | |
323 class LegacyProcessingLoop(ProcessingLoopBase, threading.Thread): | |
324 def __init__(self, appfactory, out_dir): | |
325 ProcessingLoopBase.__init__(self, appfactory, out_dir) | |
326 threading.Thread.__init__(self, name='pipeline-reloader', | |
327 daemon=True) | |
328 self.interval = 1 | |
329 self._proc_infos = None | |
330 self._last_config_mtime = 0 | |
331 | |
332 def onInitialize(self): | |
333 self._proc_infos = {} | |
334 for src in self.getSources(): | |
335 procinfo = _AssetProcessingInfo(src) | |
336 self._proc_infos[src.name] = procinfo | |
337 | |
338 # Build the list of initial asset files. | |
339 for item in src.getAllContents(): | |
340 procinfo.paths.add(item.spec) | |
341 | |
342 def onStart(self): | |
343 self._last_config_mtime = os.path.getmtime(self.config_path) | |
344 threading.Thread.start(self) | |
345 | |
346 def run(self): | |
347 while True: | |
348 cur_config_time = os.path.getmtime(self.config_path) | |
349 if self._last_config_mtime < cur_config_time: | |
350 logger.info("Site configuration changed, reloading pipeline.") | |
351 self._last_config_mtime = cur_config_time | |
352 self.initialize() | |
353 self.runPipelines() | |
354 continue | |
355 | |
356 for procinfo in self._proc_infos.values(): | |
357 # For each assets folder we try to find the first new or | |
358 # modified file. If any, we just run the pipeline on | |
359 # that source. | |
360 found_new_or_modified = False | |
361 for item in procinfo.source.getAllContents(): | |
362 path = item.spec | |
363 if path not in procinfo.paths: | |
364 logger.debug("Found new asset: %s" % path) | |
365 procinfo.paths.add(path) | |
366 found_new_or_modified = True | |
367 break | |
368 if os.path.getmtime(path) > procinfo.last_bake_time: | |
369 logger.debug("Found modified asset: %s" % path) | |
370 found_new_or_modified = True | |
371 break | |
372 if found_new_or_modified: | |
373 logger.info("change detected, reprocessed '%s'." % | |
374 procinfo.source.name) | |
375 self.runPipelines(procinfo.source) | |
376 procinfo.last_bake_time = time.time() | |
377 | |
378 time.sleep(self.interval) | |
379 | |
380 ProcessingLoop = LegacyProcessingLoop |