Mercurial > piecrust2
view piecrust/tasks/mentions.py @ 1188:a7c43131d871
bake: Fix file write flushing problem with Python 3.8+
Writing the cache files fails in Python 3.8 because it looks like flushing
behaviour has changed. We need to explicitly flush. And even then, in very
rare occurrences, it looks like it can still run into racing conditions,
so we do a very hacky and ugly "retry" loop when fetching cached data :(
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Tue, 15 Jun 2021 22:36:23 -0700 |
parents | 8af2ea1f5c34 |
children |
line wrap: on
line source
import os import os.path import json import logging from piecrust.tasks.base import TaskRunner logger = logging.getLogger(__name__) class InvalidMentionTargetError(Exception): pass class SourceDoesntLinkToTargetError(Exception): pass class DuplicateMentionError(Exception): pass class MentionTaskRunner(TaskRunner): TASK_TYPE = 'mention' def runTask(self, data, ctx): import json import requests from bs4 import BeautifulSoup from piecrust.app import PieCrustFactory from piecrust.serving.util import get_requested_page src_url = data['source'] tgt_url = data['target'] # Find if we have a page at the target URL. To do that we need to spin # up a PieCrust app that knows how the website works. Because the # website might have been baked with custom settings (usually the site # root URL) there's a good chance we need to apply some variants, which # the user can specify in the config. pcappfac = PieCrustFactory(self.app.root_dir, cache_key='webmention') wmcfg = self.app.config.get('webmention') if wmcfg.get('config_variant'): pcappfac.config_variants = [wmcfg.get('config_variant')] if wmcfg.get('config_variants'): pcappfac.config_variants = list(wmcfg.get('config_variants')) if wmcfg.get('config_values'): pcappfac.config_values = list(wmcfg.get('config_values').items()) pcapp = pcappfac.create() logger.debug("Locating page: %s" % tgt_url) try: req_page = get_requested_page(pcapp, tgt_url) if req_page.page is None: raise InvalidMentionTargetError() except Exception as ex: logger.error("Can't check webmention target page: %s" % tgt_url) logger.exception(ex) raise InvalidMentionTargetError() # Grab the source URL's contents and see if anything references the # target (ours) URL. logger.debug("Fetching mention source: %s" % src_url) src_t = requests.get(src_url) src_html = BeautifulSoup(src_t.text, 'html.parser') for link in src_html.find_all('a'): href = link.get('href') if href == tgt_url: break else: logger.error("Source '%s' doesn't link to target: %s" % (src_url, tgt_url)) raise SourceDoesntLinkToTargetError() # Load the previous mentions and find any pre-existing mention from the # source URL. mention_path, mention_data = _load_page_mentions(req_page.page) for m in mention_data['mentions']: if m['source'] == src_url: logger.error("Duplicate mention found from: %s" % src_url) raise DuplicateMentionError() # Make the new mention. new_mention = {'source': src_url} # Parse the microformats on the page, see if there's anything # interesting we can use. mf2_info = _get_mention_info_from_mf2(src_url, src_html) if mf2_info: new_mention.update(mf2_info) # Add the new mention. mention_data['mentions'].append(new_mention) with open(mention_path, 'w', encoding='utf-8') as fp: json.dump(mention_data, fp) logger.info("Received webmention from: %s" % src_url) def _get_mention_info_from_mf2(base_url, bs_html): import mf2py from urllib.parse import urljoin mf2 = mf2py.parse(bs_html) mf2_items = mf2.get('items') if not mf2_items: return None hentry = next(filter( lambda i: 'h-entry' in i['type'], mf2_items), None) if not hentry: return None info = {} hentry_props = hentry['properties'] pnames = hentry_props.get('name') if pnames: info['name'] = pnames[0] urls = hentry_props.get('url') if urls: info['url'] = urljoin(base_url, urls[0]) pubdates = hentry_props.get('published') if pubdates: info['published'] = pubdates[0] contents = hentry_props.get('content') if contents: info['content'] = contents[0]['html'] authors = hentry_props.get('author') if authors: hcard = next(filter( lambda i: 'h-card' in i['type'], authors), None) if hcard: hcard_props = hcard['properties'] hcard_names = hcard_props.get('name') if hcard_names: info['author_name'] = hcard_names[0] hcard_photos = hcard_props.get('photo') if hcard_photos: info['author_photo'] = urljoin(base_url, hcard_photos[0]) hcard_urls = hcard_props.get('url') if hcard_urls: info['author_url'] = urljoin(base_url, hcard_urls[0]) return info def _load_page_mentions(page): from piecrust.pathutil import ensure_dir logger.debug("Loading page mentions for: %s" % page.content_spec) dirname, _ = os.path.splitext(page.content_spec) dirname += '-assets' ensure_dir(dirname) mention_path = os.path.join(dirname, 'mentions.json') try: with open(mention_path, 'r', encoding='utf-8') as fp: return mention_path, json.load(fp) except IOError: return mention_path, {'mentions': []}