Mercurial > piecrust2
comparison piecrust/baking/scheduler.py @ 150:91dcbb5fe1e8
Split baking code in smaller files.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 30 Nov 2014 21:46:42 -0800 |
parents | |
children | 1187739e5a19 |
comparison
equal
deleted
inserted
replaced
149:ea4a17831242 | 150:91dcbb5fe1e8 |
---|---|
1 import logging | |
2 import threading | |
3 | |
4 | |
5 logger = logging.getLogger(__name__) | |
6 | |
7 | |
8 class BakeScheduler(object): | |
9 _EMPTY = object() | |
10 _WAIT = object() | |
11 | |
12 def __init__(self, record, jobs=None): | |
13 self.record = record | |
14 self.jobs = list(jobs) if jobs is not None else [] | |
15 self._active_jobs = [] | |
16 self._lock = threading.Lock() | |
17 self._added_event = threading.Event() | |
18 self._done_event = threading.Event() | |
19 | |
20 def addJob(self, job): | |
21 logger.debug("Queuing job '%s:%s'." % ( | |
22 job.factory.source.name, job.factory.rel_path)) | |
23 with self._lock: | |
24 self.jobs.append(job) | |
25 self._added_event.set() | |
26 | |
27 def onJobFinished(self, job): | |
28 logger.debug("Removing job '%s:%s'." % ( | |
29 job.factory.source.name, job.factory.rel_path)) | |
30 with self._lock: | |
31 self._active_jobs.remove(job) | |
32 self._done_event.set() | |
33 | |
34 def getNextJob(self, wait_timeout=None, empty_timeout=None): | |
35 self._added_event.clear() | |
36 self._done_event.clear() | |
37 job = self._doGetNextJob() | |
38 while job in (self._EMPTY, self._WAIT): | |
39 if job == self._EMPTY: | |
40 if empty_timeout is None: | |
41 return None | |
42 logger.debug("Waiting for a new job to be added...") | |
43 res = self._added_event.wait(empty_timeout) | |
44 elif job == self._WAIT: | |
45 if wait_timeout is None: | |
46 return None | |
47 logger.debug("Waiting for a job to be finished...") | |
48 res = self._done_event.wait(wait_timeout) | |
49 if not res: | |
50 logger.debug("Timed-out. No job found.") | |
51 return None | |
52 job = self._doGetNextJob() | |
53 return job | |
54 | |
55 def _doGetNextJob(self): | |
56 with self._lock: | |
57 if len(self.jobs) == 0: | |
58 return self._EMPTY | |
59 | |
60 job = self.jobs.pop(0) | |
61 first_job = job | |
62 while True: | |
63 ready, wait_on_src = self._isJobReady(job) | |
64 if ready: | |
65 break | |
66 | |
67 logger.debug("Job '%s:%s' isn't ready yet: waiting on pages " | |
68 "from source '%s' to finish baking." % | |
69 (job.factory.source.name, | |
70 job.factory.rel_path, wait_on_src)) | |
71 self.jobs.append(job) | |
72 job = self.jobs.pop(0) | |
73 if job == first_job: | |
74 # None of the jobs are ready... we need to wait. | |
75 self.jobs.append(job) | |
76 return self._WAIT | |
77 | |
78 logger.debug("Job '%s:%s' is ready to go, moving to active " | |
79 "queue." % (job.factory.source.name, job.factory.rel_path)) | |
80 self._active_jobs.append(job) | |
81 return job | |
82 | |
83 def _isJobReady(self, job): | |
84 e = self.record.getPreviousEntry(job.factory.source.name, | |
85 job.factory.rel_path) | |
86 if not e: | |
87 return (True, None) | |
88 for sn, rp in e.used_source_names: | |
89 if sn == job.factory.source.name: | |
90 continue | |
91 if any(filter(lambda j: j.factory.source.name == sn, self.jobs)): | |
92 return (False, sn) | |
93 if any(filter(lambda j: j.factory.source.name == sn, | |
94 self._active_jobs)): | |
95 return (False, sn) | |
96 return (True, None) | |
97 |