diff piecrust/serving/procloop.py @ 374:fa3ee8a8ee2d

serve: Split the server code in a couple modules inside a `serving` package. This makes the `serve` command's code a bit more removed from implementation details, and paves the way for the CMS mode.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 07 May 2015 21:37:38 -0700
parents
children 9612cfc6455a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/serving/procloop.py	Thu May 07 21:37:38 2015 -0700
@@ -0,0 +1,146 @@
+import os
+import os.path
+import time
+import json
+import queue
+import logging
+import threading
+
+
+logger = logging.getLogger(__name__)
+
+
+_sse_abort = threading.Event()
+
+
+class PipelineStatusServerSideEventProducer(object):
+    def __init__(self, status_queue):
+        self.status_queue = status_queue
+        self.interval = 2
+        self.timeout = 60*10
+        self._start_time = 0
+
+    def run(self):
+        logger.debug("Starting pipeline status SSE.")
+        self._start_time = time.time()
+
+        outstr = 'event: ping\ndata: started\n\n'
+        yield bytes(outstr, 'utf8')
+
+        count = 0
+        while True:
+            if time.time() > self.timeout + self._start_time:
+                logger.debug("Closing pipeline status SSE, timeout reached.")
+                outstr = 'event: pipeline_timeout\ndata: bye\n\n'
+                yield bytes(outstr, 'utf8')
+                break
+
+            if _sse_abort.is_set():
+                break
+
+            try:
+                logger.debug("Polling pipeline status queue...")
+                count += 1
+                data = self.status_queue.get(True, self.interval)
+            except queue.Empty:
+                if count < 3:
+                    continue
+                data = {'type': 'ping', 'message': 'ping'}
+                count = 0
+
+            event_type = data['type']
+            outstr = 'event: %s\ndata: %s\n\n' % (
+                    event_type, json.dumps(data))
+            logger.debug("Sending pipeline status SSE.")
+            yield bytes(outstr, 'utf8')
+
+    def close(self):
+        logger.debug("Closing pipeline status SSE.")
+
+
+class ProcessingLoop(threading.Thread):
+    def __init__(self, pipeline):
+        super(ProcessingLoop, self).__init__(
+                name='pipeline-reloader', daemon=True)
+        self.pipeline = pipeline
+        self.status_queue = queue.Queue()
+        self.interval = 1
+        self._paths = set()
+        self._record = None
+        self._last_bake = 0
+
+    def run(self):
+        # Build the first list of known files and run the pipeline once.
+        app = self.pipeline.app
+        roots = [os.path.join(app.root_dir, r)
+                 for r in self.pipeline.mounts.keys()]
+        for root in roots:
+            for dirpath, dirnames, filenames in os.walk(root):
+                self._paths |= set([os.path.join(dirpath, f)
+                                    for f in filenames])
+        self._last_bake = time.time()
+        self._record = self.pipeline.run()
+
+        while True:
+            for root in roots:
+                # For each mount root we try to find the first new or
+                # modified file. If any, we just run the pipeline on
+                # that mount.
+                found_new_or_modified = False
+                for dirpath, dirnames, filenames in os.walk(root):
+                    for filename in filenames:
+                        path = os.path.join(dirpath, filename)
+                        if path not in self._paths:
+                            logger.debug("Found new asset: %s" % path)
+                            self._paths.add(path)
+                            found_new_or_modified = True
+                            break
+                        if os.path.getmtime(path) > self._last_bake:
+                            logger.debug("Found modified asset: %s" % path)
+                            found_new_or_modified = True
+                            break
+
+                    if found_new_or_modified:
+                        break
+
+                if found_new_or_modified:
+                    self._runPipeline(root)
+
+            time.sleep(self.interval)
+
+    def _runPipeline(self, root):
+        self._last_bake = time.time()
+        try:
+            self._record = self.pipeline.run(
+                    root,
+                    previous_record=self._record,
+                    save_record=False)
+
+            # Update the status queue.
+            # (we need to clear it because there may not be a consumer
+            #  on the other side, if the user isn't running with the
+            #  debug window active)
+            while True:
+                try:
+                    self.status_queue.get_nowait()
+                except queue.Empty:
+                    break
+
+            if self._record.success:
+                item = {
+                        'type': 'pipeline_success'}
+                self.status_queue.put_nowait(item)
+            else:
+                item = {
+                        'type': 'pipeline_error',
+                        'assets': []}
+                for entry in self._record.entries:
+                    if entry.errors:
+                        asset_item = {
+                                'path': entry.rel_input,
+                                'errors': list(entry.errors)}
+                        item['assets'].append(asset_item)
+                self.status_queue.put_nowait(item)
+        except:
+            pass
+