comparison piecrust/serving/procloop.py @ 1043:54eb8ad9e809

serve: Use `watchdog` for a more efficient monitoring of assets files.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 20 Jan 2018 17:24:08 -0800
parents 89d94955b818
children 2f39ffa601a9
comparison
equal deleted inserted replaced
1042:895f49c9833d 1043:54eb8ad9e809
79 self.source = source 79 self.source = source
80 self.paths = set() 80 self.paths = set()
81 self.last_bake_time = time.time() 81 self.last_bake_time = time.time()
82 82
83 83
84 class ProcessingLoop(threading.Thread): 84 class ProcessingLoopBase:
85 def __init__(self, appfactory, out_dir): 85 def __init__(self, appfactory, out_dir):
86 super().__init__(name='pipeline-reloader', daemon=True)
87 self.appfactory = appfactory 86 self.appfactory = appfactory
88 self.out_dir = out_dir 87 self.out_dir = out_dir
89 self.last_status_id = 0 88 self.last_status_id = 0
90 self.interval = 1
91 self._app = None 89 self._app = None
92 self._proc_infos = None
93 self._last_records = None
94 self._last_config_mtime = 0
95 self._obs = [] 90 self._obs = []
96 self._obs_lock = threading.Lock() 91 self._obs_lock = threading.Lock()
97 config_name = ( 92 config_name = (
98 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) 93 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
99 self._config_path = os.path.join(appfactory.root_dir, config_name) 94 self.config_path = os.path.join(appfactory.root_dir, config_name)
100 95
101 def addObserver(self, obs): 96 def addObserver(self, obs):
102 with self._obs_lock: 97 with self._obs_lock:
103 self._obs.append(obs) 98 self._obs.append(obs)
104 99
105 def removeObserver(self, obs): 100 def removeObserver(self, obs):
106 with self._obs_lock: 101 with self._obs_lock:
107 self._obs.remove(obs) 102 self._obs.remove(obs)
108 103
109 def run(self): 104 def getApp(self):
110 logger.debug("Initializing processing loop with output: %s" % 105 return self._app
111 self.out_dir) 106
107 def initialize(self):
108 self._app = self.appfactory.create()
109 self.onInitialize()
110
111 def onInitialize(self):
112 pass
113
114 def start(self):
115 logger.info("Starting processing loop with output: %s" %
116 self.out_dir)
112 try: 117 try:
113 self._init() 118 self.initialize()
114 except Exception as ex: 119 except Exception as ex:
115 logger.error("Error initializing processing loop:") 120 logger.error("Error initializing processing loop:")
116 logger.exception(ex) 121 logger.exception(ex)
117 return 122 return
118 123
119 logger.debug("Doing initial processing loop bake...") 124 logger.debug("Doing initial processing loop bake...")
120 self._runPipelinesSafe() 125 # self.runPipelines()
121 126
122 logger.debug("Running processing loop...") 127 self.onStart()
123 self._last_config_mtime = os.path.getmtime(self._config_path) 128
124 129 def onStart(self):
125 while True: 130 raise NotImplementedError()
126 cur_config_time = os.path.getmtime(self._config_path) 131
127 if self._last_config_mtime < cur_config_time: 132 def getSources(self):
128 logger.info("Site configuration changed, reloading pipeline.") 133 for src in self._app.sources:
129 self._last_config_mtime = cur_config_time 134 if src.config.get('pipeline') != 'asset':
130 self._init()
131 self._runPipelines()
132 continue 135 continue
133 136 yield src
134 for procinfo in self._proc_infos.values(): 137
135 # For each assets folder we try to find the first new or 138 def runPipelines(self, only_for_source=None):
136 # modified file. If any, we just run the pipeline on
137 # that source.
138 found_new_or_modified = False
139 for item in procinfo.source.getAllContents():
140 path = item.spec
141 if path not in procinfo.paths:
142 logger.debug("Found new asset: %s" % path)
143 procinfo.paths.add(path)
144 found_new_or_modified = True
145 break
146 if os.path.getmtime(path) > procinfo.last_bake_time:
147 logger.debug("Found modified asset: %s" % path)
148 found_new_or_modified = True
149 break
150 if found_new_or_modified:
151 logger.info("change detected, reprocessed '%s'." %
152 procinfo.source.name)
153 self._runPipelinesSafe(procinfo.source)
154 procinfo.last_bake_time = time.time()
155
156 time.sleep(self.interval)
157
158 def _init(self):
159 self._app = self.appfactory.create()
160 self._last_records = MultiRecord()
161
162 self._proc_infos = {}
163 for src in self._app.sources:
164 if src.config['pipeline'] != 'asset':
165 continue
166
167 procinfo = _AssetProcessingInfo(src)
168 self._proc_infos[src.name] = procinfo
169
170 # Build the list of initial asset files.
171 for item in src.getAllContents():
172 procinfo.paths.add(item.spec)
173
174 def _runPipelinesSafe(self, only_for_source=None):
175 try: 139 try:
176 self._runPipelines(only_for_source) 140 self._doRunPipelines(only_for_source)
177 except Exception as ex: 141 except Exception as ex:
178 logger.error("Error while running asset pipeline:") 142 logger.error("Error while running asset pipeline:")
179 logger.exception(ex) 143 logger.exception(ex)
180 144
181 def _runPipelines(self, only_for_source): 145 def _doRunPipelines(self, only_for_source):
182 from piecrust.baking.baker import Baker 146 from piecrust.baking.baker import Baker
183 147
184 allowed_sources = None 148 allowed_sources = None
185 if only_for_source: 149 if only_for_source:
186 allowed_sources = [only_for_source.name] 150 allowed_sources = [only_for_source.name]
229 with self._obs_lock: 193 with self._obs_lock:
230 observers = list(self._obs) 194 observers = list(self._obs)
231 for obs in observers: 195 for obs in observers:
232 obs.addBuildEvent(item) 196 obs.addBuildEvent(item)
233 197
198
199 try:
200 from watchdog.observers import Observer
201 from watchdog.events import FileSystemEventHandler
202 _has_watchdog = True
203 except ImportError:
204 _has_watchdog = False
205
206
207 if _has_watchdog:
208 class _AssetFileEventHandler(FileSystemEventHandler):
209 def __init__(self, proc_loop, source):
210 self._proc_loop = proc_loop
211 self._source = source
212
213 def on_any_event(self, event):
214 if event.is_directory:
215 return
216
217 pl = self._proc_loop
218 with pl._lock:
219 pl._ops.append({
220 'op': 'bake',
221 'source': self._source,
222 'path': event.src_path,
223 'change': event.event_type,
224 'time': time.time()})
225 pl._event.set()
226
227
228 class _SiteConfigEventHandler(FileSystemEventHandler):
229 def __init__(self, proc_loop, path):
230 self._proc_loop = proc_loop
231 self._path = path
232
233 def on_modified(self, event):
234 if event.src_path != self._path:
235 return
236
237 pl = self._proc_loop
238 with pl._lock:
239 pl._ops.append({'op': 'reinit'})
240 pl._event.set()
241
242
243 class WatchdogProcessingLoop(ProcessingLoopBase):
244 def __init__(self, appfactory, out_dir):
245 ProcessingLoopBase.__init__(self, appfactory, out_dir)
246 self._op_thread = threading.Thread(
247 name='watchdog-operations',
248 target=self._runOpThread,
249 daemon=True)
250 self._lock = threading.Lock()
251 self._event = threading.Event()
252 self._ops = []
253 self._last_op_time = 0
254
255 def onStart(self):
256 logger.debug("Running watchdog monitor on:")
257 observer = Observer()
258
259 event_handler = _SiteConfigEventHandler(self, self.config_path)
260 observer.schedule(event_handler, os.path.dirname(self.config_path))
261 logger.debug(" - %s" % self.config_path)
262
263 for src in self.getSources():
264 path = getattr(src, 'fs_endpoint_path', None)
265 if not path:
266 logger.warn("Skipping source '%s' -- it doesn't have "
267 "a file-system endpoint." % src.name)
268 continue
269 if not os.path.isdir(path):
270 continue
271
272 logger.debug(" - %s" % path)
273 event_handler = _AssetFileEventHandler(self, src)
274 observer.schedule(event_handler, path, recursive=True)
275
276 observer.start()
277 self._op_thread.start()
278
279 def _runOpThread(self):
280 while not server_shutdown:
281 try:
282 self._event.wait()
283 with self._lock:
284 ops = self._ops
285 self._ops = []
286 self._event.clear()
287
288 orig_len = len(ops)
289 lot = self._last_op_time
290 ops = list(filter(lambda o: o['time'] > lot, ops))
291 logger.debug("Got %d ops, with %d that happened after "
292 "our last operation." % (orig_len, len(ops)))
293 if len(ops) == 0:
294 continue
295
296 if any(filter(lambda o: o['op'] == 'reinit', ops)):
297 logger.info("Site configuration changed, "
298 "reloading pipeline.")
299 self.initialize()
300 self.runPipelines()
301 continue
302
303 sources = set()
304 ops = list(filter(lambda o: o['op'] == 'bake', ops))
305 for op in ops:
306 logger.info("Detected file-system change: "
307 "%s [%s]" %
308 (op['path'], op['change']))
309 sources.add(op['source'])
310
311 logger.debug("Processing: %s" % [s.name for s in sources])
312 for s in sources:
313 self.runPipelines(s)
314
315 self._last_op_time = time.time()
316
317 except (KeyboardInterrupt, SystemExit):
318 break
319
320 ProcessingLoop = WatchdogProcessingLoop
321
322 else:
323 class LegacyProcessingLoop(ProcessingLoopBase, threading.Thread):
324 def __init__(self, appfactory, out_dir):
325 ProcessingLoopBase.__init__(self, appfactory, out_dir)
326 threading.Thread.__init__(self, name='pipeline-reloader',
327 daemon=True)
328 self.interval = 1
329 self._proc_infos = None
330 self._last_config_mtime = 0
331
332 def onInitialize(self):
333 self._proc_infos = {}
334 for src in self.getSources():
335 procinfo = _AssetProcessingInfo(src)
336 self._proc_infos[src.name] = procinfo
337
338 # Build the list of initial asset files.
339 for item in src.getAllContents():
340 procinfo.paths.add(item.spec)
341
342 def onStart(self):
343 self._last_config_mtime = os.path.getmtime(self.config_path)
344 threading.Thread.start(self)
345
346 def run(self):
347 while True:
348 cur_config_time = os.path.getmtime(self.config_path)
349 if self._last_config_mtime < cur_config_time:
350 logger.info("Site configuration changed, reloading pipeline.")
351 self._last_config_mtime = cur_config_time
352 self.initialize()
353 self.runPipelines()
354 continue
355
356 for procinfo in self._proc_infos.values():
357 # For each assets folder we try to find the first new or
358 # modified file. If any, we just run the pipeline on
359 # that source.
360 found_new_or_modified = False
361 for item in procinfo.source.getAllContents():
362 path = item.spec
363 if path not in procinfo.paths:
364 logger.debug("Found new asset: %s" % path)
365 procinfo.paths.add(path)
366 found_new_or_modified = True
367 break
368 if os.path.getmtime(path) > procinfo.last_bake_time:
369 logger.debug("Found modified asset: %s" % path)
370 found_new_or_modified = True
371 break
372 if found_new_or_modified:
373 logger.info("change detected, reprocessed '%s'." %
374 procinfo.source.name)
375 self.runPipelines(procinfo.source)
376 procinfo.last_bake_time = time.time()
377
378 time.sleep(self.interval)
379
380 ProcessingLoop = LegacyProcessingLoop