view piecrust/dataproviders/blog.py @ 1188:a7c43131d871

bake: Fix file write flushing problem with Python 3.8+ Writing the cache files fails in Python 3.8 because it looks like flushing behaviour has changed. We need to explicitly flush. And even then, in very rare occurrences, it looks like it can still run into racing conditions, so we do a very hacky and ugly "retry" loop when fetching cached data :(
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 15 Jun 2021 22:36:23 -0700
parents c5f1936e9e89
children
line wrap: on
line source

import time
import collections.abc
from piecrust.dataproviders.base import DataProvider
from piecrust.dataproviders.pageiterator import PageIterator
from piecrust.sources.list import ListSource
from piecrust.sources.taxonomy import Taxonomy


class BlogDataProvider(DataProvider, collections.abc.Mapping):
    PROVIDER_NAME = 'blog'

    debug_render_doc = """Provides a list of blog posts and yearly/monthly
                          archives."""
    debug_render_dynamic = (['_debugRenderTaxonomies'] +
                            DataProvider.debug_render_dynamic)

    def __init__(self, source, page):
        super().__init__(source, page)
        self._posts = None
        self._yearly = None
        self._monthly = None
        self._taxonomies = {}
        self._archives_built = False
        self._ctx_set = False

    def _addSource(self, source):
        raise Exception("The blog data provider doesn't support "
                        "combining multiple sources.")

    @property
    def posts(self):
        self._buildPosts()
        # Reset each time on access in case a page is showing 2 different
        # lists of the same blog.
        self._posts.reset()
        return self._posts

    @property
    def years(self):
        self._buildArchives()
        return self._yearly

    @property
    def months(self):
        self._buildArchives()
        return self._monthly

    @property
    def taxonomies(self):
        return list(self._app.config.get('site/taxonomies').keys())

    def __getitem__(self, name):
        if name in ['posts', 'years', 'months', 'taxonomies']:
            return getattr(self, name)

        self._buildArchives()
        return self._taxonomies[name]

    def __getattr__(self, name):
        self._buildArchives()
        try:
            return self._taxonomies[name]
        except KeyError:
            raise AttributeError("No such taxonomy: %s" % name)

    def __len__(self):
        return 4 + len(self.taxonomies)

    def __iter__(self):
        yield 'posts'
        yield 'years'
        yield 'months'
        yield 'taxonomies'
        yield from self.taxonomies

    def _debugRenderTaxonomies(self):
        return self.taxonomies

    def _buildPosts(self):
        if self._posts is None:
            it = PageIterator(self._sources[0], current_page=self._page)
            self._onIteration()
            self._posts = it

    def _buildArchives(self):
        if self._archives_built:
            return

        yearly_index = {}
        monthly_index = {}
        tax_index = {}

        taxonomies = []
        tax_names = list(self._app.config.get('site/taxonomies').keys())
        for tn in tax_names:
            tax_cfg = self._app.config.get('site/taxonomies/' + tn)
            taxonomies.append(Taxonomy(tn, tax_cfg))
            tax_index[tn] = {}

        page = self._page
        source = self._sources[0]

        for post in source.getAllPages():
            post_dt = post.datetime

            year = post_dt.year
            month = (post_dt.month, post_dt.year)

            posts_this_year = yearly_index.get(year)
            if posts_this_year is None:
                timestamp = time.mktime(
                    (post_dt.year, 1, 1, 0, 0, 0, 0, 0, -1))
                posts_this_year = BlogArchiveEntry(
                    source, page, year, timestamp)
                yearly_index[year] = posts_this_year
            posts_this_year._items.append(post.content_item)

            posts_this_month = monthly_index.get(month)
            if posts_this_month is None:
                timestamp = time.mktime(
                    (post_dt.year, post_dt.month, 1,
                     0, 0, 0, 0, 0, -1))
                posts_this_month = BlogArchiveEntry(
                    source, page, month[0], timestamp)
                monthly_index[month] = posts_this_month
            posts_this_month._items.append(post.content_item)

            for tax in taxonomies:
                post_term = post.config.get(tax.setting_name)
                if post_term is None:
                    continue

                posts_this_tax = tax_index[tax.name]
                if tax.is_multiple:
                    for val in post_term:
                        entry = posts_this_tax.get(val)
                        if entry is None:
                            entry = BlogTaxonomyEntry(source, page, val)
                            posts_this_tax[val] = entry
                        entry._items.append(post.content_item)
                else:
                    entry = posts_this_tax.get(val)
                    if entry is None:
                        entry = BlogTaxonomyEntry(source, page, post_term)
                        posts_this_tax[val] = entry
                    entry._items.append(post.content_item)

        self._yearly = list(sorted(
            yearly_index.values(),
            key=lambda e: e.timestamp, reverse=True))
        self._monthly = list(sorted(
            monthly_index.values(),
            key=lambda e: e.timestamp, reverse=True))

        self._taxonomies = {}
        for tax_name, entries in tax_index.items():
            self._taxonomies[tax_name] = list(
                sorted(entries.values(), key=lambda i: i.term))

        self._onIteration()

        self._archives_built = True

    def _onIteration(self):
        if not self._ctx_set:
            rcs = self._app.env.render_ctx_stack
            if rcs.current_ctx:
                rcs.current_ctx.addUsedSource(self._sources[0])
            self._ctx_set = True


class BlogArchiveEntry:
    debug_render = ['name', 'timestamp', 'posts']
    debug_render_invoke = ['name', 'timestamp', 'posts']

    def __init__(self, source, page, name, timestamp):
        self.name = name
        self.timestamp = timestamp
        self._source = source
        self._page = page
        self._items = []
        self._iterator = None

    def __str__(self):
        return str(self.name)

    def __int__(self):
        return int(self.name)

    @property
    def posts(self):
        self._load()
        self._iterator.reset()
        return self._iterator

    def _load(self):
        if self._iterator is not None:
            return

        src = ListSource(self._source, self._items)
        self._iterator = PageIterator(src, current_page=self._page)


class BlogTaxonomyEntry:
    debug_render = ['name', 'post_count', 'posts']
    debug_render_invoke = ['name', 'post_count', 'posts']

    def __init__(self, source, page, term):
        self.term = term
        self._source = source
        self._page = page
        self._items = []
        self._iterator = None

    def __str__(self):
        return self.term

    @property
    def name(self):
        return self.term

    @property
    def posts(self):
        self._load()
        self._iterator.reset()
        return self._iterator

    @property
    def post_count(self):
        return len(self._items)

    def _load(self):
        if self._iterator is not None:
            return

        src = ListSource(self._source, self._items)
        self._iterator = PageIterator(src, current_page=self._page)