Mercurial > piecrust2
comparison piecrust/serving/procloop.py @ 552:9612cfc6455a
serve: Rewrite of the Server-Sent Event code for build notifications.
At the moment the server monitors the asset directories, and notifies the
browser when an asset has changed and has been re-processed.
* Fix issues around long-running requests/threads which mess up the ability
to shutdown the server correctly with `CTRL-C` (see comments in code).
* Move the notification queue to each SSE producer, to support having multiple
pages open in a browser.
* Add JS/CSS for showing quick notifications about re-processed assets.
* Add support for hot-reloading CSS and pictures that have been re-processed.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 08 Aug 2015 16:12:04 -0700 |
parents | fa3ee8a8ee2d |
children | cc6f3dbe3048 |
comparison
equal
deleted
inserted
replaced
551:f2b875ecc940 | 552:9612cfc6455a |
---|---|
2 import os.path | 2 import os.path |
3 import time | 3 import time |
4 import json | 4 import json |
5 import queue | 5 import queue |
6 import logging | 6 import logging |
7 import itertools | |
7 import threading | 8 import threading |
8 | 9 |
9 | 10 |
10 logger = logging.getLogger(__name__) | 11 logger = logging.getLogger(__name__) |
11 | 12 |
12 | 13 # This flag is for cancelling all long running requests like SSEs. |
13 _sse_abort = threading.Event() | 14 server_shutdown = False |
14 | 15 |
15 | 16 |
16 class PipelineStatusServerSideEventProducer(object): | 17 class PipelineStatusServerSentEventProducer(object): |
17 def __init__(self, status_queue): | 18 """ The producer for Server-Sent Events (SSE) notifying the front-end |
18 self.status_queue = status_queue | 19 about useful things like assets having been re-processed in the |
19 self.interval = 2 | 20 background. |
20 self.timeout = 60*10 | 21 Each has its own queue because the user could have multiple pages |
22 open, each having to display notifications coming from the server. | |
23 """ | |
24 def __init__(self, proc_loop): | |
25 self._proc_loop = proc_loop | |
26 self._queue = queue.Queue() | |
21 self._start_time = 0 | 27 self._start_time = 0 |
28 self._poll_interval = 0.5 | |
29 self._ping_interval = 30 | |
30 self._time_between_pings = 0 | |
31 self._running = 0 | |
32 | |
33 def addBuildEvent(self, item): | |
34 self._queue.put_nowait(item) | |
22 | 35 |
23 def run(self): | 36 def run(self): |
24 logger.debug("Starting pipeline status SSE.") | 37 logger.debug("Starting pipeline status SSE.") |
38 self._proc_loop.addObserver(self) | |
25 self._start_time = time.time() | 39 self._start_time = time.time() |
40 self._running = 1 | |
26 | 41 |
27 outstr = 'event: ping\ndata: started\n\n' | 42 outstr = 'event: ping\ndata: started\n\n' |
28 yield bytes(outstr, 'utf8') | 43 yield bytes(outstr, 'utf8') |
29 | 44 |
30 count = 0 | 45 while self._running == 1 and not server_shutdown: |
31 while True: | 46 try: |
32 if time.time() > self.timeout + self._start_time: | 47 # We use a short poll interval (less than a second) because |
33 logger.debug("Closing pipeline status SSE, timeout reached.") | 48 # we need to catch `server_shutdown` going `True` as soon as |
34 outstr = 'event: pipeline_timeout\ndata: bye\n\n' | 49 # possible to exit this thread when the user hits `CTRL+C`. |
35 yield bytes(outstr, 'utf8') | 50 data = self._queue.get(True, self._poll_interval) |
36 break | 51 except queue.Empty: |
52 # Not exact timing but close enough. | |
53 self._time_between_pings += self._poll_interval | |
54 if self._time_between_pings >= self._ping_interval: | |
55 self._time_between_pings = 0 | |
56 logger.debug("Sending ping/heartbeat event.") | |
57 outstr = 'event: ping\ndata: 1\n\n' | |
58 yield bytes(outstr, 'utf8') | |
59 continue | |
37 | 60 |
38 if _sse_abort.is_set(): | |
39 break | |
40 | |
41 try: | |
42 logger.debug("Polling pipeline status queue...") | |
43 count += 1 | |
44 data = self.status_queue.get(True, self.interval) | |
45 except queue.Empty: | |
46 if count < 3: | |
47 continue | |
48 data = {'type': 'ping', 'message': 'ping'} | |
49 count = 0 | |
50 | |
51 event_type = data['type'] | |
52 outstr = 'event: %s\ndata: %s\n\n' % ( | |
53 event_type, json.dumps(data)) | |
54 logger.debug("Sending pipeline status SSE.") | 61 logger.debug("Sending pipeline status SSE.") |
62 outstr = (('event: %s\n' % data['type']) + | |
63 ('id: %s\n' % data['id']) + | |
64 ('data: %s\n\n' % json.dumps(data))) | |
65 self._queue.task_done() | |
55 yield bytes(outstr, 'utf8') | 66 yield bytes(outstr, 'utf8') |
56 | 67 |
57 def close(self): | 68 def close(self): |
58 logger.debug("Closing pipeline status SSE.") | 69 logger.debug("Closing pipeline status SSE.") |
70 self._proc_loop.removeObserver(self) | |
71 self._running = 2 | |
59 | 72 |
60 | 73 |
61 class ProcessingLoop(threading.Thread): | 74 class ProcessingLoop(threading.Thread): |
62 def __init__(self, pipeline): | 75 def __init__(self, pipeline): |
63 super(ProcessingLoop, self).__init__( | 76 super(ProcessingLoop, self).__init__( |
64 name='pipeline-reloader', daemon=True) | 77 name='pipeline-reloader', daemon=True) |
65 self.pipeline = pipeline | 78 self.pipeline = pipeline |
66 self.status_queue = queue.Queue() | 79 self.last_status_id = 0 |
67 self.interval = 1 | 80 self.interval = 1 |
68 self._paths = set() | 81 self._paths = set() |
69 self._record = None | 82 self._record = None |
70 self._last_bake = 0 | 83 self._last_bake = 0 |
84 self._obs = [] | |
85 self._obs_lock = threading.Lock() | |
86 | |
87 def addObserver(self, obs): | |
88 with self._obs_lock: | |
89 self._obs.append(obs) | |
90 | |
91 def removeObserver(self, obs): | |
92 with self._obs_lock: | |
93 self._obs.remove(obs) | |
71 | 94 |
72 def run(self): | 95 def run(self): |
73 # Build the first list of known files and run the pipeline once. | 96 # Build the first list of known files and run the pipeline once. |
74 app = self.pipeline.app | 97 app = self.pipeline.app |
75 roots = [os.path.join(app.root_dir, r) | 98 roots = [os.path.join(app.root_dir, r) |
114 self._record = self.pipeline.run( | 137 self._record = self.pipeline.run( |
115 root, | 138 root, |
116 previous_record=self._record, | 139 previous_record=self._record, |
117 save_record=False) | 140 save_record=False) |
118 | 141 |
119 # Update the status queue. | 142 status_id = self.last_status_id + 1 |
120 # (we need to clear it because there may not be a consumer | 143 self.last_status_id += 1 |
121 # on the other side, if the user isn't running with the | |
122 # debug window active) | |
123 while True: | |
124 try: | |
125 self.status_queue.get_nowait() | |
126 except queue.Empty: | |
127 break | |
128 | 144 |
129 if self._record.success: | 145 if self._record.success: |
146 changed = filter( | |
147 lambda i: not i.was_collapsed_from_last_run, | |
148 self._record.entries) | |
149 changed = itertools.chain.from_iterable( | |
150 map(lambda i: i.rel_outputs, changed)) | |
151 changed = list(changed) | |
130 item = { | 152 item = { |
131 'type': 'pipeline_success'} | 153 'id': status_id, |
132 self.status_queue.put_nowait(item) | 154 'type': 'pipeline_success', |
155 'assets': changed} | |
156 | |
157 self._notifyObservers(item) | |
133 else: | 158 else: |
134 item = { | 159 item = { |
160 'id': status_id, | |
135 'type': 'pipeline_error', | 161 'type': 'pipeline_error', |
136 'assets': []} | 162 'assets': []} |
137 for entry in self._record.entries: | 163 for entry in self._record.entries: |
138 if entry.errors: | 164 if entry.errors: |
139 asset_item = { | 165 asset_item = { |
140 'path': entry.rel_input, | 166 'path': entry.rel_input, |
141 'errors': list(entry.errors)} | 167 'errors': list(entry.errors)} |
142 item['assets'].append(asset_item) | 168 item['assets'].append(asset_item) |
143 self.status_queue.put_nowait(item) | |
144 except: | |
145 pass | |
146 | 169 |
170 self._notifyObservers(item) | |
171 except Exception as ex: | |
172 logger.exception(ex) | |
173 | |
174 def _notifyObservers(self, item): | |
175 with self._obs_lock: | |
176 observers = list(self._obs) | |
177 for obs in observers: | |
178 obs.addBuildEvent(item) | |
179 |