first commit

This commit is contained in:
Lasse Studion 2022-12-21 16:48:07 +01:00
commit 8d14a31212
6 changed files with 243 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
*
!.gitignore
!config.py
!license.txt
!README.md
!requirements.txt
!sql_api.py
!twitter_api.py
!stream_answers.py
!twitterbot.py

7
README.md Normal file
View File

@ -0,0 +1,7 @@
Simple script to find out if Twitter accounts one is following are on Mastodon. Identifying mastodon handles from Twitter accounts' bio, adding them to a CSV file, uploading that file to Dropbox and then answering with a link to a shared Dropbox file.
_To run this you need:_
- [Twitter API credentials](https://developer.twitter.com/en/docs/twitter-api)
- [Dropbox API credentials](https://www.dropbox.com/developers/)
The project is in development and I will fill out this readme later.

80
config.py Normal file
View File

@ -0,0 +1,80 @@
import os
import json
def create_config():
# Create a config file. #* Add more keys if necessary.
config = {}
with open("config.json", "w") as f:
# Info for Twitter:
twitter_token = input("Twitter token bearer: ")
config["twitter_token"] = twitter_token
config['bot_username'] = input('Bot username: ')
# Info for Postgresql database:
config['postgres_host'] = input('Postgres database host: ')
config['postgres_user'] = input('Postgres database username: ')
config['postgres_password'] = input('Postgres database password: ')
config['postgres_database'] = input('Postgres database name: ')
# Write to file.
json.dump(config, f)
return config
def update_config(key, value):
# Check if the config files with the token exist, create it if not.
if os.path.exists("config.json"):
with open("config.json") as f:
config = json.load(f)
config[key] = value
else:
print("No config file, creating one...")
config = create_config()
config[key] = value
# Write updated config to file.
with open("config.json", "w") as f:
json.dump(config, f)
return config
def get_config(check_for=False):
# Returns a config file, creating one if not existing.
if os.path.exists("config.json"):
with open("config.json") as f:
try:
config = json.load(f)
except json.decoder.JSONDecodeError:
config = create_config()
if check_for:
for i in check_for:
if i not in config:
config = create_config()
else:
config = create_config()
return config
def get_dropbox_tokens(app_key):
from dropbox import DropboxOAuth2FlowNoRedirect
'''
Populate your app key in order to run this locally
'''
auth_flow = DropboxOAuth2FlowNoRedirect(app_key, use_pkce=True, token_access_type='offline')
authorize_url = auth_flow.start()
print("1. Go to: " + authorize_url)
print("2. Click \"Allow\" (you might have to log in first).")
print("3. Copy the authorization code.")
auth_code = input("Enter the authorization code here: ").strip()
oauth_result = auth_flow.finish(auth_code)
return oauth_result
if __name__ == "__main__":
os.chdir(os.path.dirname(os.path.realpath(__file__)))
create_config()
print(f'Configuration file created at {os.path.realpath("config.json")}.')

35
sql_api.py Normal file
View File

@ -0,0 +1,35 @@
import psycopg2
class DB:
def __init__(self, config):
# Establish a connection to the database
self.database = psycopg2.connect(
host=config['postgres_host'],
database=config['postgres_database'],
user=config['postgres_user'],
password=config['postgres_password']
)
self.cursor = self.database.cursor()
def select(self, sql):
"""Returns a list of dicts from DB."""
things = self.database.execute(sql).fetchall()
unpacked = [{k: item[k] for k in item.keys()} for item in things]
return unpacked
def commit(self, sql):
""" Inserts from a query. """
self.cursor.execute(sql)
self.database.commit()
def insert_user(self, twitter_username, mastodon_username, source_tweet):
# Create the INSERT statement
sql = f"""INSERT INTO usernames (id, mastodon, source)
VALUES ('{twitter_username}', '{mastodon_username}', '{source_tweet}')
ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id, mastodon = EXCLUDED.mastodon, source = EXCLUDED.source"""
# Execute the INSERT statement
self.cursor.execute(sql)
# Commit the transaction
self.database.commit()

48
stream_answers.py Normal file
View File

@ -0,0 +1,48 @@
import re
import json
import requests
import sql_api
import twitter_api
import config
def extract_mastodon_handle(text):
""" Get a mastodon alias form a text (assuming a word with
two @ and a . in the later part of the wordis a mastodon alias). """
handle = re.search(r'@\w+@\w+.\w+', text)
if handle:
handle = handle.group()
else:
handle = False
return handle
def stream():
""" Monitor answers to a tweet. """
# Make connection to SQL databse.
db = sql_api.DB(config.get_config())
api = twitter_api.API()
rules = api.get_rules()
api.delete_all_rules(rules)
api.set_rules()
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream?expansions=referenced_tweets.id,author_id", auth=api.bearer_oauth, stream=True,
)
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
try:
twitter_username = json_response['includes']['users'][0]['username']
mastodon_username = extract_mastodon_handle(json_response['includes']['tweets'][0]['text'])
print(mastodon_username)
source_tweet = str(json_response['data']['id'])
# Add Mastodon username to db.
db.insert_user(twitter_username, mastodon_username, source_tweet)
except KeyError:
pass
if __name__ == "__main__":
stream()

63
twitter_api.py Normal file
View File

@ -0,0 +1,63 @@
import requests
import config
from requests_oauthlib import OAuth2Session, OAuth1Session
class API:
def __init__(self):
conf = config.get_config()
if 'twitter_token' not in conf:
config.update('twitter_token', input('Twitter token:'))
self.bearer_token = conf['twitter_token']
self.user_agent = "v2UserLookupPython"
self.bot_username = conf['bot_username']
def bearer_oauth(self, r):
""" Method required by bearer token authentication. """
r.headers["Authorization"] = f"Bearer {self.bearer_token}"
r.headers["User-Agent"] = self.user_agent
return r
def bearer_oauth(self, r):
"""
Method required by bearer token authentication.
"""
r.headers["Authorization"] = f"Bearer {self.bearer_token}"
r.headers["User-Agent"] = "v2FilteredStreamPython"
return r
def get_rules(self):
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream/rules", auth=self.bearer_oauth
)
if response.status_code != 200:
raise Exception(
"Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
)
return response.json()
def delete_all_rules(self, rules):
if rules is None or "data" not in rules:
return None
ids = list(map(lambda rule: rule["id"], rules["data"]))
payload = {"delete": {"ids": ids}}
requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
auth=self.bearer_oauth,
json=payload
)
def set_rules(self):
# You can adjust the rules if needed
rules = [
{"value": f"@{self.bot_username}"}
]
payload = {"add": rules}
requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
auth=self.bearer_oauth,
json=payload,
)