Mercurial > piecrust2
diff piecrust/pipelines/page.py @ 1136:5f97b5b59dfe
bake: Optimize cache handling for the baking process.
- Get rid of the 2-level pipeline runs... handle a single set of passes.
- Go back to load/render segments/layout passes for pages.
- Add descriptions of what each job batch does.
- Improve the taxonomy pipeline so it doesn't re-bake terms that don't need
to be re-baked.
- Simplify some of the code.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 23 Apr 2018 21:47:49 -0700 |
parents | 3bcb2d446397 |
children |
line wrap: on
line diff
--- a/piecrust/pipelines/page.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/pipelines/page.py Mon Apr 23 21:47:49 2018 -0700 @@ -1,12 +1,11 @@ import copy -import time import logging from piecrust.pipelines.base import ( ContentPipeline, create_job, content_item_from_job) from piecrust.pipelines._pagebaker import PageBaker, get_output_path from piecrust.pipelines._pagerecords import ( - PagePipelineRecordEntry, - add_page_job_result, merge_job_result_into_record_entry) + PagePipelineRecordEntry, SubPageFlags) +from piecrust.rendering import RenderingContext, render_page_segments from piecrust.sources.base import AbortedSourceUseError @@ -38,11 +37,11 @@ pass_num = ctx.pass_num if pass_num == 0: ctx.current_record.user_data['dirty_source_names'] = set() - return self._createLoadJobs(ctx) + return self._createLoadJobs(ctx), "load" if pass_num == 1: - return self._createSecondPassJobs(ctx) + return self._createSegmentJobs(ctx), "render" if pass_num == 2: - return self._createThirdPassJobs(ctx) + return self._createLayoutJobs(ctx), "layout" raise Exception("Unexpected pipeline pass: %d" % pass_num) def _createLoadJobs(self, ctx): @@ -55,20 +54,21 @@ return jobs return None - def _createSecondPassJobs(self, ctx): + def _createSegmentJobs(self, ctx): jobs = [] app = self.app + pass_num = ctx.pass_num out_dir = self.ctx.out_dir uri_getter = self.source.route.getUri pretty_urls = app.config.get('site/pretty_urls') - used_paths = _get_used_paths_from_records( - ctx.record_histories.current.records) history = ctx.record_histories.getHistory(ctx.record_name).copy() history.build() - pass_num = ctx.pass_num + cur_rec_used_paths = {} + history.current.user_data['used_paths'] = cur_rec_used_paths + all_records = ctx.record_histories.current.records for prev, cur in history.diffs: # Ignore pages that disappeared since last bake. @@ -76,18 +76,24 @@ continue # Skip draft pages. - if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT: + if cur.hasFlag(PagePipelineRecordEntry.FLAG_IS_DRAFT): + continue + + # Skip pages that haven't changed since last bake. + if (prev and not cur.hasFlag( + PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED)): continue - # For pages that are known to use other sources, we make a dummy - # job that will effectively get directly passed on to the next - # step. + # For pages that are known to use other sources in their own + # content segments (we don't care about the layout yet), we + # postpone them to the next pipeline pass immediately, because they + # might need populated render caches for those sources' pages. if prev: - usn1, usn2 = prev.getAllUsedSourceNames() - if usn1 or usn2: - jobs.append(create_job(self, cur.item_spec, - pass_num=pass_num, - uses_sources=True)) + usn1, _ = prev.getAllUsedSourceNames() + if usn1: + logger.debug("Postponing: %s" % cur.item_spec) + cur.flags |= \ + PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE continue # Check if this item has been overriden by a previous pipeline @@ -95,27 +101,30 @@ # source, and some of our pages have been overriden by a user # page that writes out to the same URL. uri = uri_getter(cur.route_params) - path = get_output_path(app, out_dir, uri, pretty_urls) - override = used_paths.get(path) + out_path = get_output_path(app, out_dir, uri, pretty_urls) + override = _find_used_path_spec(all_records, out_path) if override is not None: - override_source_name, override_entry = override + override_source_name, override_entry_spec = override override_source = app.getSource(override_source_name) if override_source.config['realm'] == \ self.source.config['realm']: logger.error( "Page '%s' would get baked to '%s' " "but is overriden by '%s'." % - (cur.item_spec, path, override_entry.item_spec)) + (cur.item_spec, out_path, override_entry_spec)) else: logger.debug( "Page '%s' would get baked to '%s' " "but is overriden by '%s'." % - (cur.item_spec, path, override_entry.item_spec)) + (cur.item_spec, out_path, override_entry_spec)) cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN continue # Nope, all good, let's create a job for this item. + cur.flags |= PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED + cur_rec_used_paths[out_path] = cur.item_spec + jobs.append(create_job(self, cur.item_spec, pass_num=pass_num)) @@ -123,7 +132,7 @@ return jobs return None - def _createThirdPassJobs(self, ctx): + def _createLayoutJobs(self, ctx): # Get the list of all sources that had anything baked. dirty_source_names = set() all_records = ctx.record_histories.current.records @@ -132,41 +141,59 @@ if rec_dsn: dirty_source_names |= rec_dsn - # Now look at the stuff we bake for our own source on the first pass. - # For anything that wasn't baked (i.e. it was considered 'up to date') - # we look at the records from last time, and if they say that some - # page was using a source that is "dirty", then we force bake it. - # - # The common example for this is a blog index page which hasn't been - # touched, but needs to be re-baked because someone added or edited - # a post. jobs = [] pass_num = ctx.pass_num history = ctx.record_histories.getHistory(ctx.record_name).copy() history.build() for prev, cur in history.diffs: - if not cur: + if not cur or cur.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN): continue - if cur.was_any_sub_baked: - continue + + do_bake = False + force_segments = False + force_layout = False + + # Make sure we bake the layout for pages that got their segments + # re-rendered. + if cur.hasFlag(PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED): + do_bake = True + + # Now look at the stuff we baked for our own source on the second + # pass. For anything that wasn't baked (i.e. it was considered 'up + # to date') we look at the records from last time, and if they say + # that some page was using a source that is "dirty", then we force + # bake it. + # + # The common example for this is a blog index page which hasn't + # been touched, but needs to be re-baked because someone added or + # edited a post. if prev: usn1, usn2 = prev.getAllUsedSourceNames() force_segments = any(map(lambda u: u in dirty_source_names, usn1)) force_layout = any(map(lambda u: u in dirty_source_names, usn2)) + if force_segments or force_layout: - jobs.append(create_job(self, prev.item_spec, - pass_num=pass_num, - force_segments=force_segments, - force_layout=force_layout)) - else: + # Yep, we need to force-rebake some aspect of this page. + do_bake = True + + elif not do_bake: # This page uses other sources, but no source was dirty # this time around (it was a null build, maybe). We # don't have any work to do, but we need to carry over # any information we have, otherwise the post bake step # will think we need to delete last bake's outputs. cur.subs = copy.deepcopy(prev.subs) + for cur_sub in cur.subs: + cur_sub['flags'] = \ + SubPageFlags.FLAG_COLLAPSED_FROM_LAST_RUN + + if do_bake: + jobs.append(create_job(self, cur.item_spec, + pass_num=pass_num, + force_segments=force_segments, + force_layout=force_layout)) if len(jobs) > 0: return jobs @@ -174,7 +201,6 @@ def handleJobResult(self, result, ctx): pass_num = ctx.pass_num - step_num = ctx.step_num if pass_num == 0: # Just went through a "load page" job. Let's create a record @@ -188,35 +214,37 @@ # If this page was modified, flag its entire source as "dirty", # so any pages using that source can be re-baked. - if (new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED): + if new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED: ctx.record.user_data['dirty_source_names'].add( self.source.name) + + # If this page is new + + elif pass_num == 1: + # Just went through the "render segments" job. + existing = ctx.record_entry + existing.flags |= result.get('flags', + PagePipelineRecordEntry.FLAG_NONE) + else: # Update the entry with the new information. existing = ctx.record_entry - if not result.get('postponed', False): - merge_job_result_into_record_entry(existing, result) + existing.flags |= result.get('flags', + PagePipelineRecordEntry.FLAG_NONE) + existing.errors += result.get('errors', []) + existing.subs += result.get('subs', []) def run(self, job, ctx, result): pass_num = job.get('pass_num', 0) - step_num = job.get('step_num', 0) if pass_num == 0: - if step_num == 0: - return self._loadPage(job, ctx, result) + return self._loadPage(job, ctx, result) elif pass_num == 1: - if step_num == 0: - return self._renderOrPostpone(job, ctx, result) - elif step_num == 1: - return self._renderAlways(job, ctx, result) + return self._renderSegments(job, ctx, result) - elif pass_num == 2: - if step_num == 0: - return self._renderAlways(job, ctx, result) - - raise Exception("Unexpected pipeline pass/step: %d/%d" % - (pass_num, step_num)) + elif pass_num >= 2: + return self._renderLayout(job, ctx, result) def getDeletions(self, ctx): for prev, cur in ctx.record_history.diffs: @@ -250,62 +278,50 @@ if page.config.get(self._draft_setting): result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT - def _renderOrPostpone(self, job, ctx, result): - # See if we should immediately kick this job off to the next step. - if job.get('uses_sources', False): - result['postponed'] = True - result['next_step_job'] = create_job(self, job['job_spec'][1]) - return - + def _renderSegments(self, job, ctx, result): # Here our job is to render the page's segments so that they're # cached in memory and on disk... unless we detect that the page # is using some other sources, in which case we abort and we'll try # again on the second pass. content_item = content_item_from_job(self, job) - logger.debug("Conditional render for: %s" % content_item.spec) + logger.debug("Render segments for: %s" % content_item.spec) page = self.app.getPage(self.source, content_item) if page.config.get(self._draft_setting): raise Exception("Shouldn't have a draft page in a render job!") - prev_entry = ctx.previous_entry - env = self.app.env env.abort_source_use = True - add_page_job_result(result) try: - rdr_subs = self._pagebaker.bake(page, prev_entry) - result['subs'] = rdr_subs + rdr_ctx = RenderingContext(page) + render_page_segments(rdr_ctx) except AbortedSourceUseError: logger.debug("Page was aborted for using source: %s" % content_item.spec) - result['flags'] |= \ + result['flags'] = \ PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE env.stats.stepCounter("SourceUseAbortions") env.stats.addManifestEntry("SourceUseAbortions", content_item.spec) - result['next_step_job'] = create_job(self, content_item.spec) finally: env.abort_source_use = False - def _renderAlways(self, job, ctx, result): + def _renderLayout(self, job, ctx, result): content_item = content_item_from_job(self, job) - logger.debug("Full render for: %s" % content_item.spec) + logger.debug("Render layout for: %s" % content_item.spec) page = self.app.getPage(self.source, content_item) prev_entry = ctx.previous_entry rdr_subs = self._pagebaker.bake( page, prev_entry, force_segments=job.get('force_segments'), force_layout=job.get('force_layout')) - - add_page_job_result(result) result['subs'] = rdr_subs -def _get_used_paths_from_records(records): - used_paths = {} + +def _find_used_path_spec(records, path): for rec in records: - src_name = rec.name.split('@')[0] - for e in rec.getEntries(): - paths = e.getAllOutputPaths() - if paths is not None: - for p in paths: - used_paths[p] = (src_name, e) - return used_paths + up = rec.user_data.get('used_paths') + if up is not None: + entry_spec = up.get(path) + if entry_spec is not None: + src_name = rec.name.split('@')[0] + return (src_name, entry_spec) + return None