view piecrust/data/paginator.py @ 1188:a7c43131d871

bake: Fix file write flushing problem with Python 3.8+ Writing the cache files fails in Python 3.8 because it looks like flushing behaviour has changed. We need to explicitly flush. And even then, in very rare occurrences, it looks like it can still run into racing conditions, so we do a very hacky and ugly "retry" loop when fetching cached data :(
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 15 Jun 2021 22:36:23 -0700
parents 1c324407bd1f
children
line wrap: on
line source

import math
import logging
from werkzeug.utils import cached_property
from piecrust.sources.base import ContentSource


logger = logging.getLogger(__name__)


class Paginator(object):
    debug_render = [
        'has_more', 'items', 'has_items', 'items_per_page',
        'items_this_page', 'prev_page_number', 'this_page_number',
        'next_page_number', 'prev_page', 'next_page',
        'total_item_count', 'total_page_count',
        'next_item', 'prev_item']
    debug_render_invoke = [
        'has_more', 'items', 'has_items', 'items_per_page',
        'items_this_page', 'prev_page_number', 'this_page_number',
        'next_page_number', 'prev_page', 'next_page',
        'total_item_count', 'total_page_count',
        'next_item', 'prev_item']

    def __init__(self, source, current_page, sub_num, *,
                 pgn_filter=None, items_per_page=-1):
        self._source = source
        self._page = current_page
        self._sub_num = sub_num
        self._iterator = None
        self._pgn_filter = pgn_filter
        self._items_per_page = items_per_page
        self._pgn_set_on_ctx = False
        self._is_content_source = isinstance(source, ContentSource)

    @property
    def is_loaded(self):
        return self._iterator is not None

    @property
    def has_more(self):
        return self.next_page_number is not None

    @property
    def unload(self):
        self._iterator = None

    # Backward compatibility with PieCrust 1.0 {{{
    @property
    def posts(self):
        return self.items

    @property
    def has_posts(self):
        return self.has_items

    @property
    def posts_per_page(self):
        return self.items_per_page

    @property
    def posts_this_page(self):
        return self.items_this_page

    @property
    def total_post_count(self):
        return self.total_item_count

    @property
    def next_post(self):
        return self.next_item

    @property
    def prev_post(self):
        return self.prev_item
    # }}}

    @property
    def items(self):
        self._load()
        return self._iterator

    @property
    def has_items(self):
        return self.items_this_page > 0

    @cached_property
    def items_per_page(self):
        if self._items_per_page > 0:
            return self._items_per_page

        if self._page is not None:
            ipp = self._page.config.get('items_per_page')
            if ipp is not None:
                return ipp

        if self._is_content_source:
            ipp = self._source.config.get('items_per_page')
            if ipp is not None:
                return ipp

        raise Exception("No way to figure out how many items to display "
                        "per page.")

    @property
    def items_this_page(self):
        self._load()
        return len(self._iterator)

    @property
    def prev_page_number(self):
        if self._sub_num > 1:
            return self._sub_num - 1
        return None

    @property
    def this_page_number(self):
        return self._sub_num

    @property
    def next_page_number(self):
        self._load()
        if self._iterator._has_more:
            return self._sub_num + 1
        return None

    @property
    def prev_page(self):
        num = self.prev_page_number
        if num is not None:
            return self._getPageUri(num)
        return None

    @property
    def this_page(self):
        return self._getPageUri(self._sub_num)

    @property
    def next_page(self):
        num = self.next_page_number
        if num is not None:
            return self._getPageUri(num)
        return None

    @property
    def total_item_count(self):
        self._load()
        return self._iterator.total_count

    @property
    def total_page_count(self):
        total_count = self.total_item_count
        per_page = self.items_per_page
        return int(math.ceil(total_count / per_page))

    @property
    def next_item(self):
        self._load()
        return self._iterator.prev_page

    @property
    def prev_item(self):
        self._load()
        return self._iterator.next_page

    def all_page_numbers(self, radius=-1):
        total_page_count = self.total_page_count
        if total_page_count == 0:
            return []

        if radius <= 0 or total_page_count < (2 * radius + 1):
            return list(range(1, total_page_count + 1))

        first_num = self._sub_num - radius
        last_num = self._sub_num + radius
        if first_num <= 0:
            last_num += 1 - first_num
            first_num = 1
        elif last_num > total_page_count:
            first_num -= (last_num - total_page_count)
            last_num = total_page_count
        first_num = max(1, first_num)
        last_num = min(total_page_count, last_num)
        return list(range(first_num, last_num + 1))

    def page(self, index):
        return self._getPageUri(index)

    def _load(self):
        if self._iterator is not None:
            return

        from piecrust.data.filters import PaginationFilter
        from piecrust.dataproviders.pageiterator import (
            PageIterator, HardCodedFilterIterator)

        self._iterator = PageIterator(
            self._source,
            current_page=self._page)

        if self._pgn_filter is not None:
            pag_fil = PaginationFilter()
            pag_fil.addClause(self._pgn_filter.root_clause)
            self._iterator._simpleNonSortedWrap(
                HardCodedFilterIterator, pag_fil)

        offset = (self._sub_num - 1) * self.items_per_page
        limit = self.items_per_page
        self._iterator.slice(offset, limit)

        if self._is_content_source:
            self._iterator._iter_event += self._onIteration

        self._iterator._lockIterator()

    def _getPageUri(self, index):
        return self._page.getUri(index)

    def _onIteration(self, it):
        if not self._pgn_set_on_ctx:
            rcs = self._source.app.env.render_ctx_stack
            if rcs.current_ctx is not None:
                rcs.current_ctx.setPagination(self)
                self._pgn_set_on_ctx = True