Mercurial > silorider
changeset 54:e3d2e13e8853
Add Facebook Page silo.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 08 Oct 2023 13:52:59 -0700 |
parents | 805c7d768caa |
children | d71871654790 |
files | requirements.txt setup.py silorider/silos/base.py silorider/silos/facebook.py |
diffstat | 4 files changed, 176 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/requirements.txt Sun Oct 08 13:52:26 2023 -0700 +++ b/requirements.txt Sun Oct 08 13:52:59 2023 -0700 @@ -7,6 +7,7 @@ coloredlogs==15.0.1 dateparser==1.1.8 decorator==5.1.1 +facebook-sdk==3.1.0 html5lib==1.1 humanfriendly==10.0 idna==3.4
--- a/setup.py Sun Oct 08 13:52:26 2023 -0700 +++ b/setup.py Sun Oct 08 13:52:59 2023 -0700 @@ -17,6 +17,7 @@ 'mf2py>=1.1.0', 'mf2util>=0.5.0', 'python-dateutil>=2.7.0', + 'python-facebook-api>=0.17.1', 'python-twitter>=3.4.0', 'ronkyuu>=0.6', 'tweepy>=4.14.0'
--- a/silorider/silos/base.py Sun Oct 08 13:52:26 2023 -0700 +++ b/silorider/silos/base.py Sun Oct 08 13:52:59 2023 -0700 @@ -108,10 +108,17 @@ def load_silos(config, cache): from .print import PrintSilo from .bluesky import BlueskySilo + from .facebook import FacebookSilo from .mastodon import MastodonSilo from .twitter import TwitterSilo from .webmention import WebmentionSilo - silo_types = [PrintSilo, BlueskySilo, MastodonSilo, TwitterSilo, WebmentionSilo] + silo_types = [ + PrintSilo, + BlueskySilo, + FacebookSilo, + MastodonSilo, + TwitterSilo, + WebmentionSilo] silo_dict = dict([(s.SILO_TYPE, s) for s in silo_types]) silos = []
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/silorider/silos/facebook.py Sun Oct 08 13:52:59 2023 -0700 @@ -0,0 +1,166 @@ +import logging +import requests +import pyfacebook +from datetime import datetime +from requests_oauthlib.oauth2_session import OAuth2Session +from requests_oauthlib.compliance_fixes.facebook import facebook_compliance_fix +from .base import Silo +from ..format import CardProps + + +logger = logging.getLogger(__name__) + + +class FacebookSilo(Silo): + SILO_TYPE = 'facebook' + _CLIENT_CLASS = pyfacebook.GraphAPI + + def __init__(self, ctx): + super().__init__(ctx) + + self.client = None + + def authenticate(self, ctx): + force = ctx.exec_ctx.args.force + + # Get the app info tokens. + app_id = self.getCacheItem('appid') + if not app_id or force: + logger.info("Plase enter Facebook app ID for %s" % + self.ctx.silo_name) + app_id = input("App ID:") + self.setCacheItem('appid', app_id) + + app_secret = self.getCacheItem('appsecret') + if not app_secret or force: + logger.info("Please enter Facebook app secret for %s" % + self.ctx.silo_name) + app_secret = input("App Secret:") + self.setCacheItem('appsecret', app_secret) + + # Start the OAuth authorization flow. + return_url = 'https://bolt80.com/silorider/auth_success.php' + perms = ['pages_show_list', 'pages_manage_posts'] + + auth_client = self._CLIENT_CLASS( + app_id=app_id, + app_secret=app_secret, + oauth_flow=True) + + login_url, state = auth_client.get_authorization_url(return_url, perms) + logger.info("Please authenticate at the following URL:") + logger.info(login_url) + resp_url = input("Paste the redirected URL here:") + if not resp_url: + logger.info("Authentication aborted!") + return + + # Get the long-lived user access token. + user_access_token = auth_client.exchange_user_access_token( + response=resp_url, + redirect_uri=return_url, + scope=perms) + logger.info("Got user access token, exchanging it for a long-lived one.") + print(user_access_token) + ll_user_access_token = auth_client.exchange_long_lived_user_access_token( + user_access_token['access_token']) + logger.info("Got long-lived user access token.") + print(ll_user_access_token) + + # Get the user account information where we can find which page + # we need to publish to. + auth_client.access_token = ll_user_access_token['access_token'] + + user = auth_client.get('/me', None) + print(user) + + accounts = auth_client.get('/me/accounts', None) + print(accounts) + pages = accounts['data'] + if len(pages) > 1: + logger.info("Choose which page to publish to:") + for i, page in enumerate(pages): + logger.info("%d: %s" % (i + 1, page['name'])) + page_idx = input("Enter page index:") + page = pages[page_idx - 1] + else: + page = pages[0] + + # Get a long-lived page access token for the chosen page. + logger.info("Requesting long-lived page access token for: %s" % page['name']) + ll_page_access_token = auth_client.exchange_long_lived_page_access_token( + user['id'], page['access_token']) + logger.info("Got long-lived page access token") + print(ll_page_access_token) + + id_to_find = page['id'] + page = next( + filter( + lambda p: p['id'] == id_to_find, + ll_page_access_token['data']), + None) + if page is None: + logger.error("Can't find selected page in authorization response!") + return + + self.setCacheItem("accesstoken", page['access_token']) + self.setCacheItem("objectid", page['id']) + logger.info("Page access token saved.") + + def onPostStart(self, ctx): + if not ctx.args.dry_run: + self._ensureClient() + + def _ensureClient(self): + if self.client is not None: + return + + logger.debug("Creating Facebook GraphAPI client.") + + app_id = self.getCacheItem('appid') + app_secret = self.getCacheItem('appsecret') + access_token = self.getCacheItem('accesstoken') + if not app_id or not access_token or not app_secret: + raise Exception("Facebook silo '%s' isn't authenticated." % + self.name) + + self.page_id = self.getCacheItem("objectid") + if not self.page_id: + raise Exception("Facebook silo '%s' doesn't have a page ID." % + self.name) + + self.client = self._CLIENT_CLASS( + app_id=app_id, + app_secret=app_secret, + access_token=access_token) + + def getEntryCard(self, entry, ctx): + return self.formatEntry(entry, card_props=CardProps('property', 'og')) + + def mediaCallback(self, tmpfile, mt, url, desc): + resp = self.client.post_object( + object_id=self.page_id, + connection='photos', + data={ + 'url': url, 'caption': desc, + 'published': False, 'temporary': True}) + logger.debug("Uploaded photo '%s' as object: %s" % (url, resp)) + return resp['id'] + + def postEntry(self, entry_card, media_ids, ctx): + data={'message': entry_card.text} + if media_ids: + attached_media = [] + for media_id in media_ids: + attached_media.append({"media_fbid": media_id}) + data['attached_media'] = attached_media + + logger.debug("Posting Facebook update: %s" % entry_card.text) + logger.debug("Using data: %s" % data) + + resp = self.client.post_object( + object_id=self.page_id, + connection='feed', + data={'message': entry_card.text}) + logger.debug("Posted as object: %s" % resp) +