Mercurial > piecrust2
diff piecrust/baking/baker.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | 45ad976712ec |
children | 09dc0240f08a |
line wrap: on
line diff
--- a/piecrust/baking/baker.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/baking/baker.py Sun Nov 19 14:29:17 2017 -0800 @@ -6,7 +6,8 @@ format_timed_scope, format_timed) from piecrust.environment import ExecutionStats from piecrust.pipelines.base import ( - PipelineJobCreateContext, PipelineMergeRecordContext, PipelineManager, + PipelineJobCreateContext, PipelineJobResultHandleContext, + PipelineJobValidateContext, PipelineManager, get_pipeline_name_for_source) from piecrust.pipelines.records import ( MultiRecordHistory, MultiRecord, RecordEntry, @@ -42,6 +43,8 @@ def bake(self): start_time = time.perf_counter() + + # Setup baker. logger.debug(" Bake Output: %s" % self.out_dir) logger.debug(" Root URL: %s" % self.app.config.get('site/root')) @@ -50,7 +53,9 @@ self.app.config.set('site/asset_url_format', '%page_uri%/%filename%') stats = self.app.env.stats - stats.registerTimer('WorkerTaskPut') + stats.registerTimer('LoadSourceContents', raise_if_registered=False) + stats.registerTimer('MasterTaskPut_1', raise_if_registered=False) + stats.registerTimer('MasterTaskPut_2+', raise_if_registered=False) # Make sure the output directory exists. if not os.path.isdir(self.out_dir): @@ -90,6 +95,12 @@ pool_userdata = _PoolUserData(self, ppmngr) pool = self._createWorkerPool(records_path, pool_userdata) + # Done with all the setup, let's start the actual work. + logger.info(format_timed(start_time, "setup baker")) + + # Load all sources. + self._loadSources(ppmngr) + # Bake the realms. self._bakeRealms(pool, ppmngr, record_histories) @@ -149,8 +160,9 @@ self.force = True current_records.incremental_count = 0 previous_records = MultiRecord() - logger.info(format_timed( - start_time, "cleaned cache (reason: %s)" % reason)) + logger.debug(format_timed( + start_time, "cleaned cache (reason: %s)" % reason, + colored=False)) return False else: current_records.incremental_count += 1 @@ -167,7 +179,8 @@ # Also, create and initialize each pipeline for each source. has_any_pp = False ppmngr = PipelineManager( - self.app, self.out_dir, record_histories) + self.app, self.out_dir, + record_histories=record_histories) ok_pp = self.allowed_pipelines nok_pp = self.forbidden_pipelines ok_src = self.allowed_sources @@ -192,13 +205,28 @@ "out. There's nothing to do.") return ppmngr + def _loadSources(self, ppmngr): + start_time = time.perf_counter() + + for ppinfo in ppmngr.getPipelineInfos(): + rec = ppinfo.record_history.current + rec_entries = ppinfo.pipeline.loadAllContents() + if rec_entries is not None: + for e in rec_entries: + rec.addEntry(e) + + stats = self.app.env.stats + stats.stepTimer('LoadSourceContents', + time.perf_counter() - start_time) + logger.info(format_timed(start_time, "loaded site content")) + def _bakeRealms(self, pool, ppmngr, record_histories): # Bake the realms -- user first, theme second, so that a user item # can override a theme item. # Do this for as many times as we have pipeline passes left to do. realm_list = [REALM_USER, REALM_THEME] pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm( - ppmngr.getPipelines()) + ppmngr.getPipelineInfos()) for pp_pass_num in sorted(pp_by_pass_and_realm.keys()): logger.debug("Pipelines pass %d" % pp_pass_num) @@ -206,10 +234,11 @@ for realm in realm_list: pplist = pp_by_realm.get(realm) if pplist is not None: - self._bakeRealm( - pool, record_histories, pp_pass_num, realm, pplist) + self._bakeRealm(pool, ppmngr, record_histories, + pp_pass_num, realm, pplist) - def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): + def _bakeRealm(self, pool, ppmngr, record_histories, + pp_pass_num, realm, pplist): # Start with the first step, where we iterate on the content sources' # items and run jobs on those. pool.userdata.cur_step = 0 @@ -218,15 +247,16 @@ start_time = time.perf_counter() job_count = 0 + stats = self.app.env.stats realm_name = REALM_NAMES[realm].lower() - stats = self.app.env.stats for ppinfo in pplist: src = ppinfo.source pp = ppinfo.pipeline + jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, + record_histories) next_step_jobs[src.name] = [] - jcctx = PipelineJobCreateContext(pp_pass_num, record_histories) jobs = pp.createJobs(jcctx) if jobs is not None: new_job_count = len(jobs) @@ -240,7 +270,7 @@ "(%s, step 0)." % (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) - stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) + stats.stepTimer('MasterTaskPut_1', time.perf_counter() - start_time) if job_count == 0: logger.debug("No jobs queued! Bailing out of this bake pass.") @@ -270,11 +300,18 @@ logger.debug( "Queuing jobs for source '%s' (%s, step %d)." % (sn, realm_name, pool.userdata.cur_step)) + + pp = ppmngr.getPipeline(sn) + valctx = PipelineJobValidateContext( + pp_pass_num, pool.userdata.cur_step, + pp.record_name, record_histories) + pp.validateNextStepJobs(jobs, valctx) + job_count += len(jobs) pool.userdata.next_step_jobs[sn] = [] pool.queueJobs(jobs) - stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) + stats.stepTimer('MasterTaskPut_2+', time.perf_counter() - start_time) if job_count == 0: break @@ -293,6 +330,12 @@ for e in errors: logger.error(" " + e) + def _logWorkerException(self, item_spec, exc_data): + logger.error("Errors found in %s:" % item_spec) + logger.error(exc_data['value']) + if self.app.debug: + logger.error(exc_data['traceback']) + def _createWorkerPool(self, previous_records_path, pool_userdata): from piecrust.workerpool import WorkerPool from piecrust.baking.worker import BakeWorkerContext, BakeWorker @@ -319,47 +362,45 @@ def _handleWorkerResult(self, job, res, userdata): cur_step = userdata.cur_step - record = userdata.records.getRecord(job.record_name) + source_name, item_spec = job['job_spec'] + + # See if there's a next step to take. + npj = res.get('next_step_job') + if npj is not None: + npj['step_num'] = cur_step + 1 + userdata.next_step_jobs[source_name].append(npj) - if cur_step == 0: - record.addEntry(res.record_entry) - else: - ppinfo = userdata.ppmngr.getPipeline(job.source_name) - ppmrctx = PipelineMergeRecordContext(record, job, cur_step) - ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) + # Make the pipeline do custom handling to update the record entry. + ppinfo = userdata.ppmngr.getPipelineInfo(source_name) + pipeline = ppinfo.pipeline + record = ppinfo.current_record + ppmrctx = PipelineJobResultHandleContext(record, job, cur_step) + pipeline.handleJobResult(res, ppmrctx) - npj = res.next_step_job - if npj is not None: - npj.step_num = cur_step + 1 - userdata.next_step_jobs[job.source_name].append(npj) - - if not res.record_entry.success: + # Set the overall success flags if there was an error. + record_entry = ppmrctx.record_entry + if not record_entry.success: record.success = False userdata.records.success = False - self._logErrors(job.content_item.spec, res.record_entry.errors) + self._logErrors(job['item_spec'], record_entry.errors) def _handleWorkerError(self, job, exc_data, userdata): - cur_step = userdata.cur_step - record = userdata.records.getRecord(job.record_name) - - record_entry_spec = job.content_item.metadata.get( - 'record_entry_spec', job.content_item.spec) - - if cur_step == 0: - ppinfo = userdata.ppmngr.getPipeline(job.source_name) - entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry - e = entry_class() - e.item_spec = record_entry_spec - e.errors.append(str(exc_data)) - record.addEntry(e) - else: - e = record.getEntry(record_entry_spec) - e.errors.append(str(exc_data)) - + # Set the overall success flag. + source_name, item_spec = job['job_spec'] + ppinfo = userdata.ppmngr.getPipelineInfo(source_name) + pipeline = ppinfo.pipeline + record = ppinfo.current_record record.success = False userdata.records.success = False - self._logErrors(job.content_item.spec, e.errors) + # Add those errors to the record, if possible. + record_entry_spec = job.get('record_entry_spec', item_spec) + e = record.getEntry(record_entry_spec) + if e: + e.errors.append(exc_data['value']) + self._logWorkerException(item_spec, exc_data) + + # Log debug stuff. if self.app.debug: logger.error(exc_data.traceback)