comparison piecrust/serving.py @ 209:7a5a7a7e8cee

serve: Run the asset pipeline asynchronously. The server is now running a background thread to monitor changes to asset files. If a change is detected, the processing pipeline is run on the parent mount.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 22 Jan 2015 22:26:26 -0800
parents e725af1d48fb
children 09e350db7f8f
comparison
equal deleted inserted replaced
208:989d0abd7c17 209:7a5a7a7e8cee
1 import io
2 import os
1 import re 3 import re
2 import gzip 4 import gzip
3 import time 5 import time
4 import os
5 import os.path 6 import os.path
6 import hashlib 7 import hashlib
7 import logging 8 import logging
8 import io 9 import threading
9 from werkzeug.exceptions import (NotFound, MethodNotAllowed, 10 from werkzeug.exceptions import (
10 InternalServerError, HTTPException) 11 NotFound, MethodNotAllowed, InternalServerError, HTTPException)
11 from werkzeug.serving import run_simple 12 from werkzeug.serving import run_simple
12 from werkzeug.wrappers import Request, Response 13 from werkzeug.wrappers import Request, Response
13 from werkzeug.wsgi import wrap_file 14 from werkzeug.wsgi import wrap_file
14 from jinja2 import FileSystemLoader, Environment 15 from jinja2 import FileSystemLoader, Environment
15 from piecrust.app import PieCrust 16 from piecrust.app import PieCrust
16 from piecrust.data.filters import (PaginationFilter, HasFilterClause, 17 from piecrust.data.filters import (
17 IsFilterClause) 18 PaginationFilter, HasFilterClause, IsFilterClause)
18 from piecrust.environment import StandardEnvironment 19 from piecrust.environment import StandardEnvironment
19 from piecrust.processing.base import ProcessorPipeline 20 from piecrust.processing.base import ProcessorPipeline
20 from piecrust.rendering import PageRenderingContext, render_page 21 from piecrust.rendering import PageRenderingContext, render_page
21 from piecrust.sources.base import PageFactory, MODE_PARSING 22 from piecrust.sources.base import PageFactory, MODE_PARSING
22 23
50 self.sub_num = sub_num 51 self.sub_num = sub_num
51 self.used_source_names = set() 52 self.used_source_names = set()
52 53
53 54
54 class Server(object): 55 class Server(object):
55 def __init__(self, root_dir, host='localhost', port='8080', 56 def __init__(self, root_dir, host='localhost', port=8080,
56 debug=False, use_reloader=False, static_preview=True, 57 debug=False, use_reloader=False, static_preview=True):
57 synchronous_asset_pipeline=True):
58 self.root_dir = root_dir 58 self.root_dir = root_dir
59 self.host = host 59 self.host = host
60 self.port = port 60 self.port = int(port)
61 self.debug = debug 61 self.debug = debug
62 self.use_reloader = use_reloader 62 self.use_reloader = use_reloader
63 self.static_preview = static_preview 63 self.static_preview = static_preview
64 self.synchronous_asset_pipeline = synchronous_asset_pipeline
65 self._out_dir = None 64 self._out_dir = None
66 self._asset_record = None
67 self._page_record = None 65 self._page_record = None
68 self._mimetype_map = load_mimetype_map() 66 self._mimetype_map = load_mimetype_map()
69 67
70 def run(self): 68 def run(self):
71 # Bake all the assets so we know what we have, and so we can serve 69 # Bake all the assets so we know what we have, and so we can serve
72 # them to the client. We need a temp app for this. 70 # them to the client. We need a temp app for this.
73 app = PieCrust(root_dir=self.root_dir, debug=self.debug) 71 app = PieCrust(root_dir=self.root_dir, debug=self.debug)
74 self._out_dir = os.path.join(app.cache_dir, 'server') 72 self._out_dir = os.path.join(app.cache_dir, 'server')
73 self._page_record = ServeRecord()
74
75 pipeline = ProcessorPipeline(app, self._out_dir) 75 pipeline = ProcessorPipeline(app, self._out_dir)
76 self._asset_record = pipeline.run() 76 loop = ProcessingLoop(pipeline)
77 self._page_record = ServeRecord() 77 loop.start()
78 78
79 # Run the WSGI app. 79 # Run the WSGI app.
80 wsgi_wrapper = WsgiServer(self) 80 wsgi_wrapper = WsgiServer(self)
81 run_simple(self.host, self.port, wsgi_wrapper, 81 run_simple(self.host, self.port, wsgi_wrapper,
82 use_debugger=self.debug, use_reloader=self.use_reloader) 82 use_debugger=self.debug, use_reloader=self.use_reloader)
111 # See if the requested URL is an asset. 111 # See if the requested URL is an asset.
112 response = self._try_serve_asset(app, environ, request) 112 response = self._try_serve_asset(app, environ, request)
113 if response is not None: 113 if response is not None:
114 return response(environ, start_response) 114 return response(environ, start_response)
115 115
116 # It's not an asset we know of... let's see if it can be a page asset. 116 # Let's see if it can be a page asset.
117 response = self._try_serve_page_asset(app, environ, request) 117 response = self._try_serve_page_asset(app, environ, request)
118 if response is not None: 118 if response is not None:
119 return response(environ, start_response) 119 return response(environ, start_response)
120 120
121 # Nope. Let's see if it's an actual page. 121 # Nope. Let's see if it's an actual page.
122 # We trap any exception that says "there's no such page" so we can
123 # try another thing before bailing out. But we let any exception
124 # that says "something's wrong" through.
125 exc = None
126 try: 122 try:
127 response = self._try_serve_page(app, environ, request) 123 response = self._try_serve_page(app, environ, request)
128 return response(environ, start_response) 124 return response(environ, start_response)
129 except (RouteNotFoundError, SourceNotFoundError) as ex: 125 except (RouteNotFoundError, SourceNotFoundError) as ex:
130 exc = NotFound(str(ex)) 126 raise NotFound(str(ex))
131 except NotFound as ex:
132 exc = ex
133 except HTTPException: 127 except HTTPException:
134 raise 128 raise
135 except Exception as ex: 129 except Exception as ex:
136 if app.debug: 130 if app.debug:
137 logger.exception(ex) 131 logger.exception(ex)
138 raise 132 raise
139 msg = str(ex) 133 msg = str(ex)
140 logger.error(msg) 134 logger.error(msg)
141 raise InternalServerError(msg) 135 raise InternalServerError(msg)
142 136
143 # Nothing worked so far... let's see if there's a new asset.
144 response = self._try_serve_new_asset(app, environ, request)
145 if response is not None:
146 return response(environ, start_response)
147
148 # Nope. Raise the exception we had in store.
149 raise exc
150
151 def _try_serve_asset(self, app, environ, request): 137 def _try_serve_asset(self, app, environ, request):
152 logger.debug("Searching %d entries for asset with path: %s" %
153 (len(self._asset_record.entries), request.path))
154 rel_req_path = request.path.lstrip('/').replace('/', os.sep) 138 rel_req_path = request.path.lstrip('/').replace('/', os.sep)
155 entry = self._asset_record.findEntry(rel_req_path) 139 full_path = os.path.join(self._out_dir, rel_req_path)
156 if entry is None: 140 try:
157 # We don't know any asset that could have created this path. 141 response = self._make_wrapped_file_response(
158 # It could be a new asset that the user just created, but we'll 142 environ, full_path)
159 # check for that later. 143 return response
160 # What we can do however is see if there's anything that already 144 except OSError:
161 # exists there, because it could have been created by a processor 145 pass
162 # that bypasses structured processing (like e.g. the compass 146 return None
163 # processor). In that case, just return that file, hoping it will
164 # be up-to-date.
165 full_path = os.path.join(self._out_dir, rel_req_path)
166 try:
167 response = self._make_wrapped_file_response(
168 environ, full_path)
169 logger.debug("Didn't find record entry, but found existing "
170 "output file at: %s" % rel_req_path)
171 return response
172 except OSError:
173 pass
174 return None
175
176 # Yep, we know about this URL because we processed an asset that
177 # maps to it... make sure it's up to date by re-processing it
178 # before serving.
179 asset_in_path = entry.path
180 asset_out_path = os.path.join(self._out_dir, rel_req_path)
181
182 if self.synchronous_asset_pipeline:
183 logger.debug("Making sure '%s' is up-to-date." % asset_in_path)
184 pipeline = ProcessorPipeline(app, self._out_dir)
185 r = pipeline.run(asset_in_path, delete=False, save_record=False,
186 previous_record=self._asset_record)
187 assert len(r.entries) == 1
188 self._asset_record.replaceEntry(r.entries[0])
189
190 return self._make_wrapped_file_response(environ, asset_out_path)
191
192 def _try_serve_new_asset(self, app, environ, request):
193 logger.debug("Searching for a new asset with path: %s" % request.path)
194 pipeline = ProcessorPipeline(app, self._out_dir)
195 r = pipeline.run(new_only=True, delete=False, save_record=False,
196 previous_record=self._asset_record)
197 for e in r.entries:
198 self._asset_record.addEntry(e)
199
200 rel_req_path = request.path.lstrip('/').replace('/', os.sep)
201 entry = self._asset_record.findEntry(rel_req_path)
202 if entry is None:
203 return None
204
205 asset_out_path = os.path.join(self._out_dir, rel_req_path)
206 logger.debug("Found new asset: %s" % entry.path)
207 return self._make_wrapped_file_response(environ, asset_out_path)
208 147
209 def _try_serve_page_asset(self, app, environ, request): 148 def _try_serve_page_asset(self, app, environ, request):
210 if not request.path.startswith('/_asset/'): 149 if not request.path.startswith('/_asset/'):
211 return None 150 return None
212 151
435 if len(tokens) > 1: 374 if len(tokens) > 1:
436 for t in tokens[1:]: 375 for t in tokens[1:]:
437 mimetype_map[t] = tokens[0] 376 mimetype_map[t] = tokens[0]
438 return mimetype_map 377 return mimetype_map
439 378
379
380 class ProcessingLoop(threading.Thread):
381 def __init__(self, pipeline):
382 super(ProcessingLoop, self).__init__(
383 name='pipeline-reloader', daemon=True)
384 self.pipeline = pipeline
385 self.interval = 1
386 self._paths = set()
387 self._record = None
388 self._last_bake = 0
389
390 def run(self):
391 # Build the first list of known files and run the pipeline once.
392 app = self.pipeline.app
393 roots = [os.path.join(app.root_dir, r)
394 for r in self.pipeline.mounts.keys()]
395 for root in roots:
396 for dirpath, dirnames, filenames in os.walk(root):
397 self._paths |= set([os.path.join(dirpath, f)
398 for f in filenames])
399 self._last_bake = time.time()
400 self._record = self.pipeline.run(save_record=False)
401
402 while True:
403 for root in roots:
404 # For each mount root we try to find the first new or
405 # modified file. If any, we just run the pipeline on
406 # that mount.
407 found_new_or_modified = False
408 for dirpath, dirnames, filenames in os.walk(root):
409 for filename in filenames:
410 path = os.path.join(dirpath, filename)
411 if path not in self._paths:
412 logger.debug("Found new asset: %s" % path)
413 self._paths.add(path)
414 found_new_or_modified = True
415 break
416 if os.path.getmtime(path) > self._last_bake:
417 logger.debug("Found modified asset: %s" % path)
418 found_new_or_modified = True
419 break
420
421 if found_new_or_modified:
422 break
423
424 if found_new_or_modified:
425 self._runPipeline(root)
426
427 time.sleep(self.interval)
428
429 def _runPipeline(self, root):
430 self._last_bake = time.time()
431 try:
432 self._record = self.pipeline.run(
433 root,
434 previous_record=self._record,
435 save_record=False)
436 except:
437 pass
438