Mercurial > piecrust2
comparison piecrust/serving/procloop.py @ 860:c71472e6537f
refactor: Get the processing loop in the server functional again.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 08 Jun 2017 08:51:27 -0700 |
parents | 448710d84121 |
children | d1095774bfcf |
comparison
equal
deleted
inserted
replaced
859:86994e076be4 | 860:c71472e6537f |
---|---|
108 def removeObserver(self, obs): | 108 def removeObserver(self, obs): |
109 with self._obs_lock: | 109 with self._obs_lock: |
110 self._obs.remove(obs) | 110 self._obs.remove(obs) |
111 | 111 |
112 def run(self): | 112 def run(self): |
113 self._init() | 113 logger.debug("Initializing processing loop with output: %s" % |
114 | 114 self.out_dir) |
115 try: | |
116 self._init() | |
117 except Exception as ex: | |
118 logger.error("Error initializing processing loop:") | |
119 logger.exception(ex) | |
120 return | |
121 | |
122 logger.debug("Doing initial processing loop bake...") | |
123 self._runPipelines() | |
124 | |
125 logger.debug("Running processing loop...") | |
115 self._last_config_mtime = os.path.getmtime(self._config_path) | 126 self._last_config_mtime = os.path.getmtime(self._config_path) |
116 | 127 |
117 while True: | 128 while True: |
118 cur_config_time = os.path.getmtime(self._config_path) | 129 cur_config_time = os.path.getmtime(self._config_path) |
119 if self._last_config_mtime < cur_config_time: | 130 if self._last_config_mtime < cur_config_time: |
121 self._last_config_mtime = cur_config_time | 132 self._last_config_mtime = cur_config_time |
122 self._init() | 133 self._init() |
123 self._runPipelines() | 134 self._runPipelines() |
124 continue | 135 continue |
125 | 136 |
126 for procinfo in self._proc_infos: | 137 for procinfo in self._proc_infos.values(): |
127 # For each assets folder we try to find the first new or | 138 # For each assets folder we try to find the first new or |
128 # modified file. If any, we just run the pipeline on | 139 # modified file. If any, we just run the pipeline on |
129 # that source. | 140 # that source. |
130 found_new_or_modified = False | 141 found_new_or_modified = False |
131 for item in procinfo.source.getAllContents(): | 142 for item in procinfo.source.getAllContents(): |
138 if os.path.getmtime(path) > procinfo.last_bake_time: | 149 if os.path.getmtime(path) > procinfo.last_bake_time: |
139 logger.debug("Found modified asset: %s" % path) | 150 logger.debug("Found modified asset: %s" % path) |
140 found_new_or_modified = True | 151 found_new_or_modified = True |
141 break | 152 break |
142 if found_new_or_modified: | 153 if found_new_or_modified: |
143 self._runPipeline(procinfo) | 154 self._runPipelines(procinfo.source) |
144 | 155 |
145 time.sleep(self.interval) | 156 time.sleep(self.interval) |
146 | 157 |
147 def _init(self): | 158 def _init(self): |
148 self._app = self.appfactory.create() | 159 self._app = self.appfactory.create() |
149 self._last_records = MultiRecord() | 160 self._last_records = MultiRecord() |
150 | 161 |
151 self._proc_infos = [] | 162 self._proc_infos = {} |
152 for src in self._app.sources: | 163 for src in self._app.sources: |
153 if src.config['pipeline'] != 'asset': | 164 if src.config['pipeline'] != 'asset': |
154 continue | 165 continue |
155 | 166 |
156 procinfo = _AssetProcessingInfo(src) | 167 procinfo = _AssetProcessingInfo(src) |
157 self._proc_infos.append(procinfo) | 168 self._proc_infos[src.name] = procinfo |
158 | 169 |
159 # Build the list of initial asset files. | 170 # Build the list of initial asset files. |
160 for item in src.getAllContents(): | 171 for item in src.getAllContents(): |
161 procinfo.paths.add(item.spec) | 172 procinfo.paths.add(item.spec) |
162 | 173 |
163 def _runPipelines(self): | 174 def _runPipelines(self, only_for_source=None): |
164 record_histories = MultiRecordHistory(MultiRecord(), self._records) | |
165 self._ppmngr = PipelineManager( | |
166 self._app, self.out_dir, record_histories) | |
167 | |
168 # Create the pipelines, but also remember some stuff for what | |
169 # we want to do. | |
170 for src in self._app.sources: | |
171 if src.config['pipeline'] != 'asset': | |
172 continue | |
173 | |
174 ppinfo = self._ppmngr.createPipeline(src) | |
175 api = _AssetProcessingInfo() | |
176 ppinfo.userdata = api | |
177 | |
178 current_records = MultiRecord() | |
179 record_histories = MultiRecordHistory( | |
180 self._records, current_records) | |
181 | |
182 for ppinfo, procinfo in self._pipelines: | |
183 self._runPipeline(ppinfo, procinfo, record_histories) | |
184 | |
185 status_id = self.last_status_id + 1 | |
186 self.last_status_id += 1 | |
187 | |
188 if self._records.success: | |
189 changed = filter( | |
190 lambda i: not i.was_collapsed_from_last_run, | |
191 self._record.entries) | |
192 changed = itertools.chain.from_iterable( | |
193 map(lambda i: i.rel_outputs, changed)) | |
194 changed = list(changed) | |
195 item = { | |
196 'id': status_id, | |
197 'type': 'pipeline_success', | |
198 'assets': changed} | |
199 | |
200 self._notifyObservers(item) | |
201 else: | |
202 item = { | |
203 'id': status_id, | |
204 'type': 'pipeline_error', | |
205 'assets': []} | |
206 for entry in self._record.entries: | |
207 if entry.errors: | |
208 asset_item = { | |
209 'path': entry.path, | |
210 'errors': list(entry.errors)} | |
211 item['assets'].append(asset_item) | |
212 | |
213 self._notifyObservers(item) | |
214 | |
215 def _runPipeline(self, procinfo): | |
216 procinfo.last_bake_time = time.time() | |
217 | |
218 src = procinfo.source | |
219 | |
220 current_records = MultiRecord() | 175 current_records = MultiRecord() |
221 record_histories = MultiRecordHistory( | 176 record_histories = MultiRecordHistory( |
222 self._last_records, current_records) | 177 self._last_records, current_records) |
223 ppmngr = PipelineManager( | 178 ppmngr = PipelineManager( |
224 self._app, self.out_dir, record_histories) | 179 self._app, self.out_dir, record_histories) |
225 ppinfo = ppmngr.createPipeline(src) | 180 |
226 | 181 # Create the pipelines, but also remember some stuff for what |
182 # we want to do. | |
183 for src in self._app.sources: | |
184 if src.config['pipeline'] != 'asset': | |
185 continue | |
186 if only_for_source is not None and src != only_for_source: | |
187 continue | |
188 | |
189 ppmngr.createPipeline(src) | |
190 | |
191 for ppinfo in ppmngr.getPipelines(): | |
192 self._runPipeline(ppmngr, ppinfo) | |
193 | |
194 self.last_status_id += 1 | |
195 | |
196 if self._last_records.success: | |
197 for rec in self._last_records.records: | |
198 changed = filter( | |
199 lambda i: not i.was_collapsed_from_last_run, | |
200 rec.getEntries()) | |
201 changed = itertools.chain.from_iterable( | |
202 map(lambda i: i.out_paths, changed)) | |
203 changed = list(changed) | |
204 item = { | |
205 'id': self.last_status_id, | |
206 'type': 'pipeline_success', | |
207 'assets': changed} | |
208 | |
209 self._notifyObservers(item) | |
210 else: | |
211 item = { | |
212 'id': self.last_status_id, | |
213 'type': 'pipeline_error', | |
214 'assets': []} | |
215 for rec in self._last_records.records: | |
216 for entry in rec.getEntries(): | |
217 if entry.errors: | |
218 asset_item = { | |
219 'path': entry.item_spec, | |
220 'errors': list(entry.errors)} | |
221 item['assets'].append(asset_item) | |
222 | |
223 self._notifyObservers(item) | |
224 | |
225 def _runPipeline(self, ppmngr, ppinfo): | |
226 src = ppinfo.source | |
227 logger.debug("Running pipeline '%s' on: %s" % | 227 logger.debug("Running pipeline '%s' on: %s" % |
228 (ppinfo.pipeline_name, src.name)) | 228 (ppinfo.pipeline_name, src.name)) |
229 | |
230 # Set the time. | |
231 procinfo = self._proc_infos[src.name] | |
232 procinfo.last_bake_time = time.time() | |
229 | 233 |
230 # Process all items in the source. | 234 # Process all items in the source. |
231 pp = ppinfo.pipeline | 235 pp = ppinfo.pipeline |
232 cr = ppinfo.record_history.current | 236 cr = ppinfo.record_history.current |
233 jobctx = PipelineJobCreateContext(src) | 237 record_histories = ppmngr.record_histories |
234 for item in src.getAllContents(): | 238 current_records = record_histories.current |
235 job = pp.createJob(item, jobctx) | 239 jobctx = PipelineJobCreateContext(0, record_histories) |
236 | 240 jobs = pp.createJobs(jobctx) |
241 for job in jobs: | |
237 ppres = PipelineJobResult() | 242 ppres = PipelineJobResult() |
238 ppres.record_entry = pp.createRecordEntry(job) | 243 ppres.record_entry = pp.createRecordEntry(job) |
239 | 244 |
240 runctx = PipelineJobRunContext( | 245 runctx = PipelineJobRunContext( |
241 ppinfo.pipeline_ctx, job, record_histories) | 246 job, pp, record_histories) |
242 try: | 247 try: |
243 pp.run(item, runctx, ppres) | 248 pp.run(job, runctx, ppres) |
244 except Exception as e: | 249 except Exception as e: |
245 ppres.record_entry.errors.append(str(e)) | 250 ppres.record_entry.errors.append(str(e)) |
246 | 251 |
247 if ppres.next_pass_job is not None: | 252 if ppres.next_pass_job is not None: |
248 logger.error("The processing loop for the server " | 253 logger.error("The processing loop for the server " |
250 | 255 |
251 cr.addEntry(ppres.record_entry) | 256 cr.addEntry(ppres.record_entry) |
252 if not ppres.record_entry.success: | 257 if not ppres.record_entry.success: |
253 cr.success = False | 258 cr.success = False |
254 current_records.success = False | 259 current_records.success = False |
255 logger.error("Errors found in %s:" % item.spec) | 260 logger.error("Errors found in %s:" % job.content_item.spec) |
256 for e in ppres.record_entry.errors: | 261 for e in ppres.record_entry.errors: |
257 logger.error(" " + e) | 262 logger.error(" " + e) |
258 | 263 |
259 # Do all the final stuff. | 264 # Do all the final stuff. |
260 ppmngr.postJobRun() | 265 ppmngr.postJobRun() |
262 ppmngr.collapseRecords() | 267 ppmngr.collapseRecords() |
263 ppmngr.shutdownPipelines() | 268 ppmngr.shutdownPipelines() |
264 | 269 |
265 # Swap the old record with the next record. | 270 # Swap the old record with the next record. |
266 pr = ppinfo.record_history.previous | 271 pr = ppinfo.record_history.previous |
272 logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name)) | |
267 self._last_records.records.remove(pr) | 273 self._last_records.records.remove(pr) |
268 self._last_records.records.append(cr) | 274 self._last_records.records.append(cr) |
269 | 275 |
270 def _notifyObservers(self, item): | 276 def _notifyObservers(self, item): |
271 with self._obs_lock: | 277 with self._obs_lock: |