changeset 552:9612cfc6455a

serve: Rewrite of the Server-Sent Event code for build notifications. At the moment the server monitors the asset directories, and notifies the browser when an asset has changed and has been re-processed. * Fix issues around long-running requests/threads which mess up the ability to shutdown the server correctly with `CTRL-C` (see comments in code). * Move the notification queue to each SSE producer, to support having multiple pages open in a browser. * Add JS/CSS for showing quick notifications about re-processed assets. * Add support for hot-reloading CSS and pictures that have been re-processed.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 08 Aug 2015 16:12:04 -0700
parents f2b875ecc940
children cc6f3dbe3048
files piecrust/resources/server/piecrust-debug-info.css piecrust/resources/server/piecrust-debug-info.js piecrust/serving/procloop.py piecrust/serving/server.py piecrust/serving/wrappers.py
diffstat 5 files changed, 444 insertions(+), 98 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/resources/server/piecrust-debug-info.css	Sat Aug 08 16:12:04 2015 -0700
@@ -0,0 +1,32 @@
+@keyframes slideNotification {
+    0% { opacity: 1; }
+    100% { opacity: 0; right: -11em; }
+}
+
+.piecrust-debug-notifications {
+    font-size: 1rem;
+}
+
+.piecrust-debug-notification {
+    padding: 0.5em;
+    position: fixed;
+    bottom: 0.5em;
+    right: 0.5em;
+    width: 30%;
+    min-width: 10em;
+    border-radius: 5px;
+    animation-name: slideNotification;
+    animation-delay: 1s;
+    animation-duration: 1s;
+}
+
+.piecrust-debug-notification-success {
+    background-color: #4BB025;
+    border: 2px solid #3AA014;
+}
+
+.piecrust-debug-notification-error {
+    background-color: #920C0C;
+    border: 2px solid #810B0B;
+}
+
--- a/piecrust/resources/server/piecrust-debug-info.js	Sat Aug 08 15:55:24 2015 -0700
+++ b/piecrust/resources/server/piecrust-debug-info.js	Sat Aug 08 16:12:04 2015 -0700
@@ -1,45 +1,284 @@
+///////////////////////////////////////////////////////////////////////////////
+// PieCrust debug info and features
+//
+// This stuff is injected by PieCrust's preview server and shouldn't show up
+// in production. It should all be self-contained in this one file.
+///////////////////////////////////////////////////////////////////////////////
+
 var eventSource = new EventSource("/__piecrust_debug/pipeline_status");
 
-//window.onbeforeunload = function(e) {
-//    console.log("Disconnecting SSE.", e);
-//    eventSource.close();
-//};
+if (eventSource != null) {
+    eventSource.onerror = function(e) {
+        console.log("Error with SSE, closing.", e);
+        eventSource.close();
+    };
+
+    eventSource.addEventListener('ping', function(e) {
+    });
+
+    eventSource.addEventListener('pipeline_success', function(e) {
+        var obj = JSON.parse(e.data);
+        console.log("Got pipeline success", obj);
+
+        // Check which assets were processed, and whether they are referenced
+        // by the current page for the usual use-cases.
+        for (var i = 0; i < obj.assets.length; ++i) {
+            a = obj.assets[i];
+            if (assetReloader.reloadAsset(a)) {
+                notification.flashSuccess("Reloaded " + a);
+            }
+        }
+    });
+
+    eventSource.addEventListener('pipeline_error', function(e) {
+        var obj = JSON.parse(e.data);
+        console.log("Got pipeline error", obj);
+
+        var outer = document.createElement('div');
+        outer.style = 'padding: 1em;';
+        for (var i = 0; i < obj.assets.length; ++i) {
+            var item = obj.assets[i];
+            var markup = (
+                '<p>Error processing: <span style="font-family: monospace;">' +
+                item.path + '</span></p>\n' +
+                '<ul>');
+            for (var j = 0; j < item.errors.length; ++j) {
+                markup += (
+                    '<li style="font-family: monospace;">' + 
+                    item.errors[j] +
+                    '</li>\n');
+            }
+            markup += '</ul>\n';
+            var entry = document.createElement('div');
+            entry.innerHTML = markup;
+            outer.appendChild(entry);
+        }
+
+        var placeholder = document.getElementById('piecrust-debug-info-pipeline-status');
+        placeholder.appendChild(outer);
+    });
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+NotificationArea = function() {
+    var area = document.createElement('div');
+    area.id = 'piecrust-debug-notifications';
+    area.className = 'piecrust-debug-notifications';
+    document.querySelector('body').appendChild(area);
+
+    this._area = area;
+    this._lastId = 0;
+};
 
-eventSource.onerror = function(e) {
-    console.log("Error with SSE, closing.", e);
-    eventSource.close();
+NotificationArea.prototype.flashSuccess = function(msg) {
+    this.flashMessage(msg, 'success');
+};
+
+NotificationArea.prototype.flashError = function(msg) {
+    this.flashMessage(msg, 'error');
+};
+
+NotificationArea.prototype.flashMessage = function(msg, css_class) {
+    this._lastId += 1;
+    var thisId = this._lastId;
+    this._area.insertAdjacentHTML(
+            'afterbegin',
+            '<div id="piecrust-debug-notification-msg' + thisId + '" ' +
+                  'class="piecrust-debug-notification ' +
+                       'piecrust-debug-notification-' + css_class + '">' +
+                msg + '</div>');
+
+    window.setTimeout(this._discardNotification, 2000, thisId);
+};
+
+NotificationArea.prototype._discardNotification = function(noteId) {
+    var added = document.querySelector('#piecrust-debug-notification-msg' + noteId);
+    added.remove();
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+function _get_extension(name) {
+    var ext = null;
+    var dotIdx = name.lastIndexOf('.');
+    if (dotIdx > 0)
+        ext = name.substr(dotIdx + 1);
+    return ext;
+}
+
+function _get_basename(name) {
+    var filename = name;
+    var slashIdx = name.lastIndexOf('/');
+    if (slashIdx > 0)
+        filename = name.substr(slashIdx + 1);
+    return filename;
+}
+
+var _regex_cache_bust = /\?\d+$/;
+
+function _is_path_match(path1, path2) {
+    path1 = path1.replace(_regex_cache_bust, '');
+    console.log("Matching:", path1, path2)
+    return path1.endsWith(path2);
+};
+
+function _add_cache_bust(path, cache_bust) {
+    path = path.replace(_regex_cache_bust, '');
+    return path + cache_bust;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+AssetReloader = function() {
+    this._imgExts = ['jpg', 'jpeg', 'png', 'gif', 'svg'];
+    this._imgReloader = new ImageReloader();
+    this._cssReloader = new CssReloader();
 };
 
-eventSource.addEventListener('pipeline_success', function(e) {
-    var placeholder = document.getElementById('piecrust-debug-info-pipeline-status');
-    //if (placeholder.firstChild !== null)
-    placeholder.removeChild(placeholder.firstChild);
-});
-
-eventSource.addEventListener('pipeline_error', function(e) {
-    var obj = JSON.parse(e.data);
+AssetReloader.prototype.reloadAsset = function(name) {
+    var ext = _get_extension(name);
+    var filename = _get_basename(name);
 
-    var outer = document.createElement('div');
-    outer.style = 'padding: 1em;';
-    for (var i = 0; i < obj.assets.length; ++i) {
-        var item = obj.assets[i];
-        var markup = (
-            '<p>Error processing: <span style="font-family: monospace;">' +
-            item.path + '</span></p>\n' +
-            '<ul>');
-        for (var j = 0; j < item.errors.length; ++j) {
-            markup += (
-                '<li style="font-family: monospace;">' + 
-                item.errors[j] +
-                '</li>\n');
-        }
-        markup += '</ul>\n';
-        var entry = document.createElement('div');
-        entry.innerHTML = markup;
-        outer.appendChild(entry);
+    if (ext == 'css') {
+        return this._cssReloader.reloadStylesheet(filename);
+    }
+    if (this._imgExts.indexOf(ext) >= 0) {
+        return this._imgReloader.reloadImage(filename);
     }
 
-    var placeholder = document.getElementById('piecrust-debug-info-pipeline-status');
-    placeholder.appendChild(outer);
-});
+    console.log("Don't know how to reload", filename);
+    return false;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+CssReloader = function() {
+};
+
+CssReloader.prototype.reloadStylesheet = function(name) {
+    var result = false;
+    var sheets = document.styleSheets;
+    var cacheBust = '?' + new Date().getTime();
+    for (var i = 0; i < sheets.length; ++i) {
+        var sheet = sheets[i];
+        if (_is_path_match(sheet.href, name)) {
+            sheet.ownerNode.href = _add_cache_bust(sheet.href, cacheBust);
+            result = true;
+        }
+    }
+    return result;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+ImageReloader = function() {
+    this._imgStyles = [
+          { selector: 'background', styleNames: ['backgroundImage'] },
+        ];
+    this._regexCssUrl = /\burl\s*\(([^)]+)\)/;
+};
+
+ImageReloader.prototype.reloadImage = function(name) {
+    var result = false;
+    var imgs = document.images;
+    var cacheBust = '?' + new Date().getTime();
+    for (var i = 0; i < imgs.length; ++i) {
+        var img = imgs[i];
+        if (_is_path_match(img.src, name)) {
+            img.src = _add_cache_bust(img.src, cacheBust);
+            result = true;
+        }
+    }
+    for (var i = 0; i < this._imgStyles.length; ++i) {
+        var imgInfo = this._imgStyles[i];
+        var domImgs = document.querySelectorAll(
+                "[style*=" + imgInfo.selector + "]");
+        for (var j = 0; j < domImgs.length; ++j) {
+            var img = domImgs[j];
+            result |= this._reloadStyleImage(img.style, imgInfo.styleNames,
+                                             name, cacheBust);
+        }
+    }
+    for (var i = 0; i < document.styleSheets.length; ++i) {
+        var styleSheet = document.styleSheets[i];
+        result |= this._reloadStylesheetImage(styleSheet, name, cacheBust);
+    }
+    return result;
+};
 
+ImageReloader.prototype._reloadStyleImage = function(style, styleNames, path,
+                                                     cacheBust) {
+    var result = false;
+    for (var i = 0; i < styleNames.length; ++i) {
+        var value = style[styleNames[i]];
+        if ((typeof value) == 'string') {
+            m = this._regexCssUrl.exec(value);
+            if (m != null) {
+                var m_clean = m[1].replace(/^['"]/, '');
+                m_clean = m_clean.replace(/['"]$/, '');
+                if (_is_path_match(m_clean, path)) {
+                    m_clean = _add_cache_bust(m_clean, cacheBust);
+                    style[styleNames[i]] = 'url("' + m_clean + '")';
+                    result = true;
+                }
+            }
+        }
+    }
+    return result;
+};
+
+ImageReloader.prototype._reloadStylesheetImage = function(styleSheet, path,
+                                                          cacheBust) {
+    try {
+        var rules = styleSheet.cssRules;
+    } catch (e) {
+        // Things like remote CSS stylesheets (e.g. a Google Fonts ones)
+        // will triger a SecurityException here, so just ignore that.
+        return;
+    }
+
+    var result = false;
+    for (var i = 0; i < rules.length; ++i) {
+        var rule = rules[i];
+        switch (rule.type) {
+            case CSSRule.IMPORT_RULE:
+                result |= this._reloadStylesheetImage(rule.styleSheet, path,
+                                                      cacheBust);
+                break;
+            case CSSRule.MEDIA_RULE:
+                result |= this._reloadStylesheetImage(rule, path, cacheBust);
+                break;
+            case CSSRule.STYLE_RULE:
+                for (var j = 0; j < this._imgStyles.length; ++j) {
+                    var imgInfo = this._imgStyles[j];
+                    result |= this._reloadStyleImage(
+                            rule.style, imgInfo.styleNames, path, cacheBust);
+                }
+                break;
+        }
+    }
+    return result;
+};
+
+///////////////////////////////////////////////////////////////////////////////
+
+var notification = new NotificationArea();
+var assetReloader = new AssetReloader();
+
+window.onload = function() {
+    var cacheBust = '?' + new Date().getTime();
+
+    var style = document.createElement('link');
+    style.rel = 'stylesheet';
+    style.type = 'text/css';
+    style.href = '/__piecrust_static/piecrust-debug-info.css' + cacheBust;
+    document.head.appendChild(style);
+};
+
+
+window.onbeforeunload = function(e) {
+    if (eventSource != null)
+        eventSource.close();
+};
+
--- a/piecrust/serving/procloop.py	Sat Aug 08 15:55:24 2015 -0700
+++ b/piecrust/serving/procloop.py	Sat Aug 08 16:12:04 2015 -0700
@@ -4,58 +4,71 @@
 import json
 import queue
 import logging
+import itertools
 import threading
 
 
 logger = logging.getLogger(__name__)
 
-
-_sse_abort = threading.Event()
+# This flag is for cancelling all long running requests like SSEs.
+server_shutdown = False
 
 
-class PipelineStatusServerSideEventProducer(object):
-    def __init__(self, status_queue):
-        self.status_queue = status_queue
-        self.interval = 2
-        self.timeout = 60*10
+class PipelineStatusServerSentEventProducer(object):
+    """ The producer for Server-Sent Events (SSE) notifying the front-end
+        about useful things like assets having been re-processed in the
+        background.
+        Each has its own queue because the user could have multiple pages
+        open, each having to display notifications coming from the server.
+    """
+    def __init__(self, proc_loop):
+        self._proc_loop = proc_loop
+        self._queue = queue.Queue()
         self._start_time = 0
+        self._poll_interval = 0.5
+        self._ping_interval = 30
+        self._time_between_pings = 0
+        self._running = 0
+
+    def addBuildEvent(self, item):
+        self._queue.put_nowait(item)
 
     def run(self):
         logger.debug("Starting pipeline status SSE.")
+        self._proc_loop.addObserver(self)
         self._start_time = time.time()
+        self._running = 1
 
         outstr = 'event: ping\ndata: started\n\n'
         yield bytes(outstr, 'utf8')
 
-        count = 0
-        while True:
-            if time.time() > self.timeout + self._start_time:
-                logger.debug("Closing pipeline status SSE, timeout reached.")
-                outstr = 'event: pipeline_timeout\ndata: bye\n\n'
-                yield bytes(outstr, 'utf8')
-                break
-
-            if _sse_abort.is_set():
-                break
-
+        while self._running == 1 and not server_shutdown:
             try:
-                logger.debug("Polling pipeline status queue...")
-                count += 1
-                data = self.status_queue.get(True, self.interval)
+                # We use a short poll interval (less than a second) because
+                # we need to catch `server_shutdown` going `True` as soon as
+                # possible to exit this thread when the user hits `CTRL+C`.
+                data = self._queue.get(True, self._poll_interval)
             except queue.Empty:
-                if count < 3:
-                    continue
-                data = {'type': 'ping', 'message': 'ping'}
-                count = 0
+                # Not exact timing but close enough.
+                self._time_between_pings += self._poll_interval
+                if self._time_between_pings >= self._ping_interval:
+                    self._time_between_pings = 0
+                    logger.debug("Sending ping/heartbeat event.")
+                    outstr = 'event: ping\ndata: 1\n\n'
+                    yield bytes(outstr, 'utf8')
+                continue
 
-            event_type = data['type']
-            outstr = 'event: %s\ndata: %s\n\n' % (
-                    event_type, json.dumps(data))
             logger.debug("Sending pipeline status SSE.")
+            outstr = (('event: %s\n' % data['type']) +
+                      ('id: %s\n' % data['id']) +
+                      ('data: %s\n\n' % json.dumps(data)))
+            self._queue.task_done()
             yield bytes(outstr, 'utf8')
 
     def close(self):
         logger.debug("Closing pipeline status SSE.")
+        self._proc_loop.removeObserver(self)
+        self._running = 2
 
 
 class ProcessingLoop(threading.Thread):
@@ -63,11 +76,21 @@
         super(ProcessingLoop, self).__init__(
                 name='pipeline-reloader', daemon=True)
         self.pipeline = pipeline
-        self.status_queue = queue.Queue()
+        self.last_status_id = 0
         self.interval = 1
         self._paths = set()
         self._record = None
         self._last_bake = 0
+        self._obs = []
+        self._obs_lock = threading.Lock()
+
+    def addObserver(self, obs):
+        with self._obs_lock:
+            self._obs.append(obs)
+
+    def removeObserver(self, obs):
+        with self._obs_lock:
+            self._obs.remove(obs)
 
     def run(self):
         # Build the first list of known files and run the pipeline once.
@@ -116,22 +139,25 @@
                     previous_record=self._record,
                     save_record=False)
 
-            # Update the status queue.
-            # (we need to clear it because there may not be a consumer
-            #  on the other side, if the user isn't running with the
-            #  debug window active)
-            while True:
-                try:
-                    self.status_queue.get_nowait()
-                except queue.Empty:
-                    break
+            status_id = self.last_status_id + 1
+            self.last_status_id += 1
 
             if self._record.success:
+                changed = filter(
+                        lambda i: not i.was_collapsed_from_last_run,
+                        self._record.entries)
+                changed = itertools.chain.from_iterable(
+                        map(lambda i: i.rel_outputs, changed))
+                changed = list(changed)
                 item = {
-                        'type': 'pipeline_success'}
-                self.status_queue.put_nowait(item)
+                        'id': status_id,
+                        'type': 'pipeline_success',
+                        'assets': changed}
+
+                self._notifyObservers(item)
             else:
                 item = {
+                        'id': status_id,
                         'type': 'pipeline_error',
                         'assets': []}
                 for entry in self._record.entries:
@@ -140,7 +166,14 @@
                                 'path': entry.rel_input,
                                 'errors': list(entry.errors)}
                         item['assets'].append(asset_item)
-                self.status_queue.put_nowait(item)
-        except:
-            pass
+
+                self._notifyObservers(item)
+        except Exception as ex:
+            logger.exception(ex)
 
+    def _notifyObservers(self, item):
+        with self._obs_lock:
+            observers = list(self._obs)
+        for obs in observers:
+            obs.addBuildEvent(item)
+
--- a/piecrust/serving/server.py	Sat Aug 08 15:55:24 2015 -0700
+++ b/piecrust/serving/server.py	Sat Aug 08 16:12:04 2015 -0700
@@ -110,13 +110,19 @@
 
     def _run_request(self, environ, start_response):
         try:
-            return self._try_run_request(environ, start_response)
+            response = self._try_run_request(environ)
+            if isinstance(response, tuple):
+                response, close_func = response
+                return ClosingIterator(response(environ, start_response),
+                                       [close_func])
+            else:
+                return response(environ, start_response)
         except Exception as ex:
             if self.debug:
                 raise
             return self._handle_error(ex, environ, start_response)
 
-    def _try_run_request(self, environ, start_response):
+    def _try_run_request(self, environ):
         request = Request(environ)
 
         # We don't support anything else than GET requests since we're
@@ -129,12 +135,12 @@
         # Handle special requests right away.
         response = self._try_special_request(environ, request)
         if response is not None:
-            return response(environ, start_response)
+            return response
 
         # Also handle requests to a pipeline-built asset right away.
         response = self._try_serve_asset(environ, request)
         if response is not None:
-            return response(environ, start_response)
+            return response
 
         # Create the app for this request.
         app = PieCrust(root_dir=self.root_dir, debug=self.debug)
@@ -153,12 +159,12 @@
         # Let's see if it can be a page asset.
         response = self._try_serve_page_asset(app, environ, request)
         if response is not None:
-            return response(environ, start_response)
+            return response
 
         # Nope. Let's see if it's an actual page.
         try:
             response = self._try_serve_page(app, environ, request)
-            return response(environ, start_response)
+            return response
         except (RouteNotFoundError, SourceNotFoundError) as ex:
             raise NotFound() from ex
         except HTTPException:
@@ -187,19 +193,24 @@
         debug_mount = '/__piecrust_debug/'
         if request.path.startswith(debug_mount):
             rel_req_path = request.path[len(debug_mount):]
+            if rel_req_path == 'werkzeug_shutdown':
+                shutdown_func = environ.get('werkzeug.server.shutdown')
+                if shutdown_func is None:
+                    raise RuntimeError('Not running with the Werkzeug Server')
+                shutdown_func()
+                return Response("Server shutting down...")
+
             if rel_req_path == 'pipeline_status':
                 from piecrust.serving.procloop import (
-                        PipelineStatusServerSideEventProducer)
-                provider = PipelineStatusServerSideEventProducer(
-                        self._proc_loop.status_queue)
-                it = ClosingIterator(provider.run(), [provider.close])
-                response = Response(it)
+                        PipelineStatusServerSentEventProducer)
+                provider = PipelineStatusServerSentEventProducer(
+                        self._proc_loop)
+                it = provider.run()
+                response = Response(it, mimetype='text/event-stream')
                 response.headers['Cache-Control'] = 'no-cache'
-                if 'text/event-stream' in request.accept_mimetypes:
-                    response.mimetype = 'text/event-stream'
-                response.direct_passthrough = True
-                response.implicit_sequence_conversion = False
-                return response
+                response.headers['Last-Event-ID'] = \
+                    self._proc_loop.last_status_id
+                return response, provider.close
 
         return None
 
--- a/piecrust/serving/wrappers.py	Sat Aug 08 15:55:24 2015 -0700
+++ b/piecrust/serving/wrappers.py	Sat Aug 08 16:12:04 2015 -0700
@@ -1,6 +1,11 @@
 import os
+import logging
+import threading
+import urllib.request
 from piecrust.serving.server import Server
-from piecrust.serving.procloop import _sse_abort
+
+
+logger = logging.getLogger(__name__)
 
 
 def run_werkzeug_server(root_dir, host, port,
@@ -22,13 +27,39 @@
                                debug=debug_piecrust,
                                sub_cache_dir=sub_cache_dir,
                                run_sse_check=_run_sse_check)
-    try:
+
+    # We need to run Werkzeug in a background thread because we may have some
+    # SSE responses running. In theory we should be using a proper async
+    # server for this kind of stuff, but I'd rather avoid additional
+    # dependencies on stuff that's not necessarily super portable.
+    # Anyway we run the server in multi-threading mode, but the request
+    # threads are not set to `daemon` mode (and there's no way to set that
+    # flag without re-implementing `run_simple` apparently). So instead we
+    # run the server in a background thread so we keep the main thread to
+    # ourselves here, which means we can trap `KeyboardInterrupt`, and set
+    # a global flag that will kill all the long-running SSE threads and make
+    # this whole thing exit cleanly and properly (hopefully).
+    def _inner():
         run_simple(host, port, app,
                    threaded=True,
                    use_debugger=use_debugger,
                    use_reloader=use_reloader)
+
+    t = threading.Thread(name='WerkzeugServer', target=_inner)
+    t.start()
+    try:
+        while t.is_alive():
+            t.join(0.5)
+    except KeyboardInterrupt:
+        shutdown_url = 'http://%s:%s/__piecrust_debug/werkzeug_shutdown' % (
+                host, port)
+        logger.info("")
+        logger.info("Shutting down server...")
+        urllib.request.urlopen(shutdown_url)
     finally:
-        _sse_abort.set()
+        logger.debug("Terminating push notifications...")
+        from piecrust.serving import procloop
+        procloop.server_shutdown = True
 
 
 def run_gunicorn_server(root_dir,