changeset 209:7a5a7a7e8cee

serve: Run the asset pipeline asynchronously. The server is now running a background thread to monitor changes to asset files. If a change is detected, the processing pipeline is run on the parent mount.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 22 Jan 2015 22:26:26 -0800
parents 989d0abd7c17
children 681da9009290
files piecrust/serving.py
diffstat 1 files changed, 84 insertions(+), 85 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/serving.py	Thu Jan 22 22:23:43 2015 -0800
+++ b/piecrust/serving.py	Thu Jan 22 22:26:26 2015 -0800
@@ -1,20 +1,21 @@
+import io
+import os
 import re
 import gzip
 import time
-import os
 import os.path
 import hashlib
 import logging
-import io
-from werkzeug.exceptions import (NotFound, MethodNotAllowed,
-        InternalServerError, HTTPException)
+import threading
+from werkzeug.exceptions import (
+        NotFound, MethodNotAllowed, InternalServerError, HTTPException)
 from werkzeug.serving import run_simple
 from werkzeug.wrappers import Request, Response
 from werkzeug.wsgi import wrap_file
 from jinja2 import FileSystemLoader, Environment
 from piecrust.app import PieCrust
-from piecrust.data.filters import (PaginationFilter, HasFilterClause,
-        IsFilterClause)
+from piecrust.data.filters import (
+        PaginationFilter, HasFilterClause, IsFilterClause)
 from piecrust.environment import StandardEnvironment
 from piecrust.processing.base import ProcessorPipeline
 from piecrust.rendering import PageRenderingContext, render_page
@@ -52,18 +53,15 @@
 
 
 class Server(object):
-    def __init__(self, root_dir, host='localhost', port='8080',
-                 debug=False, use_reloader=False, static_preview=True,
-                 synchronous_asset_pipeline=True):
+    def __init__(self, root_dir, host='localhost', port=8080,
+                 debug=False, use_reloader=False, static_preview=True):
         self.root_dir = root_dir
         self.host = host
-        self.port = port
+        self.port = int(port)
         self.debug = debug
         self.use_reloader = use_reloader
         self.static_preview = static_preview
-        self.synchronous_asset_pipeline = synchronous_asset_pipeline
         self._out_dir = None
-        self._asset_record = None
         self._page_record = None
         self._mimetype_map = load_mimetype_map()
 
@@ -72,9 +70,11 @@
         # them to the client. We need a temp app for this.
         app = PieCrust(root_dir=self.root_dir, debug=self.debug)
         self._out_dir = os.path.join(app.cache_dir, 'server')
+        self._page_record = ServeRecord()
+
         pipeline = ProcessorPipeline(app, self._out_dir)
-        self._asset_record = pipeline.run()
-        self._page_record = ServeRecord()
+        loop = ProcessingLoop(pipeline)
+        loop.start()
 
         # Run the WSGI app.
         wsgi_wrapper = WsgiServer(self)
@@ -113,23 +113,17 @@
         if response is not None:
             return response(environ, start_response)
 
-        # It's not an asset we know of... let's see if it can be a page asset.
+        # Let's see if it can be a page asset.
         response = self._try_serve_page_asset(app, environ, request)
         if response is not None:
             return response(environ, start_response)
 
         # Nope. Let's see if it's an actual page.
-        # We trap any exception that says "there's no such page" so we can
-        # try another thing before bailing out. But we let any exception
-        # that says "something's wrong" through.
-        exc = None
         try:
             response = self._try_serve_page(app, environ, request)
             return response(environ, start_response)
         except (RouteNotFoundError, SourceNotFoundError) as ex:
-            exc = NotFound(str(ex))
-        except NotFound as ex:
-            exc = ex
+            raise NotFound(str(ex))
         except HTTPException:
             raise
         except Exception as ex:
@@ -140,71 +134,16 @@
             logger.error(msg)
             raise InternalServerError(msg)
 
-        # Nothing worked so far... let's see if there's a new asset.
-        response = self._try_serve_new_asset(app, environ, request)
-        if response is not None:
-            return response(environ, start_response)
-
-        # Nope. Raise the exception we had in store.
-        raise exc
-
     def _try_serve_asset(self, app, environ, request):
-        logger.debug("Searching %d entries for asset with path: %s" %
-                (len(self._asset_record.entries), request.path))
         rel_req_path = request.path.lstrip('/').replace('/', os.sep)
-        entry = self._asset_record.findEntry(rel_req_path)
-        if entry is None:
-            # We don't know any asset that could have created this path.
-            # It could be a new asset that the user just created, but we'll
-            # check for that later.
-            # What we can do however is see if there's anything that already
-            # exists there, because it could have been created by a processor
-            # that bypasses structured processing (like e.g. the compass
-            # processor). In that case, just return that file, hoping it will
-            # be up-to-date.
-            full_path = os.path.join(self._out_dir, rel_req_path)
-            try:
-                response = self._make_wrapped_file_response(
-                        environ, full_path)
-                logger.debug("Didn't find record entry, but found existing "
-                             "output file at: %s" % rel_req_path)
-                return response
-            except OSError:
-                pass
-            return None
-
-        # Yep, we know about this URL because we processed an asset that
-        # maps to it... make sure it's up to date by re-processing it
-        # before serving.
-        asset_in_path = entry.path
-        asset_out_path = os.path.join(self._out_dir, rel_req_path)
-
-        if self.synchronous_asset_pipeline:
-            logger.debug("Making sure '%s' is up-to-date." % asset_in_path)
-            pipeline = ProcessorPipeline(app, self._out_dir)
-            r = pipeline.run(asset_in_path, delete=False, save_record=False,
-                             previous_record=self._asset_record)
-            assert len(r.entries) == 1
-            self._asset_record.replaceEntry(r.entries[0])
-
-        return self._make_wrapped_file_response(environ, asset_out_path)
-
-    def _try_serve_new_asset(self, app, environ, request):
-        logger.debug("Searching for a new asset with path: %s" % request.path)
-        pipeline = ProcessorPipeline(app, self._out_dir)
-        r = pipeline.run(new_only=True, delete=False, save_record=False,
-                         previous_record=self._asset_record)
-        for e in r.entries:
-            self._asset_record.addEntry(e)
-
-        rel_req_path = request.path.lstrip('/').replace('/', os.sep)
-        entry = self._asset_record.findEntry(rel_req_path)
-        if entry is None:
-            return None
-
-        asset_out_path = os.path.join(self._out_dir, rel_req_path)
-        logger.debug("Found new asset: %s" % entry.path)
-        return self._make_wrapped_file_response(environ, asset_out_path)
+        full_path = os.path.join(self._out_dir, rel_req_path)
+        try:
+            response = self._make_wrapped_file_response(
+                    environ, full_path)
+            return response
+        except OSError:
+            pass
+        return None
 
     def _try_serve_page_asset(self, app, environ, request):
         if not request.path.startswith('/_asset/'):
@@ -437,3 +376,63 @@
                     mimetype_map[t] = tokens[0]
     return mimetype_map
 
+
+class ProcessingLoop(threading.Thread):
+    def __init__(self, pipeline):
+        super(ProcessingLoop, self).__init__(
+                name='pipeline-reloader', daemon=True)
+        self.pipeline = pipeline
+        self.interval = 1
+        self._paths = set()
+        self._record = None
+        self._last_bake = 0
+
+    def run(self):
+        # Build the first list of known files and run the pipeline once.
+        app = self.pipeline.app
+        roots = [os.path.join(app.root_dir, r)
+                 for r in self.pipeline.mounts.keys()]
+        for root in roots:
+            for dirpath, dirnames, filenames in os.walk(root):
+                self._paths |= set([os.path.join(dirpath, f)
+                                    for f in filenames])
+        self._last_bake = time.time()
+        self._record = self.pipeline.run(save_record=False)
+
+        while True:
+            for root in roots:
+                # For each mount root we try to find the first new or
+                # modified file. If any, we just run the pipeline on
+                # that mount.
+                found_new_or_modified = False
+                for dirpath, dirnames, filenames in os.walk(root):
+                    for filename in filenames:
+                        path = os.path.join(dirpath, filename)
+                        if path not in self._paths:
+                            logger.debug("Found new asset: %s" % path)
+                            self._paths.add(path)
+                            found_new_or_modified = True
+                            break
+                        if os.path.getmtime(path) > self._last_bake:
+                            logger.debug("Found modified asset: %s" % path)
+                            found_new_or_modified = True
+                            break
+
+                    if found_new_or_modified:
+                        break
+
+                if found_new_or_modified:
+                    self._runPipeline(root)
+
+            time.sleep(self.interval)
+
+    def _runPipeline(self, root):
+        self._last_bake = time.time()
+        try:
+            self._record = self.pipeline.run(
+                    root,
+                    previous_record=self._record,
+                    save_record=False)
+        except:
+            pass
+