comparison piecrust/serving/procloop.py @ 374:fa3ee8a8ee2d

serve: Split the server code in a couple modules inside a `serving` package. This makes the `serve` command's code a bit more removed from implementation details, and paves the way for the CMS mode.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 07 May 2015 21:37:38 -0700
parents
children 9612cfc6455a
comparison
equal deleted inserted replaced
373:9fb7c4921d75 374:fa3ee8a8ee2d
1 import os
2 import os.path
3 import time
4 import json
5 import queue
6 import logging
7 import threading
8
9
10 logger = logging.getLogger(__name__)
11
12
13 _sse_abort = threading.Event()
14
15
16 class PipelineStatusServerSideEventProducer(object):
17 def __init__(self, status_queue):
18 self.status_queue = status_queue
19 self.interval = 2
20 self.timeout = 60*10
21 self._start_time = 0
22
23 def run(self):
24 logger.debug("Starting pipeline status SSE.")
25 self._start_time = time.time()
26
27 outstr = 'event: ping\ndata: started\n\n'
28 yield bytes(outstr, 'utf8')
29
30 count = 0
31 while True:
32 if time.time() > self.timeout + self._start_time:
33 logger.debug("Closing pipeline status SSE, timeout reached.")
34 outstr = 'event: pipeline_timeout\ndata: bye\n\n'
35 yield bytes(outstr, 'utf8')
36 break
37
38 if _sse_abort.is_set():
39 break
40
41 try:
42 logger.debug("Polling pipeline status queue...")
43 count += 1
44 data = self.status_queue.get(True, self.interval)
45 except queue.Empty:
46 if count < 3:
47 continue
48 data = {'type': 'ping', 'message': 'ping'}
49 count = 0
50
51 event_type = data['type']
52 outstr = 'event: %s\ndata: %s\n\n' % (
53 event_type, json.dumps(data))
54 logger.debug("Sending pipeline status SSE.")
55 yield bytes(outstr, 'utf8')
56
57 def close(self):
58 logger.debug("Closing pipeline status SSE.")
59
60
61 class ProcessingLoop(threading.Thread):
62 def __init__(self, pipeline):
63 super(ProcessingLoop, self).__init__(
64 name='pipeline-reloader', daemon=True)
65 self.pipeline = pipeline
66 self.status_queue = queue.Queue()
67 self.interval = 1
68 self._paths = set()
69 self._record = None
70 self._last_bake = 0
71
72 def run(self):
73 # Build the first list of known files and run the pipeline once.
74 app = self.pipeline.app
75 roots = [os.path.join(app.root_dir, r)
76 for r in self.pipeline.mounts.keys()]
77 for root in roots:
78 for dirpath, dirnames, filenames in os.walk(root):
79 self._paths |= set([os.path.join(dirpath, f)
80 for f in filenames])
81 self._last_bake = time.time()
82 self._record = self.pipeline.run()
83
84 while True:
85 for root in roots:
86 # For each mount root we try to find the first new or
87 # modified file. If any, we just run the pipeline on
88 # that mount.
89 found_new_or_modified = False
90 for dirpath, dirnames, filenames in os.walk(root):
91 for filename in filenames:
92 path = os.path.join(dirpath, filename)
93 if path not in self._paths:
94 logger.debug("Found new asset: %s" % path)
95 self._paths.add(path)
96 found_new_or_modified = True
97 break
98 if os.path.getmtime(path) > self._last_bake:
99 logger.debug("Found modified asset: %s" % path)
100 found_new_or_modified = True
101 break
102
103 if found_new_or_modified:
104 break
105
106 if found_new_or_modified:
107 self._runPipeline(root)
108
109 time.sleep(self.interval)
110
111 def _runPipeline(self, root):
112 self._last_bake = time.time()
113 try:
114 self._record = self.pipeline.run(
115 root,
116 previous_record=self._record,
117 save_record=False)
118
119 # Update the status queue.
120 # (we need to clear it because there may not be a consumer
121 # on the other side, if the user isn't running with the
122 # debug window active)
123 while True:
124 try:
125 self.status_queue.get_nowait()
126 except queue.Empty:
127 break
128
129 if self._record.success:
130 item = {
131 'type': 'pipeline_success'}
132 self.status_queue.put_nowait(item)
133 else:
134 item = {
135 'type': 'pipeline_error',
136 'assets': []}
137 for entry in self._record.entries:
138 if entry.errors:
139 asset_item = {
140 'path': entry.rel_input,
141 'errors': list(entry.errors)}
142 item['assets'].append(asset_item)
143 self.status_queue.put_nowait(item)
144 except:
145 pass
146