Mercurial > piecrust2
view piecrust/data/provider.py @ 828:bca06fc064c0
Use assetor provided by page source when paginating
author | Ben Artin <ben@artins.org> |
---|---|
date | Sun, 01 Jan 2017 19:32:19 -0500 |
parents | 37bd88f88bab |
children | 69d16e1afb95 |
line wrap: on
line source
import time import collections.abc from piecrust.data.iterators import PageIterator from piecrust.generation.taxonomy import Taxonomy from piecrust.sources.array import ArraySource class DataProvider(object): debug_render_dynamic = [] debug_render_invoke_dynamic = [] def __init__(self, source, page, override): if source.app is not page.app: raise Exception("The given source and page don't belong to " "the same application.") self._source = source self._page = page class IteratorDataProvider(DataProvider): PROVIDER_NAME = 'iterator' debug_render_doc_dynamic = ['_debugRenderDoc'] debug_render_not_empty = True def __init__(self, source, page, override): super(IteratorDataProvider, self).__init__(source, page, override) self._innerIt = None if isinstance(override, IteratorDataProvider): # Iterator providers can be chained, like for instance with # `site.pages` listing both the theme pages and the user site's # pages. self._innerIt = override self._pages = PageIterator(source, current_page=page) self._pages._iter_event += self._onIteration self._ctx_set = False def __len__(self): return len(self._pages) def __getitem__(self, key): return self._pages[key] def __iter__(self): yield from iter(self._pages) if self._innerIt: yield from self._innerIt def _onIteration(self): if not self._ctx_set: eis = self._page.app.env.exec_info_stack eis.current_page_info.render_ctx.addUsedSource(self._source.name) self._ctx_set = True def _debugRenderDoc(self): return 'Provides a list of %d items' % len(self) class BlogDataProvider(DataProvider, collections.abc.Mapping): PROVIDER_NAME = 'blog' debug_render_doc = """Provides a list of blog posts and yearly/monthly archives.""" debug_render_dynamic = (['_debugRenderTaxonomies'] + DataProvider.debug_render_dynamic) def __init__(self, source, page, override): super(BlogDataProvider, self).__init__(source, page, override) self._yearly = None self._monthly = None self._taxonomies = {} self._ctx_set = False @property def posts(self): return self._posts() @property def years(self): return self._buildYearlyArchive() @property def months(self): return self._buildMonthlyArchive() def __getitem__(self, name): if name == 'posts': return self._posts() elif name == 'years': return self._buildYearlyArchive() elif name == 'months': return self._buildMonthlyArchive() if self._source.app.config.get('site/taxonomies/' + name) is not None: return self._buildTaxonomy(name) raise KeyError("No such item: %s" % name) def __iter__(self): keys = ['posts', 'years', 'months'] keys += list(self._source.app.config.get('site/taxonomies').keys()) return iter(keys) def __len__(self): return 3 + len(self._source.app.config.get('site/taxonomies')) def _debugRenderTaxonomies(self): return list(self._source.app.config.get('site/taxonomies').keys()) def _posts(self): it = PageIterator(self._source, current_page=self._page) it._iter_event += self._onIteration return it def _buildYearlyArchive(self): if self._yearly is not None: return self._yearly self._yearly = [] yearly_index = {} for post in self._source.getPages(): year = post.datetime.strftime('%Y') posts_this_year = yearly_index.get(year) if posts_this_year is None: timestamp = time.mktime( (post.datetime.year, 1, 1, 0, 0, 0, 0, 0, -1)) posts_this_year = BlogArchiveEntry(self._page, year, timestamp) self._yearly.append(posts_this_year) yearly_index[year] = posts_this_year posts_this_year._data_source.append(post) self._yearly = sorted(self._yearly, key=lambda e: e.timestamp, reverse=True) self._onIteration() return self._yearly def _buildMonthlyArchive(self): if self._monthly is not None: return self._monthly self._monthly = [] for post in self._source.getPages(): month = post.datetime.strftime('%B %Y') posts_this_month = next( filter(lambda m: m.name == month, self._monthly), None) if posts_this_month is None: timestamp = time.mktime( (post.datetime.year, post.datetime.month, 1, 0, 0, 0, 0, 0, -1)) posts_this_month = BlogArchiveEntry(self._page, month, timestamp) self._monthly.append(posts_this_month) posts_this_month._data_source.append(post) self._monthly = sorted(self._monthly, key=lambda e: e.timestamp, reverse=True) self._onIteration() return self._monthly def _buildTaxonomy(self, tax_name): if tax_name in self._taxonomies: return self._taxonomies[tax_name] tax_cfg = self._page.app.config.get('site/taxonomies/' + tax_name) tax = Taxonomy(tax_name, tax_cfg) posts_by_tax_value = {} for post in self._source.getPages(): tax_values = post.config.get(tax.setting_name) if tax_values is None: continue if not isinstance(tax_values, list): tax_values = [tax_values] for val in tax_values: posts = posts_by_tax_value.setdefault(val, []) posts.append(post) entries = [] for value, ds in posts_by_tax_value.items(): source = ArraySource(self._page.app, ds) entries.append(BlogTaxonomyEntry(self._page, source, value)) self._taxonomies[tax_name] = sorted(entries, key=lambda k: k.name) self._onIteration() return self._taxonomies[tax_name] def _onIteration(self): if not self._ctx_set: eis = self._page.app.env.exec_info_stack if eis.current_page_info: eis.current_page_info.render_ctx.addUsedSource(self._source) self._ctx_set = True class BlogArchiveEntry(object): debug_render = ['name', 'timestamp', 'posts'] debug_render_invoke = ['name', 'timestamp', 'posts'] def __init__(self, page, name, timestamp): self.name = name self.timestamp = timestamp self._page = page self._data_source = [] self._iterator = None def __str__(self): return self.name def __int__(self): return int(self.name) @property def posts(self): self._load() self._iterator.reset() return self._iterator def _load(self): if self._iterator is not None: return source = ArraySource(self._page.app, self._data_source) self._iterator = PageIterator(source, current_page=self._page) class BlogTaxonomyEntry(object): debug_render = ['name', 'post_count', 'posts'] debug_render_invoke = ['name', 'post_count', 'posts'] def __init__(self, page, source, property_value): self._page = page self._source = source self._property_value = property_value self._iterator = None def __str__(self): return self._property_value @property def name(self): return self._property_value @property def posts(self): self._load() self._iterator.reset() return self._iterator @property def post_count(self): return self._source.page_count def _load(self): if self._iterator is not None: return self._iterator = PageIterator(self._source, current_page=self._page)