Mercurial > piecrust2
view piecrust/serving/procloop.py @ 374:fa3ee8a8ee2d
serve: Split the server code in a couple modules inside a `serving` package.
This makes the `serve` command's code a bit more removed from implementation
details, and paves the way for the CMS mode.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 07 May 2015 21:37:38 -0700 |
parents | |
children | 9612cfc6455a |
line wrap: on
line source
import os import os.path import time import json import queue import logging import threading logger = logging.getLogger(__name__) _sse_abort = threading.Event() class PipelineStatusServerSideEventProducer(object): def __init__(self, status_queue): self.status_queue = status_queue self.interval = 2 self.timeout = 60*10 self._start_time = 0 def run(self): logger.debug("Starting pipeline status SSE.") self._start_time = time.time() outstr = 'event: ping\ndata: started\n\n' yield bytes(outstr, 'utf8') count = 0 while True: if time.time() > self.timeout + self._start_time: logger.debug("Closing pipeline status SSE, timeout reached.") outstr = 'event: pipeline_timeout\ndata: bye\n\n' yield bytes(outstr, 'utf8') break if _sse_abort.is_set(): break try: logger.debug("Polling pipeline status queue...") count += 1 data = self.status_queue.get(True, self.interval) except queue.Empty: if count < 3: continue data = {'type': 'ping', 'message': 'ping'} count = 0 event_type = data['type'] outstr = 'event: %s\ndata: %s\n\n' % ( event_type, json.dumps(data)) logger.debug("Sending pipeline status SSE.") yield bytes(outstr, 'utf8') def close(self): logger.debug("Closing pipeline status SSE.") class ProcessingLoop(threading.Thread): def __init__(self, pipeline): super(ProcessingLoop, self).__init__( name='pipeline-reloader', daemon=True) self.pipeline = pipeline self.status_queue = queue.Queue() self.interval = 1 self._paths = set() self._record = None self._last_bake = 0 def run(self): # Build the first list of known files and run the pipeline once. app = self.pipeline.app roots = [os.path.join(app.root_dir, r) for r in self.pipeline.mounts.keys()] for root in roots: for dirpath, dirnames, filenames in os.walk(root): self._paths |= set([os.path.join(dirpath, f) for f in filenames]) self._last_bake = time.time() self._record = self.pipeline.run() while True: for root in roots: # For each mount root we try to find the first new or # modified file. If any, we just run the pipeline on # that mount. found_new_or_modified = False for dirpath, dirnames, filenames in os.walk(root): for filename in filenames: path = os.path.join(dirpath, filename) if path not in self._paths: logger.debug("Found new asset: %s" % path) self._paths.add(path) found_new_or_modified = True break if os.path.getmtime(path) > self._last_bake: logger.debug("Found modified asset: %s" % path) found_new_or_modified = True break if found_new_or_modified: break if found_new_or_modified: self._runPipeline(root) time.sleep(self.interval) def _runPipeline(self, root): self._last_bake = time.time() try: self._record = self.pipeline.run( root, previous_record=self._record, save_record=False) # Update the status queue. # (we need to clear it because there may not be a consumer # on the other side, if the user isn't running with the # debug window active) while True: try: self.status_queue.get_nowait() except queue.Empty: break if self._record.success: item = { 'type': 'pipeline_success'} self.status_queue.put_nowait(item) else: item = { 'type': 'pipeline_error', 'assets': []} for entry in self._record.entries: if entry.errors: asset_item = { 'path': entry.rel_input, 'errors': list(entry.errors)} item['assets'].append(asset_item) self.status_queue.put_nowait(item) except: pass