Mercurial > piecrust2
comparison piecrust/serving.py @ 219:d7a548ebcd58
serve: Add server sent events for showing pipeline errors in the debug window.
The server can now run an endpoint that streams pipeline status, including
errors or "fixed" statuses.
As a result, I had to investigate using event-loop based server alternatives,
before I figured out the correct flag to set in Werkzeug. Support for Gunicorn
is therefore now possible, although disabled by default. I will come in handy
though when proper support for CMS-mode is enabled.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 02 Feb 2015 08:34:44 -0800 |
parents | 09e350db7f8f |
children | 7decf00eee47 |
comparison
equal
deleted
inserted
replaced
218:10f24c62b05b | 219:d7a548ebcd58 |
---|---|
1 import io | 1 import io |
2 import os | 2 import os |
3 import re | 3 import re |
4 import json | |
4 import gzip | 5 import gzip |
5 import time | 6 import time |
7 import queue | |
6 import os.path | 8 import os.path |
7 import hashlib | 9 import hashlib |
8 import logging | 10 import logging |
9 import threading | 11 import threading |
10 from werkzeug.exceptions import ( | 12 from werkzeug.exceptions import ( |
11 NotFound, MethodNotAllowed, InternalServerError, HTTPException) | 13 NotFound, MethodNotAllowed, InternalServerError, HTTPException) |
12 from werkzeug.serving import run_simple | |
13 from werkzeug.wrappers import Request, Response | 14 from werkzeug.wrappers import Request, Response |
14 from werkzeug.wsgi import wrap_file | 15 from werkzeug.wsgi import ClosingIterator, wrap_file |
15 from jinja2 import FileSystemLoader, Environment | 16 from jinja2 import FileSystemLoader, Environment |
16 from piecrust.app import PieCrust | 17 from piecrust.app import PieCrust |
17 from piecrust.data.filters import ( | 18 from piecrust.data.filters import ( |
18 PaginationFilter, HasFilterClause, IsFilterClause) | 19 PaginationFilter, HasFilterClause, IsFilterClause) |
19 from piecrust.environment import StandardEnvironment | 20 from piecrust.environment import StandardEnvironment |
50 self.uri = uri | 51 self.uri = uri |
51 self.sub_num = sub_num | 52 self.sub_num = sub_num |
52 self.used_source_names = set() | 53 self.used_source_names = set() |
53 | 54 |
54 | 55 |
56 class WsgiServerWrapper(object): | |
57 def __init__(self, server): | |
58 self.server = server | |
59 | |
60 def __call__(self, environ, start_response): | |
61 return self.server._run_request(environ, start_response) | |
62 | |
63 | |
55 class Server(object): | 64 class Server(object): |
56 def __init__(self, root_dir, host='localhost', port=8080, | 65 def __init__(self, root_dir, |
57 debug=False, use_reloader=False, static_preview=True): | 66 debug=False, use_reloader=False, static_preview=True): |
58 self.root_dir = root_dir | 67 self.root_dir = root_dir |
59 self.host = host | |
60 self.port = int(port) | |
61 self.debug = debug | 68 self.debug = debug |
62 self.use_reloader = use_reloader | 69 self.use_reloader = use_reloader |
63 self.static_preview = static_preview | 70 self.static_preview = static_preview |
64 self._out_dir = None | 71 self._out_dir = None |
65 self._page_record = None | 72 self._page_record = None |
73 self._proc_loop = None | |
66 self._mimetype_map = load_mimetype_map() | 74 self._mimetype_map = load_mimetype_map() |
67 | 75 |
68 def run(self): | 76 def getWsgiApp(self): |
69 # Bake all the assets so we know what we have, and so we can serve | 77 # Bake all the assets so we know what we have, and so we can serve |
70 # them to the client. We need a temp app for this. | 78 # them to the client. We need a temp app for this. |
71 app = PieCrust(root_dir=self.root_dir, debug=self.debug) | 79 app = PieCrust(root_dir=self.root_dir, debug=self.debug) |
72 self._out_dir = os.path.join(app.cache_dir, 'server') | 80 self._out_dir = os.path.join(app.cache_dir, 'server') |
73 self._page_record = ServeRecord() | 81 self._page_record = ServeRecord() |
79 # but if we're using Werkzeug's reloader, then it won't be the | 87 # but if we're using Werkzeug's reloader, then it won't be the |
80 # first time we get there... it will only be the correct process | 88 # first time we get there... it will only be the correct process |
81 # the second time, when the reloading process is spawned, with the | 89 # the second time, when the reloading process is spawned, with the |
82 # `WERKZEUG_RUN_MAIN` variable set. | 90 # `WERKZEUG_RUN_MAIN` variable set. |
83 pipeline = ProcessorPipeline(app, self._out_dir) | 91 pipeline = ProcessorPipeline(app, self._out_dir) |
84 loop = ProcessingLoop(pipeline) | 92 self._proc_loop = ProcessingLoop(pipeline) |
85 loop.start() | 93 self._proc_loop.start() |
86 | 94 |
87 # Run the WSGI app. | 95 # Run the WSGI app. |
88 wsgi_wrapper = WsgiServer(self) | 96 wsgi_wrapper = WsgiServerWrapper(self) |
89 run_simple(self.host, self.port, wsgi_wrapper, | 97 return wsgi_wrapper |
90 use_debugger=self.debug, use_reloader=self.use_reloader) | |
91 | 98 |
92 def _run_request(self, environ, start_response): | 99 def _run_request(self, environ, start_response): |
93 try: | 100 try: |
94 return self._try_run_request(environ, start_response) | 101 return self._try_run_request(environ, start_response) |
95 except Exception as ex: | 102 except Exception as ex: |
101 request = Request(environ) | 108 request = Request(environ) |
102 | 109 |
103 # We don't support anything else than GET requests since we're | 110 # We don't support anything else than GET requests since we're |
104 # previewing something that will be static later. | 111 # previewing something that will be static later. |
105 if self.static_preview and request.method != 'GET': | 112 if self.static_preview and request.method != 'GET': |
106 logger.error("Only GET requests are allowed, got %s" % request.method) | 113 logger.error("Only GET requests are allowed, got %s" % |
114 request.method) | |
107 raise MethodNotAllowed() | 115 raise MethodNotAllowed() |
116 | |
117 # Handle special requests right away. | |
118 response = self._try_special_request(environ, request) | |
119 if response is not None: | |
120 return response(environ, start_response) | |
121 | |
122 # Also handle requests to a pipeline-built asset right away. | |
123 response = self._try_serve_asset(environ, request) | |
124 if response is not None: | |
125 return response(environ, start_response) | |
108 | 126 |
109 # Create the app for this request. | 127 # Create the app for this request. |
110 rq_debug = ('!debug' in request.args) | 128 rq_debug = ('!debug' in request.args) |
111 app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug)) | 129 app = PieCrust(root_dir=self.root_dir, debug=(self.debug or rq_debug)) |
112 app.config.set('site/root', '/') | 130 app.config.set('site/root', '/') |
113 app.config.set('site/pretty_urls', True) | 131 app.config.set('site/pretty_urls', True) |
114 app.config.set('server/is_serving', True) | 132 app.config.set('server/is_serving', True) |
115 | 133 |
116 # We'll serve page assets directly from where they are. | 134 # We'll serve page assets directly from where they are. |
117 app.env.base_asset_url_format = '/_asset/%path%' | 135 app.env.base_asset_url_format = '/_asset/%path%' |
118 | |
119 # See if the requested URL is an asset. | |
120 response = self._try_serve_asset(app, environ, request) | |
121 if response is not None: | |
122 return response(environ, start_response) | |
123 | 136 |
124 # Let's see if it can be a page asset. | 137 # Let's see if it can be a page asset. |
125 response = self._try_serve_page_asset(app, environ, request) | 138 response = self._try_serve_page_asset(app, environ, request) |
126 if response is not None: | 139 if response is not None: |
127 return response(environ, start_response) | 140 return response(environ, start_response) |
140 raise | 153 raise |
141 msg = str(ex) | 154 msg = str(ex) |
142 logger.error(msg) | 155 logger.error(msg) |
143 raise InternalServerError(msg) | 156 raise InternalServerError(msg) |
144 | 157 |
145 def _try_serve_asset(self, app, environ, request): | 158 def _try_special_request(self, environ, request): |
159 static_mount = '/__piecrust_static/' | |
160 if request.path.startswith(static_mount): | |
161 rel_req_path = request.path[len(static_mount):] | |
162 mount = os.path.join( | |
163 os.path.dirname(__file__), | |
164 'resources', 'server') | |
165 full_path = os.path.join(mount, rel_req_path) | |
166 try: | |
167 response = self._make_wrapped_file_response( | |
168 environ, full_path) | |
169 return response | |
170 except OSError: | |
171 pass | |
172 | |
173 debug_mount = '/__piecrust_debug/' | |
174 if request.path.startswith(debug_mount): | |
175 rel_req_path = request.path[len(debug_mount):] | |
176 if rel_req_path == 'pipeline_status': | |
177 provider = PipelineStatusServerSideEventProducer( | |
178 self._proc_loop.status_queue) | |
179 it = ClosingIterator(provider.run(), [provider.close]) | |
180 response = Response(it) | |
181 response.headers['Cache-Control'] = 'no-cache' | |
182 if 'text/event-stream' in request.accept_mimetypes: | |
183 response.mimetype = 'text/event-stream' | |
184 response.direct_passthrough = True | |
185 response.implicit_sequence_conversion = False | |
186 return response | |
187 | |
188 return None | |
189 | |
190 def _try_serve_asset(self, environ, request): | |
146 rel_req_path = request.path.lstrip('/').replace('/', os.sep) | 191 rel_req_path = request.path.lstrip('/').replace('/', os.sep) |
147 full_path = os.path.join(self._out_dir, rel_req_path) | 192 full_path = os.path.join(self._out_dir, rel_req_path) |
148 try: | 193 try: |
149 response = self._make_wrapped_file_response( | 194 response = self._make_wrapped_file_response( |
150 environ, full_path) | 195 environ, full_path) |
321 response = Response(template.render(context), mimetype='text/html') | 366 response = Response(template.render(context), mimetype='text/html') |
322 response.status_code = code | 367 response.status_code = code |
323 return response(environ, start_response) | 368 return response(environ, start_response) |
324 | 369 |
325 | 370 |
326 class WsgiServer(object): | |
327 def __init__(self, server): | |
328 self.server = server | |
329 | |
330 def __call__(self, environ, start_response): | |
331 return self.server._run_request(environ, start_response) | |
332 | |
333 | |
334 class RouteNotFoundError(Exception): | 371 class RouteNotFoundError(Exception): |
335 pass | 372 pass |
336 | 373 |
337 | 374 |
338 class SourceNotFoundError(Exception): | 375 class SourceNotFoundError(Exception): |
383 for t in tokens[1:]: | 420 for t in tokens[1:]: |
384 mimetype_map[t] = tokens[0] | 421 mimetype_map[t] = tokens[0] |
385 return mimetype_map | 422 return mimetype_map |
386 | 423 |
387 | 424 |
425 class PipelineStatusServerSideEventProducer(object): | |
426 def __init__(self, status_queue): | |
427 self.status_queue = status_queue | |
428 self.interval = 2 | |
429 self.timeout = 60*10 | |
430 self._start_time = 0 | |
431 | |
432 def run(self): | |
433 logger.debug("Starting pipeline status SSE.") | |
434 self._start_time = time.time() | |
435 | |
436 outstr = 'event: ping\ndata: started\n\n' | |
437 yield bytes(outstr, 'utf8') | |
438 | |
439 count = 0 | |
440 while True: | |
441 if time.time() > self.timeout + self._start_time: | |
442 logger.debug("Closing pipeline status SSE, timeout reached.") | |
443 outstr = 'event: pipeline_timeout\ndata: bye\n\n' | |
444 yield bytes(outstr, 'utf8') | |
445 break | |
446 | |
447 try: | |
448 logger.debug("Polling pipeline status queue...") | |
449 count += 1 | |
450 data = self.status_queue.get(True, self.interval) | |
451 except queue.Empty: | |
452 if count < 3: | |
453 continue | |
454 data = {'type': 'ping', 'message': 'ping'} | |
455 count = 0 | |
456 | |
457 event_type = data['type'] | |
458 outstr = 'event: %s\ndata: %s\n\n' % ( | |
459 event_type, json.dumps(data)) | |
460 logger.debug("Sending pipeline status SSE.") | |
461 yield bytes(outstr, 'utf8') | |
462 | |
463 def close(self): | |
464 logger.debug("Closing pipeline status SSE.") | |
465 | |
466 | |
388 class ProcessingLoop(threading.Thread): | 467 class ProcessingLoop(threading.Thread): |
389 def __init__(self, pipeline): | 468 def __init__(self, pipeline): |
390 super(ProcessingLoop, self).__init__( | 469 super(ProcessingLoop, self).__init__( |
391 name='pipeline-reloader', daemon=True) | 470 name='pipeline-reloader', daemon=True) |
392 self.pipeline = pipeline | 471 self.pipeline = pipeline |
472 self.status_queue = queue.Queue() | |
393 self.interval = 1 | 473 self.interval = 1 |
394 self._paths = set() | 474 self._paths = set() |
395 self._record = None | 475 self._record = None |
396 self._last_bake = 0 | 476 self._last_bake = 0 |
397 | 477 |
439 try: | 519 try: |
440 self._record = self.pipeline.run( | 520 self._record = self.pipeline.run( |
441 root, | 521 root, |
442 previous_record=self._record, | 522 previous_record=self._record, |
443 save_record=False) | 523 save_record=False) |
524 | |
525 # Update the status queue. | |
526 # (we need to clear it because there may not be a consumer | |
527 # on the other side, if the user isn't running with the | |
528 # debug window active) | |
529 while True: | |
530 try: | |
531 self.status_queue.get_nowait() | |
532 except queue.Empty: | |
533 break | |
534 | |
535 if self._record.success: | |
536 item = { | |
537 'type': 'pipeline_success'} | |
538 self.status_queue.put_nowait(item) | |
539 else: | |
540 item = { | |
541 'type': 'pipeline_error', | |
542 'assets': []} | |
543 for entry in self._record.entries: | |
544 if entry.errors: | |
545 asset_item = { | |
546 'path': entry.rel_input, | |
547 'errors': list(entry.errors)} | |
548 item['assets'].append(asset_item) | |
549 self.status_queue.put_nowait(item) | |
444 except: | 550 except: |
445 pass | 551 pass |
446 | 552 |