imonmastodon/stream_answers.py
2022-12-22 06:46:52 +00:00

57 lines
1.8 KiB
Python

import re
import json
import requests
import sql_api
import twitter_api
import config
def extract_mastodon_handle(text):
""" Get a mastodon alias form a text (assuming a word with
two @ and a . in the later part of the wordis a mastodon alias). """
handle = re.search(r'@\w+@\w+.\w+', text)
if handle:
handle = handle.group()
else:
handle = False
return handle
def stream():
""" Monitor answers to a tweet. """
# Make connection to SQL databse.
db = sql_api.DB(config.get_config())
api = twitter_api.API()
rules = api.get_rules()
api.delete_all_rules(rules)
api.set_rules()
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream?expansions=referenced_tweets.id,author_id", auth=api.bearer_oauth, stream=True,
)
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
try:
twitter_username = json_response['includes']['users'][0]['username']
mastodon_username = extract_mastodon_handle(json_response['includes']['tweets'][0]['text'])
if mastodon_username:
print(mastodon_username)
try:
source_tweet = str(json_response['data']['id'])
except Exception as e:
print(1, json_response)
print(e)
source_tweet = 'user'
# Add Mastodon username to db.
db.insert_user(twitter_username, mastodon_username, source_tweet)
except Exception as e:
print(2, json_response)
print(e)
pass
if __name__ == "__main__":
stream()