comparison piecrust/serving/procloop.py @ 860:c71472e6537f

refactor: Get the processing loop in the server functional again.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 08 Jun 2017 08:51:27 -0700
parents 448710d84121
children d1095774bfcf
comparison
equal deleted inserted replaced
859:86994e076be4 860:c71472e6537f
108 def removeObserver(self, obs): 108 def removeObserver(self, obs):
109 with self._obs_lock: 109 with self._obs_lock:
110 self._obs.remove(obs) 110 self._obs.remove(obs)
111 111
112 def run(self): 112 def run(self):
113 self._init() 113 logger.debug("Initializing processing loop with output: %s" %
114 114 self.out_dir)
115 try:
116 self._init()
117 except Exception as ex:
118 logger.error("Error initializing processing loop:")
119 logger.exception(ex)
120 return
121
122 logger.debug("Doing initial processing loop bake...")
123 self._runPipelines()
124
125 logger.debug("Running processing loop...")
115 self._last_config_mtime = os.path.getmtime(self._config_path) 126 self._last_config_mtime = os.path.getmtime(self._config_path)
116 127
117 while True: 128 while True:
118 cur_config_time = os.path.getmtime(self._config_path) 129 cur_config_time = os.path.getmtime(self._config_path)
119 if self._last_config_mtime < cur_config_time: 130 if self._last_config_mtime < cur_config_time:
121 self._last_config_mtime = cur_config_time 132 self._last_config_mtime = cur_config_time
122 self._init() 133 self._init()
123 self._runPipelines() 134 self._runPipelines()
124 continue 135 continue
125 136
126 for procinfo in self._proc_infos: 137 for procinfo in self._proc_infos.values():
127 # For each assets folder we try to find the first new or 138 # For each assets folder we try to find the first new or
128 # modified file. If any, we just run the pipeline on 139 # modified file. If any, we just run the pipeline on
129 # that source. 140 # that source.
130 found_new_or_modified = False 141 found_new_or_modified = False
131 for item in procinfo.source.getAllContents(): 142 for item in procinfo.source.getAllContents():
138 if os.path.getmtime(path) > procinfo.last_bake_time: 149 if os.path.getmtime(path) > procinfo.last_bake_time:
139 logger.debug("Found modified asset: %s" % path) 150 logger.debug("Found modified asset: %s" % path)
140 found_new_or_modified = True 151 found_new_or_modified = True
141 break 152 break
142 if found_new_or_modified: 153 if found_new_or_modified:
143 self._runPipeline(procinfo) 154 self._runPipelines(procinfo.source)
144 155
145 time.sleep(self.interval) 156 time.sleep(self.interval)
146 157
147 def _init(self): 158 def _init(self):
148 self._app = self.appfactory.create() 159 self._app = self.appfactory.create()
149 self._last_records = MultiRecord() 160 self._last_records = MultiRecord()
150 161
151 self._proc_infos = [] 162 self._proc_infos = {}
152 for src in self._app.sources: 163 for src in self._app.sources:
153 if src.config['pipeline'] != 'asset': 164 if src.config['pipeline'] != 'asset':
154 continue 165 continue
155 166
156 procinfo = _AssetProcessingInfo(src) 167 procinfo = _AssetProcessingInfo(src)
157 self._proc_infos.append(procinfo) 168 self._proc_infos[src.name] = procinfo
158 169
159 # Build the list of initial asset files. 170 # Build the list of initial asset files.
160 for item in src.getAllContents(): 171 for item in src.getAllContents():
161 procinfo.paths.add(item.spec) 172 procinfo.paths.add(item.spec)
162 173
163 def _runPipelines(self): 174 def _runPipelines(self, only_for_source=None):
164 record_histories = MultiRecordHistory(MultiRecord(), self._records)
165 self._ppmngr = PipelineManager(
166 self._app, self.out_dir, record_histories)
167
168 # Create the pipelines, but also remember some stuff for what
169 # we want to do.
170 for src in self._app.sources:
171 if src.config['pipeline'] != 'asset':
172 continue
173
174 ppinfo = self._ppmngr.createPipeline(src)
175 api = _AssetProcessingInfo()
176 ppinfo.userdata = api
177
178 current_records = MultiRecord()
179 record_histories = MultiRecordHistory(
180 self._records, current_records)
181
182 for ppinfo, procinfo in self._pipelines:
183 self._runPipeline(ppinfo, procinfo, record_histories)
184
185 status_id = self.last_status_id + 1
186 self.last_status_id += 1
187
188 if self._records.success:
189 changed = filter(
190 lambda i: not i.was_collapsed_from_last_run,
191 self._record.entries)
192 changed = itertools.chain.from_iterable(
193 map(lambda i: i.rel_outputs, changed))
194 changed = list(changed)
195 item = {
196 'id': status_id,
197 'type': 'pipeline_success',
198 'assets': changed}
199
200 self._notifyObservers(item)
201 else:
202 item = {
203 'id': status_id,
204 'type': 'pipeline_error',
205 'assets': []}
206 for entry in self._record.entries:
207 if entry.errors:
208 asset_item = {
209 'path': entry.path,
210 'errors': list(entry.errors)}
211 item['assets'].append(asset_item)
212
213 self._notifyObservers(item)
214
215 def _runPipeline(self, procinfo):
216 procinfo.last_bake_time = time.time()
217
218 src = procinfo.source
219
220 current_records = MultiRecord() 175 current_records = MultiRecord()
221 record_histories = MultiRecordHistory( 176 record_histories = MultiRecordHistory(
222 self._last_records, current_records) 177 self._last_records, current_records)
223 ppmngr = PipelineManager( 178 ppmngr = PipelineManager(
224 self._app, self.out_dir, record_histories) 179 self._app, self.out_dir, record_histories)
225 ppinfo = ppmngr.createPipeline(src) 180
226 181 # Create the pipelines, but also remember some stuff for what
182 # we want to do.
183 for src in self._app.sources:
184 if src.config['pipeline'] != 'asset':
185 continue
186 if only_for_source is not None and src != only_for_source:
187 continue
188
189 ppmngr.createPipeline(src)
190
191 for ppinfo in ppmngr.getPipelines():
192 self._runPipeline(ppmngr, ppinfo)
193
194 self.last_status_id += 1
195
196 if self._last_records.success:
197 for rec in self._last_records.records:
198 changed = filter(
199 lambda i: not i.was_collapsed_from_last_run,
200 rec.getEntries())
201 changed = itertools.chain.from_iterable(
202 map(lambda i: i.out_paths, changed))
203 changed = list(changed)
204 item = {
205 'id': self.last_status_id,
206 'type': 'pipeline_success',
207 'assets': changed}
208
209 self._notifyObservers(item)
210 else:
211 item = {
212 'id': self.last_status_id,
213 'type': 'pipeline_error',
214 'assets': []}
215 for rec in self._last_records.records:
216 for entry in rec.getEntries():
217 if entry.errors:
218 asset_item = {
219 'path': entry.item_spec,
220 'errors': list(entry.errors)}
221 item['assets'].append(asset_item)
222
223 self._notifyObservers(item)
224
225 def _runPipeline(self, ppmngr, ppinfo):
226 src = ppinfo.source
227 logger.debug("Running pipeline '%s' on: %s" % 227 logger.debug("Running pipeline '%s' on: %s" %
228 (ppinfo.pipeline_name, src.name)) 228 (ppinfo.pipeline_name, src.name))
229
230 # Set the time.
231 procinfo = self._proc_infos[src.name]
232 procinfo.last_bake_time = time.time()
229 233
230 # Process all items in the source. 234 # Process all items in the source.
231 pp = ppinfo.pipeline 235 pp = ppinfo.pipeline
232 cr = ppinfo.record_history.current 236 cr = ppinfo.record_history.current
233 jobctx = PipelineJobCreateContext(src) 237 record_histories = ppmngr.record_histories
234 for item in src.getAllContents(): 238 current_records = record_histories.current
235 job = pp.createJob(item, jobctx) 239 jobctx = PipelineJobCreateContext(0, record_histories)
236 240 jobs = pp.createJobs(jobctx)
241 for job in jobs:
237 ppres = PipelineJobResult() 242 ppres = PipelineJobResult()
238 ppres.record_entry = pp.createRecordEntry(job) 243 ppres.record_entry = pp.createRecordEntry(job)
239 244
240 runctx = PipelineJobRunContext( 245 runctx = PipelineJobRunContext(
241 ppinfo.pipeline_ctx, job, record_histories) 246 job, pp, record_histories)
242 try: 247 try:
243 pp.run(item, runctx, ppres) 248 pp.run(job, runctx, ppres)
244 except Exception as e: 249 except Exception as e:
245 ppres.record_entry.errors.append(str(e)) 250 ppres.record_entry.errors.append(str(e))
246 251
247 if ppres.next_pass_job is not None: 252 if ppres.next_pass_job is not None:
248 logger.error("The processing loop for the server " 253 logger.error("The processing loop for the server "
250 255
251 cr.addEntry(ppres.record_entry) 256 cr.addEntry(ppres.record_entry)
252 if not ppres.record_entry.success: 257 if not ppres.record_entry.success:
253 cr.success = False 258 cr.success = False
254 current_records.success = False 259 current_records.success = False
255 logger.error("Errors found in %s:" % item.spec) 260 logger.error("Errors found in %s:" % job.content_item.spec)
256 for e in ppres.record_entry.errors: 261 for e in ppres.record_entry.errors:
257 logger.error(" " + e) 262 logger.error(" " + e)
258 263
259 # Do all the final stuff. 264 # Do all the final stuff.
260 ppmngr.postJobRun() 265 ppmngr.postJobRun()
262 ppmngr.collapseRecords() 267 ppmngr.collapseRecords()
263 ppmngr.shutdownPipelines() 268 ppmngr.shutdownPipelines()
264 269
265 # Swap the old record with the next record. 270 # Swap the old record with the next record.
266 pr = ppinfo.record_history.previous 271 pr = ppinfo.record_history.previous
272 logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name))
267 self._last_records.records.remove(pr) 273 self._last_records.records.remove(pr)
268 self._last_records.records.append(cr) 274 self._last_records.records.append(cr)
269 275
270 def _notifyObservers(self, item): 276 def _notifyObservers(self, item):
271 with self._obs_lock: 277 with self._obs_lock: