comparison piecrust/baking/baker.py @ 877:d6d35b2efd04

bake: Rename "pass" to "step" and make the page pipeline use different steps. That pipeline is now first loading all pages, and then rendering full pages unless they trigger a sub-render.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 15 Jun 2017 22:16:23 -0700
parents d1095774bfcf
children e52e2dd08c96
comparison
equal deleted inserted replaced
876:d1095774bfcf 877:d6d35b2efd04
212 logger.debug(format_timed( 212 logger.debug(format_timed(
213 start_time, "cache is assumed valid", colored=False)) 213 start_time, "cache is assumed valid", colored=False))
214 return True 214 return True
215 215
216 def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): 216 def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist):
217 # Start with the first pass, where we iterate on the content sources' 217 # Start with the first step, where we iterate on the content sources'
218 # items and run jobs on those. 218 # items and run jobs on those.
219 pool.userdata.cur_pass = 0 219 pool.userdata.cur_step = 0
220 next_pass_jobs = {} 220 next_step_jobs = {}
221 pool.userdata.next_pass_jobs = next_pass_jobs 221 pool.userdata.next_step_jobs = next_step_jobs
222 222
223 start_time = time.perf_counter() 223 start_time = time.perf_counter()
224 job_count = 0 224 job_count = 0
225 realm_name = REALM_NAMES[realm].lower() 225 realm_name = REALM_NAMES[realm].lower()
226 stats = self.app.env.stats 226 stats = self.app.env.stats
229 src = ppinfo.source 229 src = ppinfo.source
230 pp = ppinfo.pipeline 230 pp = ppinfo.pipeline
231 231
232 logger.debug( 232 logger.debug(
233 "Queuing jobs for source '%s' using pipeline '%s' " 233 "Queuing jobs for source '%s' using pipeline '%s' "
234 "(%s, pass 0)." % 234 "(%s, step 0)." %
235 (src.name, pp.PIPELINE_NAME, realm_name)) 235 (src.name, pp.PIPELINE_NAME, realm_name))
236 236
237 next_pass_jobs[src.name] = [] 237 next_step_jobs[src.name] = []
238 jcctx = PipelineJobCreateContext(pp_pass_num, record_histories) 238 jcctx = PipelineJobCreateContext(pp_pass_num, record_histories)
239 jobs = pp.createJobs(jcctx) 239 jobs = pp.createJobs(jcctx)
240 if jobs is not None: 240 if jobs is not None:
241 job_count += len(jobs) 241 job_count += len(jobs)
242 pool.queueJobs(jobs) 242 pool.queueJobs(jobs)
248 return 248 return
249 249
250 pool.wait() 250 pool.wait()
251 251
252 logger.info(format_timed( 252 logger.info(format_timed(
253 start_time, "%d pipeline jobs completed (%s, pass 0)." % 253 start_time, "%d pipeline jobs completed (%s, step 0)." %
254 (job_count, realm_name))) 254 (job_count, realm_name)))
255 255
256 # Now let's see if any job created a follow-up job. Let's keep 256 # Now let's see if any job created a follow-up job. Let's keep
257 # processing those jobs as long as they create new ones. 257 # processing those jobs as long as they create new ones.
258 pool.userdata.cur_pass = 1 258 pool.userdata.cur_step = 1
259 while True: 259 while True:
260 # Make a copy of out next pass jobs and reset the list, so 260 # Make a copy of out next step jobs and reset the list, so
261 # the first jobs to be processed don't mess it up as we're 261 # the first jobs to be processed don't mess it up as we're
262 # still iterating on it. 262 # still iterating on it.
263 next_pass_jobs = pool.userdata.next_pass_jobs 263 next_step_jobs = pool.userdata.next_step_jobs
264 pool.userdata.next_pass_jobs = {} 264 pool.userdata.next_step_jobs = {}
265 265
266 start_time = time.perf_counter() 266 start_time = time.perf_counter()
267 job_count = 0 267 job_count = 0
268 268
269 for sn, jobs in next_pass_jobs.items(): 269 for sn, jobs in next_step_jobs.items():
270 if jobs: 270 if jobs:
271 logger.debug( 271 logger.debug(
272 "Queuing jobs for source '%s' (%s, pass %d)." % 272 "Queuing jobs for source '%s' (%s, step %d)." %
273 (sn, realm_name, pool.userdata.cur_pass)) 273 (sn, realm_name, pool.userdata.cur_step))
274 job_count += len(jobs) 274 job_count += len(jobs)
275 pool.userdata.next_pass_jobs[sn] = [] 275 pool.userdata.next_step_jobs[sn] = []
276 pool.queueJobs(jobs) 276 pool.queueJobs(jobs)
277 277
278 stats.stepTimer('WorkerTastPut', time.perf_counter() - start_time) 278 stats.stepTimer('WorkerTastPut', time.perf_counter() - start_time)
279 279
280 if job_count == 0: 280 if job_count == 0:
282 282
283 pool.wait() 283 pool.wait()
284 284
285 logger.info(format_timed( 285 logger.info(format_timed(
286 start_time, 286 start_time,
287 "%d pipeline jobs completed (%s, pass %d)." % 287 "%d pipeline jobs completed (%s, step %d)." %
288 (job_count, realm_name, pool.userdata.cur_pass))) 288 (job_count, realm_name, pool.userdata.cur_step)))
289 289
290 pool.userdata.cur_pass += 1 290 pool.userdata.cur_step += 1
291 291
292 def _logErrors(self, item_spec, errors): 292 def _logErrors(self, item_spec, errors):
293 logger.error("Errors found in %s:" % item_spec) 293 logger.error("Errors found in %s:" % item_spec)
294 for e in errors: 294 for e in errors:
295 logger.error(" " + e) 295 logger.error(" " + e)
317 error_callback=self._handleWorkerError, 317 error_callback=self._handleWorkerError,
318 userdata=pool_userdata) 318 userdata=pool_userdata)
319 return pool 319 return pool
320 320
321 def _handleWorkerResult(self, job, res, userdata): 321 def _handleWorkerResult(self, job, res, userdata):
322 cur_pass = userdata.cur_pass 322 cur_step = userdata.cur_step
323 record = userdata.records.getRecord(job.record_name) 323 record = userdata.records.getRecord(job.record_name)
324 324
325 if cur_pass == 0: 325 if cur_step == 0:
326 record.addEntry(res.record_entry) 326 record.addEntry(res.record_entry)
327 else: 327 else:
328 ppinfo = userdata.ppmngr.getPipeline(job.source_name) 328 ppinfo = userdata.ppmngr.getPipeline(job.source_name)
329 ppmrctx = PipelineMergeRecordContext( 329 ppmrctx = PipelineMergeRecordContext(record, job, cur_step)
330 record, job, cur_pass)
331 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) 330 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx)
332 331
333 npj = res.next_pass_job 332 npj = res.next_step_job
334 if npj is not None: 333 if npj is not None:
335 npj.data['pass'] = cur_pass + 1 334 npj.step_num = cur_step + 1
336 userdata.next_pass_jobs[job.source_name].append(npj) 335 userdata.next_step_jobs[job.source_name].append(npj)
337 336
338 if not res.record_entry.success: 337 if not res.record_entry.success:
339 record.success = False 338 record.success = False
340 userdata.records.success = False 339 userdata.records.success = False
341 self._logErrors(job.content_item.spec, res.record_entry.errors) 340 self._logErrors(job.content_item.spec, res.record_entry.errors)
342 341
343 def _handleWorkerError(self, job, exc_data, userdata): 342 def _handleWorkerError(self, job, exc_data, userdata):
344 cur_pass = userdata.cur_pass 343 cur_step = userdata.cur_step
345 record = userdata.records.getRecord(job.record_name) 344 record = userdata.records.getRecord(job.record_name)
346 345
347 if cur_pass == 0: 346 record_entry_spec = job.content_item.metadata.get(
347 'record_entry_spec', job.content_item.spec)
348
349 if cur_step == 0:
348 ppinfo = userdata.ppmngr.getPipeline(job.source_name) 350 ppinfo = userdata.ppmngr.getPipeline(job.source_name)
349 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry 351 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry
350 e = entry_class() 352 e = entry_class()
351 e.item_spec = job.content_item.spec 353 e.item_spec = record_entry_spec
352 e.errors.append(str(exc_data)) 354 e.errors.append(str(exc_data))
353 record.addEntry(e) 355 record.addEntry(e)
354 else: 356 else:
355 e = record.getEntry(job.record_entry_spec) 357 e = record.getEntry(record_entry_spec)
356 e.errors.append(str(exc_data)) 358 e.errors.append(str(exc_data))
357 359
358 record.success = False 360 record.success = False
359 userdata.records.success = False 361 userdata.records.success = False
360 362
366 class _PoolUserData: 368 class _PoolUserData:
367 def __init__(self, baker, ppmngr): 369 def __init__(self, baker, ppmngr):
368 self.baker = baker 370 self.baker = baker
369 self.ppmngr = ppmngr 371 self.ppmngr = ppmngr
370 self.records = ppmngr.record_histories.current 372 self.records = ppmngr.record_histories.current
371 self.cur_pass = 0 373 self.cur_step = 0
372 self.next_pass_jobs = {} 374 self.next_step_jobs = {}
373 375
374 376
375 def _get_pipeline_infos_by_pass_and_realm(pp_infos): 377 def _get_pipeline_infos_by_pass_and_realm(pp_infos):
376 pp_by_pass_and_realm = {} 378 pp_by_pass_and_realm = {}
377 for pp_info in pp_infos: 379 for pp_info in pp_infos: