# HG changeset patch # User Ludovic Chabant # Date 1524545269 25200 # Node ID 5f97b5b59dfe7a146f801db096f38c2a123d74cc # Parent 6350ee0842738315274d0e06d198d71425f5b6ff bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code. diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/baking/baker.py --- a/piecrust/baking/baker.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/baking/baker.py Mon Apr 23 21:47:49 2018 -0700 @@ -6,11 +6,10 @@ format_timed_scope, format_timed) from piecrust.environment import ExecutionStats from piecrust.pipelines.base import ( - PipelineJobCreateContext, PipelineJobResultHandleContext, - PipelineJobValidateContext, PipelineManager, + PipelineJobCreateContext, PipelineJobResultHandleContext, PipelineManager, get_pipeline_name_for_source) from piecrust.pipelines.records import ( - MultiRecordHistory, MultiRecord, RecordEntry, + MultiRecordHistory, MultiRecord, load_records) from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES @@ -234,18 +233,12 @@ def _bakeRealm(self, pool, ppmngr, record_histories, pp_pass_num, realm, pplist): - # Start with the first step, where we iterate on the content sources' - # items and run jobs on those. + start_time = time.perf_counter() + + job_count = 0 + job_descs = {} + realm_name = REALM_NAMES[realm].lower() pool.userdata.cur_pass = pp_pass_num - pool.userdata.cur_step = 0 - next_step_jobs = {} - pool.userdata.next_step_jobs = next_step_jobs - - start_time = time.perf_counter() - job_count = 0 - stats = self.app.env.stats - realm_name = REALM_NAMES[realm].lower() - participating_source_names = [] for ppinfo in pplist: src = ppinfo.source @@ -253,19 +246,19 @@ jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, record_histories) - next_step_jobs[src.name] = [] - jobs = pp.createJobs(jcctx) + jobs, job_desc = pp.createJobs(jcctx) if jobs is not None: new_job_count = len(jobs) job_count += new_job_count pool.queueJobs(jobs) - participating_source_names.append(src.name) + if job_desc: + job_descs.setdefault(job_desc, []).append(src.name) else: new_job_count = 0 logger.debug( "Queued %d jobs for source '%s' using pipeline '%s' " - "(%s, step 0)." % + "(%s)." % (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) if job_count == 0: @@ -276,50 +269,9 @@ logger.info(format_timed( start_time, "%d jobs completed (%s)." % - (job_count, ', '.join(participating_source_names)))) - - # Now let's see if any job created a follow-up job. Let's keep - # processing those jobs as long as they create new ones. - pool.userdata.cur_step = 1 - while True: - # Make a copy of out next step jobs and reset the list, so - # the first jobs to be processed don't mess it up as we're - # still iterating on it. - next_step_jobs = pool.userdata.next_step_jobs - pool.userdata.next_step_jobs = {} - - start_time = time.perf_counter() - job_count = 0 - participating_source_names = [] - - for sn, jobs in next_step_jobs.items(): - if jobs: - logger.debug( - "Queuing jobs for source '%s' (%s, step %d)." % - (sn, realm_name, pool.userdata.cur_step)) - - pp = ppmngr.getPipeline(sn) - valctx = PipelineJobValidateContext( - pp_pass_num, pool.userdata.cur_step, - pp.record_name, record_histories) - pp.validateNextStepJobs(jobs, valctx) - - job_count += len(jobs) - pool.userdata.next_step_jobs[sn] = [] - pool.queueJobs(jobs) - participating_source_names.append(sn) - - if job_count == 0: - break - - pool.wait() - - logger.info(format_timed( - start_time, - "%d jobs completed (%s)." % - (job_count, ', '.join(participating_source_names)))) - - pool.userdata.cur_step += 1 + (job_count, ', '.join( + ['%s %s' % (d, ', '.join(sn)) + for d, sn in job_descs.items()])))) def _logErrors(self, item_spec, errors): logger.error("Errors found in %s:" % item_spec) @@ -358,22 +310,13 @@ def _handleWorkerResult(self, job, res, userdata): cur_pass = userdata.cur_pass - cur_step = userdata.cur_step source_name, item_spec = job['job_spec'] - # See if there's a next step to take. - npj = res.get('next_step_job') - if npj is not None: - npj['pass_num'] = cur_pass - npj['step_num'] = cur_step + 1 - userdata.next_step_jobs[source_name].append(npj) - # Make the pipeline do custom handling to update the record entry. ppinfo = userdata.ppmngr.getPipelineInfo(source_name) pipeline = ppinfo.pipeline record = ppinfo.current_record - ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass, - cur_step) + ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass) pipeline.handleJobResult(res, ppmrctx) # Set the overall success flags if there was an error. @@ -412,8 +355,6 @@ self.ppmngr = ppmngr self.records = ppmngr.record_histories.current self.cur_pass = 0 - self.cur_step = 0 - self.next_step_jobs = {} def _get_pipeline_infos_by_pass_and_realm(pp_infos): diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/pipelines/_pagebaker.py --- a/piecrust/pipelines/_pagebaker.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/pipelines/_pagebaker.py Mon Apr 23 21:47:49 2018 -0700 @@ -250,7 +250,7 @@ if not prev_sub_entry: # No previous record, so most probably was never baked. Bake it. cur_sub_entry['flags'] |= \ - SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS + SubPageFlags.FLAG_FORCED_BY_NO_RECORD return STATUS_BAKE return STATUS_CLEAN diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/pipelines/_pagerecords.py --- a/piecrust/pipelines/_pagerecords.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/pipelines/_pagerecords.py Mon Apr 23 21:47:49 2018 -0700 @@ -1,4 +1,3 @@ -import copy from piecrust.pipelines.records import RecordEntry, get_flag_descriptions @@ -7,10 +6,11 @@ FLAG_BAKED = 2**0 FLAG_FORCED_BY_SOURCE = 2**1 FLAG_FORCED_BY_NO_PREVIOUS = 2**2 - FLAG_FORCED_BY_PREVIOUS_ERRORS = 2**3 - FLAG_FORCED_BY_GENERAL_FORCE = 2**4 - FLAG_RENDER_CACHE_INVALIDATED = 2**5 - FLAG_COLLAPSED_FROM_LAST_RUN = 2**6 + FLAG_FORCED_BY_NO_RECORD = 2**3 + FLAG_FORCED_BY_PREVIOUS_ERRORS = 2**4 + FLAG_FORCED_BY_GENERAL_FORCE = 2**5 + FLAG_RENDER_CACHE_INVALIDATED = 2**6 + FLAG_COLLAPSED_FROM_LAST_RUN = 2**7 def create_subpage_job_result(out_uri, out_path): @@ -23,23 +23,10 @@ } -def was_subpage_clean(sub): - return ((sub['flags'] & SubPageFlags.FLAG_BAKED) == 0 and - len(sub['errors']) == 0) - - -def was_subpage_baked(sub): - return (sub['flags'] & SubPageFlags.FLAG_BAKED) != 0 - - -def was_subpage_baked_successfully(sub): - return was_subpage_baked(sub) and len(sub['errors']) == 0 - - class PagePipelineRecordEntry(RecordEntry): FLAG_NONE = 0 - FLAG_NEW = 2**0 - FLAG_SOURCE_MODIFIED = 2**1 + FLAG_SOURCE_MODIFIED = 2**0 + FLAG_SEGMENTS_RENDERED = 2**1 FLAG_OVERRIDEN = 2**2 FLAG_COLLAPSED_FROM_LAST_RUN = 2**3 FLAG_IS_DRAFT = 2**4 @@ -54,25 +41,10 @@ self.subs = [] @property - def was_touched(self): - return (self.flags & self.FLAG_SOURCE_MODIFIED) != 0 - - @property - def was_overriden(self): - return (self.flags & self.FLAG_OVERRIDEN) != 0 - - @property def num_subs(self): return len(self.subs) @property - def was_any_sub_baked(self): - for o in self.subs: - if was_subpage_baked(o): - return True - return False - - @property def has_any_error(self): if len(self.errors) > 0: return True @@ -81,6 +53,9 @@ return True return False + def hasFlag(self, flag): + return (self.flags & flag) != 0 + def getSub(self, page_num): return self.subs[page_num - 1] @@ -118,33 +93,21 @@ return d -def add_page_job_result(result): - result.update({ - 'flags': PagePipelineRecordEntry.FLAG_NONE, - 'errors': [], - 'subs': [] - }) - - -def merge_job_result_into_record_entry(record_entry, result): - record_entry.flags |= result['flags'] - record_entry.errors += result['errors'] - record_entry.subs += result['subs'] - - flag_descriptions = { - PagePipelineRecordEntry.FLAG_NEW: 'new', PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED: 'touched', + PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED: 'rendered segments', PagePipelineRecordEntry.FLAG_OVERRIDEN: 'overriden', PagePipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN: 'from last run', PagePipelineRecordEntry.FLAG_IS_DRAFT: 'draft', - PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE: 'aborted for source use'} + PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE: ('aborted for ' + 'source use')} sub_flag_descriptions = { SubPageFlags.FLAG_BAKED: 'baked', SubPageFlags.FLAG_FORCED_BY_SOURCE: 'forced by source', SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS: 'forced b/c new', + SubPageFlags.FLAG_FORCED_BY_NO_RECORD: 'forced b/c no record', SubPageFlags.FLAG_FORCED_BY_PREVIOUS_ERRORS: 'forced by errors', SubPageFlags.FLAG_FORCED_BY_GENERAL_FORCE: 'manually forced', SubPageFlags.FLAG_RENDER_CACHE_INVALIDATED: 'cache invalidated', diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/pipelines/base.py --- a/piecrust/pipelines/base.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/pipelines/base.py Mon Apr 23 21:47:49 2018 -0700 @@ -2,7 +2,6 @@ import logging from werkzeug.utils import cached_property from piecrust.configuration import ConfigurationError -from piecrust.sources.base import ContentItem logger = logging.getLogger(__name__) @@ -58,19 +57,6 @@ self.pass_num = pass_num -class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase): - """ Context for validating jobs on subsequent step runs (i.e. validating - the list of jobs to run starting with the second step). - - This is run on the master process, so it can access both the - previous and current records. - """ - def __init__(self, pass_num, step_num, record_name, record_histories): - super().__init__(record_name, record_histories) - self.pass_num = pass_num - self.step_num = step_num - - class PipelineJobRunContext: """ Context for running pipeline baking jobs. @@ -103,11 +89,10 @@ This is run on the master process, so it can access the current record. """ - def __init__(self, record, job, pass_num, step_num): + def __init__(self, record, job, pass_num): self.record = record self.job = job self.pass_num = pass_num - self.step_num = step_num @cached_property def record_entry(self): @@ -160,7 +145,7 @@ def createJobs(self, ctx): return [ create_job(self, item.spec) - for item in self.source.getAllContents()] + for item in self.source.getAllContents()], None def createRecordEntry(self, item_spec): entry_class = self.RECORD_ENTRY_CLASS @@ -171,9 +156,6 @@ def handleJobResult(self, result, ctx): raise NotImplementedError() - def validateNextStepJobs(self, jobs, ctx): - pass - def run(self, job, ctx, result): raise NotImplementedError() diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/pipelines/page.py --- a/piecrust/pipelines/page.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/pipelines/page.py Mon Apr 23 21:47:49 2018 -0700 @@ -1,12 +1,11 @@ import copy -import time import logging from piecrust.pipelines.base import ( ContentPipeline, create_job, content_item_from_job) from piecrust.pipelines._pagebaker import PageBaker, get_output_path from piecrust.pipelines._pagerecords import ( - PagePipelineRecordEntry, - add_page_job_result, merge_job_result_into_record_entry) + PagePipelineRecordEntry, SubPageFlags) +from piecrust.rendering import RenderingContext, render_page_segments from piecrust.sources.base import AbortedSourceUseError @@ -38,11 +37,11 @@ pass_num = ctx.pass_num if pass_num == 0: ctx.current_record.user_data['dirty_source_names'] = set() - return self._createLoadJobs(ctx) + return self._createLoadJobs(ctx), "load" if pass_num == 1: - return self._createSecondPassJobs(ctx) + return self._createSegmentJobs(ctx), "render" if pass_num == 2: - return self._createThirdPassJobs(ctx) + return self._createLayoutJobs(ctx), "layout" raise Exception("Unexpected pipeline pass: %d" % pass_num) def _createLoadJobs(self, ctx): @@ -55,20 +54,21 @@ return jobs return None - def _createSecondPassJobs(self, ctx): + def _createSegmentJobs(self, ctx): jobs = [] app = self.app + pass_num = ctx.pass_num out_dir = self.ctx.out_dir uri_getter = self.source.route.getUri pretty_urls = app.config.get('site/pretty_urls') - used_paths = _get_used_paths_from_records( - ctx.record_histories.current.records) history = ctx.record_histories.getHistory(ctx.record_name).copy() history.build() - pass_num = ctx.pass_num + cur_rec_used_paths = {} + history.current.user_data['used_paths'] = cur_rec_used_paths + all_records = ctx.record_histories.current.records for prev, cur in history.diffs: # Ignore pages that disappeared since last bake. @@ -76,18 +76,24 @@ continue # Skip draft pages. - if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT: + if cur.hasFlag(PagePipelineRecordEntry.FLAG_IS_DRAFT): + continue + + # Skip pages that haven't changed since last bake. + if (prev and not cur.hasFlag( + PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED)): continue - # For pages that are known to use other sources, we make a dummy - # job that will effectively get directly passed on to the next - # step. + # For pages that are known to use other sources in their own + # content segments (we don't care about the layout yet), we + # postpone them to the next pipeline pass immediately, because they + # might need populated render caches for those sources' pages. if prev: - usn1, usn2 = prev.getAllUsedSourceNames() - if usn1 or usn2: - jobs.append(create_job(self, cur.item_spec, - pass_num=pass_num, - uses_sources=True)) + usn1, _ = prev.getAllUsedSourceNames() + if usn1: + logger.debug("Postponing: %s" % cur.item_spec) + cur.flags |= \ + PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE continue # Check if this item has been overriden by a previous pipeline @@ -95,27 +101,30 @@ # source, and some of our pages have been overriden by a user # page that writes out to the same URL. uri = uri_getter(cur.route_params) - path = get_output_path(app, out_dir, uri, pretty_urls) - override = used_paths.get(path) + out_path = get_output_path(app, out_dir, uri, pretty_urls) + override = _find_used_path_spec(all_records, out_path) if override is not None: - override_source_name, override_entry = override + override_source_name, override_entry_spec = override override_source = app.getSource(override_source_name) if override_source.config['realm'] == \ self.source.config['realm']: logger.error( "Page '%s' would get baked to '%s' " "but is overriden by '%s'." % - (cur.item_spec, path, override_entry.item_spec)) + (cur.item_spec, out_path, override_entry_spec)) else: logger.debug( "Page '%s' would get baked to '%s' " "but is overriden by '%s'." % - (cur.item_spec, path, override_entry.item_spec)) + (cur.item_spec, out_path, override_entry_spec)) cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN continue # Nope, all good, let's create a job for this item. + cur.flags |= PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED + cur_rec_used_paths[out_path] = cur.item_spec + jobs.append(create_job(self, cur.item_spec, pass_num=pass_num)) @@ -123,7 +132,7 @@ return jobs return None - def _createThirdPassJobs(self, ctx): + def _createLayoutJobs(self, ctx): # Get the list of all sources that had anything baked. dirty_source_names = set() all_records = ctx.record_histories.current.records @@ -132,41 +141,59 @@ if rec_dsn: dirty_source_names |= rec_dsn - # Now look at the stuff we bake for our own source on the first pass. - # For anything that wasn't baked (i.e. it was considered 'up to date') - # we look at the records from last time, and if they say that some - # page was using a source that is "dirty", then we force bake it. - # - # The common example for this is a blog index page which hasn't been - # touched, but needs to be re-baked because someone added or edited - # a post. jobs = [] pass_num = ctx.pass_num history = ctx.record_histories.getHistory(ctx.record_name).copy() history.build() for prev, cur in history.diffs: - if not cur: + if not cur or cur.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN): continue - if cur.was_any_sub_baked: - continue + + do_bake = False + force_segments = False + force_layout = False + + # Make sure we bake the layout for pages that got their segments + # re-rendered. + if cur.hasFlag(PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED): + do_bake = True + + # Now look at the stuff we baked for our own source on the second + # pass. For anything that wasn't baked (i.e. it was considered 'up + # to date') we look at the records from last time, and if they say + # that some page was using a source that is "dirty", then we force + # bake it. + # + # The common example for this is a blog index page which hasn't + # been touched, but needs to be re-baked because someone added or + # edited a post. if prev: usn1, usn2 = prev.getAllUsedSourceNames() force_segments = any(map(lambda u: u in dirty_source_names, usn1)) force_layout = any(map(lambda u: u in dirty_source_names, usn2)) + if force_segments or force_layout: - jobs.append(create_job(self, prev.item_spec, - pass_num=pass_num, - force_segments=force_segments, - force_layout=force_layout)) - else: + # Yep, we need to force-rebake some aspect of this page. + do_bake = True + + elif not do_bake: # This page uses other sources, but no source was dirty # this time around (it was a null build, maybe). We # don't have any work to do, but we need to carry over # any information we have, otherwise the post bake step # will think we need to delete last bake's outputs. cur.subs = copy.deepcopy(prev.subs) + for cur_sub in cur.subs: + cur_sub['flags'] = \ + SubPageFlags.FLAG_COLLAPSED_FROM_LAST_RUN + + if do_bake: + jobs.append(create_job(self, cur.item_spec, + pass_num=pass_num, + force_segments=force_segments, + force_layout=force_layout)) if len(jobs) > 0: return jobs @@ -174,7 +201,6 @@ def handleJobResult(self, result, ctx): pass_num = ctx.pass_num - step_num = ctx.step_num if pass_num == 0: # Just went through a "load page" job. Let's create a record @@ -188,35 +214,37 @@ # If this page was modified, flag its entire source as "dirty", # so any pages using that source can be re-baked. - if (new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED): + if new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED: ctx.record.user_data['dirty_source_names'].add( self.source.name) + + # If this page is new + + elif pass_num == 1: + # Just went through the "render segments" job. + existing = ctx.record_entry + existing.flags |= result.get('flags', + PagePipelineRecordEntry.FLAG_NONE) + else: # Update the entry with the new information. existing = ctx.record_entry - if not result.get('postponed', False): - merge_job_result_into_record_entry(existing, result) + existing.flags |= result.get('flags', + PagePipelineRecordEntry.FLAG_NONE) + existing.errors += result.get('errors', []) + existing.subs += result.get('subs', []) def run(self, job, ctx, result): pass_num = job.get('pass_num', 0) - step_num = job.get('step_num', 0) if pass_num == 0: - if step_num == 0: - return self._loadPage(job, ctx, result) + return self._loadPage(job, ctx, result) elif pass_num == 1: - if step_num == 0: - return self._renderOrPostpone(job, ctx, result) - elif step_num == 1: - return self._renderAlways(job, ctx, result) + return self._renderSegments(job, ctx, result) - elif pass_num == 2: - if step_num == 0: - return self._renderAlways(job, ctx, result) - - raise Exception("Unexpected pipeline pass/step: %d/%d" % - (pass_num, step_num)) + elif pass_num >= 2: + return self._renderLayout(job, ctx, result) def getDeletions(self, ctx): for prev, cur in ctx.record_history.diffs: @@ -250,62 +278,50 @@ if page.config.get(self._draft_setting): result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT - def _renderOrPostpone(self, job, ctx, result): - # See if we should immediately kick this job off to the next step. - if job.get('uses_sources', False): - result['postponed'] = True - result['next_step_job'] = create_job(self, job['job_spec'][1]) - return - + def _renderSegments(self, job, ctx, result): # Here our job is to render the page's segments so that they're # cached in memory and on disk... unless we detect that the page # is using some other sources, in which case we abort and we'll try # again on the second pass. content_item = content_item_from_job(self, job) - logger.debug("Conditional render for: %s" % content_item.spec) + logger.debug("Render segments for: %s" % content_item.spec) page = self.app.getPage(self.source, content_item) if page.config.get(self._draft_setting): raise Exception("Shouldn't have a draft page in a render job!") - prev_entry = ctx.previous_entry - env = self.app.env env.abort_source_use = True - add_page_job_result(result) try: - rdr_subs = self._pagebaker.bake(page, prev_entry) - result['subs'] = rdr_subs + rdr_ctx = RenderingContext(page) + render_page_segments(rdr_ctx) except AbortedSourceUseError: logger.debug("Page was aborted for using source: %s" % content_item.spec) - result['flags'] |= \ + result['flags'] = \ PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE env.stats.stepCounter("SourceUseAbortions") env.stats.addManifestEntry("SourceUseAbortions", content_item.spec) - result['next_step_job'] = create_job(self, content_item.spec) finally: env.abort_source_use = False - def _renderAlways(self, job, ctx, result): + def _renderLayout(self, job, ctx, result): content_item = content_item_from_job(self, job) - logger.debug("Full render for: %s" % content_item.spec) + logger.debug("Render layout for: %s" % content_item.spec) page = self.app.getPage(self.source, content_item) prev_entry = ctx.previous_entry rdr_subs = self._pagebaker.bake( page, prev_entry, force_segments=job.get('force_segments'), force_layout=job.get('force_layout')) - - add_page_job_result(result) result['subs'] = rdr_subs -def _get_used_paths_from_records(records): - used_paths = {} + +def _find_used_path_spec(records, path): for rec in records: - src_name = rec.name.split('@')[0] - for e in rec.getEntries(): - paths = e.getAllOutputPaths() - if paths is not None: - for p in paths: - used_paths[p] = (src_name, e) - return used_paths + up = rec.user_data.get('used_paths') + if up is not None: + entry_spec = up.get(path) + if entry_spec is not None: + src_name = rec.name.split('@')[0] + return (src_name, entry_spec) + return None diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/sources/blogarchives.py --- a/piecrust/sources/blogarchives.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/sources/blogarchives.py Mon Apr 23 21:47:49 2018 -0700 @@ -7,12 +7,10 @@ PageIterator, HardCodedFilterIterator, DateSortIterator) from piecrust.page import Page from piecrust.pipelines._pagebaker import PageBaker -from piecrust.pipelines._pagerecords import ( - PagePipelineRecordEntry, - add_page_job_result, merge_job_result_into_record_entry) +from piecrust.pipelines._pagerecords import PagePipelineRecordEntry from piecrust.pipelines.base import ( ContentPipeline, - create_job, get_record_name_for_source, content_item_from_job) + create_job, get_record_name_for_source) from piecrust.routing import RouteParameter from piecrust.sources.base import ContentItem from piecrust.sources.generator import GeneratorSourceBase @@ -193,8 +191,8 @@ current_record.addEntry(entry) if len(jobs) > 0: - return jobs - return None + return jobs, "archive" + return None, None def run(self, job, ctx, result): year = job['year'] @@ -206,13 +204,12 @@ prev_entry = ctx.previous_entry rdr_subs = self._pagebaker.bake(page, prev_entry) - add_page_job_result(result) result['subs'] = rdr_subs result['year'] = page.source_metadata['year'] def handleJobResult(self, result, ctx): existing = ctx.record_entry - merge_job_result_into_record_entry(existing, result) + existing.subs = result['subs'] existing.year = result['year'] def postJobRun(self, ctx): @@ -243,7 +240,8 @@ for cur_entry in cur_rec.getEntries(): dt = datetime.datetime.fromtimestamp(cur_entry.timestamp) all_years.add(dt.year) - if cur_entry.was_any_sub_baked: + if cur_entry.hasFlag( + PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED): dirty_years.add(dt.year) self._all_years = all_years diff -r 6350ee084273 -r 5f97b5b59dfe piecrust/sources/taxonomy.py --- a/piecrust/sources/taxonomy.py Mon Apr 23 21:37:43 2018 -0700 +++ b/piecrust/sources/taxonomy.py Mon Apr 23 21:47:49 2018 -0700 @@ -7,13 +7,9 @@ PaginationFilter, SettingFilterClause) from piecrust.page import Page from piecrust.pipelines._pagebaker import PageBaker -from piecrust.pipelines._pagerecords import ( - PagePipelineRecordEntry, - add_page_job_result, merge_job_result_into_record_entry) +from piecrust.pipelines._pagerecords import PagePipelineRecordEntry from piecrust.pipelines.base import ( - ContentPipeline, get_record_name_for_source, - create_job, content_item_from_job) -from piecrust.pipelines.records import RecordHistory + ContentPipeline, get_record_name_for_source, create_job) from piecrust.routing import RouteParameter from piecrust.sources.base import ContentItem from piecrust.sources.generator import GeneratorSourceBase @@ -307,8 +303,8 @@ current_record.addEntry(entry) if len(jobs) > 0: - return jobs - return None + return jobs, "taxonomize" + return None, None def run(self, job, ctx, result): term = job['term'] @@ -324,13 +320,12 @@ prev_entry = ctx.previous_entry rdr_subs = self._pagebaker.bake(page, prev_entry) - add_page_job_result(result) result['subs'] = rdr_subs result['term'] = page.source_metadata['term'] def handleJobResult(self, result, ctx): existing = ctx.record_entry - merge_job_result_into_record_entry(existing, result) + existing.subs = result['subs'] existing.term = result['term'] def postJobRun(self, ctx): @@ -362,7 +357,6 @@ self.pipeline = pipeline self.record_histories = record_histories self._all_terms = {} - self._single_dirty_slugified_terms = set() self._all_dirty_slugified_terms = None @property @@ -381,49 +375,48 @@ def analyze(self): # Build the list of terms for our taxonomy, and figure out which ones # are 'dirty' for the current bake. - # - # Remember all terms used. source = self.pipeline.inner_source taxonomy = self.pipeline.taxonomy slugifier = self.pipeline.slugifier + tax_is_mult = taxonomy.is_multiple + tax_setting_name = taxonomy.setting_name + + # First, go over all of our source's pages seen during this bake. + # Gather all the taxonomy terms they have, and also keep track of + # the ones used by the pages that were actually rendered (instead of + # those that were up-to-date and skipped). + single_dirty_slugified_terms = set() + current_records = self.record_histories.current record_name = get_record_name_for_source(source) - current_records = self.record_histories.current cur_rec = current_records.getRecord(record_name) for cur_entry in cur_rec.getEntries(): - if not cur_entry.was_overriden: - cur_terms = cur_entry.config.get(taxonomy.setting_name) - if cur_terms: - if not taxonomy.is_multiple: - self._addTerm( - slugifier, cur_entry.item_spec, cur_terms) - else: - self._addTerms( - slugifier, cur_entry.item_spec, cur_terms) + if cur_entry.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN): + continue + + cur_terms = cur_entry.config.get(tax_setting_name) + if not cur_terms: + continue - # Re-bake all taxonomy terms that include new or changed pages, by - # marking them as 'dirty'. - history = self.record_histories.getHistory(record_name).copy() - history.build() - for prev_entry, cur_entry in history.diffs: - entries = [cur_entry] - if prev_entry: - entries.append(prev_entry) + if not tax_is_mult: + self._addTerm( + slugifier, cur_entry.item_spec, cur_terms) + else: + self._addTerms( + slugifier, cur_entry.item_spec, cur_terms) - for e in entries: - if e and e.was_any_sub_baked: - entry_terms = e.config.get(taxonomy.setting_name) - if entry_terms: - if not taxonomy.is_multiple: - self._single_dirty_slugified_terms.add( - slugifier.slugify(entry_terms)) - else: - self._single_dirty_slugified_terms.update( - (slugifier.slugify(t) - for t in entry_terms)) + if cur_entry.hasFlag( + PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED): + if not tax_is_mult: + single_dirty_slugified_terms.add( + slugifier.slugify(cur_terms)) + else: + single_dirty_slugified_terms.update( + (slugifier.slugify(t) + for t in cur_terms)) self._all_dirty_slugified_terms = list( - self._single_dirty_slugified_terms) + single_dirty_slugified_terms) logger.debug("Gathered %d dirty taxonomy terms", len(self._all_dirty_slugified_terms)) @@ -438,7 +431,7 @@ # by any page in the website (anywhere someone can ask for an URL # to the combination page), it means we check all the records, not # just the record for our source. - if taxonomy.is_multiple: + if tax_is_mult: known_combinations = set() for rec in current_records.records: # Cheap way to test if a record contains entries that @@ -456,7 +449,7 @@ dcc = 0 for terms in known_combinations: - if not self._single_dirty_slugified_terms.isdisjoint( + if not single_dirty_slugified_terms.isdisjoint( set(terms)): self._all_dirty_slugified_terms.append( taxonomy.separator.join(terms))