view piecrust/serving/procloop.py @ 1131:32f71dbf5cb1

Added tag 3.1.1 for changeset 71627dd7d2f1
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 28 Feb 2018 20:44:23 -0800
parents 292e3a1316d8
children
line wrap: on
line source

import os
import os.path
import time
import json
import queue
import logging
import itertools
import threading
from piecrust import CONFIG_PATH, THEME_CONFIG_PATH
from piecrust.chefutil import format_timed_scope
from piecrust.pipelines.records import MultiRecord


logger = logging.getLogger(__name__)

# This flag is for cancelling all long running requests like SSEs.
server_shutdown = False


class PipelineStatusServerSentEventProducer(object):
    """ The producer for Server-Sent Events (SSE) notifying the front-end
        about useful things like assets having been re-processed in the
        background.
        Each has its own queue because the user could have multiple pages
        open, each having to display notifications coming from the server.
    """
    def __init__(self, proc_loop):
        self._proc_loop = proc_loop
        self._queue = queue.Queue()
        self._start_time = 0
        self._poll_interval = 0.5
        self._ping_interval = 30
        self._time_between_pings = 0
        self._running = 0

    def addBuildEvent(self, item):
        self._queue.put_nowait(item)

    def run(self):
        logger.debug("Starting pipeline status SSE.")
        self._proc_loop.addObserver(self)
        self._start_time = time.time()
        self._running = 1

        outstr = 'event: ping\ndata: started\n\n'
        yield bytes(outstr, 'utf8')

        while self._running == 1 and not server_shutdown:
            try:
                # We use a short poll interval (less than a second) because
                # we need to catch `server_shutdown` going `True` as soon as
                # possible to exit this thread when the user hits `CTRL+C`.
                data = self._queue.get(True, self._poll_interval)
            except queue.Empty:
                # Not exact timing but close enough.
                self._time_between_pings += self._poll_interval
                if self._time_between_pings >= self._ping_interval:
                    self._time_between_pings = 0
                    logger.debug("Sending ping/heartbeat event.")
                    outstr = 'event: ping\ndata: 1\n\n'
                    yield bytes(outstr, 'utf8')
                continue

            logger.debug("Sending pipeline status SSE.")
            outstr = (('event: %s\n' % data['type']) +
                      ('id: %s\n' % data['id']) +
                      ('data: %s\n\n' % json.dumps(data)))
            self._queue.task_done()
            yield bytes(outstr, 'utf8')

    def close(self):
        logger.debug("Closing pipeline status SSE.")
        self._proc_loop.removeObserver(self)
        self._running = 2


class _AssetProcessingInfo:
    def __init__(self, source):
        self.source = source
        self.paths = set()
        self.last_bake_time = time.time()


class ProcessingLoopBase:
    def __init__(self, appfactory, out_dir):
        self.appfactory = appfactory
        self.out_dir = out_dir
        self.last_status_id = 0
        self._app = None
        self._obs = []
        self._obs_lock = threading.Lock()
        config_name = (
            THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
        self.config_path = os.path.join(appfactory.root_dir, config_name)

    def addObserver(self, obs):
        with self._obs_lock:
            self._obs.append(obs)

    def removeObserver(self, obs):
        with self._obs_lock:
            self._obs.remove(obs)

    def getApp(self):
        return self._app

    def initialize(self):
        self._app = self.appfactory.create()
        self.onInitialize()

    def onInitialize(self):
        pass

    def start(self):
        logger.info("Starting processing loop with output: %s" %
                    self.out_dir)
        try:
            self.initialize()
        except Exception as ex:
            logger.error("Error initializing processing loop:")
            logger.exception(ex)
            return

        logger.debug("Doing initial processing loop bake...")
        self.runPipelines()

        self.onStart()

    def onStart(self):
        raise NotImplementedError()

    def getSources(self):
        for src in self._app.sources:
            if src.config.get('pipeline') != 'asset':
                continue
            yield src

    def runPipelines(self, only_for_source=None):
        try:
            self._doRunPipelines(only_for_source)
        except Exception as ex:
            logger.error("Error while running asset pipeline:")
            logger.exception(ex)

    def _doRunPipelines(self, only_for_source):
        from piecrust.baking.baker import Baker

        allowed_sources = None
        if only_for_source:
            allowed_sources = [only_for_source.name]
        baker = Baker(
            self.appfactory, self._app, self.out_dir,
            allowed_pipelines=['asset'],
            allowed_sources=allowed_sources,
            rotate_bake_records=False,
            keep_unused_records=(allowed_sources is not None))
        records = baker.bake()

        self._onPipelinesRun(records)

    def _onPipelinesRun(self, records):
        self.last_status_id += 1

        if records.success:
            for rec in records.records:
                changed = filter(
                    lambda i: not i.was_collapsed_from_last_run,
                    rec.getEntries())
                changed = itertools.chain.from_iterable(
                    map(lambda i: i.out_paths, changed))
                changed = list(changed)
                item = {
                    'id': self.last_status_id,
                    'type': 'pipeline_success',
                    'assets': changed}

                self._notifyObservers(item)
        else:
            item = {
                'id': self.last_status_id,
                'type': 'pipeline_error',
                'assets': []}
            for rec in records.records:
                for entry in rec.getEntries():
                    if entry.errors:
                        asset_item = {
                            'path': entry.item_spec,
                            'errors': list(entry.errors)}
                        item['assets'].append(asset_item)

                self._notifyObservers(item)

    def _notifyObservers(self, item):
        with self._obs_lock:
            observers = list(self._obs)
        for obs in observers:
            obs.addBuildEvent(item)


try:
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    _has_watchdog = True
except ImportError:
    _has_watchdog = False


if _has_watchdog:
    class _AssetFileEventHandler(FileSystemEventHandler):
        def __init__(self, proc_loop, source):
            self._proc_loop = proc_loop
            self._source = source

        def on_any_event(self, event):
            if event.is_directory:
                return

            pl = self._proc_loop
            with pl._lock:
                pl._ops.append({
                    'op': 'bake',
                    'source': self._source,
                    'path': event.src_path,
                    'change': event.event_type,
                    'time': time.time()})
                pl._event.set()


    class _SiteConfigEventHandler(FileSystemEventHandler):
        def __init__(self, proc_loop, path):
            self._proc_loop = proc_loop
            self._path = path

        def on_modified(self, event):
            if event.src_path != self._path:
                return

            pl = self._proc_loop
            with pl._lock:
                pl._ops.append({'op': 'reinit', 'time': time.time()})
                pl._event.set()


    class WatchdogProcessingLoop(ProcessingLoopBase):
        def __init__(self, appfactory, out_dir):
            ProcessingLoopBase.__init__(self, appfactory, out_dir)
            self._op_thread = threading.Thread(
                name='watchdog-operations',
                target=self._runOpThread,
                daemon=True)
            self._lock = threading.Lock()
            self._event = threading.Event()
            self._ops = []
            self._last_op_time = 0

        def onStart(self):
            logger.debug("Running watchdog monitor on:")
            observer = Observer()

            event_handler = _SiteConfigEventHandler(self, self.config_path)
            observer.schedule(event_handler, os.path.dirname(self.config_path))
            logger.debug(" - %s" % self.config_path)

            for src in self.getSources():
                path = getattr(src, 'fs_endpoint_path', None)
                if not path:
                    logger.warn("Skipping source '%s' -- it doesn't have "
                                "a file-system endpoint." % src.name)
                    continue
                if not os.path.isdir(path):
                    continue

                logger.debug(" - %s" % path)
                event_handler = _AssetFileEventHandler(self, src)
                observer.schedule(event_handler, path, recursive=True)

            observer.start()
            self._op_thread.start()

        def _runOpThread(self):
            while not server_shutdown:
                try:
                    self._event.wait()
                    with self._lock:
                        ops = self._ops
                        self._ops = []
                        self._event.clear()

                    orig_len = len(ops)
                    lot = self._last_op_time
                    ops = list(filter(lambda o: o['time'] > lot, ops))
                    logger.debug("Got %d ops, with %d that happened after "
                                 "our last operation." % (orig_len, len(ops)))
                    if len(ops) == 0:
                        continue

                    if any(filter(lambda o: o['op'] == 'reinit', ops)):
                        logger.info("Site configuration changed, "
                                    "reloading pipeline.")
                        self.initialize()
                        self.runPipelines()
                        continue

                    sources = set()
                    ops = list(filter(lambda o: o['op'] == 'bake', ops))
                    for op in ops:
                        logger.info("Detected file-system change: "
                                    "%s [%s]" %
                                    (op['path'], op['change']))
                        sources.add(op['source'])

                    logger.debug("Processing: %s" % [s.name for s in sources])
                    for s in sources:
                        self.runPipelines(s)

                    self._last_op_time = time.time()

                except (KeyboardInterrupt, SystemExit):
                    break

    ProcessingLoop = WatchdogProcessingLoop

else:
    class LegacyProcessingLoop(ProcessingLoopBase, threading.Thread):
        def __init__(self, appfactory, out_dir):
            ProcessingLoopBase.__init__(self, appfactory, out_dir)
            threading.Thread.__init__(self, name='pipeline-reloader',
                                      daemon=True)
            self.interval = 1
            self._proc_infos = None
            self._last_config_mtime = 0

        def onInitialize(self):
            self._proc_infos = {}
            for src in self.getSources():
                procinfo = _AssetProcessingInfo(src)
                self._proc_infos[src.name] = procinfo

                # Build the list of initial asset files.
                for item in src.getAllContents():
                    procinfo.paths.add(item.spec)

        def onStart(self):
            self._last_config_mtime = os.path.getmtime(self.config_path)
            threading.Thread.start(self)

        def run(self):
            while True:
                cur_config_time = os.path.getmtime(self.config_path)
                if self._last_config_mtime < cur_config_time:
                    logger.info("Site configuration changed, reloading pipeline.")
                    self._last_config_mtime = cur_config_time
                    self.initialize()
                    self.runPipelines()
                    continue

                for procinfo in self._proc_infos.values():
                    # For each assets folder we try to find the first new or
                    # modified file. If any, we just run the pipeline on
                    # that source.
                    found_new_or_modified = False
                    for item in procinfo.source.getAllContents():
                        path = item.spec
                        if path not in procinfo.paths:
                            logger.debug("Found new asset: %s" % path)
                            procinfo.paths.add(path)
                            found_new_or_modified = True
                            break
                        if os.path.getmtime(path) > procinfo.last_bake_time:
                            logger.debug("Found modified asset: %s" % path)
                            found_new_or_modified = True
                            break
                    if found_new_or_modified:
                        logger.info("change detected, reprocessed '%s'." %
                                    procinfo.source.name)
                        self.runPipelines(procinfo.source)
                        procinfo.last_bake_time = time.time()

                time.sleep(self.interval)

    ProcessingLoop = LegacyProcessingLoop