Mercurial > piecrust2
comparison piecrust/serving/procloop.py @ 374:fa3ee8a8ee2d
serve: Split the server code in a couple modules inside a `serving` package.
This makes the `serve` command's code a bit more removed from implementation
details, and paves the way for the CMS mode.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 07 May 2015 21:37:38 -0700 |
parents | |
children | 9612cfc6455a |
comparison
equal
deleted
inserted
replaced
373:9fb7c4921d75 | 374:fa3ee8a8ee2d |
---|---|
1 import os | |
2 import os.path | |
3 import time | |
4 import json | |
5 import queue | |
6 import logging | |
7 import threading | |
8 | |
9 | |
10 logger = logging.getLogger(__name__) | |
11 | |
12 | |
13 _sse_abort = threading.Event() | |
14 | |
15 | |
16 class PipelineStatusServerSideEventProducer(object): | |
17 def __init__(self, status_queue): | |
18 self.status_queue = status_queue | |
19 self.interval = 2 | |
20 self.timeout = 60*10 | |
21 self._start_time = 0 | |
22 | |
23 def run(self): | |
24 logger.debug("Starting pipeline status SSE.") | |
25 self._start_time = time.time() | |
26 | |
27 outstr = 'event: ping\ndata: started\n\n' | |
28 yield bytes(outstr, 'utf8') | |
29 | |
30 count = 0 | |
31 while True: | |
32 if time.time() > self.timeout + self._start_time: | |
33 logger.debug("Closing pipeline status SSE, timeout reached.") | |
34 outstr = 'event: pipeline_timeout\ndata: bye\n\n' | |
35 yield bytes(outstr, 'utf8') | |
36 break | |
37 | |
38 if _sse_abort.is_set(): | |
39 break | |
40 | |
41 try: | |
42 logger.debug("Polling pipeline status queue...") | |
43 count += 1 | |
44 data = self.status_queue.get(True, self.interval) | |
45 except queue.Empty: | |
46 if count < 3: | |
47 continue | |
48 data = {'type': 'ping', 'message': 'ping'} | |
49 count = 0 | |
50 | |
51 event_type = data['type'] | |
52 outstr = 'event: %s\ndata: %s\n\n' % ( | |
53 event_type, json.dumps(data)) | |
54 logger.debug("Sending pipeline status SSE.") | |
55 yield bytes(outstr, 'utf8') | |
56 | |
57 def close(self): | |
58 logger.debug("Closing pipeline status SSE.") | |
59 | |
60 | |
61 class ProcessingLoop(threading.Thread): | |
62 def __init__(self, pipeline): | |
63 super(ProcessingLoop, self).__init__( | |
64 name='pipeline-reloader', daemon=True) | |
65 self.pipeline = pipeline | |
66 self.status_queue = queue.Queue() | |
67 self.interval = 1 | |
68 self._paths = set() | |
69 self._record = None | |
70 self._last_bake = 0 | |
71 | |
72 def run(self): | |
73 # Build the first list of known files and run the pipeline once. | |
74 app = self.pipeline.app | |
75 roots = [os.path.join(app.root_dir, r) | |
76 for r in self.pipeline.mounts.keys()] | |
77 for root in roots: | |
78 for dirpath, dirnames, filenames in os.walk(root): | |
79 self._paths |= set([os.path.join(dirpath, f) | |
80 for f in filenames]) | |
81 self._last_bake = time.time() | |
82 self._record = self.pipeline.run() | |
83 | |
84 while True: | |
85 for root in roots: | |
86 # For each mount root we try to find the first new or | |
87 # modified file. If any, we just run the pipeline on | |
88 # that mount. | |
89 found_new_or_modified = False | |
90 for dirpath, dirnames, filenames in os.walk(root): | |
91 for filename in filenames: | |
92 path = os.path.join(dirpath, filename) | |
93 if path not in self._paths: | |
94 logger.debug("Found new asset: %s" % path) | |
95 self._paths.add(path) | |
96 found_new_or_modified = True | |
97 break | |
98 if os.path.getmtime(path) > self._last_bake: | |
99 logger.debug("Found modified asset: %s" % path) | |
100 found_new_or_modified = True | |
101 break | |
102 | |
103 if found_new_or_modified: | |
104 break | |
105 | |
106 if found_new_or_modified: | |
107 self._runPipeline(root) | |
108 | |
109 time.sleep(self.interval) | |
110 | |
111 def _runPipeline(self, root): | |
112 self._last_bake = time.time() | |
113 try: | |
114 self._record = self.pipeline.run( | |
115 root, | |
116 previous_record=self._record, | |
117 save_record=False) | |
118 | |
119 # Update the status queue. | |
120 # (we need to clear it because there may not be a consumer | |
121 # on the other side, if the user isn't running with the | |
122 # debug window active) | |
123 while True: | |
124 try: | |
125 self.status_queue.get_nowait() | |
126 except queue.Empty: | |
127 break | |
128 | |
129 if self._record.success: | |
130 item = { | |
131 'type': 'pipeline_success'} | |
132 self.status_queue.put_nowait(item) | |
133 else: | |
134 item = { | |
135 'type': 'pipeline_error', | |
136 'assets': []} | |
137 for entry in self._record.entries: | |
138 if entry.errors: | |
139 asset_item = { | |
140 'path': entry.rel_input, | |
141 'errors': list(entry.errors)} | |
142 item['assets'].append(asset_item) | |
143 self.status_queue.put_nowait(item) | |
144 except: | |
145 pass | |
146 |