Mercurial > piecrust2
diff piecrust/serving.py @ 219:d7a548ebcd58
serve: Add server sent events for showing pipeline errors in the debug window.
The server can now run an endpoint that streams pipeline status, including
errors or "fixed" statuses.
As a result, I had to investigate using event-loop based server alternatives,
before I figured out the correct flag to set in Werkzeug. Support for Gunicorn
is therefore now possible, although disabled by default. I will come in handy
though when proper support for CMS-mode is enabled.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 02 Feb 2015 08:34:44 -0800 |
parents | 09e350db7f8f |
children | 7decf00eee47 |
line wrap: on
line diff
--- a/piecrust/serving.py Sat Jan 31 17:36:01 2015 -0800 +++ b/piecrust/serving.py Mon Feb 02 08:34:44 2015 -0800 @@ -1,17 +1,18 @@ import io import os import re +import json import gzip import time +import queue import os.path import hashlib import logging import threading from werkzeug.exceptions import ( NotFound, MethodNotAllowed, InternalServerError, HTTPException) -from werkzeug.serving import run_simple from werkzeug.wrappers import Request, Response -from werkzeug.wsgi import wrap_file +from werkzeug.wsgi import ClosingIterator, wrap_file from jinja2 import FileSystemLoader, Environment from piecrust.app import PieCrust from piecrust.data.filters import ( @@ -52,20 +53,27 @@ self.used_source_names = set() +class WsgiServerWrapper(object): + def __init__(self, server): + self.server = server + + def __call__(self, environ, start_response): + return self.server._run_request(environ, start_response) + + class Server(object): - def __init__(self, root_dir, host='localhost', port=8080, + def __init__(self, root_dir, debug=False, use_reloader=False, static_preview=True): self.root_dir = root_dir - self.host = host - self.port = int(port) self.debug = debug self.use_reloader = use_reloader self.static_preview = static_preview self._out_dir = None self._page_record = None + self._proc_loop = None self._mimetype_map = load_mimetype_map() - def run(self): + def getWsgiApp(self): # Bake all the assets so we know what we have, and so we can serve # them to the client. We need a temp app for this. app = PieCrust(root_dir=self.root_dir, debug=self.debug) @@ -81,13 +89,12 @@ # the second time, when the reloading process is spawned, with the # `WERKZEUG_RUN_MAIN` variable set. pipeline = ProcessorPipeline(app, self._out_dir) - loop = ProcessingLoop(pipeline) - loop.start() + self._proc_loop = ProcessingLoop(pipeline) + self._proc_loop.start() # Run the WSGI app. - wsgi_wrapper = WsgiServer(self) - run_simple(self.host, self.port, wsgi_wrapper, - use_debugger=self.debug, use_reloader=self.use_reloader) + wsgi_wrapper = WsgiServerWrapper(self) + return wsgi_wrapper def _run_request(self, environ, start_response): try: @@ -103,9 +110,20 @@ # We don't support anything else than GET requests since we're # previewing something that will be static later. if self.static_preview and request.method != 'GET': - logger.error("Only GET requests are allowed, got %s" % request.method) + logger.error("Only GET requests are allowed, got %s" % + request.method) raise MethodNotAllowed() + # Handle special requests right away. + response = self._try_special_request(environ, request) + if response is not None: + return response(environ, start_response) + + # Also handle requests to a pipeline-built asset right away. + response = self._try_serve_asset(environ, request) + if response is not None: + return response(environ, start_response) + # Create the app for this request. rq_debug = ('!debug' in request.args) app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug)) @@ -116,11 +134,6 @@ # We'll serve page assets directly from where they are. app.env.base_asset_url_format = '/_asset/%path%' - # See if the requested URL is an asset. - response = self._try_serve_asset(app, environ, request) - if response is not None: - return response(environ, start_response) - # Let's see if it can be a page asset. response = self._try_serve_page_asset(app, environ, request) if response is not None: @@ -142,7 +155,39 @@ logger.error(msg) raise InternalServerError(msg) - def _try_serve_asset(self, app, environ, request): + def _try_special_request(self, environ, request): + static_mount = '/__piecrust_static/' + if request.path.startswith(static_mount): + rel_req_path = request.path[len(static_mount):] + mount = os.path.join( + os.path.dirname(__file__), + 'resources', 'server') + full_path = os.path.join(mount, rel_req_path) + try: + response = self._make_wrapped_file_response( + environ, full_path) + return response + except OSError: + pass + + debug_mount = '/__piecrust_debug/' + if request.path.startswith(debug_mount): + rel_req_path = request.path[len(debug_mount):] + if rel_req_path == 'pipeline_status': + provider = PipelineStatusServerSideEventProducer( + self._proc_loop.status_queue) + it = ClosingIterator(provider.run(), [provider.close]) + response = Response(it) + response.headers['Cache-Control'] = 'no-cache' + if 'text/event-stream' in request.accept_mimetypes: + response.mimetype = 'text/event-stream' + response.direct_passthrough = True + response.implicit_sequence_conversion = False + return response + + return None + + def _try_serve_asset(self, environ, request): rel_req_path = request.path.lstrip('/').replace('/', os.sep) full_path = os.path.join(self._out_dir, rel_req_path) try: @@ -323,14 +368,6 @@ return response(environ, start_response) -class WsgiServer(object): - def __init__(self, server): - self.server = server - - def __call__(self, environ, start_response): - return self.server._run_request(environ, start_response) - - class RouteNotFoundError(Exception): pass @@ -385,11 +422,54 @@ return mimetype_map +class PipelineStatusServerSideEventProducer(object): + def __init__(self, status_queue): + self.status_queue = status_queue + self.interval = 2 + self.timeout = 60*10 + self._start_time = 0 + + def run(self): + logger.debug("Starting pipeline status SSE.") + self._start_time = time.time() + + outstr = 'event: ping\ndata: started\n\n' + yield bytes(outstr, 'utf8') + + count = 0 + while True: + if time.time() > self.timeout + self._start_time: + logger.debug("Closing pipeline status SSE, timeout reached.") + outstr = 'event: pipeline_timeout\ndata: bye\n\n' + yield bytes(outstr, 'utf8') + break + + try: + logger.debug("Polling pipeline status queue...") + count += 1 + data = self.status_queue.get(True, self.interval) + except queue.Empty: + if count < 3: + continue + data = {'type': 'ping', 'message': 'ping'} + count = 0 + + event_type = data['type'] + outstr = 'event: %s\ndata: %s\n\n' % ( + event_type, json.dumps(data)) + logger.debug("Sending pipeline status SSE.") + yield bytes(outstr, 'utf8') + + def close(self): + logger.debug("Closing pipeline status SSE.") + + class ProcessingLoop(threading.Thread): def __init__(self, pipeline): super(ProcessingLoop, self).__init__( name='pipeline-reloader', daemon=True) self.pipeline = pipeline + self.status_queue = queue.Queue() self.interval = 1 self._paths = set() self._record = None @@ -441,6 +521,32 @@ root, previous_record=self._record, save_record=False) + + # Update the status queue. + # (we need to clear it because there may not be a consumer + # on the other side, if the user isn't running with the + # debug window active) + while True: + try: + self.status_queue.get_nowait() + except queue.Empty: + break + + if self._record.success: + item = { + 'type': 'pipeline_success'} + self.status_queue.put_nowait(item) + else: + item = { + 'type': 'pipeline_error', + 'assets': []} + for entry in self._record.entries: + if entry.errors: + asset_item = { + 'path': entry.rel_input, + 'errors': list(entry.errors)} + item['assets'].append(asset_item) + self.status_queue.put_nowait(item) except: pass