# HG changeset patch # User Ludovic Chabant # Date 1512016677 28800 # Node ID fa489c5e829ebd4618ef605748404397abea3c59 # Parent 071f30aa04bbe78d28a728c830242b89b13df278 bake: Load pages in parallel again. diff -r 071f30aa04bb -r fa489c5e829e piecrust/baking/baker.py --- a/piecrust/baking/baker.py Tue Nov 28 21:28:15 2017 -0800 +++ b/piecrust/baking/baker.py Wed Nov 29 20:37:57 2017 -0800 @@ -90,19 +90,17 @@ # Create the pipelines. ppmngr = self._createPipelineManager(record_histories) - # Create the worker processes. - pool_userdata = _PoolUserData(self, ppmngr) - pool = self._createWorkerPool(records_path, pool_userdata) - # Done with all the setup, let's start the actual work. logger.info(format_timed(start_time, "setup baker")) # Load all sources, pre-cache templates. load_start_time = time.perf_counter() - self._startPopulateTemplateCaches(pool) - self._loadSources(ppmngr) - self._endPopulateTemplateCache(pool) - logger.info(format_timed(load_start_time, "loaded site content")) + self._populateTemplateCaches() + logger.info(format_timed(load_start_time, "cache templates")) + + # Create the worker processes. + pool_userdata = _PoolUserData(self, ppmngr) + pool = self._createWorkerPool(records_path, pool_userdata) # Bake the realms. self._bakeRealms(pool, ppmngr, record_histories) @@ -208,37 +206,9 @@ "out. There's nothing to do.") return ppmngr - def _loadSources(self, ppmngr): - for ppinfo in ppmngr.getPipelineInfos(): - rec = ppinfo.record_history.current - rec_entries = ppinfo.pipeline.loadAllContents() - if rec_entries is not None: - for e in rec_entries: - rec.addEntry(e) - - def _startPopulateTemplateCaches(self, pool): - # If we can, cache templates in a worker process, so we can load - # the sources' pages in the main process in the meantime. - # But if we don't have any workers, well, we'll have to make do - # in the `_endPopulateTemplateCache` method. - if pool.pool_size == 0: - return - - pool._callback = None - pool._error_callback = None - job = {'job_spec': ('__special__', 'populate_template_cache')} - pool.queueJobs([job]) - - def _endPopulateTemplateCache(self, pool): - if pool.pool_size == 0: - # No workers... load the templates synchronously. - for eng in self.app.plugin_loader.getTemplateEngines(): - eng.populateCache() - else: - # Wait for the job to finish. - pool.wait() - pool._callback = self._handleWorkerResult - pool._error_callback = self._handleWorkerError + def _populateTemplateCaches(self): + for eng in self.app.plugin_loader.getTemplateEngines(): + eng.populateCache() def _bakeRealms(self, pool, ppmngr, record_histories): # Bake the realms -- user first, theme second, so that a user item @@ -261,6 +231,7 @@ pp_pass_num, realm, pplist): # Start with the first step, where we iterate on the content sources' # items and run jobs on those. + pool.userdata.cur_pass = pp_pass_num pool.userdata.cur_step = 0 next_step_jobs = {} pool.userdata.next_step_jobs = next_step_jobs @@ -381,6 +352,7 @@ return pool def _handleWorkerResult(self, job, res, userdata): + cur_pass = userdata.cur_pass cur_step = userdata.cur_step source_name, item_spec = job['job_spec'] @@ -394,7 +366,8 @@ ppinfo = userdata.ppmngr.getPipelineInfo(source_name) pipeline = ppinfo.pipeline record = ppinfo.current_record - ppmrctx = PipelineJobResultHandleContext(record, job, cur_step) + ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass, + cur_step) pipeline.handleJobResult(res, ppmrctx) # Set the overall success flags if there was an error. @@ -430,6 +403,7 @@ self.baker = baker self.ppmngr = ppmngr self.records = ppmngr.record_histories.current + self.cur_pass = 0 self.cur_step = 0 self.next_step_jobs = {} diff -r 071f30aa04bb -r fa489c5e829e piecrust/baking/worker.py --- a/piecrust/baking/worker.py Tue Nov 28 21:28:15 2017 -0800 +++ b/piecrust/baking/worker.py Wed Nov 29 20:37:57 2017 -0800 @@ -80,15 +80,6 @@ source_name, item_spec = job['job_spec'] logger.debug("Received job: %s@%s" % (source_name, item_spec)) - # Check for special jobs. - if source_name == '__special__': - if item_spec == 'populate_template_cache': - for eng in self.app.plugin_loader.getTemplateEngines(): - eng.populateCache() - else: - raise Exception("Unknown special job: %s" % item_spec) - return {} - # Run the job! job_start = time.perf_counter() pp = self.ppmngr.getPipeline(source_name) diff -r 071f30aa04bb -r fa489c5e829e piecrust/pipelines/base.py --- a/piecrust/pipelines/base.py Tue Nov 28 21:28:15 2017 -0800 +++ b/piecrust/pipelines/base.py Wed Nov 29 20:37:57 2017 -0800 @@ -103,9 +103,10 @@ This is run on the master process, so it can access the current record. """ - def __init__(self, record, job, step_num): + def __init__(self, record, job, pass_num, step_num): self.record = record self.job = job + self.pass_num = pass_num self.step_num = step_num @cached_property @@ -156,9 +157,6 @@ def initialize(self): pass - def loadAllContents(self): - return None - def createJobs(self, ctx): return [ create_job(self, item.spec) diff -r 071f30aa04bb -r fa489c5e829e piecrust/pipelines/page.py --- a/piecrust/pipelines/page.py Tue Nov 28 21:28:15 2017 -0800 +++ b/piecrust/pipelines/page.py Wed Nov 29 20:37:57 2017 -0800 @@ -16,7 +16,7 @@ class PagePipeline(ContentPipeline): PIPELINE_NAME = 'page' RECORD_ENTRY_CLASS = PagePipelineRecordEntry - PASS_NUM = [0, 1] + PASS_NUM = [0, 1, 2] def __init__(self, source, ppctx): super().__init__(source, ppctx) @@ -34,34 +34,24 @@ force=self.ctx.force) self._pagebaker.startWriterQueue() - def loadAllContents(self): + def createJobs(self, ctx): + if ctx.pass_num == 0: + return self._createLoadJobs(ctx) + if ctx.pass_num == 1: + return self._createSecondPassJobs(ctx) + return self._createThirdPassJobs(ctx) + + def _createLoadJobs(self, ctx): # Here we load all the pages in the source, making sure they all # have a valid cache for their configuration and contents. - # We also create the record entries while we're at it. - source = self.source - page_fac = self.app.getPage - record_fac = self.createRecordEntry - for item in source.getAllContents(): - page = page_fac(source, item) - - cur_entry = record_fac(item.spec) - cur_entry.config = page.config.getAll() - cur_entry.route_params = item.metadata['route_params'] - cur_entry.timestamp = page.datetime.timestamp() + jobs = [] + for item in self.source.getAllContents(): + jobs.append(create_job(self, item.spec)) + if len(jobs) > 0: + return jobs + return None - if page.was_modified: - cur_entry.flags |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED - if page.config.get(self._draft_setting): - cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT - - yield cur_entry - - def createJobs(self, ctx): - if ctx.pass_num == 0: - return self._createFirstPassJobs(ctx) - return self._createSecondPassJobs(ctx) - - def _createFirstPassJobs(self, ctx): + def _createSecondPassJobs(self, ctx): jobs = [] app = self.app @@ -125,7 +115,7 @@ return jobs return None - def _createSecondPassJobs(self, ctx): + def _createThirdPassJobs(self, ctx): # Get the list of all sources that had anything baked. dirty_source_names = set() all_records = ctx.record_histories.current.records @@ -171,21 +161,38 @@ return None def handleJobResult(self, result, ctx): - existing = ctx.record_entry - merge_job_result_into_record_entry(existing, result) - if existing.was_any_sub_baked: - ctx.record.user_data['dirty_source_names'].add(self.source.name) + step_num = ctx.step_num + + if step_num == 0: + print(result) + new_entry = self.createRecordEntry(result['item_spec']) + new_entry.config = result['config'] + new_entry.route_params = result['route_params'] + new_entry.timestamp = result['timestamp'] + ctx.record.addEntry(new_entry) + else: + existing = ctx.record_entry + merge_job_result_into_record_entry(existing, result) + + if existing.was_any_sub_baked: + ctx.record.user_data['dirty_source_names'].add(self.source.name) def run(self, job, ctx, result): pass_num = job.get('pass_num', 0) step_num = job.get('step_num', 0) if pass_num == 0: if step_num == 0: - self._renderOrPostpone(job, ctx, result) + self._loadPage(job, ctx, result) elif step_num == 1: + self._renderOrPostpone(job, ctx, result) + elif step_num == 2: self._renderAlways(job, ctx, result) + else: + raise Exception("Unexpected pipeline step: %d" % step_num) elif pass_num == 1: self._renderAlways(job, ctx, result) + else: + raise Exception("Unexpected pipeline pass: %d" % pass_num) def getDeletions(self, ctx): for prev, cur in ctx.record_history.diffs: @@ -205,6 +212,25 @@ def shutdown(self): self._pagebaker.stopWriterQueue() + def _loadPage(self, job, ctx, result): + content_item = content_item_from_job(self, job) + page = self.app.getPage(self.source, content_item) + + trigger_next_job = True + result['flags'] = PagePipelineRecordEntry.FLAG_NONE + result['config'] = page.config.getAll() + result['route_params'] = item.metadata['route_params'] + result['timestamp'] = page.datetime.timestamp() + + if page.was_modified: + result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED + if page.config.get(self._draft_setting): + result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT + trigger_next_job = False + + if trigger_next_job: + result['next_step_job'] = create_job(self, content_item.spec) + def _renderOrPostpone(self, job, ctx, result): # Here our job is to render the page's segments so that they're # cached in memory and on disk... unless we detect that the page