Mercurial > piecrust2
diff piecrust/serving/procloop.py @ 552:9612cfc6455a
serve: Rewrite of the Server-Sent Event code for build notifications.
At the moment the server monitors the asset directories, and notifies the
browser when an asset has changed and has been re-processed.
* Fix issues around long-running requests/threads which mess up the ability
to shutdown the server correctly with `CTRL-C` (see comments in code).
* Move the notification queue to each SSE producer, to support having multiple
pages open in a browser.
* Add JS/CSS for showing quick notifications about re-processed assets.
* Add support for hot-reloading CSS and pictures that have been re-processed.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 08 Aug 2015 16:12:04 -0700 |
parents | fa3ee8a8ee2d |
children | cc6f3dbe3048 |
line wrap: on
line diff
--- a/piecrust/serving/procloop.py Sat Aug 08 15:55:24 2015 -0700 +++ b/piecrust/serving/procloop.py Sat Aug 08 16:12:04 2015 -0700 @@ -4,58 +4,71 @@ import json import queue import logging +import itertools import threading logger = logging.getLogger(__name__) - -_sse_abort = threading.Event() +# This flag is for cancelling all long running requests like SSEs. +server_shutdown = False -class PipelineStatusServerSideEventProducer(object): - def __init__(self, status_queue): - self.status_queue = status_queue - self.interval = 2 - self.timeout = 60*10 +class PipelineStatusServerSentEventProducer(object): + """ The producer for Server-Sent Events (SSE) notifying the front-end + about useful things like assets having been re-processed in the + background. + Each has its own queue because the user could have multiple pages + open, each having to display notifications coming from the server. + """ + def __init__(self, proc_loop): + self._proc_loop = proc_loop + self._queue = queue.Queue() self._start_time = 0 + self._poll_interval = 0.5 + self._ping_interval = 30 + self._time_between_pings = 0 + self._running = 0 + + def addBuildEvent(self, item): + self._queue.put_nowait(item) def run(self): logger.debug("Starting pipeline status SSE.") + self._proc_loop.addObserver(self) self._start_time = time.time() + self._running = 1 outstr = 'event: ping\ndata: started\n\n' yield bytes(outstr, 'utf8') - count = 0 - while True: - if time.time() > self.timeout + self._start_time: - logger.debug("Closing pipeline status SSE, timeout reached.") - outstr = 'event: pipeline_timeout\ndata: bye\n\n' - yield bytes(outstr, 'utf8') - break - - if _sse_abort.is_set(): - break - + while self._running == 1 and not server_shutdown: try: - logger.debug("Polling pipeline status queue...") - count += 1 - data = self.status_queue.get(True, self.interval) + # We use a short poll interval (less than a second) because + # we need to catch `server_shutdown` going `True` as soon as + # possible to exit this thread when the user hits `CTRL+C`. + data = self._queue.get(True, self._poll_interval) except queue.Empty: - if count < 3: - continue - data = {'type': 'ping', 'message': 'ping'} - count = 0 + # Not exact timing but close enough. + self._time_between_pings += self._poll_interval + if self._time_between_pings >= self._ping_interval: + self._time_between_pings = 0 + logger.debug("Sending ping/heartbeat event.") + outstr = 'event: ping\ndata: 1\n\n' + yield bytes(outstr, 'utf8') + continue - event_type = data['type'] - outstr = 'event: %s\ndata: %s\n\n' % ( - event_type, json.dumps(data)) logger.debug("Sending pipeline status SSE.") + outstr = (('event: %s\n' % data['type']) + + ('id: %s\n' % data['id']) + + ('data: %s\n\n' % json.dumps(data))) + self._queue.task_done() yield bytes(outstr, 'utf8') def close(self): logger.debug("Closing pipeline status SSE.") + self._proc_loop.removeObserver(self) + self._running = 2 class ProcessingLoop(threading.Thread): @@ -63,11 +76,21 @@ super(ProcessingLoop, self).__init__( name='pipeline-reloader', daemon=True) self.pipeline = pipeline - self.status_queue = queue.Queue() + self.last_status_id = 0 self.interval = 1 self._paths = set() self._record = None self._last_bake = 0 + self._obs = [] + self._obs_lock = threading.Lock() + + def addObserver(self, obs): + with self._obs_lock: + self._obs.append(obs) + + def removeObserver(self, obs): + with self._obs_lock: + self._obs.remove(obs) def run(self): # Build the first list of known files and run the pipeline once. @@ -116,22 +139,25 @@ previous_record=self._record, save_record=False) - # Update the status queue. - # (we need to clear it because there may not be a consumer - # on the other side, if the user isn't running with the - # debug window active) - while True: - try: - self.status_queue.get_nowait() - except queue.Empty: - break + status_id = self.last_status_id + 1 + self.last_status_id += 1 if self._record.success: + changed = filter( + lambda i: not i.was_collapsed_from_last_run, + self._record.entries) + changed = itertools.chain.from_iterable( + map(lambda i: i.rel_outputs, changed)) + changed = list(changed) item = { - 'type': 'pipeline_success'} - self.status_queue.put_nowait(item) + 'id': status_id, + 'type': 'pipeline_success', + 'assets': changed} + + self._notifyObservers(item) else: item = { + 'id': status_id, 'type': 'pipeline_error', 'assets': []} for entry in self._record.entries: @@ -140,7 +166,14 @@ 'path': entry.rel_input, 'errors': list(entry.errors)} item['assets'].append(asset_item) - self.status_queue.put_nowait(item) - except: - pass + + self._notifyObservers(item) + except Exception as ex: + logger.exception(ex) + def _notifyObservers(self, item): + with self._obs_lock: + observers = list(self._obs) + for obs in observers: + obs.addBuildEvent(item) +