Mercurial > piecrust2
diff piecrust/serving/procloop.py @ 374:fa3ee8a8ee2d
serve: Split the server code in a couple modules inside a `serving` package.
This makes the `serve` command's code a bit more removed from implementation
details, and paves the way for the CMS mode.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 07 May 2015 21:37:38 -0700 |
parents | |
children | 9612cfc6455a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/serving/procloop.py Thu May 07 21:37:38 2015 -0700 @@ -0,0 +1,146 @@ +import os +import os.path +import time +import json +import queue +import logging +import threading + + +logger = logging.getLogger(__name__) + + +_sse_abort = threading.Event() + + +class PipelineStatusServerSideEventProducer(object): + def __init__(self, status_queue): + self.status_queue = status_queue + self.interval = 2 + self.timeout = 60*10 + self._start_time = 0 + + def run(self): + logger.debug("Starting pipeline status SSE.") + self._start_time = time.time() + + outstr = 'event: ping\ndata: started\n\n' + yield bytes(outstr, 'utf8') + + count = 0 + while True: + if time.time() > self.timeout + self._start_time: + logger.debug("Closing pipeline status SSE, timeout reached.") + outstr = 'event: pipeline_timeout\ndata: bye\n\n' + yield bytes(outstr, 'utf8') + break + + if _sse_abort.is_set(): + break + + try: + logger.debug("Polling pipeline status queue...") + count += 1 + data = self.status_queue.get(True, self.interval) + except queue.Empty: + if count < 3: + continue + data = {'type': 'ping', 'message': 'ping'} + count = 0 + + event_type = data['type'] + outstr = 'event: %s\ndata: %s\n\n' % ( + event_type, json.dumps(data)) + logger.debug("Sending pipeline status SSE.") + yield bytes(outstr, 'utf8') + + def close(self): + logger.debug("Closing pipeline status SSE.") + + +class ProcessingLoop(threading.Thread): + def __init__(self, pipeline): + super(ProcessingLoop, self).__init__( + name='pipeline-reloader', daemon=True) + self.pipeline = pipeline + self.status_queue = queue.Queue() + self.interval = 1 + self._paths = set() + self._record = None + self._last_bake = 0 + + def run(self): + # Build the first list of known files and run the pipeline once. + app = self.pipeline.app + roots = [os.path.join(app.root_dir, r) + for r in self.pipeline.mounts.keys()] + for root in roots: + for dirpath, dirnames, filenames in os.walk(root): + self._paths |= set([os.path.join(dirpath, f) + for f in filenames]) + self._last_bake = time.time() + self._record = self.pipeline.run() + + while True: + for root in roots: + # For each mount root we try to find the first new or + # modified file. If any, we just run the pipeline on + # that mount. + found_new_or_modified = False + for dirpath, dirnames, filenames in os.walk(root): + for filename in filenames: + path = os.path.join(dirpath, filename) + if path not in self._paths: + logger.debug("Found new asset: %s" % path) + self._paths.add(path) + found_new_or_modified = True + break + if os.path.getmtime(path) > self._last_bake: + logger.debug("Found modified asset: %s" % path) + found_new_or_modified = True + break + + if found_new_or_modified: + break + + if found_new_or_modified: + self._runPipeline(root) + + time.sleep(self.interval) + + def _runPipeline(self, root): + self._last_bake = time.time() + try: + self._record = self.pipeline.run( + root, + previous_record=self._record, + save_record=False) + + # Update the status queue. + # (we need to clear it because there may not be a consumer + # on the other side, if the user isn't running with the + # debug window active) + while True: + try: + self.status_queue.get_nowait() + except queue.Empty: + break + + if self._record.success: + item = { + 'type': 'pipeline_success'} + self.status_queue.put_nowait(item) + else: + item = { + 'type': 'pipeline_error', + 'assets': []} + for entry in self._record.entries: + if entry.errors: + asset_item = { + 'path': entry.rel_input, + 'errors': list(entry.errors)} + item['assets'].append(asset_item) + self.status_queue.put_nowait(item) + except: + pass +