comparison piecrust/serving/procloop.py @ 552:9612cfc6455a

serve: Rewrite of the Server-Sent Event code for build notifications. At the moment the server monitors the asset directories, and notifies the browser when an asset has changed and has been re-processed. * Fix issues around long-running requests/threads which mess up the ability to shutdown the server correctly with `CTRL-C` (see comments in code). * Move the notification queue to each SSE producer, to support having multiple pages open in a browser. * Add JS/CSS for showing quick notifications about re-processed assets. * Add support for hot-reloading CSS and pictures that have been re-processed.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 08 Aug 2015 16:12:04 -0700
parents fa3ee8a8ee2d
children cc6f3dbe3048
comparison
equal deleted inserted replaced
551:f2b875ecc940 552:9612cfc6455a
2 import os.path 2 import os.path
3 import time 3 import time
4 import json 4 import json
5 import queue 5 import queue
6 import logging 6 import logging
7 import itertools
7 import threading 8 import threading
8 9
9 10
10 logger = logging.getLogger(__name__) 11 logger = logging.getLogger(__name__)
11 12
12 13 # This flag is for cancelling all long running requests like SSEs.
13 _sse_abort = threading.Event() 14 server_shutdown = False
14 15
15 16
16 class PipelineStatusServerSideEventProducer(object): 17 class PipelineStatusServerSentEventProducer(object):
17 def __init__(self, status_queue): 18 """ The producer for Server-Sent Events (SSE) notifying the front-end
18 self.status_queue = status_queue 19 about useful things like assets having been re-processed in the
19 self.interval = 2 20 background.
20 self.timeout = 60*10 21 Each has its own queue because the user could have multiple pages
22 open, each having to display notifications coming from the server.
23 """
24 def __init__(self, proc_loop):
25 self._proc_loop = proc_loop
26 self._queue = queue.Queue()
21 self._start_time = 0 27 self._start_time = 0
28 self._poll_interval = 0.5
29 self._ping_interval = 30
30 self._time_between_pings = 0
31 self._running = 0
32
33 def addBuildEvent(self, item):
34 self._queue.put_nowait(item)
22 35
23 def run(self): 36 def run(self):
24 logger.debug("Starting pipeline status SSE.") 37 logger.debug("Starting pipeline status SSE.")
38 self._proc_loop.addObserver(self)
25 self._start_time = time.time() 39 self._start_time = time.time()
40 self._running = 1
26 41
27 outstr = 'event: ping\ndata: started\n\n' 42 outstr = 'event: ping\ndata: started\n\n'
28 yield bytes(outstr, 'utf8') 43 yield bytes(outstr, 'utf8')
29 44
30 count = 0 45 while self._running == 1 and not server_shutdown:
31 while True: 46 try:
32 if time.time() > self.timeout + self._start_time: 47 # We use a short poll interval (less than a second) because
33 logger.debug("Closing pipeline status SSE, timeout reached.") 48 # we need to catch `server_shutdown` going `True` as soon as
34 outstr = 'event: pipeline_timeout\ndata: bye\n\n' 49 # possible to exit this thread when the user hits `CTRL+C`.
35 yield bytes(outstr, 'utf8') 50 data = self._queue.get(True, self._poll_interval)
36 break 51 except queue.Empty:
52 # Not exact timing but close enough.
53 self._time_between_pings += self._poll_interval
54 if self._time_between_pings >= self._ping_interval:
55 self._time_between_pings = 0
56 logger.debug("Sending ping/heartbeat event.")
57 outstr = 'event: ping\ndata: 1\n\n'
58 yield bytes(outstr, 'utf8')
59 continue
37 60
38 if _sse_abort.is_set():
39 break
40
41 try:
42 logger.debug("Polling pipeline status queue...")
43 count += 1
44 data = self.status_queue.get(True, self.interval)
45 except queue.Empty:
46 if count < 3:
47 continue
48 data = {'type': 'ping', 'message': 'ping'}
49 count = 0
50
51 event_type = data['type']
52 outstr = 'event: %s\ndata: %s\n\n' % (
53 event_type, json.dumps(data))
54 logger.debug("Sending pipeline status SSE.") 61 logger.debug("Sending pipeline status SSE.")
62 outstr = (('event: %s\n' % data['type']) +
63 ('id: %s\n' % data['id']) +
64 ('data: %s\n\n' % json.dumps(data)))
65 self._queue.task_done()
55 yield bytes(outstr, 'utf8') 66 yield bytes(outstr, 'utf8')
56 67
57 def close(self): 68 def close(self):
58 logger.debug("Closing pipeline status SSE.") 69 logger.debug("Closing pipeline status SSE.")
70 self._proc_loop.removeObserver(self)
71 self._running = 2
59 72
60 73
61 class ProcessingLoop(threading.Thread): 74 class ProcessingLoop(threading.Thread):
62 def __init__(self, pipeline): 75 def __init__(self, pipeline):
63 super(ProcessingLoop, self).__init__( 76 super(ProcessingLoop, self).__init__(
64 name='pipeline-reloader', daemon=True) 77 name='pipeline-reloader', daemon=True)
65 self.pipeline = pipeline 78 self.pipeline = pipeline
66 self.status_queue = queue.Queue() 79 self.last_status_id = 0
67 self.interval = 1 80 self.interval = 1
68 self._paths = set() 81 self._paths = set()
69 self._record = None 82 self._record = None
70 self._last_bake = 0 83 self._last_bake = 0
84 self._obs = []
85 self._obs_lock = threading.Lock()
86
87 def addObserver(self, obs):
88 with self._obs_lock:
89 self._obs.append(obs)
90
91 def removeObserver(self, obs):
92 with self._obs_lock:
93 self._obs.remove(obs)
71 94
72 def run(self): 95 def run(self):
73 # Build the first list of known files and run the pipeline once. 96 # Build the first list of known files and run the pipeline once.
74 app = self.pipeline.app 97 app = self.pipeline.app
75 roots = [os.path.join(app.root_dir, r) 98 roots = [os.path.join(app.root_dir, r)
114 self._record = self.pipeline.run( 137 self._record = self.pipeline.run(
115 root, 138 root,
116 previous_record=self._record, 139 previous_record=self._record,
117 save_record=False) 140 save_record=False)
118 141
119 # Update the status queue. 142 status_id = self.last_status_id + 1
120 # (we need to clear it because there may not be a consumer 143 self.last_status_id += 1
121 # on the other side, if the user isn't running with the
122 # debug window active)
123 while True:
124 try:
125 self.status_queue.get_nowait()
126 except queue.Empty:
127 break
128 144
129 if self._record.success: 145 if self._record.success:
146 changed = filter(
147 lambda i: not i.was_collapsed_from_last_run,
148 self._record.entries)
149 changed = itertools.chain.from_iterable(
150 map(lambda i: i.rel_outputs, changed))
151 changed = list(changed)
130 item = { 152 item = {
131 'type': 'pipeline_success'} 153 'id': status_id,
132 self.status_queue.put_nowait(item) 154 'type': 'pipeline_success',
155 'assets': changed}
156
157 self._notifyObservers(item)
133 else: 158 else:
134 item = { 159 item = {
160 'id': status_id,
135 'type': 'pipeline_error', 161 'type': 'pipeline_error',
136 'assets': []} 162 'assets': []}
137 for entry in self._record.entries: 163 for entry in self._record.entries:
138 if entry.errors: 164 if entry.errors:
139 asset_item = { 165 asset_item = {
140 'path': entry.rel_input, 166 'path': entry.rel_input,
141 'errors': list(entry.errors)} 167 'errors': list(entry.errors)}
142 item['assets'].append(asset_item) 168 item['assets'].append(asset_item)
143 self.status_queue.put_nowait(item)
144 except:
145 pass
146 169
170 self._notifyObservers(item)
171 except Exception as ex:
172 logger.exception(ex)
173
174 def _notifyObservers(self, item):
175 with self._obs_lock:
176 observers = list(self._obs)
177 for obs in observers:
178 obs.addBuildEvent(item)
179