Mercurial > piecrust2
comparison piecrust/tasks/base.py @ 1114:8af2ea1f5c34
tasks: Add new `tasks` command and infrastructure, with `mention` task.
* The new command lets `chef` run tasks from a queue.
* The webmention endpoint now adds a mention task.
* Moved mention handling code to a task runner.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 22 Feb 2018 22:12:45 -0800 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
1113:29c51b981c17 | 1114:8af2ea1f5c34 |
---|---|
1 import os | |
2 import os.path | |
3 import json | |
4 import time | |
5 import logging | |
6 from piecrust.chefutil import format_timed | |
7 | |
8 | |
9 TASKS_DIR = '_tasks' | |
10 | |
11 | |
12 logger = logging.getLogger(__name__) | |
13 | |
14 | |
15 class TaskContext: | |
16 def __init__(self): | |
17 pass | |
18 | |
19 | |
20 class TaskRunner: | |
21 TASK_TYPE = 'undefined' | |
22 | |
23 def __init__(self, app): | |
24 self.app = app | |
25 | |
26 def runTask(self, task_data, ctx): | |
27 raise NotImplementedError() | |
28 | |
29 | |
30 class TaskManager: | |
31 def __init__(self, app, *, time_threshold=1): | |
32 self.app = app | |
33 self.time_threshold = time_threshold | |
34 self._runners = None | |
35 | |
36 @property | |
37 def tasks_dir(self): | |
38 return os.path.join(self.app.root_dir, TASKS_DIR) | |
39 | |
40 def createTask(self, task_type, task_data): | |
41 from piecrust.pathutil import ensure_dir | |
42 | |
43 tasks_dir = self.tasks_dir | |
44 ensure_dir(tasks_dir) | |
45 new_task = { | |
46 'type': task_type, | |
47 'data': task_data} | |
48 task_id = str(int(time.time())) | |
49 task_path = os.path.join(tasks_dir, '%s.json' % task_id) | |
50 with open(task_path, 'w', encoding='utf8') as fp: | |
51 json.dump(new_task, fp) | |
52 return task_id | |
53 | |
54 def getTasks(self, *, only_task=None): | |
55 max_time = time.time() - self.time_threshold | |
56 tasks_dir = self.tasks_dir | |
57 try: | |
58 task_files = os.listdir(tasks_dir) | |
59 except (IOError, OSError): | |
60 task_files = [] | |
61 | |
62 for tf in task_files: | |
63 tfname, _ = os.path.splitext(tf) | |
64 if only_task and tfname != only_task: | |
65 continue | |
66 | |
67 tf_path = os.path.join(tasks_dir, tf) | |
68 task_time = os.path.getmtime(tf_path) | |
69 if task_time >= max_time: | |
70 logger.debug("Skipping task '%s' because it's too new." % tf) | |
71 continue | |
72 | |
73 with open(tf_path, 'r', encoding='utf8') as fp: | |
74 task_data = json.load(fp) | |
75 | |
76 task_type = task_data.get('task') | |
77 task_payload = task_data.get('data') | |
78 yield (tf_path, task_type, task_payload) | |
79 | |
80 def runQueue(self, *, only_task=None, clear_queue=True): | |
81 start_time = time.perf_counter() | |
82 | |
83 tasks = list(self.getTasks(only_task=only_task)) | |
84 for path, task_type, task_data in tasks: | |
85 if not task_type: | |
86 logger.error("Got task with no type: %s" % path) | |
87 continue | |
88 | |
89 runner = self._getRunner(task_type) | |
90 if runner is None: | |
91 logger.error("No task runner for type: %s" % task_type) | |
92 continue | |
93 | |
94 ctx = TaskContext() | |
95 runner.runTask(task_data, ctx) | |
96 | |
97 if clear_queue: | |
98 os.remove(path) | |
99 | |
100 logger.info(format_timed( | |
101 start_time, "Ran %d tasks." % len(tasks))) | |
102 | |
103 def _getRunner(self, task_type): | |
104 if self._runners is None: | |
105 self._runners = {} | |
106 for r in self.app.plugin_loader.getTaskRunners(): | |
107 self._runners[r.TASK_TYPE] = r(self.app) | |
108 | |
109 return self._runners.get(task_type) |