Mercurial > piecrust2
comparison piecrust/serving/procloop.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | c2ea75e37540 |
children | 448710d84121 |
comparison
equal
deleted
inserted
replaced
853:f070a4fc033c | 854:08e02c2a2a1a |
---|---|
5 import queue | 5 import queue |
6 import logging | 6 import logging |
7 import itertools | 7 import itertools |
8 import threading | 8 import threading |
9 from piecrust import CONFIG_PATH, THEME_CONFIG_PATH | 9 from piecrust import CONFIG_PATH, THEME_CONFIG_PATH |
10 from piecrust.app import PieCrust | 10 from piecrust.pipelines.base import ( |
11 from piecrust.processing.pipeline import ProcessorPipeline | 11 PipelineJobCreateContext, PipelineJobRunContext, PipelineJobResult, |
12 PipelineManager) | |
13 from piecrust.pipelines.records import ( | |
14 MultiRecord, MultiRecordHistory) | |
12 | 15 |
13 | 16 |
14 logger = logging.getLogger(__name__) | 17 logger = logging.getLogger(__name__) |
15 | 18 |
16 # This flag is for cancelling all long running requests like SSEs. | 19 # This flag is for cancelling all long running requests like SSEs. |
72 logger.debug("Closing pipeline status SSE.") | 75 logger.debug("Closing pipeline status SSE.") |
73 self._proc_loop.removeObserver(self) | 76 self._proc_loop.removeObserver(self) |
74 self._running = 2 | 77 self._running = 2 |
75 | 78 |
76 | 79 |
80 class _AssetProcessingInfo: | |
81 def __init__(self, source): | |
82 self.source = source | |
83 self.paths = set() | |
84 self.last_bake_time = time.time() | |
85 | |
86 | |
77 class ProcessingLoop(threading.Thread): | 87 class ProcessingLoop(threading.Thread): |
78 def __init__(self, appfactory, out_dir): | 88 def __init__(self, appfactory, out_dir): |
79 super(ProcessingLoop, self).__init__( | 89 super().__init__(name='pipeline-reloader', daemon=True) |
80 name='pipeline-reloader', daemon=True) | |
81 self.appfactory = appfactory | 90 self.appfactory = appfactory |
82 self.out_dir = out_dir | 91 self.out_dir = out_dir |
83 self.last_status_id = 0 | 92 self.last_status_id = 0 |
84 self.interval = 1 | 93 self.interval = 1 |
85 self.app = None | 94 self._app = None |
86 self._roots = [] | 95 self._proc_infos = None |
87 self._monitor_assets_root = False | 96 self._last_records = None |
88 self._paths = set() | |
89 self._record = None | |
90 self._last_bake = 0 | |
91 self._last_config_mtime = 0 | 97 self._last_config_mtime = 0 |
92 self._obs = [] | 98 self._obs = [] |
93 self._obs_lock = threading.Lock() | 99 self._obs_lock = threading.Lock() |
94 config_name = ( | 100 config_name = ( |
95 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) | 101 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) |
96 self._config_path = os.path.join(appfactory.root_dir, config_name) | 102 self._config_path = os.path.join(appfactory.root_dir, config_name) |
97 | 103 |
98 def addObserver(self, obs): | 104 def addObserver(self, obs): |
99 with self._obs_lock: | 105 with self._obs_lock: |
100 self._obs.append(obs) | 106 self._obs.append(obs) |
102 def removeObserver(self, obs): | 108 def removeObserver(self, obs): |
103 with self._obs_lock: | 109 with self._obs_lock: |
104 self._obs.remove(obs) | 110 self._obs.remove(obs) |
105 | 111 |
106 def run(self): | 112 def run(self): |
107 self._initPipeline() | 113 self._init() |
108 | 114 |
109 self._last_bake = time.time() | |
110 self._last_config_mtime = os.path.getmtime(self._config_path) | 115 self._last_config_mtime = os.path.getmtime(self._config_path) |
111 self._record = self.pipeline.run() | |
112 | 116 |
113 while True: | 117 while True: |
114 cur_config_time = os.path.getmtime(self._config_path) | 118 cur_config_time = os.path.getmtime(self._config_path) |
115 if self._last_config_mtime < cur_config_time: | 119 if self._last_config_mtime < cur_config_time: |
116 logger.info("Site configuration changed, reloading pipeline.") | 120 logger.info("Site configuration changed, reloading pipeline.") |
117 self._last_config_mtime = cur_config_time | 121 self._last_config_mtime = cur_config_time |
118 self._initPipeline() | 122 self._init() |
119 for root in self._roots: | 123 self._runPipelines() |
120 self._runPipeline(root) | 124 continue |
121 continue | 125 |
122 | 126 for procinfo in self._proc_infos: |
123 if self._monitor_assets_root: | 127 # For each assets folder we try to find the first new or |
124 assets_dir = os.path.join(self.app.root_dir, 'assets') | |
125 if os.path.isdir(assets_dir): | |
126 logger.info("Assets directory was created, reloading " | |
127 "pipeline.") | |
128 self._initPipeline() | |
129 self._runPipeline(assets_dir) | |
130 continue | |
131 | |
132 for root in self._roots: | |
133 # For each mount root we try to find the first new or | |
134 # modified file. If any, we just run the pipeline on | 128 # modified file. If any, we just run the pipeline on |
135 # that mount. | 129 # that source. |
136 found_new_or_modified = False | 130 found_new_or_modified = False |
137 for dirpath, dirnames, filenames in os.walk(root): | 131 for item in procinfo.source.getAllContents(): |
138 for filename in filenames: | 132 path = item.spec |
139 path = os.path.join(dirpath, filename) | 133 if path not in procinfo.paths: |
140 if path not in self._paths: | 134 logger.debug("Found new asset: %s" % path) |
141 logger.debug("Found new asset: %s" % path) | 135 procinfo.paths.add(path) |
142 self._paths.add(path) | 136 found_new_or_modified = True |
143 found_new_or_modified = True | |
144 break | |
145 if os.path.getmtime(path) > self._last_bake: | |
146 logger.debug("Found modified asset: %s" % path) | |
147 found_new_or_modified = True | |
148 break | |
149 | |
150 if found_new_or_modified: | |
151 break | 137 break |
152 | 138 if os.path.getmtime(path) > procinfo.last_bake_time: |
139 logger.debug("Found modified asset: %s" % path) | |
140 found_new_or_modified = True | |
141 break | |
153 if found_new_or_modified: | 142 if found_new_or_modified: |
154 self._runPipeline(root) | 143 self._runPipeline(procinfo) |
155 | 144 |
156 time.sleep(self.interval) | 145 time.sleep(self.interval) |
157 | 146 |
158 def _initPipeline(self): | 147 def _init(self): |
159 # Create the app and pipeline. | 148 self._app = self.appfactory.create() |
160 self.app = self.appfactory.create() | 149 self._last_records = MultiRecord() |
161 self.pipeline = ProcessorPipeline(self.app, self.out_dir) | 150 |
162 | 151 self._proc_infos = [] |
163 # Get the list of assets directories. | 152 for src in self._app.sources: |
164 self._roots = list(self.pipeline.mounts.keys()) | 153 if src.config['pipeline'] != 'asset': |
165 | 154 continue |
166 # The 'assets' folder may not be in the mounts list if it doesn't | 155 |
167 # exist yet, but we want to monitor for when the user creates it. | 156 procinfo = _AssetProcessingInfo(src) |
168 default_root = os.path.join(self.app.root_dir, 'assets') | 157 self._proc_infos.append(procinfo) |
169 self._monitor_assets_root = (default_root not in self._roots) | 158 |
170 | 159 # Build the list of initial asset files. |
171 # Build the list of initial asset files. | 160 for item in src.getAllContents(): |
172 self._paths = set() | 161 procinfo.paths.add(item.spec) |
173 for root in self._roots: | 162 |
174 for dirpath, dirnames, filenames in os.walk(root): | 163 def _runPipelines(self): |
175 self._paths |= set([os.path.join(dirpath, f) | 164 record_histories = MultiRecordHistory(MultiRecord(), self._records) |
176 for f in filenames]) | 165 self._ppmngr = PipelineManager( |
177 | 166 self._app, self.out_dir, record_histories) |
178 def _runPipeline(self, root): | 167 |
179 self._last_bake = time.time() | 168 # Create the pipelines, but also remember some stuff for what |
180 try: | 169 # we want to do. |
181 self._record = self.pipeline.run( | 170 for src in self._app.sources: |
182 root, | 171 if src.config['pipeline'] != 'asset': |
183 previous_record=self._record, | 172 continue |
184 save_record=False) | 173 |
185 | 174 ppinfo = self._ppmngr.createPipeline(src) |
186 status_id = self.last_status_id + 1 | 175 api = _AssetProcessingInfo() |
187 self.last_status_id += 1 | 176 ppinfo.userdata = api |
188 | 177 |
189 if self._record.success: | 178 current_records = MultiRecord() |
190 changed = filter( | 179 record_histories = MultiRecordHistory( |
191 lambda i: not i.was_collapsed_from_last_run, | 180 self._records, current_records) |
192 self._record.entries) | 181 |
193 changed = itertools.chain.from_iterable( | 182 for ppinfo, procinfo in self._pipelines: |
194 map(lambda i: i.rel_outputs, changed)) | 183 self._runPipeline(ppinfo, procinfo, record_histories) |
195 changed = list(changed) | 184 |
196 item = { | 185 status_id = self.last_status_id + 1 |
197 'id': status_id, | 186 self.last_status_id += 1 |
198 'type': 'pipeline_success', | 187 |
199 'assets': changed} | 188 if self._records.success: |
200 | 189 changed = filter( |
201 self._notifyObservers(item) | 190 lambda i: not i.was_collapsed_from_last_run, |
202 else: | 191 self._record.entries) |
203 item = { | 192 changed = itertools.chain.from_iterable( |
204 'id': status_id, | 193 map(lambda i: i.rel_outputs, changed)) |
205 'type': 'pipeline_error', | 194 changed = list(changed) |
206 'assets': []} | 195 item = { |
207 for entry in self._record.entries: | 196 'id': status_id, |
208 if entry.errors: | 197 'type': 'pipeline_success', |
209 asset_item = { | 198 'assets': changed} |
210 'path': entry.path, | 199 |
211 'errors': list(entry.errors)} | 200 self._notifyObservers(item) |
212 item['assets'].append(asset_item) | 201 else: |
213 | 202 item = { |
214 self._notifyObservers(item) | 203 'id': status_id, |
215 except Exception as ex: | 204 'type': 'pipeline_error', |
216 logger.exception(ex) | 205 'assets': []} |
206 for entry in self._record.entries: | |
207 if entry.errors: | |
208 asset_item = { | |
209 'path': entry.path, | |
210 'errors': list(entry.errors)} | |
211 item['assets'].append(asset_item) | |
212 | |
213 self._notifyObservers(item) | |
214 | |
215 def _runPipeline(self, procinfo): | |
216 procinfo.last_bake_time = time.time() | |
217 | |
218 src = procinfo.source | |
219 | |
220 current_records = MultiRecord() | |
221 record_histories = MultiRecordHistory( | |
222 self._last_records, current_records) | |
223 ppmngr = PipelineManager( | |
224 self._app, self.out_dir, record_histories) | |
225 ppinfo = ppmngr.createPipeline(src) | |
226 | |
227 logger.debug("Running pipeline '%s' on: %s" % | |
228 (ppinfo.pipeline_name, src.name)) | |
229 | |
230 # Process all items in the source. | |
231 pp = ppinfo.pipeline | |
232 cr = ppinfo.record_history.current | |
233 jobctx = PipelineJobCreateContext(src) | |
234 for item in src.getAllContents(): | |
235 job = pp.createJob(item, jobctx) | |
236 | |
237 ppres = PipelineJobResult() | |
238 ppres.record_entry = pp.createRecordEntry(job) | |
239 | |
240 runctx = PipelineJobRunContext( | |
241 ppinfo.pipeline_ctx, job, record_histories) | |
242 try: | |
243 pp.run(item, runctx, ppres) | |
244 except Exception as e: | |
245 ppres.record_entry.errors.append(str(e)) | |
246 | |
247 if ppres.next_pass_job is not None: | |
248 logger.error("The processing loop for the server " | |
249 "doesn't support multi-pass pipelines.") | |
250 | |
251 cr.addEntry(ppres.record_entry) | |
252 if not ppres.record_entry.success: | |
253 cr.success = False | |
254 current_records.success = False | |
255 logger.error("Errors found in %s:" % item.spec) | |
256 for e in ppres.record_entry.errors: | |
257 logger.error(" " + e) | |
258 | |
259 # Do all the final stuff. | |
260 ppmngr.buildHistoryDiffs() | |
261 ppmngr.deleteStaleOutputs() | |
262 ppmngr.collapseRecords() | |
263 ppmngr.shutdownPipelines() | |
264 | |
265 # Swap the old record with the next record. | |
266 pr = ppinfo.record_history.previous | |
267 self._last_records.records.remove(pr) | |
268 self._last_records.records.append(cr) | |
217 | 269 |
218 def _notifyObservers(self, item): | 270 def _notifyObservers(self, item): |
219 with self._obs_lock: | 271 with self._obs_lock: |
220 observers = list(self._obs) | 272 observers = list(self._obs) |
221 for obs in observers: | 273 for obs in observers: |