comparison piecrust/serving/procloop.py @ 918:7f1da7e7b154

internal: The processing loop for the server is now using the baker. Instead of reimplementing a custom way to run the pipelines, the loop is just calling the baker, but only for asset pipelines.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 29 Sep 2017 08:43:34 -0700
parents 342e3ea24b5d
children 89d94955b818
comparison
equal deleted inserted replaced
917:33a89139c284 918:7f1da7e7b154
119 logger.error("Error initializing processing loop:") 119 logger.error("Error initializing processing loop:")
120 logger.exception(ex) 120 logger.exception(ex)
121 return 121 return
122 122
123 logger.debug("Doing initial processing loop bake...") 123 logger.debug("Doing initial processing loop bake...")
124 self._runPipelines() 124 self._runPipelinesSafe()
125 125
126 logger.debug("Running processing loop...") 126 logger.debug("Running processing loop...")
127 self._last_config_mtime = os.path.getmtime(self._config_path) 127 self._last_config_mtime = os.path.getmtime(self._config_path)
128 128
129 while True: 129 while True:
154 if found_new_or_modified: 154 if found_new_or_modified:
155 with format_timed_scope( 155 with format_timed_scope(
156 logger, 156 logger,
157 "change detected, reprocessed '%s'." % 157 "change detected, reprocessed '%s'." %
158 procinfo.source.name): 158 procinfo.source.name):
159 self._runPipelines(procinfo.source) 159 self._runPipelinesSafe(procinfo.source)
160 160
161 time.sleep(self.interval) 161 time.sleep(self.interval)
162 162
163 def _init(self): 163 def _init(self):
164 self._app = self.appfactory.create() 164 self._app = self.appfactory.create()
174 174
175 # Build the list of initial asset files. 175 # Build the list of initial asset files.
176 for item in src.getAllContents(): 176 for item in src.getAllContents():
177 procinfo.paths.add(item.spec) 177 procinfo.paths.add(item.spec)
178 178
179 def _runPipelines(self, only_for_source=None): 179 def _runPipelinesSafe(self, only_for_source=None):
180 current_records = MultiRecord() 180 try:
181 record_histories = MultiRecordHistory( 181 self._runPipelines(only_for_source)
182 self._last_records, current_records) 182 except Exception as ex:
183 ppmngr = PipelineManager( 183 logger.error("Error while running asset pipeline:")
184 self._app, self.out_dir, record_histories) 184 logger.exception(ex)
185 185
186 # Create the pipelines, but also remember some stuff for what 186 def _runPipelines(self, only_for_source):
187 # we want to do. 187 from piecrust.baking.baker import Baker
188 for src in self._app.sources: 188
189 if src.config['pipeline'] != 'asset': 189 allowed_sources = None
190 continue 190 if only_for_source:
191 if only_for_source is not None and src != only_for_source: 191 allowed_sources = [only_for_source.name]
192 continue 192 baker = Baker(
193 193 self.appfactory, self._app, self.out_dir,
194 ppmngr.createPipeline(src) 194 allowed_pipelines=['asset'],
195 195 allowed_sources=allowed_sources,
196 for ppinfo in ppmngr.getPipelines(): 196 rotate_bake_records=False)
197 self._runPipeline(ppmngr, ppinfo) 197 records = baker.bake()
198 198
199 self._onPipelinesRun(records)
200
201 def _onPipelinesRun(self, records):
199 self.last_status_id += 1 202 self.last_status_id += 1
200 203
201 if self._last_records.success: 204 if records.success:
202 for rec in self._last_records.records: 205 for rec in records.records:
203 changed = filter( 206 changed = filter(
204 lambda i: not i.was_collapsed_from_last_run, 207 lambda i: not i.was_collapsed_from_last_run,
205 rec.getEntries()) 208 rec.getEntries())
206 changed = itertools.chain.from_iterable( 209 changed = itertools.chain.from_iterable(
207 map(lambda i: i.out_paths, changed)) 210 map(lambda i: i.out_paths, changed))
215 else: 218 else:
216 item = { 219 item = {
217 'id': self.last_status_id, 220 'id': self.last_status_id,
218 'type': 'pipeline_error', 221 'type': 'pipeline_error',
219 'assets': []} 222 'assets': []}
220 for rec in self._last_records.records: 223 for rec in records.records:
221 for entry in rec.getEntries(): 224 for entry in rec.getEntries():
222 if entry.errors: 225 if entry.errors:
223 asset_item = { 226 asset_item = {
224 'path': entry.item_spec, 227 'path': entry.item_spec,
225 'errors': list(entry.errors)} 228 'errors': list(entry.errors)}
226 item['assets'].append(asset_item) 229 item['assets'].append(asset_item)
227 230
228 self._notifyObservers(item) 231 self._notifyObservers(item)
229 232
230 def _runPipeline(self, ppmngr, ppinfo):
231 src = ppinfo.source
232 logger.debug("Running pipeline '%s' on: %s" %
233 (ppinfo.pipeline_name, src.name))
234
235 # Set the time.
236 procinfo = self._proc_infos[src.name]
237 procinfo.last_bake_time = time.time()
238
239 # Process all items in the source.
240 pp = ppinfo.pipeline
241 cr = ppinfo.record_history.current
242 record_histories = ppmngr.record_histories
243 current_records = record_histories.current
244 jobctx = PipelineJobCreateContext(0, record_histories)
245 jobs = pp.createJobs(jobctx)
246 for job in jobs:
247 runctx = PipelineJobRunContext(
248 job, pp.record_name, record_histories)
249
250 ppres = PipelineJobResult()
251 ppres.record_entry = pp.createRecordEntry(job, runctx)
252
253 try:
254 pp.run(job, runctx, ppres)
255 except Exception as e:
256 ppres.record_entry.errors.append(str(e))
257
258 if ppres.next_step_job is not None:
259 logger.error("The processing loop for the server "
260 "doesn't support multi-step pipelines.")
261
262 cr.addEntry(ppres.record_entry)
263 if not ppres.record_entry.success:
264 cr.success = False
265 current_records.success = False
266 logger.error("Errors found in %s:" % job.content_item.spec)
267 for e in ppres.record_entry.errors:
268 logger.error(" " + e)
269
270 # Do all the final stuff.
271 ppmngr.postJobRun()
272 ppmngr.deleteStaleOutputs()
273 ppmngr.collapseRecords()
274 ppmngr.shutdownPipelines()
275
276 # Swap the old record with the next record.
277 pr = ppinfo.record_history.previous
278 logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name))
279 self._last_records.records.remove(pr)
280 self._last_records.records.append(cr)
281
282 def _notifyObservers(self, item): 233 def _notifyObservers(self, item):
283 with self._obs_lock: 234 with self._obs_lock:
284 observers = list(self._obs) 235 observers = list(self._obs)
285 for obs in observers: 236 for obs in observers:
286 obs.addBuildEvent(item) 237 obs.addBuildEvent(item)