Mercurial > piecrust2
comparison piecrust/serving/procloop.py @ 918:7f1da7e7b154
internal: The processing loop for the server is now using the baker.
Instead of reimplementing a custom way to run the pipelines, the loop is
just calling the baker, but only for asset pipelines.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 29 Sep 2017 08:43:34 -0700 |
parents | 342e3ea24b5d |
children | 89d94955b818 |
comparison
equal
deleted
inserted
replaced
917:33a89139c284 | 918:7f1da7e7b154 |
---|---|
119 logger.error("Error initializing processing loop:") | 119 logger.error("Error initializing processing loop:") |
120 logger.exception(ex) | 120 logger.exception(ex) |
121 return | 121 return |
122 | 122 |
123 logger.debug("Doing initial processing loop bake...") | 123 logger.debug("Doing initial processing loop bake...") |
124 self._runPipelines() | 124 self._runPipelinesSafe() |
125 | 125 |
126 logger.debug("Running processing loop...") | 126 logger.debug("Running processing loop...") |
127 self._last_config_mtime = os.path.getmtime(self._config_path) | 127 self._last_config_mtime = os.path.getmtime(self._config_path) |
128 | 128 |
129 while True: | 129 while True: |
154 if found_new_or_modified: | 154 if found_new_or_modified: |
155 with format_timed_scope( | 155 with format_timed_scope( |
156 logger, | 156 logger, |
157 "change detected, reprocessed '%s'." % | 157 "change detected, reprocessed '%s'." % |
158 procinfo.source.name): | 158 procinfo.source.name): |
159 self._runPipelines(procinfo.source) | 159 self._runPipelinesSafe(procinfo.source) |
160 | 160 |
161 time.sleep(self.interval) | 161 time.sleep(self.interval) |
162 | 162 |
163 def _init(self): | 163 def _init(self): |
164 self._app = self.appfactory.create() | 164 self._app = self.appfactory.create() |
174 | 174 |
175 # Build the list of initial asset files. | 175 # Build the list of initial asset files. |
176 for item in src.getAllContents(): | 176 for item in src.getAllContents(): |
177 procinfo.paths.add(item.spec) | 177 procinfo.paths.add(item.spec) |
178 | 178 |
179 def _runPipelines(self, only_for_source=None): | 179 def _runPipelinesSafe(self, only_for_source=None): |
180 current_records = MultiRecord() | 180 try: |
181 record_histories = MultiRecordHistory( | 181 self._runPipelines(only_for_source) |
182 self._last_records, current_records) | 182 except Exception as ex: |
183 ppmngr = PipelineManager( | 183 logger.error("Error while running asset pipeline:") |
184 self._app, self.out_dir, record_histories) | 184 logger.exception(ex) |
185 | 185 |
186 # Create the pipelines, but also remember some stuff for what | 186 def _runPipelines(self, only_for_source): |
187 # we want to do. | 187 from piecrust.baking.baker import Baker |
188 for src in self._app.sources: | 188 |
189 if src.config['pipeline'] != 'asset': | 189 allowed_sources = None |
190 continue | 190 if only_for_source: |
191 if only_for_source is not None and src != only_for_source: | 191 allowed_sources = [only_for_source.name] |
192 continue | 192 baker = Baker( |
193 | 193 self.appfactory, self._app, self.out_dir, |
194 ppmngr.createPipeline(src) | 194 allowed_pipelines=['asset'], |
195 | 195 allowed_sources=allowed_sources, |
196 for ppinfo in ppmngr.getPipelines(): | 196 rotate_bake_records=False) |
197 self._runPipeline(ppmngr, ppinfo) | 197 records = baker.bake() |
198 | 198 |
199 self._onPipelinesRun(records) | |
200 | |
201 def _onPipelinesRun(self, records): | |
199 self.last_status_id += 1 | 202 self.last_status_id += 1 |
200 | 203 |
201 if self._last_records.success: | 204 if records.success: |
202 for rec in self._last_records.records: | 205 for rec in records.records: |
203 changed = filter( | 206 changed = filter( |
204 lambda i: not i.was_collapsed_from_last_run, | 207 lambda i: not i.was_collapsed_from_last_run, |
205 rec.getEntries()) | 208 rec.getEntries()) |
206 changed = itertools.chain.from_iterable( | 209 changed = itertools.chain.from_iterable( |
207 map(lambda i: i.out_paths, changed)) | 210 map(lambda i: i.out_paths, changed)) |
215 else: | 218 else: |
216 item = { | 219 item = { |
217 'id': self.last_status_id, | 220 'id': self.last_status_id, |
218 'type': 'pipeline_error', | 221 'type': 'pipeline_error', |
219 'assets': []} | 222 'assets': []} |
220 for rec in self._last_records.records: | 223 for rec in records.records: |
221 for entry in rec.getEntries(): | 224 for entry in rec.getEntries(): |
222 if entry.errors: | 225 if entry.errors: |
223 asset_item = { | 226 asset_item = { |
224 'path': entry.item_spec, | 227 'path': entry.item_spec, |
225 'errors': list(entry.errors)} | 228 'errors': list(entry.errors)} |
226 item['assets'].append(asset_item) | 229 item['assets'].append(asset_item) |
227 | 230 |
228 self._notifyObservers(item) | 231 self._notifyObservers(item) |
229 | 232 |
230 def _runPipeline(self, ppmngr, ppinfo): | |
231 src = ppinfo.source | |
232 logger.debug("Running pipeline '%s' on: %s" % | |
233 (ppinfo.pipeline_name, src.name)) | |
234 | |
235 # Set the time. | |
236 procinfo = self._proc_infos[src.name] | |
237 procinfo.last_bake_time = time.time() | |
238 | |
239 # Process all items in the source. | |
240 pp = ppinfo.pipeline | |
241 cr = ppinfo.record_history.current | |
242 record_histories = ppmngr.record_histories | |
243 current_records = record_histories.current | |
244 jobctx = PipelineJobCreateContext(0, record_histories) | |
245 jobs = pp.createJobs(jobctx) | |
246 for job in jobs: | |
247 runctx = PipelineJobRunContext( | |
248 job, pp.record_name, record_histories) | |
249 | |
250 ppres = PipelineJobResult() | |
251 ppres.record_entry = pp.createRecordEntry(job, runctx) | |
252 | |
253 try: | |
254 pp.run(job, runctx, ppres) | |
255 except Exception as e: | |
256 ppres.record_entry.errors.append(str(e)) | |
257 | |
258 if ppres.next_step_job is not None: | |
259 logger.error("The processing loop for the server " | |
260 "doesn't support multi-step pipelines.") | |
261 | |
262 cr.addEntry(ppres.record_entry) | |
263 if not ppres.record_entry.success: | |
264 cr.success = False | |
265 current_records.success = False | |
266 logger.error("Errors found in %s:" % job.content_item.spec) | |
267 for e in ppres.record_entry.errors: | |
268 logger.error(" " + e) | |
269 | |
270 # Do all the final stuff. | |
271 ppmngr.postJobRun() | |
272 ppmngr.deleteStaleOutputs() | |
273 ppmngr.collapseRecords() | |
274 ppmngr.shutdownPipelines() | |
275 | |
276 # Swap the old record with the next record. | |
277 pr = ppinfo.record_history.previous | |
278 logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name)) | |
279 self._last_records.records.remove(pr) | |
280 self._last_records.records.append(cr) | |
281 | |
282 def _notifyObservers(self, item): | 233 def _notifyObservers(self, item): |
283 with self._obs_lock: | 234 with self._obs_lock: |
284 observers = list(self._obs) | 235 observers = list(self._obs) |
285 for obs in observers: | 236 for obs in observers: |
286 obs.addBuildEvent(item) | 237 obs.addBuildEvent(item) |