Mercurial > silorider
view silorider/silos/twitter.py @ 80:2a7fa4259fc8 draft default tip master
Improve upload of preview images.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 30 Nov 2024 14:14:54 -0800 |
parents | c5bf03406a33 |
children |
line wrap: on
line source
import os.path import logging import tweepy import urllib.parse from .base import Silo, SiloProfileUrlHandler from ..format import CardProps, UrlFlattener from ..parse import strip_img_alt logger = logging.getLogger(__name__) class _CompositeClient: def __init__(self, consumer_key, consumer_secret, access_token_key, access_token_secret): self.v2 = tweepy.Client( None, # using OAuth v1 consumer_key=consumer_key, consumer_secret=consumer_secret, access_token=access_token_key, access_token_secret=access_token_secret) auth_v1 = tweepy.OAuth1UserHandler( consumer_key, consumer_secret, access_token_key, access_token_secret) self.v1 = tweepy.API(auth_v1) def create_tweet(self, *args, **kwargs): return self.v2.create_tweet(*args, **kwargs) def simple_upload(self, *args, **kwargs): return self.v1.simple_upload(*args, **kwargs) class TwitterSilo(Silo): SILO_TYPE = 'twitter' PHOTO_LIMIT = 5000000 _CLIENT_CLASS = _CompositeClient def __init__(self, ctx): super().__init__(ctx) self.client = None def authenticate(self, ctx): force = ctx.exec_ctx.args.force # Get the app info tokens. client_token = self.getCacheItem('clienttoken') if not client_token or force: logger.info("Please enter consumer tokens (aka API tokens) for %s:" % self.ctx.silo_name) consumer_key = input("Consumer Key: ") consumer_secret = input("Consumer Secret: ") client_token = '%s,%s' % (consumer_key, consumer_secret) self.setCacheItem('clienttoken', client_token) # Generate an access token by making the user login and authorize # our app. access_token = self.getCacheItem('accesstoken') if not access_token or force: self._ensureClient() auth_url = self.client.v1.auth.get_authorization_url() logger.info("Please authorize SiloRider with Twitter for %s at: %s" % (self.ctx.silo_name, auth_url)) logger.info("And then enter the OAuth verifier token here:") verifier_token = input("Verifier Token:") access_key, access_secret = self.client.v1.auth.get_access_token( verifier_token) access_token = '%s,%s' % (access_key, access_secret) self.setCacheItem('accesstoken', access_token) def onPostStart(self, ctx): if not ctx.args.dry_run: self._ensureClient() def _ensureClient(self): if self.client is not None: return logger.debug("Creating Twitter API client.") client_token = self.getCacheItem('clienttoken') if not client_token: raise Exception("Twitter silo '%s' isn't authenticated." % self.name) client_key, client_secret = client_token.split(',') access_token = self.getCacheItem('accesstoken') if not access_token: raise Exception("Twitter silo '%s' isn't authenticated." % self.name) access_key, access_secret = access_token.split(',') self.client = self._CLIENT_CLASS( consumer_key=client_key, consumer_secret=client_secret, access_token_key=access_key, access_token_secret=access_secret) def getProfileUrlHandler(self): return TwitterProfileUrlHandler() def getEntryCard(self, entry, ctx): return self.formatEntry( entry, limit=280, card_props=CardProps('name', 'twitter'), profile_url_handlers=ctx.profile_url_handlers, url_flattener=TwitterUrlFlattener()) def mediaCallback(self, tmpfile, mt, url, desc): url_parsed = urllib.parse.urlparse(url) fname = os.path.basename(url_parsed.path) with open(tmpfile, 'rb') as tmpfp: logger.debug("Uploading %s to twitter" % fname) media = self.client.simple_upload(fname, file=tmpfp) return media.media_id def postEntry(self, entry_card, media_ids, ctx): self.client.create_tweet(text=entry_card.text, media_ids=media_ids) TWITTER_NETLOCS = ['twitter.com', 'www.twitter.com'] class TwitterProfileUrlHandler(SiloProfileUrlHandler): def handleUrl(self, text, raw_url): url = urllib.parse.urlparse(raw_url) # Is it a Twitter URL? if url.netloc not in TWITTER_NETLOCS: return None path = url.path.lstrip('/') # Is it a profile URL? if '/' not in path: return '@' + path return None class TwitterUrlFlattener(UrlFlattener): def replaceHref(self, text, raw_url, ctx): url = urllib.parse.urlparse(raw_url) # Is it a Twitter URL? if url.netloc not in TWITTER_NETLOCS: return None path = url.path.lstrip('/') # Is it a profile URL? if '/' not in path: return '@' + path return None def measureUrl(self, raw_url): return 23