Mercurial > silorider
changeset 2:27543b2e73b9
Add Twitter silo.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 29 Jul 2018 10:40:05 -0700 |
parents | 169aa24a8442 |
children | 98687befb7bf |
files | silorider/silos/base.py silorider/silos/mastodon.py silorider/silos/twitter.py tests/mockutil.py tests/test_silos_mastodon.py tests/test_silos_twitter.py |
diffstat | 6 files changed, 287 insertions(+), 44 deletions(-) [+] |
line wrap: on
line diff
--- a/silorider/silos/base.py Sun Jul 22 14:07:55 2018 -0700 +++ b/silorider/silos/base.py Sun Jul 29 10:40:05 2018 -0700 @@ -1,4 +1,6 @@ +import urllib.request import logging +import mimetypes from ..format import format_entry @@ -81,7 +83,8 @@ def load_silos(config, cache): from .print import PrintSilo from .mastodon import MastodonSilo - silo_types = [PrintSilo, MastodonSilo] + from .twitter import TwitterSilo + silo_types = [PrintSilo, MastodonSilo, TwitterSilo] silo_dict = dict([(s.SILO_TYPE, s) for s in silo_types]) silos = [] @@ -102,3 +105,33 @@ silo = silo_class(cctx) silos.append(silo) return silos + + +def upload_silo_media(entry, propname, callback): + media_ids = None + urls = entry.get(propname, [], force_list=True) + if urls: + media_ids = [] + for url in urls: + mid = _do_upload_silo_media(url, callback) + if mid is not None: + media_ids.append(mid) + return media_ids + + +def _do_upload_silo_media(url, callback): + logger.debug("Downloading %s for upload to silo..." % url) + mt, enc = mimetypes.guess_type(url) + if not mt: + mt = mimetypes.common_types['.jpg'] + + ext = mimetypes.guess_extension(mt) or '.jpg' + logger.debug("Got MIME type and extension: %s %s" % (mt, ext)) + + try: + tmpfile, headers = urllib.request.urlretrieve(url) + logger.debug("Using temporary file: %s" % tmpfile) + return callback(tmpfile, mt) + finally: + logger.debug("Cleaning up.") + urllib.request.urlcleanup()
--- a/silorider/silos/mastodon.py Sun Jul 22 14:07:55 2018 -0700 +++ b/silorider/silos/mastodon.py Sun Jul 29 10:40:05 2018 -0700 @@ -1,9 +1,7 @@ -import urllib.request import getpass import logging -import mimetypes import mastodon -from .base import Silo +from .base import Silo, upload_silo_media logger = logging.getLogger(__name__) @@ -111,38 +109,18 @@ api_base_url=self.base_url) def postEntry(self, entry, ctx): - toottxt = self.formatEntry(entry, limit=400) + toottxt = self.formatEntry(entry, limit=500) if not toottxt: raise Exception("Can't find any content to use for the toot!") visibility = self.getConfigItem('toot_visibility', fallback='public') - media_ids = None - photo_urls = entry.get('photo', [], force_list=True) - if photo_urls: - media_ids = [] - for pu in photo_urls: - media_ids.append(self._mediaPostFromUrl(pu)) + media_ids = upload_silo_media(entry, 'photo', self._media_callback) logger.debug("Posting toot: %s" % toottxt) self.client.status_post(toottxt, media_ids=media_ids, visibility=visibility) - def _mediaPostFromUrl(self, url): - logger.debug("Downloading %s for upload to Mastodon..." % url) - mt, enc = mimetypes.guess_type(url) - if not mt: - mt = mimetypes.common_types['.jpg'] - - ext = mimetypes.guess_extension(mt) or '.jpg' - logger.debug("Got MIME type and extension: %s %s" % (mt, ext)) - - try: - tmpfile, headers = urllib.request.urlretrieve(url) - logger.debug("Using temporary file: %s" % tmpfile) - - with open(tmpfile, 'rb') as tmpfp: - return self.client.media_post(tmpfp, mt) - finally: - logger.debug("Cleaning up.") - urllib.request.urlcleanup() + def _media_callback(self, tmpfile, mt): + with open(tmpfile, 'rb') as tmpfp: + return self.client.media_post(tmpfp, mt)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/silorider/silos/twitter.py Sun Jul 29 10:40:05 2018 -0700 @@ -0,0 +1,80 @@ +import logging +import twitter +from .base import Silo, upload_silo_media + + +logger = logging.getLogger(__name__) + + +class TwitterSilo(Silo): + SILO_TYPE = 'twitter' + _CLIENT_CLASS = twitter.Api + + def __init__(self, ctx): + super().__init__(ctx) + self.client = None + + def authenticate(self, ctx): + force = ctx.exec_ctx.args.force + + client_token = self.getCacheItem('clienttoken') + if not client_token or force: + logger.info("Please enter Twitter consumer tokens for %s:" % + self.ctx.silo_name) + consumer_key = input("Consumer Key: ") + consumer_secret = input("Consumer Secret: ") + client_token = '%s,%s' % (consumer_key, consumer_secret) + self.setCacheItem('clienttoken', client_token) + + access_token = self.getCacheItem('accesstoken') + if not access_token or force: + logger.info("Please enter Twitter access tokens for %s:" % + self.ctx.silo_name) + + access_key = input("Access Token: ") + access_secret = input("Access Token Secret: ") + + access_token = '%s,%s' % (access_key, access_secret) + self.setCacheItem('accesstoken', access_token) + + def onPostStart(self): + self._ensureClient() + + def _ensureClient(self): + if self.client is not None: + return + + logger.debug("Creating Twitter API client.") + client_token = self.getCacheItem('clienttoken') + if not client_token: + raise Exception("Twitter silo '%s' isn't authenticated." % + self.name) + + client_key, client_secret = client_token.split(',') + + access_token = self.getCacheItem('accesstoken') + if not access_token: + raise Exception("Twitter silo '%s' isn't authenticated." % + self.name) + + access_key, access_secret = access_token.split(',') + + self.client = self._CLIENT_CLASS( + consumer_key=client_key, + consumer_secret=client_secret, + access_token_key=access_key, + access_token_secret=access_secret) + + def postEntry(self, entry, ctx): + tweettxt = self.formatEntry(entry, limit=280) + if not tweettxt: + raise Exception("Can't find any content to use for the tweet!") + + media_ids = upload_silo_media(entry, 'photo', self._media_callback) + + logger.debug("Posting tweet: %s" % tweettxt) + self.client.PostUpdate(tweettxt, media=media_ids) + + def _media_callback(self, tmpfile, mt): + with open(tmpfile, 'rb') as tmpfp: + return self.client.UploadMediaChunked(tmpfp)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/mockutil.py Sun Jul 29 10:40:05 2018 -0700 @@ -0,0 +1,14 @@ + +def mock_urllib(m): + import urllib.request + m.setattr(urllib.request, 'urlretrieve', _patched_urlretrieve) + m.setattr(urllib.request, 'urlcleanup', _patched_urlcleanup) + return m + + +def _patched_urlretrieve(url): + return ('/retrieved/' + url.lstrip('/'), None) + + +def _patched_urlcleanup(): + pass
--- a/tests/test_silos_mastodon.py Sun Jul 22 14:07:55 2018 -0700 +++ b/tests/test_silos_mastodon.py Sun Jul 29 10:40:05 2018 -0700 @@ -1,4 +1,5 @@ import pytest +from .mockutil import mock_urllib def test_one_article(cli, feedutil, mastmock): @@ -49,13 +50,15 @@ mastmock.installTokens(cli, 'test') with monkeypatch.context() as m: - import urllib.request - m.setattr(urllib.request, 'urlretrieve', _patched_urlretrieve) - m.setattr(urllib.request, 'urlcleanup', _patched_urlcleanup) + import silorider.silos.mastodon + mock_urllib(m) + m.setattr(silorider.silos.mastodon.MastodonSilo, '_media_callback', + _patched_media_callback) ctx, _ = cli.run('process', feed) + assert ctx.cache.wasPosted('test', '/01234.html') media = ctx.silos[0].client.media[0] - assert media == ('/retrived/fullimg.jpg', 'image/jpeg', 1) + assert media == ('/retrieved/fullimg.jpg', 'image/jpeg', 1) toot = ctx.silos[0].client.toots[0] assert toot == ("This is a quick photo update.", [1], 'public') @@ -74,25 +77,23 @@ mastmock.installTokens(cli, 'test') with monkeypatch.context() as m: - import urllib.request - m.setattr(urllib.request, 'urlretrieve', _patched_urlretrieve) - m.setattr(urllib.request, 'urlcleanup', _patched_urlcleanup) + import silorider.silos.mastodon + mock_urllib(m) + m.setattr(silorider.silos.mastodon.MastodonSilo, '_media_callback', + _patched_media_callback) ctx, _ = cli.run('process', feed) + assert ctx.cache.wasPosted('test', '/01234.html') media = ctx.silos[0].client.media[0] - assert media == ('/retrived/fullimg1.jpg', 'image/jpeg', 1) + assert media == ('/retrieved/fullimg1.jpg', 'image/jpeg', 1) media = ctx.silos[0].client.media[1] - assert media == ('/retrived/fullimg2.jpg', 'image/jpeg', 2) + assert media == ('/retrieved/fullimg2.jpg', 'image/jpeg', 2) toot = ctx.silos[0].client.toots[0] assert toot == ("This is a photo update with 2 photos.", [1, 2], 'public') -def _patched_urlretrieve(url): - return ('/retrived/' + url.lstrip('/'), None) - - -def _patched_urlcleanup(): - pass +def _patched_media_callback(self, tmpfile, mt): + return self.client.media_post(tmpfile, mt) @pytest.fixture(scope='session')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_silos_twitter.py Sun Jul 29 10:40:05 2018 -0700 @@ -0,0 +1,137 @@ +import pytest +from .mockutil import mock_urllib + + +def test_one_article(cli, feedutil, tweetmock): + feed = cli.createTempFeed(feedutil.makeFeed( + """<h1 class="p-name">A new article</h1> +<div class="e-content"> +<p>This is the text of the article.</p> +<p>It has 2 paragraphs.</p> +</div> +<a class="u-url" href="https://example.org/a-new-article">permalink</a>""" + )) + + cli.appendSiloConfig('test', 'twitter', url='/blah') + tweetmock.installTokens(cli, 'test') + + ctx, _ = cli.run('process', feed) + assert ctx.cache.wasPosted('test', 'https://example.org/a-new-article') + toot = ctx.silos[0].client.tweets[0] + assert toot == ('A new article https://example.org/a-new-article', None) + + +def test_one_micropost(cli, feedutil, tweetmock): + feed = cli.createTempFeed(feedutil.makeFeed( + """<p class="p-name">This is a quick update.</p> +<a class="u-url" href="/01234.html">permalink</a>""" + )) + + cli.appendSiloConfig('test', 'twitter', url='/blah') + tweetmock.installTokens(cli, 'test') + + ctx, _ = cli.run('process', feed) + assert ctx.cache.wasPosted('test', '/01234.html') + toot = ctx.silos[0].client.tweets[0] + assert toot == ("This is a quick update.", None) + + +def test_one_micropost_with_one_photo(cli, feedutil, tweetmock, monkeypatch): + feed = cli.createTempFeed(feedutil.makeFeed( + """<p class="p-name">This is a quick photo update.</p> +<div> + <a class="u-photo" href="/fullimg.jpg"><img src="/thumbimg.jpg"/></a> +</div> +<a class="u-url" href="/01234.html">permalink</a>""" + )) + + cli.appendSiloConfig('test', 'twitter', url='/blah') + tweetmock.installTokens(cli, 'test') + + with monkeypatch.context() as m: + import silorider.silos.twitter + mock_urllib(m) + m.setattr(silorider.silos.twitter.TwitterSilo, '_media_callback', + _patched_media_callback) + ctx, _ = cli.run('process', feed) + + assert ctx.cache.wasPosted('test', '/01234.html') + media = ctx.silos[0].client.media[0] + assert media == ('/retrieved/fullimg.jpg', 1) + toot = ctx.silos[0].client.tweets[0] + assert toot == ("This is a quick photo update.", [1]) + + +def test_one_micropost_with_two_photos(cli, feedutil, tweetmock, monkeypatch): + feed = cli.createTempFeed(feedutil.makeFeed( + """<p class="p-name">This is a photo update with 2 photos.</p> +<div> + <a class="u-photo" href="/fullimg1.jpg"><img src="/thumbimg1.jpg"/></a> + <a class="u-photo" href="/fullimg2.jpg"><img src="/thumbimg2.jpg"/></a> +</div> +<a class="u-url" href="/01234.html">permalink</a>""" + )) + + cli.appendSiloConfig('test', 'twitter', url='/blah') + tweetmock.installTokens(cli, 'test') + + with monkeypatch.context() as m: + import silorider.silos.twitter + mock_urllib(m) + m.setattr(silorider.silos.twitter.TwitterSilo, '_media_callback', + _patched_media_callback) + ctx, _ = cli.run('process', feed) + + assert ctx.cache.wasPosted('test', '/01234.html') + media = ctx.silos[0].client.media[0] + assert media == ('/retrieved/fullimg1.jpg', 1) + media = ctx.silos[0].client.media[1] + assert media == ('/retrieved/fullimg2.jpg', 2) + toot = ctx.silos[0].client.tweets[0] + assert toot == ("This is a photo update with 2 photos.", [1, 2]) + + +def _patched_media_callback(self, tmpfile, mt): + return self.client.UploadMediaChunked(tmpfile) + + +@pytest.fixture(scope='session') +def tweetmock(): + from silorider.silos.twitter import TwitterSilo + TwitterSilo._CLIENT_CLASS = TwitterMock + return TwitterMockUtil() + + +class TwitterMock: + def __init__(self, consumer_key, consumer_secret, + access_token_key, access_token_secret): + assert consumer_key == 'TEST_CLIENT_KEY' + assert consumer_secret == 'TEST_CLIENT_SECRET' + assert access_token_key == 'TEST_ACCESS_KEY' + assert access_token_secret == 'TEST_ACCESS_SECRET' + + self.tweets = [] + self.media = [] + self.next_mid = 1 + + def PostUpdate(self, tweet, media=None): + self.tweets.append((tweet, media)) + + def UploadMediaChunked(self, filename): + mid = self.next_mid + self.next_mid += 1 + self.media.append((filename, mid)) + return mid + + +class TwitterMockUtil: + def installTokens(self, cli, silo_name): + def do_install_tokens(ctx): + ctx.cache.setCustomValue( + '%s_clienttoken' % silo_name, + 'TEST_CLIENT_KEY,TEST_CLIENT_SECRET') + ctx.cache.setCustomValue( + '%s_accesstoken' % silo_name, + 'TEST_ACCESS_KEY,TEST_ACCESS_SECRET') + + cli.preExecHook(do_install_tokens)