# HG changeset patch # User Ludovic Chabant # Date 1422894884 28800 # Node ID d7a548ebcd58a008ca0cb21eb394b4a84f89ead6 # Parent 10f24c62b05b8a341bd151fcb0ff54557199992b serve: Add server sent events for showing pipeline errors in the debug window. The server can now run an endpoint that streams pipeline status, including errors or "fixed" statuses. As a result, I had to investigate using event-loop based server alternatives, before I figured out the correct flag to set in Werkzeug. Support for Gunicorn is therefore now possible, although disabled by default. I will come in handy though when proper support for CMS-mode is enabled. diff -r 10f24c62b05b -r d7a548ebcd58 piecrust/commands/builtin/serving.py --- a/piecrust/commands/builtin/serving.py Sat Jan 31 17:36:01 2015 -0800 +++ b/piecrust/commands/builtin/serving.py Mon Feb 02 08:34:44 2015 -0800 @@ -13,25 +13,73 @@ self.description = "Runs a local web server to serve your website." def setupParser(self, parser, app): - parser.add_argument('-p', '--port', + parser.add_argument( + '-p', '--port', help="The port for the web server", default=8080) - parser.add_argument('-a', '--address', + parser.add_argument( + '-a', '--address', help="The host for the web server", default='localhost') - parser.add_argument('--use-reloader', + parser.add_argument( + '--use-reloader', help="Restart the server when PieCrust code changes", action='store_true') - parser.add_argument('--use-debugger', + parser.add_argument( + '--use-debugger', help="Show the debugger when an error occurs", action='store_true') + parser.add_argument( + '--wsgi', + help="The WSGI server implementation to use", + choices=['werkzeug', 'gunicorn'], + default='werkzeug') def run(self, ctx): + host = ctx.args.address + port = int(ctx.args.port) + debug = ctx.args.debug or ctx.args.use_debugger + server = Server( ctx.app.root_dir, - host=ctx.args.address, - port=ctx.args.port, - debug=(ctx.args.debug or ctx.args.use_debugger), + debug=debug, use_reloader=ctx.args.use_reloader) - server.run() + app = server.getWsgiApp() + + if ctx.args.wsgi == 'werkzeug': + from werkzeug.serving import run_simple + run_simple(host, port, app, + threaded=True, + use_debugger=debug, + use_reloader=ctx.args.use_reloader) + + elif ctx.args.wsgi == 'gunicorn': + from gunicorn.app.base import BaseApplication + + class PieCrustGunicornApplication(BaseApplication): + def __init__(self, app, options): + self.app = app + self.options = options + super(PieCrustGunicornApplication, self).__init__() + def load_config(self): + for k, v in self.options.items(): + if k in self.cfg.settings and v is not None: + self.cfg.set(k, v) + + def load(self): + return self.app + + options = { + 'bind': '%s:%s' % (host, port), + 'accesslog': '-', + 'worker_class': 'gaiohttp', + 'workers': 2, + 'timeout': 999999} + if debug: + options['loglevel'] = 'debug' + if ctx.args.use_reloader: + options['reload'] = True + app_wrapper = PieCrustGunicornApplication(app, options) + app_wrapper.run() + diff -r 10f24c62b05b -r d7a548ebcd58 piecrust/data/debug.py --- a/piecrust/data/debug.py Sat Jan 31 17:36:01 2015 -0800 +++ b/piecrust/data/debug.py Mon Feb 02 08:34:44 2015 -0800 @@ -14,12 +14,11 @@ # CSS for the debug window. -CSS_DEBUGINFO = """ +CSS_DEBUGWINDOW = """ text-align: left; +font-family: serif; font-style: normal; -padding: 1em; -background: #a42; -color: #fff; +font-weight: normal; position: fixed; width: 50%; bottom: 0; @@ -29,6 +28,17 @@ box-shadow: 0 0 10px #633; """ +CSS_PIPELINESTATUS = """ +background: #fff; +color: #a22; +""" + +CSS_DEBUGINFO = """ +padding: 1em; +background: #a42; +color: #fff; +""" + # HTML elements. CSS_P = 'margin: 0; padding: 0;' CSS_A = 'color: #fff; text-decoration: none;' @@ -62,10 +72,16 @@ def _do_build_debug_info(page, data, output): app = page.app - print('
' % CSS_DEBUGINFO, file=output) + print('
' % CSS_DEBUGWINDOW, + file=output) - print('
', file=output) - print('

PieCrust %s — ' % (CSS_P, APP_VERSION), file=output) + print('

' % + CSS_PIPELINESTATUS, file=output) + print('
', file=output) + + print('
' % CSS_DEBUGINFO, file=output) + print('

PieCrust %s — ' % + (CSS_P, APP_VERSION), file=output) # If we have some execution info in the environment, # add more information. @@ -92,7 +108,7 @@ print('

', file=output) if data: - print('
', file=output) + print('
' % CSS_DEBUGINFO, file=output) print(('

', + file=output) + class DebugDataRenderer(object): MAX_VALUE_LENGTH = 150 diff -r 10f24c62b05b -r d7a548ebcd58 piecrust/resources/server/piecrust-debug-info.js --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/resources/server/piecrust-debug-info.js Mon Feb 02 08:34:44 2015 -0800 @@ -0,0 +1,45 @@ +var eventSource = new EventSource("/__piecrust_debug/pipeline_status"); + +//window.onbeforeunload = function(e) { +// console.log("Disconnecting SSE.", e); +// eventSource.close(); +//}; + +eventSource.onerror = function(e) { + console.log("Error with SSE, closing.", e); + eventSource.close(); +}; + +eventSource.addEventListener('pipeline_success', function(e) { + var placeholder = document.getElementById('piecrust-debug-info-pipeline-status'); + //if (placeholder.firstChild !== null) + placeholder.removeChild(placeholder.firstChild); +}); + +eventSource.addEventListener('pipeline_error', function(e) { + var obj = JSON.parse(e.data); + + var outer = document.createElement('div'); + outer.style = 'padding: 1em;'; + for (var i = 0; i < obj.assets.length; ++i) { + var item = obj.assets[i]; + var markup = ( + '

Error processing: ' + + item.path + '

\n' + + '
    '); + for (var j = 0; j < item.errors.length; ++j) { + markup += ( + '
  • ' + + item.errors[j] + + '
  • \n'); + } + markup += '
\n'; + var entry = document.createElement('div'); + entry.innerHTML = markup; + outer.appendChild(entry); + } + + var placeholder = document.getElementById('piecrust-debug-info-pipeline-status'); + placeholder.appendChild(outer); +}); + diff -r 10f24c62b05b -r d7a548ebcd58 piecrust/serving.py --- a/piecrust/serving.py Sat Jan 31 17:36:01 2015 -0800 +++ b/piecrust/serving.py Mon Feb 02 08:34:44 2015 -0800 @@ -1,17 +1,18 @@ import io import os import re +import json import gzip import time +import queue import os.path import hashlib import logging import threading from werkzeug.exceptions import ( NotFound, MethodNotAllowed, InternalServerError, HTTPException) -from werkzeug.serving import run_simple from werkzeug.wrappers import Request, Response -from werkzeug.wsgi import wrap_file +from werkzeug.wsgi import ClosingIterator, wrap_file from jinja2 import FileSystemLoader, Environment from piecrust.app import PieCrust from piecrust.data.filters import ( @@ -52,20 +53,27 @@ self.used_source_names = set() +class WsgiServerWrapper(object): + def __init__(self, server): + self.server = server + + def __call__(self, environ, start_response): + return self.server._run_request(environ, start_response) + + class Server(object): - def __init__(self, root_dir, host='localhost', port=8080, + def __init__(self, root_dir, debug=False, use_reloader=False, static_preview=True): self.root_dir = root_dir - self.host = host - self.port = int(port) self.debug = debug self.use_reloader = use_reloader self.static_preview = static_preview self._out_dir = None self._page_record = None + self._proc_loop = None self._mimetype_map = load_mimetype_map() - def run(self): + def getWsgiApp(self): # Bake all the assets so we know what we have, and so we can serve # them to the client. We need a temp app for this. app = PieCrust(root_dir=self.root_dir, debug=self.debug) @@ -81,13 +89,12 @@ # the second time, when the reloading process is spawned, with the # `WERKZEUG_RUN_MAIN` variable set. pipeline = ProcessorPipeline(app, self._out_dir) - loop = ProcessingLoop(pipeline) - loop.start() + self._proc_loop = ProcessingLoop(pipeline) + self._proc_loop.start() # Run the WSGI app. - wsgi_wrapper = WsgiServer(self) - run_simple(self.host, self.port, wsgi_wrapper, - use_debugger=self.debug, use_reloader=self.use_reloader) + wsgi_wrapper = WsgiServerWrapper(self) + return wsgi_wrapper def _run_request(self, environ, start_response): try: @@ -103,9 +110,20 @@ # We don't support anything else than GET requests since we're # previewing something that will be static later. if self.static_preview and request.method != 'GET': - logger.error("Only GET requests are allowed, got %s" % request.method) + logger.error("Only GET requests are allowed, got %s" % + request.method) raise MethodNotAllowed() + # Handle special requests right away. + response = self._try_special_request(environ, request) + if response is not None: + return response(environ, start_response) + + # Also handle requests to a pipeline-built asset right away. + response = self._try_serve_asset(environ, request) + if response is not None: + return response(environ, start_response) + # Create the app for this request. rq_debug = ('!debug' in request.args) app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug)) @@ -116,11 +134,6 @@ # We'll serve page assets directly from where they are. app.env.base_asset_url_format = '/_asset/%path%' - # See if the requested URL is an asset. - response = self._try_serve_asset(app, environ, request) - if response is not None: - return response(environ, start_response) - # Let's see if it can be a page asset. response = self._try_serve_page_asset(app, environ, request) if response is not None: @@ -142,7 +155,39 @@ logger.error(msg) raise InternalServerError(msg) - def _try_serve_asset(self, app, environ, request): + def _try_special_request(self, environ, request): + static_mount = '/__piecrust_static/' + if request.path.startswith(static_mount): + rel_req_path = request.path[len(static_mount):] + mount = os.path.join( + os.path.dirname(__file__), + 'resources', 'server') + full_path = os.path.join(mount, rel_req_path) + try: + response = self._make_wrapped_file_response( + environ, full_path) + return response + except OSError: + pass + + debug_mount = '/__piecrust_debug/' + if request.path.startswith(debug_mount): + rel_req_path = request.path[len(debug_mount):] + if rel_req_path == 'pipeline_status': + provider = PipelineStatusServerSideEventProducer( + self._proc_loop.status_queue) + it = ClosingIterator(provider.run(), [provider.close]) + response = Response(it) + response.headers['Cache-Control'] = 'no-cache' + if 'text/event-stream' in request.accept_mimetypes: + response.mimetype = 'text/event-stream' + response.direct_passthrough = True + response.implicit_sequence_conversion = False + return response + + return None + + def _try_serve_asset(self, environ, request): rel_req_path = request.path.lstrip('/').replace('/', os.sep) full_path = os.path.join(self._out_dir, rel_req_path) try: @@ -323,14 +368,6 @@ return response(environ, start_response) -class WsgiServer(object): - def __init__(self, server): - self.server = server - - def __call__(self, environ, start_response): - return self.server._run_request(environ, start_response) - - class RouteNotFoundError(Exception): pass @@ -385,11 +422,54 @@ return mimetype_map +class PipelineStatusServerSideEventProducer(object): + def __init__(self, status_queue): + self.status_queue = status_queue + self.interval = 2 + self.timeout = 60*10 + self._start_time = 0 + + def run(self): + logger.debug("Starting pipeline status SSE.") + self._start_time = time.time() + + outstr = 'event: ping\ndata: started\n\n' + yield bytes(outstr, 'utf8') + + count = 0 + while True: + if time.time() > self.timeout + self._start_time: + logger.debug("Closing pipeline status SSE, timeout reached.") + outstr = 'event: pipeline_timeout\ndata: bye\n\n' + yield bytes(outstr, 'utf8') + break + + try: + logger.debug("Polling pipeline status queue...") + count += 1 + data = self.status_queue.get(True, self.interval) + except queue.Empty: + if count < 3: + continue + data = {'type': 'ping', 'message': 'ping'} + count = 0 + + event_type = data['type'] + outstr = 'event: %s\ndata: %s\n\n' % ( + event_type, json.dumps(data)) + logger.debug("Sending pipeline status SSE.") + yield bytes(outstr, 'utf8') + + def close(self): + logger.debug("Closing pipeline status SSE.") + + class ProcessingLoop(threading.Thread): def __init__(self, pipeline): super(ProcessingLoop, self).__init__( name='pipeline-reloader', daemon=True) self.pipeline = pipeline + self.status_queue = queue.Queue() self.interval = 1 self._paths = set() self._record = None @@ -441,6 +521,32 @@ root, previous_record=self._record, save_record=False) + + # Update the status queue. + # (we need to clear it because there may not be a consumer + # on the other side, if the user isn't running with the + # debug window active) + while True: + try: + self.status_queue.get_nowait() + except queue.Empty: + break + + if self._record.success: + item = { + 'type': 'pipeline_success'} + self.status_queue.put_nowait(item) + else: + item = { + 'type': 'pipeline_error', + 'assets': []} + for entry in self._record.entries: + if entry.errors: + asset_item = { + 'path': entry.rel_input, + 'errors': list(entry.errors)} + item['assets'].append(asset_item) + self.status_queue.put_nowait(item) except: pass