diff piecrust/serving.py @ 219:d7a548ebcd58

serve: Add server sent events for showing pipeline errors in the debug window. The server can now run an endpoint that streams pipeline status, including errors or "fixed" statuses. As a result, I had to investigate using event-loop based server alternatives, before I figured out the correct flag to set in Werkzeug. Support for Gunicorn is therefore now possible, although disabled by default. I will come in handy though when proper support for CMS-mode is enabled.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 02 Feb 2015 08:34:44 -0800
parents 09e350db7f8f
children 7decf00eee47
line wrap: on
line diff
--- a/piecrust/serving.py	Sat Jan 31 17:36:01 2015 -0800
+++ b/piecrust/serving.py	Mon Feb 02 08:34:44 2015 -0800
@@ -1,17 +1,18 @@
 import io
 import os
 import re
+import json
 import gzip
 import time
+import queue
 import os.path
 import hashlib
 import logging
 import threading
 from werkzeug.exceptions import (
         NotFound, MethodNotAllowed, InternalServerError, HTTPException)
-from werkzeug.serving import run_simple
 from werkzeug.wrappers import Request, Response
-from werkzeug.wsgi import wrap_file
+from werkzeug.wsgi import ClosingIterator, wrap_file
 from jinja2 import FileSystemLoader, Environment
 from piecrust.app import PieCrust
 from piecrust.data.filters import (
@@ -52,20 +53,27 @@
         self.used_source_names = set()
 
 
+class WsgiServerWrapper(object):
+    def __init__(self, server):
+        self.server = server
+
+    def __call__(self, environ, start_response):
+        return self.server._run_request(environ, start_response)
+
+
 class Server(object):
-    def __init__(self, root_dir, host='localhost', port=8080,
+    def __init__(self, root_dir,
                  debug=False, use_reloader=False, static_preview=True):
         self.root_dir = root_dir
-        self.host = host
-        self.port = int(port)
         self.debug = debug
         self.use_reloader = use_reloader
         self.static_preview = static_preview
         self._out_dir = None
         self._page_record = None
+        self._proc_loop = None
         self._mimetype_map = load_mimetype_map()
 
-    def run(self):
+    def getWsgiApp(self):
         # Bake all the assets so we know what we have, and so we can serve
         # them to the client. We need a temp app for this.
         app = PieCrust(root_dir=self.root_dir, debug=self.debug)
@@ -81,13 +89,12 @@
             # the second time, when the reloading process is spawned, with the
             # `WERKZEUG_RUN_MAIN` variable set.
             pipeline = ProcessorPipeline(app, self._out_dir)
-            loop = ProcessingLoop(pipeline)
-            loop.start()
+            self._proc_loop = ProcessingLoop(pipeline)
+            self._proc_loop.start()
 
         # Run the WSGI app.
-        wsgi_wrapper = WsgiServer(self)
-        run_simple(self.host, self.port, wsgi_wrapper,
-                   use_debugger=self.debug, use_reloader=self.use_reloader)
+        wsgi_wrapper = WsgiServerWrapper(self)
+        return wsgi_wrapper
 
     def _run_request(self, environ, start_response):
         try:
@@ -103,9 +110,20 @@
         # We don't support anything else than GET requests since we're
         # previewing something that will be static later.
         if self.static_preview and request.method != 'GET':
-            logger.error("Only GET requests are allowed, got %s" % request.method)
+            logger.error("Only GET requests are allowed, got %s" %
+                         request.method)
             raise MethodNotAllowed()
 
+        # Handle special requests right away.
+        response = self._try_special_request(environ, request)
+        if response is not None:
+            return response(environ, start_response)
+
+        # Also handle requests to a pipeline-built asset right away.
+        response = self._try_serve_asset(environ, request)
+        if response is not None:
+            return response(environ, start_response)
+
         # Create the app for this request.
         rq_debug = ('!debug' in request.args)
         app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug))
@@ -116,11 +134,6 @@
         # We'll serve page assets directly from where they are.
         app.env.base_asset_url_format = '/_asset/%path%'
 
-        # See if the requested URL is an asset.
-        response = self._try_serve_asset(app, environ, request)
-        if response is not None:
-            return response(environ, start_response)
-
         # Let's see if it can be a page asset.
         response = self._try_serve_page_asset(app, environ, request)
         if response is not None:
@@ -142,7 +155,39 @@
             logger.error(msg)
             raise InternalServerError(msg)
 
-    def _try_serve_asset(self, app, environ, request):
+    def _try_special_request(self, environ, request):
+        static_mount = '/__piecrust_static/'
+        if request.path.startswith(static_mount):
+            rel_req_path = request.path[len(static_mount):]
+            mount = os.path.join(
+                    os.path.dirname(__file__),
+                    'resources', 'server')
+            full_path = os.path.join(mount, rel_req_path)
+            try:
+                response = self._make_wrapped_file_response(
+                        environ, full_path)
+                return response
+            except OSError:
+                pass
+
+        debug_mount = '/__piecrust_debug/'
+        if request.path.startswith(debug_mount):
+            rel_req_path = request.path[len(debug_mount):]
+            if rel_req_path == 'pipeline_status':
+                provider = PipelineStatusServerSideEventProducer(
+                        self._proc_loop.status_queue)
+                it = ClosingIterator(provider.run(), [provider.close])
+                response = Response(it)
+                response.headers['Cache-Control'] = 'no-cache'
+                if 'text/event-stream' in request.accept_mimetypes:
+                    response.mimetype = 'text/event-stream'
+                response.direct_passthrough = True
+                response.implicit_sequence_conversion = False
+                return response
+
+        return None
+
+    def _try_serve_asset(self, environ, request):
         rel_req_path = request.path.lstrip('/').replace('/', os.sep)
         full_path = os.path.join(self._out_dir, rel_req_path)
         try:
@@ -323,14 +368,6 @@
         return response(environ, start_response)
 
 
-class WsgiServer(object):
-    def __init__(self, server):
-        self.server = server
-
-    def __call__(self, environ, start_response):
-        return self.server._run_request(environ, start_response)
-
-
 class RouteNotFoundError(Exception):
     pass
 
@@ -385,11 +422,54 @@
     return mimetype_map
 
 
+class PipelineStatusServerSideEventProducer(object):
+    def __init__(self, status_queue):
+        self.status_queue = status_queue
+        self.interval = 2
+        self.timeout = 60*10
+        self._start_time = 0
+
+    def run(self):
+        logger.debug("Starting pipeline status SSE.")
+        self._start_time = time.time()
+
+        outstr = 'event: ping\ndata: started\n\n'
+        yield bytes(outstr, 'utf8')
+
+        count = 0
+        while True:
+            if time.time() > self.timeout + self._start_time:
+                logger.debug("Closing pipeline status SSE, timeout reached.")
+                outstr = 'event: pipeline_timeout\ndata: bye\n\n'
+                yield bytes(outstr, 'utf8')
+                break
+
+            try:
+                logger.debug("Polling pipeline status queue...")
+                count += 1
+                data = self.status_queue.get(True, self.interval)
+            except queue.Empty:
+                if count < 3:
+                    continue
+                data = {'type': 'ping', 'message': 'ping'}
+                count = 0
+
+            event_type = data['type']
+            outstr = 'event: %s\ndata: %s\n\n' % (
+                    event_type, json.dumps(data))
+            logger.debug("Sending pipeline status SSE.")
+            yield bytes(outstr, 'utf8')
+
+    def close(self):
+        logger.debug("Closing pipeline status SSE.")
+
+
 class ProcessingLoop(threading.Thread):
     def __init__(self, pipeline):
         super(ProcessingLoop, self).__init__(
                 name='pipeline-reloader', daemon=True)
         self.pipeline = pipeline
+        self.status_queue = queue.Queue()
         self.interval = 1
         self._paths = set()
         self._record = None
@@ -441,6 +521,32 @@
                     root,
                     previous_record=self._record,
                     save_record=False)
+
+            # Update the status queue.
+            # (we need to clear it because there may not be a consumer
+            #  on the other side, if the user isn't running with the
+            #  debug window active)
+            while True:
+                try:
+                    self.status_queue.get_nowait()
+                except queue.Empty:
+                    break
+
+            if self._record.success:
+                item = {
+                        'type': 'pipeline_success'}
+                self.status_queue.put_nowait(item)
+            else:
+                item = {
+                        'type': 'pipeline_error',
+                        'assets': []}
+                for entry in self._record.entries:
+                    if entry.errors:
+                        asset_item = {
+                                'path': entry.rel_input,
+                                'errors': list(entry.errors)}
+                        item['assets'].append(asset_item)
+                self.status_queue.put_nowait(item)
         except:
             pass