view silorider/silos/bluesky.py @ 50:53d6c58a98e0

Fix problem with missing timezones, Bluesky doesn't like that.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 08 Oct 2023 13:48:48 -0700
parents 486affad656e
children b87217b90048
line wrap: on
line source

import re
import json
import time
import urllib.parse
import getpass
import logging
import datetime
from .base import Silo
from ..format import CardProps, UrlFlattener, URLMODE_ERASE

import atproto
import atproto.xrpc_client.models as atprotomodels


logger = logging.getLogger(__name__)


class _BlueskyClient(atproto.Client):
    def __init__(self, *args, **kwargs):
        atproto.Client.__init__(self, *args, **kwargs)

    def send_post(self, text, *, post_datetime=None, embed=None, facets=None):
        # Override the atproto.Client send_post function because it
        # doesn't support facets yet. The code is otherwise more or
        # less identical.
        repo = self.me.did
        langs = [atprotomodels.languages.DEFAULT_LANGUAGE_CODE1]

        # Make sure we have a proper time zone.
        post_datetime = post_datetime or datetime.datetime.now()
        if not post_datetime.tzinfo:
            tz_dt = datetime.datetime.now().astimezone()
            post_datetime = post_datetime.replace(tzinfo=tz_dt.tzinfo)
        created_at = post_datetime.isoformat()

        # Do it!
        data = atprotomodels.ComAtprotoRepoCreateRecord.Data(
                repo=repo,
                collection=atprotomodels.ids.AppBskyFeedPost,
                record=atprotomodels.AppBskyFeedPost.Main(
                    createdAt=created_at,
                    text=text,
                    facets=facets,
                    embed=embed,
                    langs=langs)
                )
        self.com.atproto.repo.create_record(data)


class BlueskySilo(Silo):
    SILO_TYPE = 'bluesky'
    _DEFAULT_SERVER = 'bsky.app'
    _CLIENT_CLASS = _BlueskyClient

    def __init__(self, ctx):
        super().__init__(ctx)

        base_url = self.getConfigItem('url')
        self.client = self._CLIENT_CLASS(base_url)

    def authenticate(self, ctx):
        force = ctx.exec_ctx.args.force

        password = self.getCacheItem('password')
        if not password or force:
            logger.info("Authenticating client app with Bluesky for %s" %
                        self.ctx.silo_name)
            email = input("Email: ")
            password = getpass.getpass(prompt="Application password: ")
            profile = self.client.login(email, password)

            logger.info("Authenticated as %s" % profile.displayName)
            self.setCacheItem('email', email)
            self.setCacheItem('password', password)

    def onPostStart(self, ctx):
        if not ctx.args.dry_run:
            email = self.getCacheItem('email')
            password = self.getCacheItem('password')
            if not email or not password:
                raise Exception("Please authenticate Bluesky silo %s" %
                                self.ctx.silo_name)
            self.client.login(email, password)

    def getEntryCard(self, entry, ctx):
        # We use URLMODE_ERASE to remove all hyperlinks from the
        # formatted text, and we later add them as facets to the atproto
        # record.
        url_flattener = BlueskyUrlFlattener()
        card = self.formatEntry(
            entry,
            limit=300,
            # Use Twitter's meta properties
            card_props=CardProps('name', 'twitter'),
            url_flattener=url_flattener,
            url_mode=URLMODE_ERASE)
        card.__bsky_url_flattener = url_flattener
        return card

    def mediaCallback(self, tmpfile, mt, url, desc):
        with open(tmpfile, 'rb') as tmpfp:
            data = tmpfp.read()

        logger.debug("Uploading image to Bluesky (%d bytes) with description: %s" %
                     (len(data), desc))
        upload = self.client.com.atproto.repo.upload_blob(data)

        if desc is None:
            desc = ""
        return atprotomodels.AppBskyEmbedImages.Image(alt=desc, image=upload.blob)

    def postEntry(self, entry_card, media_ids, ctx):
        # Add images as an embed on the atproto record.
        embed = None
        if media_ids:
            embed = atprotomodels.AppBskyEmbedImages.Main(images=media_ids)

        # Grab any URLs detected by our URL flattener and add them as
        # facets on the atproto record.
        facets = None
        url_flattener = entry_card.__bsky_url_flattener
        if url_flattener.urls:
            facets = []
            for url_info in url_flattener.urls:
                # atproto requires an http or https scheme.
                start, end, url = url_info
                if not url.startswith('http'):
                    url = 'https://' + url

                facet = atprotomodels.AppBskyRichtextFacet.Main(
                    features=[atprotomodels.AppBskyRichtextFacet.Link(uri=url)],
                    index=atprotomodels.AppBskyRichtextFacet.ByteSlice(
                        byteStart=start, byteEnd=end),
                    )
                facets.append(facet)

        # Create the record!
        entry_dt = entry_card.entry.get('published')
        self.client.send_post(
                text=entry_card.text, post_datetime=entry_dt, embed=embed,
                facets=facets)


BLUESKY_NETLOC = 'bsky.app'

# Match both links to a profile by name, and by ID
profile_path_re = re.compile(r'/profile/([\w\d\.]+|(did\:plc\:[\w\d]+))')


class BlueskyUrlFlattener(UrlFlattener):
    def __init__(self):
        self.urls = []

    def replaceHref(self, text, raw_url, ctx):
        url = urllib.parse.urlparse(raw_url)

        # If this is a Bluesky profile URL, replace it with a mention.
        if url.netloc == BLUESKY_NETLOC:
            m = profile_path_re.match(url.path)
            if m:
                return '@' + m.group(1)

        # Otherwise, keep track of where the URL is so we can add a facet
        # for it.
        start = ctx.byte_length
        end = start + len(text.encode())
        self.urls.append((start, end, raw_url))
        print("Gathered link: ", start, end, raw_url)

        # Always keep the text as-is.
        return text

    def measureUrl(self, url):
        return len(url)