Mercurial > piecrust2
diff piecrust/pipelines/page.py @ 1015:fa489c5e829e
bake: Load pages in parallel again.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 29 Nov 2017 20:37:57 -0800 |
parents | 1857dbd4580f |
children | 3c6e6e7b9639 |
line wrap: on
line diff
--- a/piecrust/pipelines/page.py Tue Nov 28 21:28:15 2017 -0800 +++ b/piecrust/pipelines/page.py Wed Nov 29 20:37:57 2017 -0800 @@ -16,7 +16,7 @@ class PagePipeline(ContentPipeline): PIPELINE_NAME = 'page' RECORD_ENTRY_CLASS = PagePipelineRecordEntry - PASS_NUM = [0, 1] + PASS_NUM = [0, 1, 2] def __init__(self, source, ppctx): super().__init__(source, ppctx) @@ -34,34 +34,24 @@ force=self.ctx.force) self._pagebaker.startWriterQueue() - def loadAllContents(self): + def createJobs(self, ctx): + if ctx.pass_num == 0: + return self._createLoadJobs(ctx) + if ctx.pass_num == 1: + return self._createSecondPassJobs(ctx) + return self._createThirdPassJobs(ctx) + + def _createLoadJobs(self, ctx): # Here we load all the pages in the source, making sure they all # have a valid cache for their configuration and contents. - # We also create the record entries while we're at it. - source = self.source - page_fac = self.app.getPage - record_fac = self.createRecordEntry - for item in source.getAllContents(): - page = page_fac(source, item) - - cur_entry = record_fac(item.spec) - cur_entry.config = page.config.getAll() - cur_entry.route_params = item.metadata['route_params'] - cur_entry.timestamp = page.datetime.timestamp() + jobs = [] + for item in self.source.getAllContents(): + jobs.append(create_job(self, item.spec)) + if len(jobs) > 0: + return jobs + return None - if page.was_modified: - cur_entry.flags |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED - if page.config.get(self._draft_setting): - cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT - - yield cur_entry - - def createJobs(self, ctx): - if ctx.pass_num == 0: - return self._createFirstPassJobs(ctx) - return self._createSecondPassJobs(ctx) - - def _createFirstPassJobs(self, ctx): + def _createSecondPassJobs(self, ctx): jobs = [] app = self.app @@ -125,7 +115,7 @@ return jobs return None - def _createSecondPassJobs(self, ctx): + def _createThirdPassJobs(self, ctx): # Get the list of all sources that had anything baked. dirty_source_names = set() all_records = ctx.record_histories.current.records @@ -171,21 +161,38 @@ return None def handleJobResult(self, result, ctx): - existing = ctx.record_entry - merge_job_result_into_record_entry(existing, result) - if existing.was_any_sub_baked: - ctx.record.user_data['dirty_source_names'].add(self.source.name) + step_num = ctx.step_num + + if step_num == 0: + print(result) + new_entry = self.createRecordEntry(result['item_spec']) + new_entry.config = result['config'] + new_entry.route_params = result['route_params'] + new_entry.timestamp = result['timestamp'] + ctx.record.addEntry(new_entry) + else: + existing = ctx.record_entry + merge_job_result_into_record_entry(existing, result) + + if existing.was_any_sub_baked: + ctx.record.user_data['dirty_source_names'].add(self.source.name) def run(self, job, ctx, result): pass_num = job.get('pass_num', 0) step_num = job.get('step_num', 0) if pass_num == 0: if step_num == 0: - self._renderOrPostpone(job, ctx, result) + self._loadPage(job, ctx, result) elif step_num == 1: + self._renderOrPostpone(job, ctx, result) + elif step_num == 2: self._renderAlways(job, ctx, result) + else: + raise Exception("Unexpected pipeline step: %d" % step_num) elif pass_num == 1: self._renderAlways(job, ctx, result) + else: + raise Exception("Unexpected pipeline pass: %d" % pass_num) def getDeletions(self, ctx): for prev, cur in ctx.record_history.diffs: @@ -205,6 +212,25 @@ def shutdown(self): self._pagebaker.stopWriterQueue() + def _loadPage(self, job, ctx, result): + content_item = content_item_from_job(self, job) + page = self.app.getPage(self.source, content_item) + + trigger_next_job = True + result['flags'] = PagePipelineRecordEntry.FLAG_NONE + result['config'] = page.config.getAll() + result['route_params'] = item.metadata['route_params'] + result['timestamp'] = page.datetime.timestamp() + + if page.was_modified: + result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED + if page.config.get(self._draft_setting): + result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT + trigger_next_job = False + + if trigger_next_job: + result['next_step_job'] = create_job(self, content_item.spec) + def _renderOrPostpone(self, job, ctx, result): # Here our job is to render the page's segments so that they're # cached in memory and on disk... unless we detect that the page