Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 1136:5f97b5b59dfe
bake: Optimize cache handling for the baking process.
- Get rid of the 2-level pipeline runs... handle a single set of passes.
- Go back to load/render segments/layout passes for pages.
- Add descriptions of what each job batch does.
- Improve the taxonomy pipeline so it doesn't re-bake terms that don't need
to be re-baked.
- Simplify some of the code.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 23 Apr 2018 21:47:49 -0700 |
parents | 971b4d67e82a |
children |
comparison
equal
deleted
inserted
replaced
1135:6350ee084273 | 1136:5f97b5b59dfe |
---|---|
4 import logging | 4 import logging |
5 from piecrust.chefutil import ( | 5 from piecrust.chefutil import ( |
6 format_timed_scope, format_timed) | 6 format_timed_scope, format_timed) |
7 from piecrust.environment import ExecutionStats | 7 from piecrust.environment import ExecutionStats |
8 from piecrust.pipelines.base import ( | 8 from piecrust.pipelines.base import ( |
9 PipelineJobCreateContext, PipelineJobResultHandleContext, | 9 PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager, |
10 PipelineJobValidateContext, PipelineManager, | |
11 get_pipeline_name_for_source) | 10 get_pipeline_name_for_source) |
12 from piecrust.pipelines.records import ( | 11 from piecrust.pipelines.records import ( |
13 MultiRecordHistory, MultiRecord, RecordEntry, | 12 MultiRecordHistory, MultiRecord, |
14 load_records) | 13 load_records) |
15 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES | 14 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES |
16 | 15 |
17 | 16 |
18 logger = logging.getLogger(__name__) | 17 logger = logging.getLogger(__name__) |
232 self._bakeRealm(pool, ppmngr, record_histories, | 231 self._bakeRealm(pool, ppmngr, record_histories, |
233 pp_pass_num, realm, pplist) | 232 pp_pass_num, realm, pplist) |
234 | 233 |
235 def _bakeRealm(self, pool, ppmngr, record_histories, | 234 def _bakeRealm(self, pool, ppmngr, record_histories, |
236 pp_pass_num, realm, pplist): | 235 pp_pass_num, realm, pplist): |
237 # Start with the first step, where we iterate on the content sources' | 236 start_time = time.perf_counter() |
238 # items and run jobs on those. | 237 |
238 job_count = 0 | |
239 job_descs = {} | |
240 realm_name = REALM_NAMES[realm].lower() | |
239 pool.userdata.cur_pass = pp_pass_num | 241 pool.userdata.cur_pass = pp_pass_num |
240 pool.userdata.cur_step = 0 | |
241 next_step_jobs = {} | |
242 pool.userdata.next_step_jobs = next_step_jobs | |
243 | |
244 start_time = time.perf_counter() | |
245 job_count = 0 | |
246 stats = self.app.env.stats | |
247 realm_name = REALM_NAMES[realm].lower() | |
248 participating_source_names = [] | |
249 | 242 |
250 for ppinfo in pplist: | 243 for ppinfo in pplist: |
251 src = ppinfo.source | 244 src = ppinfo.source |
252 pp = ppinfo.pipeline | 245 pp = ppinfo.pipeline |
253 jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, | 246 jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, |
254 record_histories) | 247 record_histories) |
255 | 248 |
256 next_step_jobs[src.name] = [] | 249 jobs, job_desc = pp.createJobs(jcctx) |
257 jobs = pp.createJobs(jcctx) | |
258 if jobs is not None: | 250 if jobs is not None: |
259 new_job_count = len(jobs) | 251 new_job_count = len(jobs) |
260 job_count += new_job_count | 252 job_count += new_job_count |
261 pool.queueJobs(jobs) | 253 pool.queueJobs(jobs) |
262 participating_source_names.append(src.name) | 254 if job_desc: |
255 job_descs.setdefault(job_desc, []).append(src.name) | |
263 else: | 256 else: |
264 new_job_count = 0 | 257 new_job_count = 0 |
265 | 258 |
266 logger.debug( | 259 logger.debug( |
267 "Queued %d jobs for source '%s' using pipeline '%s' " | 260 "Queued %d jobs for source '%s' using pipeline '%s' " |
268 "(%s, step 0)." % | 261 "(%s)." % |
269 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) | 262 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) |
270 | 263 |
271 if job_count == 0: | 264 if job_count == 0: |
272 logger.debug("No jobs queued! Bailing out of this bake pass.") | 265 logger.debug("No jobs queued! Bailing out of this bake pass.") |
273 return | 266 return |
274 | 267 |
275 pool.wait() | 268 pool.wait() |
276 | 269 |
277 logger.info(format_timed( | 270 logger.info(format_timed( |
278 start_time, "%d jobs completed (%s)." % | 271 start_time, "%d jobs completed (%s)." % |
279 (job_count, ', '.join(participating_source_names)))) | 272 (job_count, ', '.join( |
280 | 273 ['%s %s' % (d, ', '.join(sn)) |
281 # Now let's see if any job created a follow-up job. Let's keep | 274 for d, sn in job_descs.items()])))) |
282 # processing those jobs as long as they create new ones. | |
283 pool.userdata.cur_step = 1 | |
284 while True: | |
285 # Make a copy of out next step jobs and reset the list, so | |
286 # the first jobs to be processed don't mess it up as we're | |
287 # still iterating on it. | |
288 next_step_jobs = pool.userdata.next_step_jobs | |
289 pool.userdata.next_step_jobs = {} | |
290 | |
291 start_time = time.perf_counter() | |
292 job_count = 0 | |
293 participating_source_names = [] | |
294 | |
295 for sn, jobs in next_step_jobs.items(): | |
296 if jobs: | |
297 logger.debug( | |
298 "Queuing jobs for source '%s' (%s, step %d)." % | |
299 (sn, realm_name, pool.userdata.cur_step)) | |
300 | |
301 pp = ppmngr.getPipeline(sn) | |
302 valctx = PipelineJobValidateContext( | |
303 pp_pass_num, pool.userdata.cur_step, | |
304 pp.record_name, record_histories) | |
305 pp.validateNextStepJobs(jobs, valctx) | |
306 | |
307 job_count += len(jobs) | |
308 pool.userdata.next_step_jobs[sn] = [] | |
309 pool.queueJobs(jobs) | |
310 participating_source_names.append(sn) | |
311 | |
312 if job_count == 0: | |
313 break | |
314 | |
315 pool.wait() | |
316 | |
317 logger.info(format_timed( | |
318 start_time, | |
319 "%d jobs completed (%s)." % | |
320 (job_count, ', '.join(participating_source_names)))) | |
321 | |
322 pool.userdata.cur_step += 1 | |
323 | 275 |
324 def _logErrors(self, item_spec, errors): | 276 def _logErrors(self, item_spec, errors): |
325 logger.error("Errors found in %s:" % item_spec) | 277 logger.error("Errors found in %s:" % item_spec) |
326 for e in errors: | 278 for e in errors: |
327 logger.error(" " + e) | 279 logger.error(" " + e) |
356 userdata=pool_userdata) | 308 userdata=pool_userdata) |
357 return pool | 309 return pool |
358 | 310 |
359 def _handleWorkerResult(self, job, res, userdata): | 311 def _handleWorkerResult(self, job, res, userdata): |
360 cur_pass = userdata.cur_pass | 312 cur_pass = userdata.cur_pass |
361 cur_step = userdata.cur_step | |
362 source_name, item_spec = job['job_spec'] | 313 source_name, item_spec = job['job_spec'] |
363 | |
364 # See if there's a next step to take. | |
365 npj = res.get('next_step_job') | |
366 if npj is not None: | |
367 npj['pass_num'] = cur_pass | |
368 npj['step_num'] = cur_step + 1 | |
369 userdata.next_step_jobs[source_name].append(npj) | |
370 | 314 |
371 # Make the pipeline do custom handling to update the record entry. | 315 # Make the pipeline do custom handling to update the record entry. |
372 ppinfo = userdata.ppmngr.getPipelineInfo(source_name) | 316 ppinfo = userdata.ppmngr.getPipelineInfo(source_name) |
373 pipeline = ppinfo.pipeline | 317 pipeline = ppinfo.pipeline |
374 record = ppinfo.current_record | 318 record = ppinfo.current_record |
375 ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass, | 319 ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass) |
376 cur_step) | |
377 pipeline.handleJobResult(res, ppmrctx) | 320 pipeline.handleJobResult(res, ppmrctx) |
378 | 321 |
379 # Set the overall success flags if there was an error. | 322 # Set the overall success flags if there was an error. |
380 record_entry = ppmrctx.record_entry | 323 record_entry = ppmrctx.record_entry |
381 if not record_entry.success: | 324 if not record_entry.success: |
410 def __init__(self, baker, ppmngr): | 353 def __init__(self, baker, ppmngr): |
411 self.baker = baker | 354 self.baker = baker |
412 self.ppmngr = ppmngr | 355 self.ppmngr = ppmngr |
413 self.records = ppmngr.record_histories.current | 356 self.records = ppmngr.record_histories.current |
414 self.cur_pass = 0 | 357 self.cur_pass = 0 |
415 self.cur_step = 0 | |
416 self.next_step_jobs = {} | |
417 | 358 |
418 | 359 |
419 def _get_pipeline_infos_by_pass_and_realm(pp_infos): | 360 def _get_pipeline_infos_by_pass_and_realm(pp_infos): |
420 pp_by_pass_and_realm = {} | 361 pp_by_pass_and_realm = {} |
421 for pp_info in pp_infos: | 362 for pp_info in pp_infos: |