diff piecrust/serving/procloop.py @ 552:9612cfc6455a

serve: Rewrite of the Server-Sent Event code for build notifications. At the moment the server monitors the asset directories, and notifies the browser when an asset has changed and has been re-processed. * Fix issues around long-running requests/threads which mess up the ability to shutdown the server correctly with `CTRL-C` (see comments in code). * Move the notification queue to each SSE producer, to support having multiple pages open in a browser. * Add JS/CSS for showing quick notifications about re-processed assets. * Add support for hot-reloading CSS and pictures that have been re-processed.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 08 Aug 2015 16:12:04 -0700
parents fa3ee8a8ee2d
children cc6f3dbe3048
line wrap: on
line diff
--- a/piecrust/serving/procloop.py	Sat Aug 08 15:55:24 2015 -0700
+++ b/piecrust/serving/procloop.py	Sat Aug 08 16:12:04 2015 -0700
@@ -4,58 +4,71 @@
 import json
 import queue
 import logging
+import itertools
 import threading
 
 
 logger = logging.getLogger(__name__)
 
-
-_sse_abort = threading.Event()
+# This flag is for cancelling all long running requests like SSEs.
+server_shutdown = False
 
 
-class PipelineStatusServerSideEventProducer(object):
-    def __init__(self, status_queue):
-        self.status_queue = status_queue
-        self.interval = 2
-        self.timeout = 60*10
+class PipelineStatusServerSentEventProducer(object):
+    """ The producer for Server-Sent Events (SSE) notifying the front-end
+        about useful things like assets having been re-processed in the
+        background.
+        Each has its own queue because the user could have multiple pages
+        open, each having to display notifications coming from the server.
+    """
+    def __init__(self, proc_loop):
+        self._proc_loop = proc_loop
+        self._queue = queue.Queue()
         self._start_time = 0
+        self._poll_interval = 0.5
+        self._ping_interval = 30
+        self._time_between_pings = 0
+        self._running = 0
+
+    def addBuildEvent(self, item):
+        self._queue.put_nowait(item)
 
     def run(self):
         logger.debug("Starting pipeline status SSE.")
+        self._proc_loop.addObserver(self)
         self._start_time = time.time()
+        self._running = 1
 
         outstr = 'event: ping\ndata: started\n\n'
         yield bytes(outstr, 'utf8')
 
-        count = 0
-        while True:
-            if time.time() > self.timeout + self._start_time:
-                logger.debug("Closing pipeline status SSE, timeout reached.")
-                outstr = 'event: pipeline_timeout\ndata: bye\n\n'
-                yield bytes(outstr, 'utf8')
-                break
-
-            if _sse_abort.is_set():
-                break
-
+        while self._running == 1 and not server_shutdown:
             try:
-                logger.debug("Polling pipeline status queue...")
-                count += 1
-                data = self.status_queue.get(True, self.interval)
+                # We use a short poll interval (less than a second) because
+                # we need to catch `server_shutdown` going `True` as soon as
+                # possible to exit this thread when the user hits `CTRL+C`.
+                data = self._queue.get(True, self._poll_interval)
             except queue.Empty:
-                if count < 3:
-                    continue
-                data = {'type': 'ping', 'message': 'ping'}
-                count = 0
+                # Not exact timing but close enough.
+                self._time_between_pings += self._poll_interval
+                if self._time_between_pings >= self._ping_interval:
+                    self._time_between_pings = 0
+                    logger.debug("Sending ping/heartbeat event.")
+                    outstr = 'event: ping\ndata: 1\n\n'
+                    yield bytes(outstr, 'utf8')
+                continue
 
-            event_type = data['type']
-            outstr = 'event: %s\ndata: %s\n\n' % (
-                    event_type, json.dumps(data))
             logger.debug("Sending pipeline status SSE.")
+            outstr = (('event: %s\n' % data['type']) +
+                      ('id: %s\n' % data['id']) +
+                      ('data: %s\n\n' % json.dumps(data)))
+            self._queue.task_done()
             yield bytes(outstr, 'utf8')
 
     def close(self):
         logger.debug("Closing pipeline status SSE.")
+        self._proc_loop.removeObserver(self)
+        self._running = 2
 
 
 class ProcessingLoop(threading.Thread):
@@ -63,11 +76,21 @@
         super(ProcessingLoop, self).__init__(
                 name='pipeline-reloader', daemon=True)
         self.pipeline = pipeline
-        self.status_queue = queue.Queue()
+        self.last_status_id = 0
         self.interval = 1
         self._paths = set()
         self._record = None
         self._last_bake = 0
+        self._obs = []
+        self._obs_lock = threading.Lock()
+
+    def addObserver(self, obs):
+        with self._obs_lock:
+            self._obs.append(obs)
+
+    def removeObserver(self, obs):
+        with self._obs_lock:
+            self._obs.remove(obs)
 
     def run(self):
         # Build the first list of known files and run the pipeline once.
@@ -116,22 +139,25 @@
                     previous_record=self._record,
                     save_record=False)
 
-            # Update the status queue.
-            # (we need to clear it because there may not be a consumer
-            #  on the other side, if the user isn't running with the
-            #  debug window active)
-            while True:
-                try:
-                    self.status_queue.get_nowait()
-                except queue.Empty:
-                    break
+            status_id = self.last_status_id + 1
+            self.last_status_id += 1
 
             if self._record.success:
+                changed = filter(
+                        lambda i: not i.was_collapsed_from_last_run,
+                        self._record.entries)
+                changed = itertools.chain.from_iterable(
+                        map(lambda i: i.rel_outputs, changed))
+                changed = list(changed)
                 item = {
-                        'type': 'pipeline_success'}
-                self.status_queue.put_nowait(item)
+                        'id': status_id,
+                        'type': 'pipeline_success',
+                        'assets': changed}
+
+                self._notifyObservers(item)
             else:
                 item = {
+                        'id': status_id,
                         'type': 'pipeline_error',
                         'assets': []}
                 for entry in self._record.entries:
@@ -140,7 +166,14 @@
                                 'path': entry.rel_input,
                                 'errors': list(entry.errors)}
                         item['assets'].append(asset_item)
-                self.status_queue.put_nowait(item)
-        except:
-            pass
+
+                self._notifyObservers(item)
+        except Exception as ex:
+            logger.exception(ex)
 
+    def _notifyObservers(self, item):
+        with self._obs_lock:
+            observers = list(self._obs)
+        for obs in observers:
+            obs.addBuildEvent(item)
+