comparison piecrust/serving.py @ 219:d7a548ebcd58

serve: Add server sent events for showing pipeline errors in the debug window. The server can now run an endpoint that streams pipeline status, including errors or "fixed" statuses. As a result, I had to investigate using event-loop based server alternatives, before I figured out the correct flag to set in Werkzeug. Support for Gunicorn is therefore now possible, although disabled by default. I will come in handy though when proper support for CMS-mode is enabled.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 02 Feb 2015 08:34:44 -0800
parents 09e350db7f8f
children 7decf00eee47
comparison
equal deleted inserted replaced
218:10f24c62b05b 219:d7a548ebcd58
1 import io 1 import io
2 import os 2 import os
3 import re 3 import re
4 import json
4 import gzip 5 import gzip
5 import time 6 import time
7 import queue
6 import os.path 8 import os.path
7 import hashlib 9 import hashlib
8 import logging 10 import logging
9 import threading 11 import threading
10 from werkzeug.exceptions import ( 12 from werkzeug.exceptions import (
11 NotFound, MethodNotAllowed, InternalServerError, HTTPException) 13 NotFound, MethodNotAllowed, InternalServerError, HTTPException)
12 from werkzeug.serving import run_simple
13 from werkzeug.wrappers import Request, Response 14 from werkzeug.wrappers import Request, Response
14 from werkzeug.wsgi import wrap_file 15 from werkzeug.wsgi import ClosingIterator, wrap_file
15 from jinja2 import FileSystemLoader, Environment 16 from jinja2 import FileSystemLoader, Environment
16 from piecrust.app import PieCrust 17 from piecrust.app import PieCrust
17 from piecrust.data.filters import ( 18 from piecrust.data.filters import (
18 PaginationFilter, HasFilterClause, IsFilterClause) 19 PaginationFilter, HasFilterClause, IsFilterClause)
19 from piecrust.environment import StandardEnvironment 20 from piecrust.environment import StandardEnvironment
50 self.uri = uri 51 self.uri = uri
51 self.sub_num = sub_num 52 self.sub_num = sub_num
52 self.used_source_names = set() 53 self.used_source_names = set()
53 54
54 55
56 class WsgiServerWrapper(object):
57 def __init__(self, server):
58 self.server = server
59
60 def __call__(self, environ, start_response):
61 return self.server._run_request(environ, start_response)
62
63
55 class Server(object): 64 class Server(object):
56 def __init__(self, root_dir, host='localhost', port=8080, 65 def __init__(self, root_dir,
57 debug=False, use_reloader=False, static_preview=True): 66 debug=False, use_reloader=False, static_preview=True):
58 self.root_dir = root_dir 67 self.root_dir = root_dir
59 self.host = host
60 self.port = int(port)
61 self.debug = debug 68 self.debug = debug
62 self.use_reloader = use_reloader 69 self.use_reloader = use_reloader
63 self.static_preview = static_preview 70 self.static_preview = static_preview
64 self._out_dir = None 71 self._out_dir = None
65 self._page_record = None 72 self._page_record = None
73 self._proc_loop = None
66 self._mimetype_map = load_mimetype_map() 74 self._mimetype_map = load_mimetype_map()
67 75
68 def run(self): 76 def getWsgiApp(self):
69 # Bake all the assets so we know what we have, and so we can serve 77 # Bake all the assets so we know what we have, and so we can serve
70 # them to the client. We need a temp app for this. 78 # them to the client. We need a temp app for this.
71 app = PieCrust(root_dir=self.root_dir, debug=self.debug) 79 app = PieCrust(root_dir=self.root_dir, debug=self.debug)
72 self._out_dir = os.path.join(app.cache_dir, 'server') 80 self._out_dir = os.path.join(app.cache_dir, 'server')
73 self._page_record = ServeRecord() 81 self._page_record = ServeRecord()
79 # but if we're using Werkzeug's reloader, then it won't be the 87 # but if we're using Werkzeug's reloader, then it won't be the
80 # first time we get there... it will only be the correct process 88 # first time we get there... it will only be the correct process
81 # the second time, when the reloading process is spawned, with the 89 # the second time, when the reloading process is spawned, with the
82 # `WERKZEUG_RUN_MAIN` variable set. 90 # `WERKZEUG_RUN_MAIN` variable set.
83 pipeline = ProcessorPipeline(app, self._out_dir) 91 pipeline = ProcessorPipeline(app, self._out_dir)
84 loop = ProcessingLoop(pipeline) 92 self._proc_loop = ProcessingLoop(pipeline)
85 loop.start() 93 self._proc_loop.start()
86 94
87 # Run the WSGI app. 95 # Run the WSGI app.
88 wsgi_wrapper = WsgiServer(self) 96 wsgi_wrapper = WsgiServerWrapper(self)
89 run_simple(self.host, self.port, wsgi_wrapper, 97 return wsgi_wrapper
90 use_debugger=self.debug, use_reloader=self.use_reloader)
91 98
92 def _run_request(self, environ, start_response): 99 def _run_request(self, environ, start_response):
93 try: 100 try:
94 return self._try_run_request(environ, start_response) 101 return self._try_run_request(environ, start_response)
95 except Exception as ex: 102 except Exception as ex:
101 request = Request(environ) 108 request = Request(environ)
102 109
103 # We don't support anything else than GET requests since we're 110 # We don't support anything else than GET requests since we're
104 # previewing something that will be static later. 111 # previewing something that will be static later.
105 if self.static_preview and request.method != 'GET': 112 if self.static_preview and request.method != 'GET':
106 logger.error("Only GET requests are allowed, got %s" % request.method) 113 logger.error("Only GET requests are allowed, got %s" %
114 request.method)
107 raise MethodNotAllowed() 115 raise MethodNotAllowed()
116
117 # Handle special requests right away.
118 response = self._try_special_request(environ, request)
119 if response is not None:
120 return response(environ, start_response)
121
122 # Also handle requests to a pipeline-built asset right away.
123 response = self._try_serve_asset(environ, request)
124 if response is not None:
125 return response(environ, start_response)
108 126
109 # Create the app for this request. 127 # Create the app for this request.
110 rq_debug = ('!debug' in request.args) 128 rq_debug = ('!debug' in request.args)
111 app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug)) 129 app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug))
112 app.config.set('site/root', '/') 130 app.config.set('site/root', '/')
113 app.config.set('site/pretty_urls', True) 131 app.config.set('site/pretty_urls', True)
114 app.config.set('server/is_serving', True) 132 app.config.set('server/is_serving', True)
115 133
116 # We'll serve page assets directly from where they are. 134 # We'll serve page assets directly from where they are.
117 app.env.base_asset_url_format = '/_asset/%path%' 135 app.env.base_asset_url_format = '/_asset/%path%'
118
119 # See if the requested URL is an asset.
120 response = self._try_serve_asset(app, environ, request)
121 if response is not None:
122 return response(environ, start_response)
123 136
124 # Let's see if it can be a page asset. 137 # Let's see if it can be a page asset.
125 response = self._try_serve_page_asset(app, environ, request) 138 response = self._try_serve_page_asset(app, environ, request)
126 if response is not None: 139 if response is not None:
127 return response(environ, start_response) 140 return response(environ, start_response)
140 raise 153 raise
141 msg = str(ex) 154 msg = str(ex)
142 logger.error(msg) 155 logger.error(msg)
143 raise InternalServerError(msg) 156 raise InternalServerError(msg)
144 157
145 def _try_serve_asset(self, app, environ, request): 158 def _try_special_request(self, environ, request):
159 static_mount = '/__piecrust_static/'
160 if request.path.startswith(static_mount):
161 rel_req_path = request.path[len(static_mount):]
162 mount = os.path.join(
163 os.path.dirname(__file__),
164 'resources', 'server')
165 full_path = os.path.join(mount, rel_req_path)
166 try:
167 response = self._make_wrapped_file_response(
168 environ, full_path)
169 return response
170 except OSError:
171 pass
172
173 debug_mount = '/__piecrust_debug/'
174 if request.path.startswith(debug_mount):
175 rel_req_path = request.path[len(debug_mount):]
176 if rel_req_path == 'pipeline_status':
177 provider = PipelineStatusServerSideEventProducer(
178 self._proc_loop.status_queue)
179 it = ClosingIterator(provider.run(), [provider.close])
180 response = Response(it)
181 response.headers['Cache-Control'] = 'no-cache'
182 if 'text/event-stream' in request.accept_mimetypes:
183 response.mimetype = 'text/event-stream'
184 response.direct_passthrough = True
185 response.implicit_sequence_conversion = False
186 return response
187
188 return None
189
190 def _try_serve_asset(self, environ, request):
146 rel_req_path = request.path.lstrip('/').replace('/', os.sep) 191 rel_req_path = request.path.lstrip('/').replace('/', os.sep)
147 full_path = os.path.join(self._out_dir, rel_req_path) 192 full_path = os.path.join(self._out_dir, rel_req_path)
148 try: 193 try:
149 response = self._make_wrapped_file_response( 194 response = self._make_wrapped_file_response(
150 environ, full_path) 195 environ, full_path)
321 response = Response(template.render(context), mimetype='text/html') 366 response = Response(template.render(context), mimetype='text/html')
322 response.status_code = code 367 response.status_code = code
323 return response(environ, start_response) 368 return response(environ, start_response)
324 369
325 370
326 class WsgiServer(object):
327 def __init__(self, server):
328 self.server = server
329
330 def __call__(self, environ, start_response):
331 return self.server._run_request(environ, start_response)
332
333
334 class RouteNotFoundError(Exception): 371 class RouteNotFoundError(Exception):
335 pass 372 pass
336 373
337 374
338 class SourceNotFoundError(Exception): 375 class SourceNotFoundError(Exception):
383 for t in tokens[1:]: 420 for t in tokens[1:]:
384 mimetype_map[t] = tokens[0] 421 mimetype_map[t] = tokens[0]
385 return mimetype_map 422 return mimetype_map
386 423
387 424
425 class PipelineStatusServerSideEventProducer(object):
426 def __init__(self, status_queue):
427 self.status_queue = status_queue
428 self.interval = 2
429 self.timeout = 60*10
430 self._start_time = 0
431
432 def run(self):
433 logger.debug("Starting pipeline status SSE.")
434 self._start_time = time.time()
435
436 outstr = 'event: ping\ndata: started\n\n'
437 yield bytes(outstr, 'utf8')
438
439 count = 0
440 while True:
441 if time.time() > self.timeout + self._start_time:
442 logger.debug("Closing pipeline status SSE, timeout reached.")
443 outstr = 'event: pipeline_timeout\ndata: bye\n\n'
444 yield bytes(outstr, 'utf8')
445 break
446
447 try:
448 logger.debug("Polling pipeline status queue...")
449 count += 1
450 data = self.status_queue.get(True, self.interval)
451 except queue.Empty:
452 if count < 3:
453 continue
454 data = {'type': 'ping', 'message': 'ping'}
455 count = 0
456
457 event_type = data['type']
458 outstr = 'event: %s\ndata: %s\n\n' % (
459 event_type, json.dumps(data))
460 logger.debug("Sending pipeline status SSE.")
461 yield bytes(outstr, 'utf8')
462
463 def close(self):
464 logger.debug("Closing pipeline status SSE.")
465
466
388 class ProcessingLoop(threading.Thread): 467 class ProcessingLoop(threading.Thread):
389 def __init__(self, pipeline): 468 def __init__(self, pipeline):
390 super(ProcessingLoop, self).__init__( 469 super(ProcessingLoop, self).__init__(
391 name='pipeline-reloader', daemon=True) 470 name='pipeline-reloader', daemon=True)
392 self.pipeline = pipeline 471 self.pipeline = pipeline
472 self.status_queue = queue.Queue()
393 self.interval = 1 473 self.interval = 1
394 self._paths = set() 474 self._paths = set()
395 self._record = None 475 self._record = None
396 self._last_bake = 0 476 self._last_bake = 0
397 477
439 try: 519 try:
440 self._record = self.pipeline.run( 520 self._record = self.pipeline.run(
441 root, 521 root,
442 previous_record=self._record, 522 previous_record=self._record,
443 save_record=False) 523 save_record=False)
524
525 # Update the status queue.
526 # (we need to clear it because there may not be a consumer
527 # on the other side, if the user isn't running with the
528 # debug window active)
529 while True:
530 try:
531 self.status_queue.get_nowait()
532 except queue.Empty:
533 break
534
535 if self._record.success:
536 item = {
537 'type': 'pipeline_success'}
538 self.status_queue.put_nowait(item)
539 else:
540 item = {
541 'type': 'pipeline_error',
542 'assets': []}
543 for entry in self._record.entries:
544 if entry.errors:
545 asset_item = {
546 'path': entry.rel_input,
547 'errors': list(entry.errors)}
548 item['assets'].append(asset_item)
549 self.status_queue.put_nowait(item)
444 except: 550 except:
445 pass 551 pass
446 552