diff piecrust/admin/pubutil.py @ 778:5e91bc0e3b4d

internal: Move admin panel code into the piecrust package.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 16 Jul 2016 15:02:24 +0200
parents foodtruck/pubutil.py@3799621cd25b
children 82509bce94ca
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/admin/pubutil.py	Sat Jul 16 15:02:24 2016 +0200
@@ -0,0 +1,169 @@
+import os
+import os.path
+import time
+import errno
+import signal
+import logging
+from .blueprint import foodtruck_bp
+
+
+logger = logging.getLogger(__name__)
+
+server_shutdown = False
+
+
+def _shutdown_server_and_raise_sigint(is_app_debug):
+    if (not is_app_debug or
+            os.environ.get('WERKZEUG_RUN_MAIN') == 'true'):
+        # This is needed when hitting CTRL+C to shutdown the Werkzeug server,
+        # otherwise SSE generators will keep it alive.
+        logger.debug("Shutting down SSE generators...")
+        for h in logger.handlers:
+            h.flush()
+        global server_shutdown
+        server_shutdown = True
+    raise KeyboardInterrupt()
+
+
+def record_pipeline(state):
+    if state.app.config.get('FOODTRUCK_CMDLINE_MODE', False):
+        # Make sure CTRL+C works correctly.
+        logger.debug("Adding SIGINT callback for pipeline thread.")
+        signal.signal(
+                signal.SIGINT,
+                lambda *args: _shutdown_server_and_raise_sigint(
+                    state.app.debug))
+
+
+foodtruck_bp.record(record_pipeline)
+
+
+def _read_pid_file(pid_file):
+    logger.debug("Reading PID file: %s" % pid_file)
+    try:
+        with open(pid_file, 'r') as fp:
+            pid_str = fp.read()
+
+        return int(pid_str.strip())
+    except Exception:
+        logger.error("Error reading PID file.")
+        raise
+
+
+def _pid_exists(pid):
+    logger.debug("Checking if process ID %d is running" % pid)
+    try:
+        os.kill(pid, 0)
+    except OSError as ex:
+        if ex.errno == errno.ESRCH:
+            # No such process.
+            return False
+        elif ex.errno == errno.EPERM:
+            # No permission, so process exists.
+            return True
+        else:
+            raise
+    else:
+        return True
+
+
+class PublishLogReader(object):
+    _poll_interval = 1        # Check the process every 1 seconds.
+    _ping_interval = 30       # Send a ping message every 30 seconds.
+
+    def __init__(self, pid_path, log_path):
+        self.pid_path = pid_path
+        self.log_path = log_path
+
+    def run(self):
+        logger.debug("Opening publish log...")
+        pid = None
+        pid_mtime = 0
+        is_running = False
+        last_seek = -1
+        last_ping_time = 0
+        try:
+            while not server_shutdown:
+                # PING!
+                interval = time.time() - last_ping_time
+                if interval > self._ping_interval:
+                    logger.debug("Sending ping...")
+                    last_ping_time = time.time()
+                    yield bytes("event: ping\ndata: 1\n\n", 'utf8')
+
+                # Check the PID file timestamp.
+                try:
+                    new_mtime = os.path.getmtime(self.pid_path)
+                except OSError:
+                    new_mtime = 0
+
+                # If there's a valid PID file and we either just started
+                # streaming (pid_mtime == 0) or we remember an older version
+                # of that PID file (pid_mtime != new_mtime), let's read the
+                # PID from the file.
+                is_pid_file_prehistoric = False
+                if new_mtime > 0 and new_mtime != pid_mtime:
+                    is_pid_file_prehistoric = (pid_mtime == 0)
+                    pid_mtime = new_mtime
+                    pid = _read_pid_file(self.pid_path)
+
+                if is_pid_file_prehistoric:
+                    logger.debug("PID file is pre-historic, we will skip the "
+                                 "first parts of the log.")
+
+                # If we have a valid PID, let's check if the process is
+                # currently running.
+                was_running = is_running
+                if pid:
+                    is_running = _pid_exists(pid)
+                    logger.debug(
+                            "Process %d is %s" %
+                            (pid, 'running' if is_running else 'not running'))
+                    if not is_running:
+                        # Let's forget this PID file until it changes.
+                        pid = None
+                else:
+                    is_running = False
+
+                # Read new data from the log file.
+                new_data = None
+                if is_running or was_running:
+                    if last_seek < 0:
+                        # Only send the "publish started" message if we
+                        # actually caught the process as it was starting, not
+                        # if we started streaming after it started.
+                        # This means we saw the PID file get changed.
+                        if not is_pid_file_prehistoric:
+                            outstr = (
+                                    'event: message\n'
+                                    'data: Publish started.\n\n')
+                            yield bytes(outstr, 'utf8')
+                        last_seek = 0
+
+                    try:
+                        with open(self.log_path, 'r', encoding='utf8') as fp:
+                            fp.seek(last_seek)
+                            new_data = fp.read()
+                            last_seek = fp.tell()
+                    except OSError:
+                        pass
+                if not is_running:
+                    # Process is not running anymore, let's reset our seek
+                    # marker back to the beginning.
+                    last_seek = -1
+
+                # Stream the new data to the client, but don't send old stuff
+                # that happened before we started this stream.
+                if new_data and not is_pid_file_prehistoric:
+                    logger.debug("SSE: %s" % new_data)
+                    for line in new_data.split('\n'):
+                        outstr = 'event: message\ndata: %s\n\n' % line
+                        yield bytes(outstr, 'utf8')
+
+                time.sleep(self._poll_interval)
+
+        except GeneratorExit:
+            pass
+
+        logger.debug("Closing publish log...")
+