changeset 219:d7a548ebcd58

serve: Add server sent events for showing pipeline errors in the debug window. The server can now run an endpoint that streams pipeline status, including errors or "fixed" statuses. As a result, I had to investigate using event-loop based server alternatives, before I figured out the correct flag to set in Werkzeug. Support for Gunicorn is therefore now possible, although disabled by default. I will come in handy though when proper support for CMS-mode is enabled.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 02 Feb 2015 08:34:44 -0800
parents 10f24c62b05b
children 84e2bc2d16cb
files piecrust/commands/builtin/serving.py piecrust/data/debug.py piecrust/resources/server/piecrust-debug-info.js piecrust/serving.py
diffstat 4 files changed, 260 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/commands/builtin/serving.py	Sat Jan 31 17:36:01 2015 -0800
+++ b/piecrust/commands/builtin/serving.py	Mon Feb 02 08:34:44 2015 -0800
@@ -13,25 +13,73 @@
         self.description = "Runs a local web server to serve your website."
 
     def setupParser(self, parser, app):
-        parser.add_argument('-p', '--port',
+        parser.add_argument(
+                '-p', '--port',
                 help="The port for the web server",
                 default=8080)
-        parser.add_argument('-a', '--address',
+        parser.add_argument(
+                '-a', '--address',
                 help="The host for the web server",
                 default='localhost')
-        parser.add_argument('--use-reloader',
+        parser.add_argument(
+                '--use-reloader',
                 help="Restart the server when PieCrust code changes",
                 action='store_true')
-        parser.add_argument('--use-debugger',
+        parser.add_argument(
+                '--use-debugger',
                 help="Show the debugger when an error occurs",
                 action='store_true')
+        parser.add_argument(
+                '--wsgi',
+                help="The WSGI server implementation to use",
+                choices=['werkzeug', 'gunicorn'],
+                default='werkzeug')
 
     def run(self, ctx):
+        host = ctx.args.address
+        port = int(ctx.args.port)
+        debug = ctx.args.debug or ctx.args.use_debugger
+
         server = Server(
                 ctx.app.root_dir,
-                host=ctx.args.address,
-                port=ctx.args.port,
-                debug=(ctx.args.debug or ctx.args.use_debugger),
+                debug=debug,
                 use_reloader=ctx.args.use_reloader)
-        server.run()
+        app = server.getWsgiApp()
+
+        if ctx.args.wsgi == 'werkzeug':
+            from werkzeug.serving import run_simple
+            run_simple(host, port, app,
+                       threaded=True,
+                       use_debugger=debug,
+                       use_reloader=ctx.args.use_reloader)
+
+        elif ctx.args.wsgi == 'gunicorn':
+            from gunicorn.app.base import BaseApplication
+
+            class PieCrustGunicornApplication(BaseApplication):
+                def __init__(self, app, options):
+                    self.app = app
+                    self.options = options
+                    super(PieCrustGunicornApplication, self).__init__()
 
+                def load_config(self):
+                    for k, v in self.options.items():
+                        if k in self.cfg.settings and v is not None:
+                            self.cfg.set(k, v)
+
+                def load(self):
+                    return self.app
+
+            options = {
+                    'bind': '%s:%s' % (host, port),
+                    'accesslog': '-',
+                    'worker_class': 'gaiohttp',
+                    'workers': 2,
+                    'timeout': 999999}
+            if debug:
+                options['loglevel'] = 'debug'
+            if ctx.args.use_reloader:
+                options['reload'] = True
+            app_wrapper = PieCrustGunicornApplication(app, options)
+            app_wrapper.run()
+
--- a/piecrust/data/debug.py	Sat Jan 31 17:36:01 2015 -0800
+++ b/piecrust/data/debug.py	Mon Feb 02 08:34:44 2015 -0800
@@ -14,12 +14,11 @@
 
 
 # CSS for the debug window.
-CSS_DEBUGINFO = """
+CSS_DEBUGWINDOW = """
 text-align: left;
+font-family: serif;
 font-style: normal;
-padding: 1em;
-background: #a42;
-color: #fff;
+font-weight: normal;
 position: fixed;
 width: 50%;
 bottom: 0;
@@ -29,6 +28,17 @@
 box-shadow: 0 0 10px #633;
 """
 
+CSS_PIPELINESTATUS = """
+background: #fff;
+color: #a22;
+"""
+
+CSS_DEBUGINFO = """
+padding: 1em;
+background: #a42;
+color: #fff;
+"""
+
 # HTML elements.
 CSS_P = 'margin: 0; padding: 0;'
 CSS_A = 'color: #fff; text-decoration: none;'
@@ -62,10 +72,16 @@
 def _do_build_debug_info(page, data, output):
     app = page.app
 
-    print('<div id="piecrust-debug-info" style="%s">' % CSS_DEBUGINFO, file=output)
+    print('<div id="piecrust-debug-info" style="%s">' % CSS_DEBUGWINDOW,
+          file=output)
 
-    print('<div>', file=output)
-    print('<p style="%s"><strong>PieCrust %s</strong> &mdash; ' % (CSS_P, APP_VERSION), file=output)
+    print('<div id="piecrust-debug-info-pipeline-status" style="%s">' %
+          CSS_PIPELINESTATUS, file=output)
+    print('</div>', file=output)
+
+    print('<div style="%s">' % CSS_DEBUGINFO, file=output)
+    print('<p style="%s"><strong>PieCrust %s</strong> &mdash; ' %
+          (CSS_P, APP_VERSION), file=output)
 
     # If we have some execution info in the environment,
     # add more information.
@@ -92,7 +108,7 @@
     print('</div>', file=output)
 
     if data:
-        print('<div>', file=output)
+        print('<div style="%s padding-top: 0;">' % CSS_DEBUGINFO, file=output)
         print(('<p style="%s cursor: pointer;" onclick="var l = '
                          'document.getElementById(\'piecrust-debug-details\'); '
                          'if (l.style.display == \'none\') l.style.display = '
@@ -122,6 +138,9 @@
 
     print('</div>', file=output)
 
+    print('<script src="/__piecrust_static/piecrust-debug-info.js"></script>',
+            file=output)
+
 
 class DebugDataRenderer(object):
     MAX_VALUE_LENGTH = 150
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/resources/server/piecrust-debug-info.js	Mon Feb 02 08:34:44 2015 -0800
@@ -0,0 +1,45 @@
+var eventSource = new EventSource("/__piecrust_debug/pipeline_status");
+
+//window.onbeforeunload = function(e) {
+//    console.log("Disconnecting SSE.", e);
+//    eventSource.close();
+//};
+
+eventSource.onerror = function(e) {
+    console.log("Error with SSE, closing.", e);
+    eventSource.close();
+};
+
+eventSource.addEventListener('pipeline_success', function(e) {
+    var placeholder = document.getElementById('piecrust-debug-info-pipeline-status');
+    //if (placeholder.firstChild !== null)
+    placeholder.removeChild(placeholder.firstChild);
+});
+
+eventSource.addEventListener('pipeline_error', function(e) {
+    var obj = JSON.parse(e.data);
+
+    var outer = document.createElement('div');
+    outer.style = 'padding: 1em;';
+    for (var i = 0; i < obj.assets.length; ++i) {
+        var item = obj.assets[i];
+        var markup = (
+            '<p>Error processing: <span style="font-family: monospace;">' +
+            item.path + '</span></p>\n' +
+            '<ul>');
+        for (var j = 0; j < item.errors.length; ++j) {
+            markup += (
+                '<li style="font-family: monospace;">' + 
+                item.errors[j] +
+                '</li>\n');
+        }
+        markup += '</ul>\n';
+        var entry = document.createElement('div');
+        entry.innerHTML = markup;
+        outer.appendChild(entry);
+    }
+
+    var placeholder = document.getElementById('piecrust-debug-info-pipeline-status');
+    placeholder.appendChild(outer);
+});
+
--- a/piecrust/serving.py	Sat Jan 31 17:36:01 2015 -0800
+++ b/piecrust/serving.py	Mon Feb 02 08:34:44 2015 -0800
@@ -1,17 +1,18 @@
 import io
 import os
 import re
+import json
 import gzip
 import time
+import queue
 import os.path
 import hashlib
 import logging
 import threading
 from werkzeug.exceptions import (
         NotFound, MethodNotAllowed, InternalServerError, HTTPException)
-from werkzeug.serving import run_simple
 from werkzeug.wrappers import Request, Response
-from werkzeug.wsgi import wrap_file
+from werkzeug.wsgi import ClosingIterator, wrap_file
 from jinja2 import FileSystemLoader, Environment
 from piecrust.app import PieCrust
 from piecrust.data.filters import (
@@ -52,20 +53,27 @@
         self.used_source_names = set()
 
 
+class WsgiServerWrapper(object):
+    def __init__(self, server):
+        self.server = server
+
+    def __call__(self, environ, start_response):
+        return self.server._run_request(environ, start_response)
+
+
 class Server(object):
-    def __init__(self, root_dir, host='localhost', port=8080,
+    def __init__(self, root_dir,
                  debug=False, use_reloader=False, static_preview=True):
         self.root_dir = root_dir
-        self.host = host
-        self.port = int(port)
         self.debug = debug
         self.use_reloader = use_reloader
         self.static_preview = static_preview
         self._out_dir = None
         self._page_record = None
+        self._proc_loop = None
         self._mimetype_map = load_mimetype_map()
 
-    def run(self):
+    def getWsgiApp(self):
         # Bake all the assets so we know what we have, and so we can serve
         # them to the client. We need a temp app for this.
         app = PieCrust(root_dir=self.root_dir, debug=self.debug)
@@ -81,13 +89,12 @@
             # the second time, when the reloading process is spawned, with the
             # `WERKZEUG_RUN_MAIN` variable set.
             pipeline = ProcessorPipeline(app, self._out_dir)
-            loop = ProcessingLoop(pipeline)
-            loop.start()
+            self._proc_loop = ProcessingLoop(pipeline)
+            self._proc_loop.start()
 
         # Run the WSGI app.
-        wsgi_wrapper = WsgiServer(self)
-        run_simple(self.host, self.port, wsgi_wrapper,
-                   use_debugger=self.debug, use_reloader=self.use_reloader)
+        wsgi_wrapper = WsgiServerWrapper(self)
+        return wsgi_wrapper
 
     def _run_request(self, environ, start_response):
         try:
@@ -103,9 +110,20 @@
         # We don't support anything else than GET requests since we're
         # previewing something that will be static later.
         if self.static_preview and request.method != 'GET':
-            logger.error("Only GET requests are allowed, got %s" % request.method)
+            logger.error("Only GET requests are allowed, got %s" %
+                         request.method)
             raise MethodNotAllowed()
 
+        # Handle special requests right away.
+        response = self._try_special_request(environ, request)
+        if response is not None:
+            return response(environ, start_response)
+
+        # Also handle requests to a pipeline-built asset right away.
+        response = self._try_serve_asset(environ, request)
+        if response is not None:
+            return response(environ, start_response)
+
         # Create the app for this request.
         rq_debug = ('!debug' in request.args)
         app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug))
@@ -116,11 +134,6 @@
         # We'll serve page assets directly from where they are.
         app.env.base_asset_url_format = '/_asset/%path%'
 
-        # See if the requested URL is an asset.
-        response = self._try_serve_asset(app, environ, request)
-        if response is not None:
-            return response(environ, start_response)
-
         # Let's see if it can be a page asset.
         response = self._try_serve_page_asset(app, environ, request)
         if response is not None:
@@ -142,7 +155,39 @@
             logger.error(msg)
             raise InternalServerError(msg)
 
-    def _try_serve_asset(self, app, environ, request):
+    def _try_special_request(self, environ, request):
+        static_mount = '/__piecrust_static/'
+        if request.path.startswith(static_mount):
+            rel_req_path = request.path[len(static_mount):]
+            mount = os.path.join(
+                    os.path.dirname(__file__),
+                    'resources', 'server')
+            full_path = os.path.join(mount, rel_req_path)
+            try:
+                response = self._make_wrapped_file_response(
+                        environ, full_path)
+                return response
+            except OSError:
+                pass
+
+        debug_mount = '/__piecrust_debug/'
+        if request.path.startswith(debug_mount):
+            rel_req_path = request.path[len(debug_mount):]
+            if rel_req_path == 'pipeline_status':
+                provider = PipelineStatusServerSideEventProducer(
+                        self._proc_loop.status_queue)
+                it = ClosingIterator(provider.run(), [provider.close])
+                response = Response(it)
+                response.headers['Cache-Control'] = 'no-cache'
+                if 'text/event-stream' in request.accept_mimetypes:
+                    response.mimetype = 'text/event-stream'
+                response.direct_passthrough = True
+                response.implicit_sequence_conversion = False
+                return response
+
+        return None
+
+    def _try_serve_asset(self, environ, request):
         rel_req_path = request.path.lstrip('/').replace('/', os.sep)
         full_path = os.path.join(self._out_dir, rel_req_path)
         try:
@@ -323,14 +368,6 @@
         return response(environ, start_response)
 
 
-class WsgiServer(object):
-    def __init__(self, server):
-        self.server = server
-
-    def __call__(self, environ, start_response):
-        return self.server._run_request(environ, start_response)
-
-
 class RouteNotFoundError(Exception):
     pass
 
@@ -385,11 +422,54 @@
     return mimetype_map
 
 
+class PipelineStatusServerSideEventProducer(object):
+    def __init__(self, status_queue):
+        self.status_queue = status_queue
+        self.interval = 2
+        self.timeout = 60*10
+        self._start_time = 0
+
+    def run(self):
+        logger.debug("Starting pipeline status SSE.")
+        self._start_time = time.time()
+
+        outstr = 'event: ping\ndata: started\n\n'
+        yield bytes(outstr, 'utf8')
+
+        count = 0
+        while True:
+            if time.time() > self.timeout + self._start_time:
+                logger.debug("Closing pipeline status SSE, timeout reached.")
+                outstr = 'event: pipeline_timeout\ndata: bye\n\n'
+                yield bytes(outstr, 'utf8')
+                break
+
+            try:
+                logger.debug("Polling pipeline status queue...")
+                count += 1
+                data = self.status_queue.get(True, self.interval)
+            except queue.Empty:
+                if count < 3:
+                    continue
+                data = {'type': 'ping', 'message': 'ping'}
+                count = 0
+
+            event_type = data['type']
+            outstr = 'event: %s\ndata: %s\n\n' % (
+                    event_type, json.dumps(data))
+            logger.debug("Sending pipeline status SSE.")
+            yield bytes(outstr, 'utf8')
+
+    def close(self):
+        logger.debug("Closing pipeline status SSE.")
+
+
 class ProcessingLoop(threading.Thread):
     def __init__(self, pipeline):
         super(ProcessingLoop, self).__init__(
                 name='pipeline-reloader', daemon=True)
         self.pipeline = pipeline
+        self.status_queue = queue.Queue()
         self.interval = 1
         self._paths = set()
         self._record = None
@@ -441,6 +521,32 @@
                     root,
                     previous_record=self._record,
                     save_record=False)
+
+            # Update the status queue.
+            # (we need to clear it because there may not be a consumer
+            #  on the other side, if the user isn't running with the
+            #  debug window active)
+            while True:
+                try:
+                    self.status_queue.get_nowait()
+                except queue.Empty:
+                    break
+
+            if self._record.success:
+                item = {
+                        'type': 'pipeline_success'}
+                self.status_queue.put_nowait(item)
+            else:
+                item = {
+                        'type': 'pipeline_error',
+                        'assets': []}
+                for entry in self._record.entries:
+                    if entry.errors:
+                        asset_item = {
+                                'path': entry.rel_input,
+                                'errors': list(entry.errors)}
+                        item['assets'].append(asset_item)
+                self.status_queue.put_nowait(item)
         except:
             pass