Mercurial > piecrust2
diff piecrust/baking/baker.py @ 1136:5f97b5b59dfe
bake: Optimize cache handling for the baking process.
- Get rid of the 2-level pipeline runs... handle a single set of passes.
- Go back to load/render segments/layout passes for pages.
- Add descriptions of what each job batch does.
- Improve the taxonomy pipeline so it doesn't re-bake terms that don't need
to be re-baked.
- Simplify some of the code.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 23 Apr 2018 21:47:49 -0700 |
parents | 971b4d67e82a |
children |
line wrap: on
line diff
--- a/piecrust/baking/baker.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/baking/baker.py Mon Apr 23 21:47:49 2018 -0700 @@ -6,11 +6,10 @@ format_timed_scope, format_timed) from piecrust.environment import ExecutionStats from piecrust.pipelines.base import ( - PipelineJobCreateContext, PipelineJobResultHandleContext, - PipelineJobValidateContext, PipelineManager, + PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager, get_pipeline_name_for_source) from piecrust.pipelines.records import ( - MultiRecordHistory, MultiRecord, RecordEntry, + MultiRecordHistory, MultiRecord, load_records) from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES @@ -234,18 +233,12 @@ def _bakeRealm(self, pool, ppmngr, record_histories, pp_pass_num, realm, pplist): - # Start with the first step, where we iterate on the content sources' - # items and run jobs on those. + start_time = time.perf_counter() + + job_count = 0 + job_descs = {} + realm_name = REALM_NAMES[realm].lower() pool.userdata.cur_pass = pp_pass_num - pool.userdata.cur_step = 0 - next_step_jobs = {} - pool.userdata.next_step_jobs = next_step_jobs - - start_time = time.perf_counter() - job_count = 0 - stats = self.app.env.stats - realm_name = REALM_NAMES[realm].lower() - participating_source_names = [] for ppinfo in pplist: src = ppinfo.source @@ -253,19 +246,19 @@ jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, record_histories) - next_step_jobs[src.name] = [] - jobs = pp.createJobs(jcctx) + jobs, job_desc = pp.createJobs(jcctx) if jobs is not None: new_job_count = len(jobs) job_count += new_job_count pool.queueJobs(jobs) - participating_source_names.append(src.name) + if job_desc: + job_descs.setdefault(job_desc, []).append(src.name) else: new_job_count = 0 logger.debug( "Queued %d jobs for source '%s' using pipeline '%s' " - "(%s, step 0)." % + "(%s)." % (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) if job_count == 0: @@ -276,50 +269,9 @@ logger.info(format_timed( start_time, "%d jobs completed (%s)." % - (job_count, ', '.join(participating_source_names)))) - - # Now let's see if any job created a follow-up job. Let's keep - # processing those jobs as long as they create new ones. - pool.userdata.cur_step = 1 - while True: - # Make a copy of out next step jobs and reset the list, so - # the first jobs to be processed don't mess it up as we're - # still iterating on it. - next_step_jobs = pool.userdata.next_step_jobs - pool.userdata.next_step_jobs = {} - - start_time = time.perf_counter() - job_count = 0 - participating_source_names = [] - - for sn, jobs in next_step_jobs.items(): - if jobs: - logger.debug( - "Queuing jobs for source '%s' (%s, step %d)." % - (sn, realm_name, pool.userdata.cur_step)) - - pp = ppmngr.getPipeline(sn) - valctx = PipelineJobValidateContext( - pp_pass_num, pool.userdata.cur_step, - pp.record_name, record_histories) - pp.validateNextStepJobs(jobs, valctx) - - job_count += len(jobs) - pool.userdata.next_step_jobs[sn] = [] - pool.queueJobs(jobs) - participating_source_names.append(sn) - - if job_count == 0: - break - - pool.wait() - - logger.info(format_timed( - start_time, - "%d jobs completed (%s)." % - (job_count, ', '.join(participating_source_names)))) - - pool.userdata.cur_step += 1 + (job_count, ', '.join( + ['%s %s' % (d, ', '.join(sn)) + for d, sn in job_descs.items()])))) def _logErrors(self, item_spec, errors): logger.error("Errors found in %s:" % item_spec) @@ -358,22 +310,13 @@ def _handleWorkerResult(self, job, res, userdata): cur_pass = userdata.cur_pass - cur_step = userdata.cur_step source_name, item_spec = job['job_spec'] - # See if there's a next step to take. - npj = res.get('next_step_job') - if npj is not None: - npj['pass_num'] = cur_pass - npj['step_num'] = cur_step + 1 - userdata.next_step_jobs[source_name].append(npj) - # Make the pipeline do custom handling to update the record entry. ppinfo = userdata.ppmngr.getPipelineInfo(source_name) pipeline = ppinfo.pipeline record = ppinfo.current_record - ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass, - cur_step) + ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass) pipeline.handleJobResult(res, ppmrctx) # Set the overall success flags if there was an error. @@ -412,8 +355,6 @@ self.ppmngr = ppmngr self.records = ppmngr.record_histories.current self.cur_pass = 0 - self.cur_step = 0 - self.next_step_jobs = {} def _get_pipeline_infos_by_pass_and_realm(pp_infos):