Mercurial > piecrust2
changeset 209:7a5a7a7e8cee
serve: Run the asset pipeline asynchronously.
The server is now running a background thread to monitor changes to asset
files. If a change is detected, the processing pipeline is run on the parent
mount.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 22 Jan 2015 22:26:26 -0800 |
parents | 989d0abd7c17 |
children | 681da9009290 |
files | piecrust/serving.py |
diffstat | 1 files changed, 84 insertions(+), 85 deletions(-) [+] |
line wrap: on
line diff
--- a/piecrust/serving.py Thu Jan 22 22:23:43 2015 -0800 +++ b/piecrust/serving.py Thu Jan 22 22:26:26 2015 -0800 @@ -1,20 +1,21 @@ +import io +import os import re import gzip import time -import os import os.path import hashlib import logging -import io -from werkzeug.exceptions import (NotFound, MethodNotAllowed, - InternalServerError, HTTPException) +import threading +from werkzeug.exceptions import ( + NotFound, MethodNotAllowed, InternalServerError, HTTPException) from werkzeug.serving import run_simple from werkzeug.wrappers import Request, Response from werkzeug.wsgi import wrap_file from jinja2 import FileSystemLoader, Environment from piecrust.app import PieCrust -from piecrust.data.filters import (PaginationFilter, HasFilterClause, - IsFilterClause) +from piecrust.data.filters import ( + PaginationFilter, HasFilterClause, IsFilterClause) from piecrust.environment import StandardEnvironment from piecrust.processing.base import ProcessorPipeline from piecrust.rendering import PageRenderingContext, render_page @@ -52,18 +53,15 @@ class Server(object): - def __init__(self, root_dir, host='localhost', port='8080', - debug=False, use_reloader=False, static_preview=True, - synchronous_asset_pipeline=True): + def __init__(self, root_dir, host='localhost', port=8080, + debug=False, use_reloader=False, static_preview=True): self.root_dir = root_dir self.host = host - self.port = port + self.port = int(port) self.debug = debug self.use_reloader = use_reloader self.static_preview = static_preview - self.synchronous_asset_pipeline = synchronous_asset_pipeline self._out_dir = None - self._asset_record = None self._page_record = None self._mimetype_map = load_mimetype_map() @@ -72,9 +70,11 @@ # them to the client. We need a temp app for this. app = PieCrust(root_dir=self.root_dir, debug=self.debug) self._out_dir = os.path.join(app.cache_dir, 'server') + self._page_record = ServeRecord() + pipeline = ProcessorPipeline(app, self._out_dir) - self._asset_record = pipeline.run() - self._page_record = ServeRecord() + loop = ProcessingLoop(pipeline) + loop.start() # Run the WSGI app. wsgi_wrapper = WsgiServer(self) @@ -113,23 +113,17 @@ if response is not None: return response(environ, start_response) - # It's not an asset we know of... let's see if it can be a page asset. + # Let's see if it can be a page asset. response = self._try_serve_page_asset(app, environ, request) if response is not None: return response(environ, start_response) # Nope. Let's see if it's an actual page. - # We trap any exception that says "there's no such page" so we can - # try another thing before bailing out. But we let any exception - # that says "something's wrong" through. - exc = None try: response = self._try_serve_page(app, environ, request) return response(environ, start_response) except (RouteNotFoundError, SourceNotFoundError) as ex: - exc = NotFound(str(ex)) - except NotFound as ex: - exc = ex + raise NotFound(str(ex)) except HTTPException: raise except Exception as ex: @@ -140,71 +134,16 @@ logger.error(msg) raise InternalServerError(msg) - # Nothing worked so far... let's see if there's a new asset. - response = self._try_serve_new_asset(app, environ, request) - if response is not None: - return response(environ, start_response) - - # Nope. Raise the exception we had in store. - raise exc - def _try_serve_asset(self, app, environ, request): - logger.debug("Searching %d entries for asset with path: %s" % - (len(self._asset_record.entries), request.path)) rel_req_path = request.path.lstrip('/').replace('/', os.sep) - entry = self._asset_record.findEntry(rel_req_path) - if entry is None: - # We don't know any asset that could have created this path. - # It could be a new asset that the user just created, but we'll - # check for that later. - # What we can do however is see if there's anything that already - # exists there, because it could have been created by a processor - # that bypasses structured processing (like e.g. the compass - # processor). In that case, just return that file, hoping it will - # be up-to-date. - full_path = os.path.join(self._out_dir, rel_req_path) - try: - response = self._make_wrapped_file_response( - environ, full_path) - logger.debug("Didn't find record entry, but found existing " - "output file at: %s" % rel_req_path) - return response - except OSError: - pass - return None - - # Yep, we know about this URL because we processed an asset that - # maps to it... make sure it's up to date by re-processing it - # before serving. - asset_in_path = entry.path - asset_out_path = os.path.join(self._out_dir, rel_req_path) - - if self.synchronous_asset_pipeline: - logger.debug("Making sure '%s' is up-to-date." % asset_in_path) - pipeline = ProcessorPipeline(app, self._out_dir) - r = pipeline.run(asset_in_path, delete=False, save_record=False, - previous_record=self._asset_record) - assert len(r.entries) == 1 - self._asset_record.replaceEntry(r.entries[0]) - - return self._make_wrapped_file_response(environ, asset_out_path) - - def _try_serve_new_asset(self, app, environ, request): - logger.debug("Searching for a new asset with path: %s" % request.path) - pipeline = ProcessorPipeline(app, self._out_dir) - r = pipeline.run(new_only=True, delete=False, save_record=False, - previous_record=self._asset_record) - for e in r.entries: - self._asset_record.addEntry(e) - - rel_req_path = request.path.lstrip('/').replace('/', os.sep) - entry = self._asset_record.findEntry(rel_req_path) - if entry is None: - return None - - asset_out_path = os.path.join(self._out_dir, rel_req_path) - logger.debug("Found new asset: %s" % entry.path) - return self._make_wrapped_file_response(environ, asset_out_path) + full_path = os.path.join(self._out_dir, rel_req_path) + try: + response = self._make_wrapped_file_response( + environ, full_path) + return response + except OSError: + pass + return None def _try_serve_page_asset(self, app, environ, request): if not request.path.startswith('/_asset/'): @@ -437,3 +376,63 @@ mimetype_map[t] = tokens[0] return mimetype_map + +class ProcessingLoop(threading.Thread): + def __init__(self, pipeline): + super(ProcessingLoop, self).__init__( + name='pipeline-reloader', daemon=True) + self.pipeline = pipeline + self.interval = 1 + self._paths = set() + self._record = None + self._last_bake = 0 + + def run(self): + # Build the first list of known files and run the pipeline once. + app = self.pipeline.app + roots = [os.path.join(app.root_dir, r) + for r in self.pipeline.mounts.keys()] + for root in roots: + for dirpath, dirnames, filenames in os.walk(root): + self._paths |= set([os.path.join(dirpath, f) + for f in filenames]) + self._last_bake = time.time() + self._record = self.pipeline.run(save_record=False) + + while True: + for root in roots: + # For each mount root we try to find the first new or + # modified file. If any, we just run the pipeline on + # that mount. + found_new_or_modified = False + for dirpath, dirnames, filenames in os.walk(root): + for filename in filenames: + path = os.path.join(dirpath, filename) + if path not in self._paths: + logger.debug("Found new asset: %s" % path) + self._paths.add(path) + found_new_or_modified = True + break + if os.path.getmtime(path) > self._last_bake: + logger.debug("Found modified asset: %s" % path) + found_new_or_modified = True + break + + if found_new_or_modified: + break + + if found_new_or_modified: + self._runPipeline(root) + + time.sleep(self.interval) + + def _runPipeline(self, root): + self._last_bake = time.time() + try: + self._record = self.pipeline.run( + root, + previous_record=self._record, + save_record=False) + except: + pass +