Mercurial > piecrust2
view piecrust/serving/procloop.py @ 1051:971b4d67e82a
serve: Fix problems with assets disappearing between servings.
When an asset file changes, its source's pipeline is re-run. But that created
a bake record that only had that pipeline's output, so the other outputs were
incorrectly considered empty and therefore any stray files were removed. Now we
copy over bake records for the pipelines we don't run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 26 Jan 2018 18:05:02 -0800 |
parents | 2f39ffa601a9 |
children | 292e3a1316d8 |
line wrap: on
line source
import os import os.path import time import json import queue import logging import itertools import threading from piecrust import CONFIG_PATH, THEME_CONFIG_PATH from piecrust.chefutil import format_timed_scope from piecrust.pipelines.records import MultiRecord logger = logging.getLogger(__name__) # This flag is for cancelling all long running requests like SSEs. server_shutdown = False class PipelineStatusServerSentEventProducer(object): """ The producer for Server-Sent Events (SSE) notifying the front-end about useful things like assets having been re-processed in the background. Each has its own queue because the user could have multiple pages open, each having to display notifications coming from the server. """ def __init__(self, proc_loop): self._proc_loop = proc_loop self._queue = queue.Queue() self._start_time = 0 self._poll_interval = 0.5 self._ping_interval = 30 self._time_between_pings = 0 self._running = 0 def addBuildEvent(self, item): self._queue.put_nowait(item) def run(self): logger.debug("Starting pipeline status SSE.") self._proc_loop.addObserver(self) self._start_time = time.time() self._running = 1 outstr = 'event: ping\ndata: started\n\n' yield bytes(outstr, 'utf8') while self._running == 1 and not server_shutdown: try: # We use a short poll interval (less than a second) because # we need to catch `server_shutdown` going `True` as soon as # possible to exit this thread when the user hits `CTRL+C`. data = self._queue.get(True, self._poll_interval) except queue.Empty: # Not exact timing but close enough. self._time_between_pings += self._poll_interval if self._time_between_pings >= self._ping_interval: self._time_between_pings = 0 logger.debug("Sending ping/heartbeat event.") outstr = 'event: ping\ndata: 1\n\n' yield bytes(outstr, 'utf8') continue logger.debug("Sending pipeline status SSE.") outstr = (('event: %s\n' % data['type']) + ('id: %s\n' % data['id']) + ('data: %s\n\n' % json.dumps(data))) self._queue.task_done() yield bytes(outstr, 'utf8') def close(self): logger.debug("Closing pipeline status SSE.") self._proc_loop.removeObserver(self) self._running = 2 class _AssetProcessingInfo: def __init__(self, source): self.source = source self.paths = set() self.last_bake_time = time.time() class ProcessingLoopBase: def __init__(self, appfactory, out_dir): self.appfactory = appfactory self.out_dir = out_dir self.last_status_id = 0 self._app = None self._obs = [] self._obs_lock = threading.Lock() config_name = ( THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) self.config_path = os.path.join(appfactory.root_dir, config_name) def addObserver(self, obs): with self._obs_lock: self._obs.append(obs) def removeObserver(self, obs): with self._obs_lock: self._obs.remove(obs) def getApp(self): return self._app def initialize(self): self._app = self.appfactory.create() self.onInitialize() def onInitialize(self): pass def start(self): logger.info("Starting processing loop with output: %s" % self.out_dir) try: self.initialize() except Exception as ex: logger.error("Error initializing processing loop:") logger.exception(ex) return logger.debug("Doing initial processing loop bake...") self.runPipelines() self.onStart() def onStart(self): raise NotImplementedError() def getSources(self): for src in self._app.sources: if src.config.get('pipeline') != 'asset': continue yield src def runPipelines(self, only_for_source=None): try: self._doRunPipelines(only_for_source) except Exception as ex: logger.error("Error while running asset pipeline:") logger.exception(ex) def _doRunPipelines(self, only_for_source): from piecrust.baking.baker import Baker allowed_sources = None if only_for_source: allowed_sources = [only_for_source.name] baker = Baker( self.appfactory, self._app, self.out_dir, allowed_pipelines=['asset'], allowed_sources=allowed_sources, rotate_bake_records=False, keep_unused_records=(allowed_sources is not None)) records = baker.bake() self._onPipelinesRun(records) def _onPipelinesRun(self, records): self.last_status_id += 1 if records.success: for rec in records.records: changed = filter( lambda i: not i.was_collapsed_from_last_run, rec.getEntries()) changed = itertools.chain.from_iterable( map(lambda i: i.out_paths, changed)) changed = list(changed) item = { 'id': self.last_status_id, 'type': 'pipeline_success', 'assets': changed} self._notifyObservers(item) else: item = { 'id': self.last_status_id, 'type': 'pipeline_error', 'assets': []} for rec in records.records: for entry in rec.getEntries(): if entry.errors: asset_item = { 'path': entry.item_spec, 'errors': list(entry.errors)} item['assets'].append(asset_item) self._notifyObservers(item) def _notifyObservers(self, item): with self._obs_lock: observers = list(self._obs) for obs in observers: obs.addBuildEvent(item) try: from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler _has_watchdog = True except ImportError: _has_watchdog = False if _has_watchdog: class _AssetFileEventHandler(FileSystemEventHandler): def __init__(self, proc_loop, source): self._proc_loop = proc_loop self._source = source def on_any_event(self, event): if event.is_directory: return pl = self._proc_loop with pl._lock: pl._ops.append({ 'op': 'bake', 'source': self._source, 'path': event.src_path, 'change': event.event_type, 'time': time.time()}) pl._event.set() class _SiteConfigEventHandler(FileSystemEventHandler): def __init__(self, proc_loop, path): self._proc_loop = proc_loop self._path = path def on_modified(self, event): if event.src_path != self._path: return pl = self._proc_loop with pl._lock: pl._ops.append({'op': 'reinit'}) pl._event.set() class WatchdogProcessingLoop(ProcessingLoopBase): def __init__(self, appfactory, out_dir): ProcessingLoopBase.__init__(self, appfactory, out_dir) self._op_thread = threading.Thread( name='watchdog-operations', target=self._runOpThread, daemon=True) self._lock = threading.Lock() self._event = threading.Event() self._ops = [] self._last_op_time = 0 def onStart(self): logger.debug("Running watchdog monitor on:") observer = Observer() event_handler = _SiteConfigEventHandler(self, self.config_path) observer.schedule(event_handler, os.path.dirname(self.config_path)) logger.debug(" - %s" % self.config_path) for src in self.getSources(): path = getattr(src, 'fs_endpoint_path', None) if not path: logger.warn("Skipping source '%s' -- it doesn't have " "a file-system endpoint." % src.name) continue if not os.path.isdir(path): continue logger.debug(" - %s" % path) event_handler = _AssetFileEventHandler(self, src) observer.schedule(event_handler, path, recursive=True) observer.start() self._op_thread.start() def _runOpThread(self): while not server_shutdown: try: self._event.wait() with self._lock: ops = self._ops self._ops = [] self._event.clear() orig_len = len(ops) lot = self._last_op_time ops = list(filter(lambda o: o['time'] > lot, ops)) logger.debug("Got %d ops, with %d that happened after " "our last operation." % (orig_len, len(ops))) if len(ops) == 0: continue if any(filter(lambda o: o['op'] == 'reinit', ops)): logger.info("Site configuration changed, " "reloading pipeline.") self.initialize() self.runPipelines() continue sources = set() ops = list(filter(lambda o: o['op'] == 'bake', ops)) for op in ops: logger.info("Detected file-system change: " "%s [%s]" % (op['path'], op['change'])) sources.add(op['source']) logger.debug("Processing: %s" % [s.name for s in sources]) for s in sources: self.runPipelines(s) self._last_op_time = time.time() except (KeyboardInterrupt, SystemExit): break ProcessingLoop = WatchdogProcessingLoop else: class LegacyProcessingLoop(ProcessingLoopBase, threading.Thread): def __init__(self, appfactory, out_dir): ProcessingLoopBase.__init__(self, appfactory, out_dir) threading.Thread.__init__(self, name='pipeline-reloader', daemon=True) self.interval = 1 self._proc_infos = None self._last_config_mtime = 0 def onInitialize(self): self._proc_infos = {} for src in self.getSources(): procinfo = _AssetProcessingInfo(src) self._proc_infos[src.name] = procinfo # Build the list of initial asset files. for item in src.getAllContents(): procinfo.paths.add(item.spec) def onStart(self): self._last_config_mtime = os.path.getmtime(self.config_path) threading.Thread.start(self) def run(self): while True: cur_config_time = os.path.getmtime(self.config_path) if self._last_config_mtime < cur_config_time: logger.info("Site configuration changed, reloading pipeline.") self._last_config_mtime = cur_config_time self.initialize() self.runPipelines() continue for procinfo in self._proc_infos.values(): # For each assets folder we try to find the first new or # modified file. If any, we just run the pipeline on # that source. found_new_or_modified = False for item in procinfo.source.getAllContents(): path = item.spec if path not in procinfo.paths: logger.debug("Found new asset: %s" % path) procinfo.paths.add(path) found_new_or_modified = True break if os.path.getmtime(path) > procinfo.last_bake_time: logger.debug("Found modified asset: %s" % path) found_new_or_modified = True break if found_new_or_modified: logger.info("change detected, reprocessed '%s'." % procinfo.source.name) self.runPipelines(procinfo.source) procinfo.last_bake_time = time.time() time.sleep(self.interval) ProcessingLoop = LegacyProcessingLoop