Mercurial > piecrust2
view piecrust/tasks/base.py @ 1188:a7c43131d871
bake: Fix file write flushing problem with Python 3.8+
Writing the cache files fails in Python 3.8 because it looks like flushing
behaviour has changed. We need to explicitly flush. And even then, in very
rare occurrences, it looks like it can still run into racing conditions,
so we do a very hacky and ugly "retry" loop when fetching cached data :(
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 15 Jun 2021 22:36:23 -0700 |
parents | 8af2ea1f5c34 |
children |
line wrap: on
line source
import os import os.path import json import time import logging from piecrust.chefutil import format_timed TASKS_DIR = '_tasks' logger = logging.getLogger(__name__) class TaskContext: def __init__(self): pass class TaskRunner: TASK_TYPE = 'undefined' def __init__(self, app): self.app = app def runTask(self, task_data, ctx): raise NotImplementedError() class TaskManager: def __init__(self, app, *, time_threshold=1): self.app = app self.time_threshold = time_threshold self._runners = None @property def tasks_dir(self): return os.path.join(self.app.root_dir, TASKS_DIR) def createTask(self, task_type, task_data): from piecrust.pathutil import ensure_dir tasks_dir = self.tasks_dir ensure_dir(tasks_dir) new_task = { 'type': task_type, 'data': task_data} task_id = str(int(time.time())) task_path = os.path.join(tasks_dir, '%s.json' % task_id) with open(task_path, 'w', encoding='utf8') as fp: json.dump(new_task, fp) return task_id def getTasks(self, *, only_task=None): max_time = time.time() - self.time_threshold tasks_dir = self.tasks_dir try: task_files = os.listdir(tasks_dir) except (IOError, OSError): task_files = [] for tf in task_files: tfname, _ = os.path.splitext(tf) if only_task and tfname != only_task: continue tf_path = os.path.join(tasks_dir, tf) task_time = os.path.getmtime(tf_path) if task_time >= max_time: logger.debug("Skipping task '%s' because it's too new." % tf) continue with open(tf_path, 'r', encoding='utf8') as fp: task_data = json.load(fp) task_type = task_data.get('task') task_payload = task_data.get('data') yield (tf_path, task_type, task_payload) def runQueue(self, *, only_task=None, clear_queue=True): start_time = time.perf_counter() tasks = list(self.getTasks(only_task=only_task)) for path, task_type, task_data in tasks: if not task_type: logger.error("Got task with no type: %s" % path) continue runner = self._getRunner(task_type) if runner is None: logger.error("No task runner for type: %s" % task_type) continue ctx = TaskContext() runner.runTask(task_data, ctx) if clear_queue: os.remove(path) logger.info(format_timed( start_time, "Ran %d tasks." % len(tasks))) def _getRunner(self, task_type): if self._runners is None: self._runners = {} for r in self.app.plugin_loader.getTaskRunners(): self._runners[r.TASK_TYPE] = r(self.app) return self._runners.get(task_type)