comparison piecrust/tasks/base.py @ 1114:8af2ea1f5c34

tasks: Add new `tasks` command and infrastructure, with `mention` task. * The new command lets `chef` run tasks from a queue. * The webmention endpoint now adds a mention task. * Moved mention handling code to a task runner.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 22 Feb 2018 22:12:45 -0800
parents
children
comparison
equal deleted inserted replaced
1113:29c51b981c17 1114:8af2ea1f5c34
1 import os
2 import os.path
3 import json
4 import time
5 import logging
6 from piecrust.chefutil import format_timed
7
8
9 TASKS_DIR = '_tasks'
10
11
12 logger = logging.getLogger(__name__)
13
14
15 class TaskContext:
16 def __init__(self):
17 pass
18
19
20 class TaskRunner:
21 TASK_TYPE = 'undefined'
22
23 def __init__(self, app):
24 self.app = app
25
26 def runTask(self, task_data, ctx):
27 raise NotImplementedError()
28
29
30 class TaskManager:
31 def __init__(self, app, *, time_threshold=1):
32 self.app = app
33 self.time_threshold = time_threshold
34 self._runners = None
35
36 @property
37 def tasks_dir(self):
38 return os.path.join(self.app.root_dir, TASKS_DIR)
39
40 def createTask(self, task_type, task_data):
41 from piecrust.pathutil import ensure_dir
42
43 tasks_dir = self.tasks_dir
44 ensure_dir(tasks_dir)
45 new_task = {
46 'type': task_type,
47 'data': task_data}
48 task_id = str(int(time.time()))
49 task_path = os.path.join(tasks_dir, '%s.json' % task_id)
50 with open(task_path, 'w', encoding='utf8') as fp:
51 json.dump(new_task, fp)
52 return task_id
53
54 def getTasks(self, *, only_task=None):
55 max_time = time.time() - self.time_threshold
56 tasks_dir = self.tasks_dir
57 try:
58 task_files = os.listdir(tasks_dir)
59 except (IOError, OSError):
60 task_files = []
61
62 for tf in task_files:
63 tfname, _ = os.path.splitext(tf)
64 if only_task and tfname != only_task:
65 continue
66
67 tf_path = os.path.join(tasks_dir, tf)
68 task_time = os.path.getmtime(tf_path)
69 if task_time >= max_time:
70 logger.debug("Skipping task '%s' because it's too new." % tf)
71 continue
72
73 with open(tf_path, 'r', encoding='utf8') as fp:
74 task_data = json.load(fp)
75
76 task_type = task_data.get('task')
77 task_payload = task_data.get('data')
78 yield (tf_path, task_type, task_payload)
79
80 def runQueue(self, *, only_task=None, clear_queue=True):
81 start_time = time.perf_counter()
82
83 tasks = list(self.getTasks(only_task=only_task))
84 for path, task_type, task_data in tasks:
85 if not task_type:
86 logger.error("Got task with no type: %s" % path)
87 continue
88
89 runner = self._getRunner(task_type)
90 if runner is None:
91 logger.error("No task runner for type: %s" % task_type)
92 continue
93
94 ctx = TaskContext()
95 runner.runTask(task_data, ctx)
96
97 if clear_queue:
98 os.remove(path)
99
100 logger.info(format_timed(
101 start_time, "Ran %d tasks." % len(tasks)))
102
103 def _getRunner(self, task_type):
104 if self._runners is None:
105 self._runners = {}
106 for r in self.app.plugin_loader.getTaskRunners():
107 self._runners[r.TASK_TYPE] = r(self.app)
108
109 return self._runners.get(task_type)