diff piecrust/tasks/base.py @ 1114:8af2ea1f5c34

tasks: Add new `tasks` command and infrastructure, with `mention` task. * The new command lets `chef` run tasks from a queue. * The webmention endpoint now adds a mention task. * Moved mention handling code to a task runner.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 22 Feb 2018 22:12:45 -0800
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/piecrust/tasks/base.py	Thu Feb 22 22:12:45 2018 -0800
@@ -0,0 +1,109 @@
+import os
+import os.path
+import json
+import time
+import logging
+from piecrust.chefutil import format_timed
+
+
+TASKS_DIR = '_tasks'
+
+
+logger = logging.getLogger(__name__)
+
+
+class TaskContext:
+    def __init__(self):
+        pass
+
+
+class TaskRunner:
+    TASK_TYPE = 'undefined'
+
+    def __init__(self, app):
+        self.app = app
+
+    def runTask(self, task_data, ctx):
+        raise NotImplementedError()
+
+
+class TaskManager:
+    def __init__(self, app, *, time_threshold=1):
+        self.app = app
+        self.time_threshold = time_threshold
+        self._runners = None
+
+    @property
+    def tasks_dir(self):
+        return os.path.join(self.app.root_dir, TASKS_DIR)
+
+    def createTask(self, task_type, task_data):
+        from piecrust.pathutil import ensure_dir
+
+        tasks_dir = self.tasks_dir
+        ensure_dir(tasks_dir)
+        new_task = {
+            'type': task_type,
+            'data': task_data}
+        task_id = str(int(time.time()))
+        task_path = os.path.join(tasks_dir, '%s.json' % task_id)
+        with open(task_path, 'w', encoding='utf8') as fp:
+            json.dump(new_task, fp)
+        return task_id
+
+    def getTasks(self, *, only_task=None):
+        max_time = time.time() - self.time_threshold
+        tasks_dir = self.tasks_dir
+        try:
+            task_files = os.listdir(tasks_dir)
+        except (IOError, OSError):
+            task_files = []
+
+        for tf in task_files:
+            tfname, _ = os.path.splitext(tf)
+            if only_task and tfname != only_task:
+                continue
+
+            tf_path = os.path.join(tasks_dir, tf)
+            task_time = os.path.getmtime(tf_path)
+            if task_time >= max_time:
+                logger.debug("Skipping task '%s' because it's too new." % tf)
+                continue
+
+            with open(tf_path, 'r', encoding='utf8') as fp:
+                task_data = json.load(fp)
+
+            task_type = task_data.get('task')
+            task_payload = task_data.get('data')
+            yield (tf_path, task_type, task_payload)
+
+    def runQueue(self, *, only_task=None, clear_queue=True):
+        start_time = time.perf_counter()
+
+        tasks = list(self.getTasks(only_task=only_task))
+        for path, task_type, task_data in tasks:
+            if not task_type:
+                logger.error("Got task with no type: %s" % path)
+                continue
+
+            runner = self._getRunner(task_type)
+            if runner is None:
+                logger.error("No task runner for type: %s" % task_type)
+                continue
+
+            ctx = TaskContext()
+            runner.runTask(task_data, ctx)
+
+            if clear_queue:
+                os.remove(path)
+
+        logger.info(format_timed(
+            start_time, "Ran %d tasks." % len(tasks)))
+
+    def _getRunner(self, task_type):
+        if self._runners is None:
+            self._runners = {}
+            for r in self.app.plugin_loader.getTaskRunners():
+                self._runners[r.TASK_TYPE] = r(self.app)
+
+        return self._runners.get(task_type)