comparison piecrust/baking/baker.py @ 1136:5f97b5b59dfe

bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 23 Apr 2018 21:47:49 -0700
parents 971b4d67e82a
children
comparison
equal deleted inserted replaced
1135:6350ee084273 1136:5f97b5b59dfe
4 import logging 4 import logging
5 from piecrust.chefutil import ( 5 from piecrust.chefutil import (
6 format_timed_scope, format_timed) 6 format_timed_scope, format_timed)
7 from piecrust.environment import ExecutionStats 7 from piecrust.environment import ExecutionStats
8 from piecrust.pipelines.base import ( 8 from piecrust.pipelines.base import (
9 PipelineJobCreateContext, PipelineJobResultHandleContext, 9 PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager,
10 PipelineJobValidateContext, PipelineManager,
11 get_pipeline_name_for_source) 10 get_pipeline_name_for_source)
12 from piecrust.pipelines.records import ( 11 from piecrust.pipelines.records import (
13 MultiRecordHistory, MultiRecord, RecordEntry, 12 MultiRecordHistory, MultiRecord,
14 load_records) 13 load_records)
15 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES 14 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES
16 15
17 16
18 logger = logging.getLogger(__name__) 17 logger = logging.getLogger(__name__)
232 self._bakeRealm(pool, ppmngr, record_histories, 231 self._bakeRealm(pool, ppmngr, record_histories,
233 pp_pass_num, realm, pplist) 232 pp_pass_num, realm, pplist)
234 233
235 def _bakeRealm(self, pool, ppmngr, record_histories, 234 def _bakeRealm(self, pool, ppmngr, record_histories,
236 pp_pass_num, realm, pplist): 235 pp_pass_num, realm, pplist):
237 # Start with the first step, where we iterate on the content sources' 236 start_time = time.perf_counter()
238 # items and run jobs on those. 237
238 job_count = 0
239 job_descs = {}
240 realm_name = REALM_NAMES[realm].lower()
239 pool.userdata.cur_pass = pp_pass_num 241 pool.userdata.cur_pass = pp_pass_num
240 pool.userdata.cur_step = 0
241 next_step_jobs = {}
242 pool.userdata.next_step_jobs = next_step_jobs
243
244 start_time = time.perf_counter()
245 job_count = 0
246 stats = self.app.env.stats
247 realm_name = REALM_NAMES[realm].lower()
248 participating_source_names = []
249 242
250 for ppinfo in pplist: 243 for ppinfo in pplist:
251 src = ppinfo.source 244 src = ppinfo.source
252 pp = ppinfo.pipeline 245 pp = ppinfo.pipeline
253 jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, 246 jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name,
254 record_histories) 247 record_histories)
255 248
256 next_step_jobs[src.name] = [] 249 jobs, job_desc = pp.createJobs(jcctx)
257 jobs = pp.createJobs(jcctx)
258 if jobs is not None: 250 if jobs is not None:
259 new_job_count = len(jobs) 251 new_job_count = len(jobs)
260 job_count += new_job_count 252 job_count += new_job_count
261 pool.queueJobs(jobs) 253 pool.queueJobs(jobs)
262 participating_source_names.append(src.name) 254 if job_desc:
255 job_descs.setdefault(job_desc, []).append(src.name)
263 else: 256 else:
264 new_job_count = 0 257 new_job_count = 0
265 258
266 logger.debug( 259 logger.debug(
267 "Queued %d jobs for source '%s' using pipeline '%s' " 260 "Queued %d jobs for source '%s' using pipeline '%s' "
268 "(%s, step 0)." % 261 "(%s)." %
269 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) 262 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name))
270 263
271 if job_count == 0: 264 if job_count == 0:
272 logger.debug("No jobs queued! Bailing out of this bake pass.") 265 logger.debug("No jobs queued! Bailing out of this bake pass.")
273 return 266 return
274 267
275 pool.wait() 268 pool.wait()
276 269
277 logger.info(format_timed( 270 logger.info(format_timed(
278 start_time, "%d jobs completed (%s)." % 271 start_time, "%d jobs completed (%s)." %
279 (job_count, ', '.join(participating_source_names)))) 272 (job_count, ', '.join(
280 273 ['%s %s' % (d, ', '.join(sn))
281 # Now let's see if any job created a follow-up job. Let's keep 274 for d, sn in job_descs.items()]))))
282 # processing those jobs as long as they create new ones.
283 pool.userdata.cur_step = 1
284 while True:
285 # Make a copy of out next step jobs and reset the list, so
286 # the first jobs to be processed don't mess it up as we're
287 # still iterating on it.
288 next_step_jobs = pool.userdata.next_step_jobs
289 pool.userdata.next_step_jobs = {}
290
291 start_time = time.perf_counter()
292 job_count = 0
293 participating_source_names = []
294
295 for sn, jobs in next_step_jobs.items():
296 if jobs:
297 logger.debug(
298 "Queuing jobs for source '%s' (%s, step %d)." %
299 (sn, realm_name, pool.userdata.cur_step))
300
301 pp = ppmngr.getPipeline(sn)
302 valctx = PipelineJobValidateContext(
303 pp_pass_num, pool.userdata.cur_step,
304 pp.record_name, record_histories)
305 pp.validateNextStepJobs(jobs, valctx)
306
307 job_count += len(jobs)
308 pool.userdata.next_step_jobs[sn] = []
309 pool.queueJobs(jobs)
310 participating_source_names.append(sn)
311
312 if job_count == 0:
313 break
314
315 pool.wait()
316
317 logger.info(format_timed(
318 start_time,
319 "%d jobs completed (%s)." %
320 (job_count, ', '.join(participating_source_names))))
321
322 pool.userdata.cur_step += 1
323 275
324 def _logErrors(self, item_spec, errors): 276 def _logErrors(self, item_spec, errors):
325 logger.error("Errors found in %s:" % item_spec) 277 logger.error("Errors found in %s:" % item_spec)
326 for e in errors: 278 for e in errors:
327 logger.error(" " + e) 279 logger.error(" " + e)
356 userdata=pool_userdata) 308 userdata=pool_userdata)
357 return pool 309 return pool
358 310
359 def _handleWorkerResult(self, job, res, userdata): 311 def _handleWorkerResult(self, job, res, userdata):
360 cur_pass = userdata.cur_pass 312 cur_pass = userdata.cur_pass
361 cur_step = userdata.cur_step
362 source_name, item_spec = job['job_spec'] 313 source_name, item_spec = job['job_spec']
363
364 # See if there's a next step to take.
365 npj = res.get('next_step_job')
366 if npj is not None:
367 npj['pass_num'] = cur_pass
368 npj['step_num'] = cur_step + 1
369 userdata.next_step_jobs[source_name].append(npj)
370 314
371 # Make the pipeline do custom handling to update the record entry. 315 # Make the pipeline do custom handling to update the record entry.
372 ppinfo = userdata.ppmngr.getPipelineInfo(source_name) 316 ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
373 pipeline = ppinfo.pipeline 317 pipeline = ppinfo.pipeline
374 record = ppinfo.current_record 318 record = ppinfo.current_record
375 ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass, 319 ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass)
376 cur_step)
377 pipeline.handleJobResult(res, ppmrctx) 320 pipeline.handleJobResult(res, ppmrctx)
378 321
379 # Set the overall success flags if there was an error. 322 # Set the overall success flags if there was an error.
380 record_entry = ppmrctx.record_entry 323 record_entry = ppmrctx.record_entry
381 if not record_entry.success: 324 if not record_entry.success:
410 def __init__(self, baker, ppmngr): 353 def __init__(self, baker, ppmngr):
411 self.baker = baker 354 self.baker = baker
412 self.ppmngr = ppmngr 355 self.ppmngr = ppmngr
413 self.records = ppmngr.record_histories.current 356 self.records = ppmngr.record_histories.current
414 self.cur_pass = 0 357 self.cur_pass = 0
415 self.cur_step = 0
416 self.next_step_jobs = {}
417 358
418 359
419 def _get_pipeline_infos_by_pass_and_realm(pp_infos): 360 def _get_pipeline_infos_by_pass_and_realm(pp_infos):
420 pp_by_pass_and_realm = {} 361 pp_by_pass_and_realm = {}
421 for pp_info in pp_infos: 362 for pp_info in pp_infos: