Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 877:d6d35b2efd04
bake: Rename "pass" to "step" and make the page pipeline use different steps.
That pipeline is now first loading all pages, and then rendering full pages
unless they trigger a sub-render.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 15 Jun 2017 22:16:23 -0700 |
parents | d1095774bfcf |
children | e52e2dd08c96 |
comparison
equal
deleted
inserted
replaced
876:d1095774bfcf | 877:d6d35b2efd04 |
---|---|
212 logger.debug(format_timed( | 212 logger.debug(format_timed( |
213 start_time, "cache is assumed valid", colored=False)) | 213 start_time, "cache is assumed valid", colored=False)) |
214 return True | 214 return True |
215 | 215 |
216 def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): | 216 def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): |
217 # Start with the first pass, where we iterate on the content sources' | 217 # Start with the first step, where we iterate on the content sources' |
218 # items and run jobs on those. | 218 # items and run jobs on those. |
219 pool.userdata.cur_pass = 0 | 219 pool.userdata.cur_step = 0 |
220 next_pass_jobs = {} | 220 next_step_jobs = {} |
221 pool.userdata.next_pass_jobs = next_pass_jobs | 221 pool.userdata.next_step_jobs = next_step_jobs |
222 | 222 |
223 start_time = time.perf_counter() | 223 start_time = time.perf_counter() |
224 job_count = 0 | 224 job_count = 0 |
225 realm_name = REALM_NAMES[realm].lower() | 225 realm_name = REALM_NAMES[realm].lower() |
226 stats = self.app.env.stats | 226 stats = self.app.env.stats |
229 src = ppinfo.source | 229 src = ppinfo.source |
230 pp = ppinfo.pipeline | 230 pp = ppinfo.pipeline |
231 | 231 |
232 logger.debug( | 232 logger.debug( |
233 "Queuing jobs for source '%s' using pipeline '%s' " | 233 "Queuing jobs for source '%s' using pipeline '%s' " |
234 "(%s, pass 0)." % | 234 "(%s, step 0)." % |
235 (src.name, pp.PIPELINE_NAME, realm_name)) | 235 (src.name, pp.PIPELINE_NAME, realm_name)) |
236 | 236 |
237 next_pass_jobs[src.name] = [] | 237 next_step_jobs[src.name] = [] |
238 jcctx = PipelineJobCreateContext(pp_pass_num, record_histories) | 238 jcctx = PipelineJobCreateContext(pp_pass_num, record_histories) |
239 jobs = pp.createJobs(jcctx) | 239 jobs = pp.createJobs(jcctx) |
240 if jobs is not None: | 240 if jobs is not None: |
241 job_count += len(jobs) | 241 job_count += len(jobs) |
242 pool.queueJobs(jobs) | 242 pool.queueJobs(jobs) |
248 return | 248 return |
249 | 249 |
250 pool.wait() | 250 pool.wait() |
251 | 251 |
252 logger.info(format_timed( | 252 logger.info(format_timed( |
253 start_time, "%d pipeline jobs completed (%s, pass 0)." % | 253 start_time, "%d pipeline jobs completed (%s, step 0)." % |
254 (job_count, realm_name))) | 254 (job_count, realm_name))) |
255 | 255 |
256 # Now let's see if any job created a follow-up job. Let's keep | 256 # Now let's see if any job created a follow-up job. Let's keep |
257 # processing those jobs as long as they create new ones. | 257 # processing those jobs as long as they create new ones. |
258 pool.userdata.cur_pass = 1 | 258 pool.userdata.cur_step = 1 |
259 while True: | 259 while True: |
260 # Make a copy of out next pass jobs and reset the list, so | 260 # Make a copy of out next step jobs and reset the list, so |
261 # the first jobs to be processed don't mess it up as we're | 261 # the first jobs to be processed don't mess it up as we're |
262 # still iterating on it. | 262 # still iterating on it. |
263 next_pass_jobs = pool.userdata.next_pass_jobs | 263 next_step_jobs = pool.userdata.next_step_jobs |
264 pool.userdata.next_pass_jobs = {} | 264 pool.userdata.next_step_jobs = {} |
265 | 265 |
266 start_time = time.perf_counter() | 266 start_time = time.perf_counter() |
267 job_count = 0 | 267 job_count = 0 |
268 | 268 |
269 for sn, jobs in next_pass_jobs.items(): | 269 for sn, jobs in next_step_jobs.items(): |
270 if jobs: | 270 if jobs: |
271 logger.debug( | 271 logger.debug( |
272 "Queuing jobs for source '%s' (%s, pass %d)." % | 272 "Queuing jobs for source '%s' (%s, step %d)." % |
273 (sn, realm_name, pool.userdata.cur_pass)) | 273 (sn, realm_name, pool.userdata.cur_step)) |
274 job_count += len(jobs) | 274 job_count += len(jobs) |
275 pool.userdata.next_pass_jobs[sn] = [] | 275 pool.userdata.next_step_jobs[sn] = [] |
276 pool.queueJobs(jobs) | 276 pool.queueJobs(jobs) |
277 | 277 |
278 stats.stepTimer('WorkerTastPut', time.perf_counter() - start_time) | 278 stats.stepTimer('WorkerTastPut', time.perf_counter() - start_time) |
279 | 279 |
280 if job_count == 0: | 280 if job_count == 0: |
282 | 282 |
283 pool.wait() | 283 pool.wait() |
284 | 284 |
285 logger.info(format_timed( | 285 logger.info(format_timed( |
286 start_time, | 286 start_time, |
287 "%d pipeline jobs completed (%s, pass %d)." % | 287 "%d pipeline jobs completed (%s, step %d)." % |
288 (job_count, realm_name, pool.userdata.cur_pass))) | 288 (job_count, realm_name, pool.userdata.cur_step))) |
289 | 289 |
290 pool.userdata.cur_pass += 1 | 290 pool.userdata.cur_step += 1 |
291 | 291 |
292 def _logErrors(self, item_spec, errors): | 292 def _logErrors(self, item_spec, errors): |
293 logger.error("Errors found in %s:" % item_spec) | 293 logger.error("Errors found in %s:" % item_spec) |
294 for e in errors: | 294 for e in errors: |
295 logger.error(" " + e) | 295 logger.error(" " + e) |
317 error_callback=self._handleWorkerError, | 317 error_callback=self._handleWorkerError, |
318 userdata=pool_userdata) | 318 userdata=pool_userdata) |
319 return pool | 319 return pool |
320 | 320 |
321 def _handleWorkerResult(self, job, res, userdata): | 321 def _handleWorkerResult(self, job, res, userdata): |
322 cur_pass = userdata.cur_pass | 322 cur_step = userdata.cur_step |
323 record = userdata.records.getRecord(job.record_name) | 323 record = userdata.records.getRecord(job.record_name) |
324 | 324 |
325 if cur_pass == 0: | 325 if cur_step == 0: |
326 record.addEntry(res.record_entry) | 326 record.addEntry(res.record_entry) |
327 else: | 327 else: |
328 ppinfo = userdata.ppmngr.getPipeline(job.source_name) | 328 ppinfo = userdata.ppmngr.getPipeline(job.source_name) |
329 ppmrctx = PipelineMergeRecordContext( | 329 ppmrctx = PipelineMergeRecordContext(record, job, cur_step) |
330 record, job, cur_pass) | |
331 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) | 330 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) |
332 | 331 |
333 npj = res.next_pass_job | 332 npj = res.next_step_job |
334 if npj is not None: | 333 if npj is not None: |
335 npj.data['pass'] = cur_pass + 1 | 334 npj.step_num = cur_step + 1 |
336 userdata.next_pass_jobs[job.source_name].append(npj) | 335 userdata.next_step_jobs[job.source_name].append(npj) |
337 | 336 |
338 if not res.record_entry.success: | 337 if not res.record_entry.success: |
339 record.success = False | 338 record.success = False |
340 userdata.records.success = False | 339 userdata.records.success = False |
341 self._logErrors(job.content_item.spec, res.record_entry.errors) | 340 self._logErrors(job.content_item.spec, res.record_entry.errors) |
342 | 341 |
343 def _handleWorkerError(self, job, exc_data, userdata): | 342 def _handleWorkerError(self, job, exc_data, userdata): |
344 cur_pass = userdata.cur_pass | 343 cur_step = userdata.cur_step |
345 record = userdata.records.getRecord(job.record_name) | 344 record = userdata.records.getRecord(job.record_name) |
346 | 345 |
347 if cur_pass == 0: | 346 record_entry_spec = job.content_item.metadata.get( |
347 'record_entry_spec', job.content_item.spec) | |
348 | |
349 if cur_step == 0: | |
348 ppinfo = userdata.ppmngr.getPipeline(job.source_name) | 350 ppinfo = userdata.ppmngr.getPipeline(job.source_name) |
349 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry | 351 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry |
350 e = entry_class() | 352 e = entry_class() |
351 e.item_spec = job.content_item.spec | 353 e.item_spec = record_entry_spec |
352 e.errors.append(str(exc_data)) | 354 e.errors.append(str(exc_data)) |
353 record.addEntry(e) | 355 record.addEntry(e) |
354 else: | 356 else: |
355 e = record.getEntry(job.record_entry_spec) | 357 e = record.getEntry(record_entry_spec) |
356 e.errors.append(str(exc_data)) | 358 e.errors.append(str(exc_data)) |
357 | 359 |
358 record.success = False | 360 record.success = False |
359 userdata.records.success = False | 361 userdata.records.success = False |
360 | 362 |
366 class _PoolUserData: | 368 class _PoolUserData: |
367 def __init__(self, baker, ppmngr): | 369 def __init__(self, baker, ppmngr): |
368 self.baker = baker | 370 self.baker = baker |
369 self.ppmngr = ppmngr | 371 self.ppmngr = ppmngr |
370 self.records = ppmngr.record_histories.current | 372 self.records = ppmngr.record_histories.current |
371 self.cur_pass = 0 | 373 self.cur_step = 0 |
372 self.next_pass_jobs = {} | 374 self.next_step_jobs = {} |
373 | 375 |
374 | 376 |
375 def _get_pipeline_infos_by_pass_and_realm(pp_infos): | 377 def _get_pipeline_infos_by_pass_and_realm(pp_infos): |
376 pp_by_pass_and_realm = {} | 378 pp_by_pass_and_realm = {} |
377 for pp_info in pp_infos: | 379 for pp_info in pp_infos: |