Mercurial > piecrust2
view piecrust/sources/taxonomy.py @ 1188:a7c43131d871
bake: Fix file write flushing problem with Python 3.8+
Writing the cache files fails in Python 3.8 because it looks like flushing
behaviour has changed. We need to explicitly flush. And even then, in very
rare occurrences, it looks like it can still run into racing conditions,
so we do a very hacky and ugly "retry" loop when fetching cached data :(
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 15 Jun 2021 22:36:23 -0700 |
parents | 9f3e702a8a69 |
children |
line wrap: on
line source
import re import copy import logging import unidecode from piecrust.configuration import ConfigurationError from piecrust.data.filters import ( PaginationFilter, SettingFilterClause) from piecrust.page import Page from piecrust.pipelines._pagebaker import PageBaker from piecrust.pipelines._pagerecords import PagePipelineRecordEntry from piecrust.pipelines.base import ( ContentPipeline, get_record_name_for_source, create_job) from piecrust.routing import RouteParameter from piecrust.sources.base import ContentItem from piecrust.sources.generator import GeneratorSourceBase logger = logging.getLogger(__name__) SLUGIFY_ENCODE = 1 SLUGIFY_TRANSLITERATE = 2 SLUGIFY_LOWERCASE = 4 SLUGIFY_DOT_TO_DASH = 8 SLUGIFY_SPACE_TO_DASH = 16 re_first_dot_to_dash = re.compile(r'^\.+') re_dot_to_dash = re.compile(r'\.+') re_space_to_dash = re.compile(r'\s+') class Taxonomy(object): """ Describes a taxonomy. """ def __init__(self, name, config): self.name = name self.config = config self.term_name = config.get('term', name) self.is_multiple = bool(config.get('multiple', False)) self.separator = config.get('separator', '/') self.page_ref = config.get('page') @property def setting_name(self): if self.is_multiple: return self.name return self.term_name _taxonomy_index = """--- layout: %(template)s --- """ class TaxonomySource(GeneratorSourceBase): """ A content source that generates taxonomy listing pages. """ SOURCE_NAME = 'taxonomy' DEFAULT_PIPELINE_NAME = 'taxonomy' def __init__(self, app, name, config): super().__init__(app, name, config) tax_name = config.get('taxonomy') if tax_name is None: raise ConfigurationError( "Taxonomy source '%s' requires a taxonomy name." % name) self.taxonomy = _get_taxonomy(app, tax_name) sm = config.get('slugify_mode') self.slugifier = _get_slugifier(app, self.taxonomy, sm) tpl_name = config.get('template', '_%s.html' % tax_name) self._raw_item = _taxonomy_index % {'template': tpl_name} def getSupportedRouteParameters(self): name = self.taxonomy.term_name param_type = (RouteParameter.TYPE_PATH if self.taxonomy.is_multiple else RouteParameter.TYPE_STRING) return [RouteParameter(name, param_type, variadic=self.taxonomy.is_multiple)] def findContentFromRoute(self, route_params): slugified_term = route_params[self.taxonomy.term_name] spec = '_index[%s]' % slugified_term metadata = {'term': slugified_term, 'route_params': { self.taxonomy.term_name: slugified_term} } return ContentItem(spec, metadata) def slugify(self, term): return self.slugifier.slugify(term) def slugifyMultiple(self, terms): return self.slugifier.slugifyMultiple(terms) def prepareRenderContext(self, ctx): # Set the pagination source as the source we're generating for. ctx.pagination_source = self.inner_source # Get the taxonomy terms from the route metadata... this can come from # the browser's URL (while serving) or from the baking (see `bake` # method below). In both cases, we expect to have the *slugified* # version of the term, because we're going to set a filter that also # slugifies the terms found on each page. # # This is because: # * while serving, we get everything from the request URL, so we only # have the slugified version. # * if 2 slightly different terms "collide" into the same slugified # term, we'll get a merge of the 2 on the listing page, which is # what the user expects. # route_params = ctx.page.source_metadata['route_params'] tax_terms, is_combination = self._getTaxonomyTerms(route_params) self._setTaxonomyFilter(ctx, tax_terms, is_combination) # Add some custom data for rendering. ctx.custom_data.update({ self.taxonomy.term_name: tax_terms, 'is_multiple_%s' % self.taxonomy.term_name: is_combination}) # Add some "plural" version of the term... so for instance, if this # is the "tags" taxonomy, "tag" will have one term most of the time, # except when it's a combination. Here, we add "tags" as something that # is always a tuple, even when it's not a combination. if (self.taxonomy.is_multiple and self.taxonomy.name != self.taxonomy.term_name): mult_val = tax_terms if not is_combination: mult_val = (mult_val,) ctx.custom_data[self.taxonomy.name] = mult_val def _getTaxonomyTerms(self, route_params): # Get the individual slugified terms from the route metadata. all_values = route_params.get(self.taxonomy.term_name) if all_values is None: raise Exception("'%s' values couldn't be found in route metadata" % self.taxonomy.term_name) # If it's a "multiple" taxonomy, we need to potentially split the # route value into the individual terms (_e.g._ when listing all pages # that have 2 given tags, we need to get each of those 2 tags). if self.taxonomy.is_multiple: sep = self.taxonomy.separator if sep in all_values: return tuple(all_values.split(sep)), True # Not a "multiple" taxonomy, so there's only the one value. return all_values, False def _setTaxonomyFilter(self, ctx, term_value, is_combination): # Set up the filter that will check the pages' terms. flt = PaginationFilter() flt.addClause(HasTaxonomyTermsFilterClause( self.taxonomy, self.slugifier.mode, term_value, is_combination)) ctx.pagination_filter = flt def onRouteFunctionUsed(self, route_params): # Get the values, and slugify them appropriately. # If this is a "multiple" taxonomy, `values` will be a tuple of # terms. If not, `values` will just be a term. values = route_params[self.taxonomy.term_name] tax_is_multiple = self.taxonomy.is_multiple if tax_is_multiple: slugified_values = self.slugifyMultiple((str(v) for v in values)) route_val = self.taxonomy.separator.join(slugified_values) else: slugified_values = self.slugify(str(values)) route_val = slugified_values # We need to register this use of a taxonomy term. # Because the render info gets serialized across bake worker # processes, we can only use basic JSON-able structures, which # excludes `set`... hence the awkward use of `list`. # Also, note that the tuples we're putting in there will be # transformed into lists so we'll have to convert back. rcs = self.app.env.render_ctx_stack ri = rcs.current_ctx.render_info utt = ri.get('used_taxonomy_terms') if utt is None: ri['used_taxonomy_terms'] = [slugified_values] else: if slugified_values not in utt: utt.append(slugified_values) # Put the slugified values in the route metadata so they're used to # generate the URL. route_params[self.taxonomy.term_name] = route_val class HasTaxonomyTermsFilterClause(SettingFilterClause): def __init__(self, taxonomy, slugify_mode, value, is_combination): super().__init__(taxonomy.setting_name, value) self._taxonomy = taxonomy self._is_combination = is_combination self._slugifier = _Slugifier(taxonomy, slugify_mode) if taxonomy.is_multiple: self.pageMatches = self._pageMatchesAny else: self.pageMatches = self._pageMatchesSingle def _pageMatchesAny(self, fil, page): # Multiple taxonomy, i.e. it supports multiple terms, like tags. page_values = page.config.get(self.name) if page_values is None or not isinstance(page_values, list): return False page_set = set(map(self._slugifier.slugify, page_values)) if self._is_combination: # Multiple taxonomy, and multiple terms to match. Check that # the ones to match are all in the page's terms. value_set = set(self.value) return value_set.issubset(page_set) else: # Multiple taxonomy, one term to match. return self.value in page_set def _pageMatchesSingle(self, fil, page): # Single taxonomy. Just compare the values. page_value = page.config.get(self.name) if page_value is None: return False page_value = self._slugifier.slugify(page_value) return page_value == self.value def _get_taxonomy(app, tax_name): tax_config = app.config.get('site/taxonomies/' + tax_name) if tax_config is None: raise ConfigurationError("No such taxonomy: %s" % tax_name) return Taxonomy(tax_name, tax_config) def _get_slugifier(app, taxonomy, slugify_mode=None): if slugify_mode is None: slugify_mode = app.config.get('site/slugify_mode', 'encode') sm = _parse_slugify_mode(slugify_mode) return _Slugifier(taxonomy, sm) class TaxonomyPipelineRecordEntry(PagePipelineRecordEntry): def __init__(self): super().__init__() self.term = None class TaxonomyPipeline(ContentPipeline): PIPELINE_NAME = 'taxonomy' PASS_NUM = 10 RECORD_ENTRY_CLASS = TaxonomyPipelineRecordEntry def __init__(self, source, ctx): if not isinstance(source, TaxonomySource): raise Exception("The taxonomy pipeline only supports taxonomy " "content sources.") super().__init__(source, ctx) self.inner_source = source.inner_source self.taxonomy = source.taxonomy self.slugifier = source.slugifier self._tpl_name = source.config['template'] self._analyzer = None self._pagebaker = None def initialize(self): self._pagebaker = PageBaker(self.app, self.ctx.out_dir, force=self.ctx.force) self._pagebaker.startWriterQueue() def shutdown(self): self._pagebaker.stopWriterQueue() def createJobs(self, ctx): logger.debug("Building '%s' taxonomy pages for source: %s" % (self.taxonomy.name, self.inner_source.name)) self._analyzer = _TaxonomyTermsAnalyzer(self, ctx.record_histories) self._analyzer.analyze() logger.debug("Queuing %d '%s' jobs." % (len(self._analyzer.dirty_slugified_terms), self.taxonomy.name)) jobs = [] rec_fac = self.createRecordEntry current_record = ctx.current_record for slugified_term in self._analyzer.dirty_slugified_terms: item_spec = '_index[%s]' % slugified_term jobs.append(create_job(self, item_spec, term=slugified_term)) entry = rec_fac(item_spec) current_record.addEntry(entry) if len(jobs) > 0: return jobs, "taxonomize" return None, None def run(self, job, ctx, result): term = job['term'] content_item = ContentItem('_index[%s]' % term, {'term': term, 'route_params': { self.taxonomy.term_name: term} }) page = Page(self.source, content_item) logger.debug("Rendering '%s' page: %s" % (self.taxonomy.name, page.source_metadata['term'])) prev_entry = ctx.previous_entry rdr_subs = self._pagebaker.bake(page, prev_entry) result['subs'] = rdr_subs result['term'] = page.source_metadata['term'] def handleJobResult(self, result, ctx): existing = ctx.record_entry existing.subs = result['subs'] existing.term = result['term'] def postJobRun(self, ctx): # We create bake entries for all the terms that were *not* dirty. # This is because otherwise, on the next incremental bake, we wouldn't # find any entry for those things, and figure that we need to delete # their outputs. analyzer = self._analyzer record = ctx.record_history.current for prev, cur in ctx.record_history.diffs: # Only consider entries that don't have any current version # (i.e. they weren't baked just now). if prev and not cur: t = prev.term if analyzer.isKnownSlugifiedTerm(t): logger.debug("Creating unbaked entry for '%s' term: %s" % (self.taxonomy.name, t)) cur = copy.deepcopy(prev) cur.flags = \ PagePipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN record.addEntry(cur) else: logger.debug("Term '%s' in '%s' isn't used anymore." % (t, self.taxonomy.name)) class _TaxonomyTermsAnalyzer(object): def __init__(self, pipeline, record_histories): self.pipeline = pipeline self.record_histories = record_histories self._all_terms = {} self._all_dirty_slugified_terms = None @property def dirty_slugified_terms(self): """ Returns the slugified terms that have been 'dirtied' during this bake. """ return self._all_dirty_slugified_terms def isKnownSlugifiedTerm(self, term): """ Returns whether the given slugified term has been seen during this bake. """ return term in self._all_terms def analyze(self): # Build the list of terms for our taxonomy, and figure out which ones # are 'dirty' for the current bake. source = self.pipeline.inner_source taxonomy = self.pipeline.taxonomy slugifier = self.pipeline.slugifier tax_is_mult = taxonomy.is_multiple tax_setting_name = taxonomy.setting_name # First, go over all of our source's pages seen during this bake. # Gather all the taxonomy terms they have, and also keep track of # the ones used by the pages that were actually rendered (instead of # those that were up-to-date and skipped). single_dirty_slugified_terms = set() current_records = self.record_histories.current record_name = get_record_name_for_source(source) cur_rec = current_records.getRecord(record_name) for cur_entry in cur_rec.getEntries(): if cur_entry.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN): continue cur_terms = cur_entry.config.get(tax_setting_name) if not cur_terms: continue if not tax_is_mult: self._addTerm( slugifier, cur_entry.item_spec, cur_terms) else: self._addTerms( slugifier, cur_entry.item_spec, cur_terms) if cur_entry.hasFlag( PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED): if not tax_is_mult: single_dirty_slugified_terms.add( slugifier.slugify(cur_terms)) else: single_dirty_slugified_terms.update( (slugifier.slugify(t) for t in cur_terms)) self._all_dirty_slugified_terms = list( single_dirty_slugified_terms) logger.debug("Gathered %d dirty taxonomy terms", len(self._all_dirty_slugified_terms)) # Re-bake the combination pages for terms that are 'dirty'. # We make all terms into tuple, even those that are not actual # combinations, so that we have less things to test further down the # line. # # Add the combinations to that list. We get those combinations from # wherever combinations were used, so they're coming from the # `onRouteFunctionUsed` method. And because combinations can be used # by any page in the website (anywhere someone can ask for an URL # to the combination page), it means we check all the records, not # just the record for our source. if tax_is_mult: known_combinations = set() for rec in current_records.records: # Cheap way to test if a record contains entries that # are sub-types of a page entry: test the first one. first_entry = next(iter(rec.getEntries()), None) if (first_entry is None or not isinstance(first_entry, PagePipelineRecordEntry)): continue for cur_entry in rec.getEntries(): used_terms = _get_all_entry_taxonomy_terms(cur_entry) for terms in used_terms: if len(terms) > 1: known_combinations.add(terms) dcc = 0 for terms in known_combinations: if not single_dirty_slugified_terms.isdisjoint( set(terms)): self._all_dirty_slugified_terms.append( taxonomy.separator.join(terms)) dcc += 1 logger.debug("Gathered %d term combinations, with %d dirty." % (len(known_combinations), dcc)) def _addTerms(self, slugifier, item_spec, terms): for t in terms: self._addTerm(slugifier, item_spec, t) def _addTerm(self, slugifier, item_spec, term): st = slugifier.slugify(term) orig_terms = self._all_terms.setdefault(st, []) if orig_terms and orig_terms[0] != term: logger.warning( "Term '%s' in '%s' is slugified to '%s' which conflicts with " "previously existing '%s'. The two will be merged." % (term, item_spec, st, orig_terms[0])) orig_terms.append(term) def _get_all_entry_taxonomy_terms(entry): res = set() for o in entry.subs: pinfo = o['render_info'] terms = pinfo.get('used_taxonomy_terms') if terms: res |= set([tuple(t) for t in terms]) return res class _Slugifier(object): def __init__(self, taxonomy, mode): self.taxonomy = taxonomy self.mode = mode def slugifyMultiple(self, terms): return tuple(map(self.slugify, terms)) def slugify(self, term): if self.mode & SLUGIFY_TRANSLITERATE: term = unidecode.unidecode(term) if self.mode & SLUGIFY_LOWERCASE: term = term.lower() if self.mode & SLUGIFY_DOT_TO_DASH: term = re_first_dot_to_dash.sub('', term) term = re_dot_to_dash.sub('-', term) if self.mode & SLUGIFY_SPACE_TO_DASH: term = re_space_to_dash.sub('-', term) return term def _parse_slugify_mode(value): mapping = { 'encode': SLUGIFY_ENCODE, 'transliterate': SLUGIFY_TRANSLITERATE, 'lowercase': SLUGIFY_LOWERCASE, 'dot_to_dash': SLUGIFY_DOT_TO_DASH, 'space_to_dash': SLUGIFY_SPACE_TO_DASH} mode = 0 for v in value.split(','): f = mapping.get(v.strip()) if f is None: if v == 'iconv': raise Exception("'iconv' is not supported as a slugify mode " "in PieCrust2. Use 'transliterate'.") raise Exception("Unknown slugify flag: %s" % v) mode |= f return mode