comparison piecrust/admin/pubutil.py @ 778:5e91bc0e3b4d

internal: Move admin panel code into the piecrust package.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 16 Jul 2016 15:02:24 +0200
parents foodtruck/pubutil.py@3799621cd25b
children 82509bce94ca
comparison
equal deleted inserted replaced
777:8d633ca59bc5 778:5e91bc0e3b4d
1 import os
2 import os.path
3 import time
4 import errno
5 import signal
6 import logging
7 from .blueprint import foodtruck_bp
8
9
10 logger = logging.getLogger(__name__)
11
12 server_shutdown = False
13
14
15 def _shutdown_server_and_raise_sigint(is_app_debug):
16 if (not is_app_debug or
17 os.environ.get('WERKZEUG_RUN_MAIN') == 'true'):
18 # This is needed when hitting CTRL+C to shutdown the Werkzeug server,
19 # otherwise SSE generators will keep it alive.
20 logger.debug("Shutting down SSE generators...")
21 for h in logger.handlers:
22 h.flush()
23 global server_shutdown
24 server_shutdown = True
25 raise KeyboardInterrupt()
26
27
28 def record_pipeline(state):
29 if state.app.config.get('FOODTRUCK_CMDLINE_MODE', False):
30 # Make sure CTRL+C works correctly.
31 logger.debug("Adding SIGINT callback for pipeline thread.")
32 signal.signal(
33 signal.SIGINT,
34 lambda *args: _shutdown_server_and_raise_sigint(
35 state.app.debug))
36
37
38 foodtruck_bp.record(record_pipeline)
39
40
41 def _read_pid_file(pid_file):
42 logger.debug("Reading PID file: %s" % pid_file)
43 try:
44 with open(pid_file, 'r') as fp:
45 pid_str = fp.read()
46
47 return int(pid_str.strip())
48 except Exception:
49 logger.error("Error reading PID file.")
50 raise
51
52
53 def _pid_exists(pid):
54 logger.debug("Checking if process ID %d is running" % pid)
55 try:
56 os.kill(pid, 0)
57 except OSError as ex:
58 if ex.errno == errno.ESRCH:
59 # No such process.
60 return False
61 elif ex.errno == errno.EPERM:
62 # No permission, so process exists.
63 return True
64 else:
65 raise
66 else:
67 return True
68
69
70 class PublishLogReader(object):
71 _poll_interval = 1 # Check the process every 1 seconds.
72 _ping_interval = 30 # Send a ping message every 30 seconds.
73
74 def __init__(self, pid_path, log_path):
75 self.pid_path = pid_path
76 self.log_path = log_path
77
78 def run(self):
79 logger.debug("Opening publish log...")
80 pid = None
81 pid_mtime = 0
82 is_running = False
83 last_seek = -1
84 last_ping_time = 0
85 try:
86 while not server_shutdown:
87 # PING!
88 interval = time.time() - last_ping_time
89 if interval > self._ping_interval:
90 logger.debug("Sending ping...")
91 last_ping_time = time.time()
92 yield bytes("event: ping\ndata: 1\n\n", 'utf8')
93
94 # Check the PID file timestamp.
95 try:
96 new_mtime = os.path.getmtime(self.pid_path)
97 except OSError:
98 new_mtime = 0
99
100 # If there's a valid PID file and we either just started
101 # streaming (pid_mtime == 0) or we remember an older version
102 # of that PID file (pid_mtime != new_mtime), let's read the
103 # PID from the file.
104 is_pid_file_prehistoric = False
105 if new_mtime > 0 and new_mtime != pid_mtime:
106 is_pid_file_prehistoric = (pid_mtime == 0)
107 pid_mtime = new_mtime
108 pid = _read_pid_file(self.pid_path)
109
110 if is_pid_file_prehistoric:
111 logger.debug("PID file is pre-historic, we will skip the "
112 "first parts of the log.")
113
114 # If we have a valid PID, let's check if the process is
115 # currently running.
116 was_running = is_running
117 if pid:
118 is_running = _pid_exists(pid)
119 logger.debug(
120 "Process %d is %s" %
121 (pid, 'running' if is_running else 'not running'))
122 if not is_running:
123 # Let's forget this PID file until it changes.
124 pid = None
125 else:
126 is_running = False
127
128 # Read new data from the log file.
129 new_data = None
130 if is_running or was_running:
131 if last_seek < 0:
132 # Only send the "publish started" message if we
133 # actually caught the process as it was starting, not
134 # if we started streaming after it started.
135 # This means we saw the PID file get changed.
136 if not is_pid_file_prehistoric:
137 outstr = (
138 'event: message\n'
139 'data: Publish started.\n\n')
140 yield bytes(outstr, 'utf8')
141 last_seek = 0
142
143 try:
144 with open(self.log_path, 'r', encoding='utf8') as fp:
145 fp.seek(last_seek)
146 new_data = fp.read()
147 last_seek = fp.tell()
148 except OSError:
149 pass
150 if not is_running:
151 # Process is not running anymore, let's reset our seek
152 # marker back to the beginning.
153 last_seek = -1
154
155 # Stream the new data to the client, but don't send old stuff
156 # that happened before we started this stream.
157 if new_data and not is_pid_file_prehistoric:
158 logger.debug("SSE: %s" % new_data)
159 for line in new_data.split('\n'):
160 outstr = 'event: message\ndata: %s\n\n' % line
161 yield bytes(outstr, 'utf8')
162
163 time.sleep(self._poll_interval)
164
165 except GeneratorExit:
166 pass
167
168 logger.debug("Closing publish log...")
169