You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

278 lines
8.8 KiB

import streamlit as st
import pandas as pd
from sqlalchemy import create_engine
import streamlit_authenticator as stauth
import yaml
from yaml.loader import SafeLoader
class Params():
def __init__(self, d):
for key in d:
setattr(self, key, d[key])
for attr in ['q', 'category']:
if attr not in d:
setattr(self, attr, '')
self.set_params()
def set_params(self):
st.experimental_set_query_params(q=self.q, category=self.category)
def update(self, param, value):
setattr(self, param, value)
self.set_params()
def reset_q():
st.experimental_set_query_params(q='')
def download(df):
st.download_button(
"Ladda ner alla meddelanden som CSV",
df.to_csv(index=False, sep=';').encode('utf-8'),
"file.csv",
"text/csv",
key='download-csv')
def define_search_terms(user_input):
""" Takes user input and make them into search terms for SQL.
Args:
user_input (str): The string resulting from user input (input()).
Returns:
list: List of search terms.
"""
# Search for quated phrases.
search_terms = []
while '"' in user_input:
q1 = user_input.find('"')
q2 = user_input.find('"', q1 + 1)
quoted_term = user_input[q1 + 1 : q2]
search_terms.append(quoted_term.lower())
user_input = user_input.replace(f'"{quoted_term}"', "")
while " " in user_input:
user_input = user_input.replace(
" ", " "
).strip() # Remove double and trailing blanks.
# Add non-quoted terms.
if len(user_input) > 0:
search_terms += [i.lower() for i in user_input.strip().split(" ")]
return search_terms
def create_sql_query(search_terms, table):
"""Returns a valid sql query."""
word_list = []
select_columns = 'body, "to" as m2, "from" as m1, senddate_str'
return_limit = 1000
for word in search_terms:
if "*" not in word: #Searching for the exact word.
word_list.append(f" {word} ")
else:
if word[0] == "*" and word[-1] == "*":
word_list.append(word.replace("*", ""))
elif word[0] == "*":
word_list.append(f"{word.replace('*', '')} ")
elif word[-1] == "*":
word_list.append(f" {word.replace('*', '')}")
# Format for SQL.
search_list = [f"'%%{i}%%'" for i in word_list]
n = 0
for i in search_list:
if " or " in i:
search_list[n] = "OR"
n += 1
# Handle searches with OR.
or_terms = []
while "OR" in search_list:
n_or = search_list.count("OR")
or_terms.append(search_list.pop(search_list.index("OR") - 1))
if n_or == 1:
or_terms.append(search_list.pop(search_list.index("OR") + 1))
search_list.remove("OR")
or_sql = f"( body_lower LIKE {' OR body_lower LIKE '.join(or_terms)})"
# Handle searches with -.
not_terms = []
for term in search_list:
if "-" in term: # TODO Make this not include words with hyphen.
not_terms.append(search_list.pop(search_list.index(term)).replace("-", ""))
# Create SQL query.
search_sql = ''
if search_list != []:
search_sql = f'(body_lower LIKE {" AND body_lower LIKE ".join(search_list)}) '
if or_terms != []:
if search_sql == '':
search_sql = or_sql
else:
search_sql = search_sql + " AND " + or_sql
if len(not_terms) > 0:
search_sql += (
f' AND (body_lower NOT LIKE {" AND body_lower NOT LIKE ".join(not_terms)})'
)
sql = f"SELECT {select_columns} FROM {table} WHERE {search_sql} LIMIT {return_limit}"
return sql
def search_messages(search_for, engine, user=False):
if user: # Search for all messages from/to a single user.
select_columns = 'body, "to" as m2, "from" as m1, senddate_str'
sql = f'select {select_columns} from messages where "to" == "{search_for}" or "from" == "{search_for}"'
else: # Search for keywords.
sql = create_sql_query(define_search_terms(search_for), 'messages')
# Get data from db.
df = pd.read_sql(sql, engine)
download(df)
st.write(f'Träffar: {len(df)}')
talkers = list(set(df['m1'].tolist() + df['m2'].tolist()))
if 'admin' in talkers: # Remove admin from conversations.
talkers.remove('admin')
conversations = {}
for talker in talkers:
df_ = df.query(f'm1=="{talker}" | m2=="{talker}" ').copy()
others = list(set(df_['m1'].tolist() + df_['m2'].tolist()))
others.remove(talker)
for other in others:
parts_list = [str(talker), str(other)]
parts_list.sort()
parts = '_'.join(parts_list)
if parts not in conversations:
df_ = df_.query(f'm1=="{other}" | m2=="{other}" ').copy()
df_.sort_values('senddate_str', inplace=True)
conversations[parts] = {'df': df_[['m1', 'm2', 'body', 'senddate_str']], 'parts': parts_list}
for _, data in conversations.items():
data['parts'][0]
base_url = 'https://lasseedfast.se/rosa/'
url_from = f"{base_url}?q={data['parts'][0]}&category=Användare"
url_to = f"{base_url}?q={data['parts'][1]}&category=Användare"
st.markdown(f"{[str(data['parts'][0])]}"f"({url_from}) - {[str(data['parts'][1])]}"f"({url_to})")
df = data['df']
df.rename({'m1': 'från', 'm2':'till', 'body':'meddelande', 'senddate_str':'datum'}, inplace=True, axis=1)
st.dataframe(df, hide_index=True)
def search_user(search_for, engine):
search_for = search_for.lower()
select = 'member_id, email, username'
if '@' in search_for:
search_columns = 'email_lower'
else:
search_columns = 'username_lower'
df = pd.read_sql(f'select {select} from members_expanded where {search_columns} == "{search_for}"', engine)
st.dataframe(df, use_container_width=True)
params = st.experimental_get_query_params()
# if 'user' in params:
if df.shape[0] == 1:
search_messages(df.username[0], engine, user=True)
def main():
# Say hi.
st.markdown(f'<h4 style="color:#ff748c";>Hej {name}!</h4>', unsafe_allow_html=True)
# Define connection to sqlite database.
engine = create_engine('sqlite:///db.sqlite')
# Create query_params class and set empty parameters.
params = Params(st.experimental_get_query_params())
# Let user chose what categorie to search in.
categories = ['Meddelanden', 'Användare']
if params.category != '':
index_category = categories.index(params.category[0])
else:
index_category = 0
search_category = st.selectbox('Vill du söka efter meddelanden eller användare?', categories, index_category, on_change=reset_q)
params.update('category', search_category)
# Let user chose what words to search for.
if params.q != '':
placeholder=params.q[0]
else:
placeholder = 'Skriv här'
# Set text to be shown above search field.
if search_category == 'Meddelanden':
search_text = 'Vilka ord vill du söka efter?'
elif search_category == 'Användare':
search_text = 'Skriv email-adress eller användarnamn.'
helpt_text = 'Du kan använda asterix (*), minus(-), citattecken ("") och OR.'
search_for = st.text_input(search_text, placeholder=placeholder, help=helpt_text).lower()
# Set search input from params if nothing else is input.
# This is the case when the user is coming from an url with q param.
if search_for == '' and params.q != '':
search_for = params.q[0]
# Start the search.
if search_for != '':
params.update('q', search_for)
# Replace å, ä, ö with ?.
search_for = search_for.replace('å', '?').replace('ä', '?').replace('ö', '?')
#* Search message.
if search_category == 'Meddelanden':
search_messages(search_for, engine)
#* Seach user.
elif search_category == 'Användare':
search_user(search_for, engine)
with open('credentials.yaml') as file:
config = yaml.load(file, Loader=SafeLoader)
authenticator = stauth.Authenticate(
config['credentials'],
config['cookie']['name'],
config['cookie']['key'],
config['cookie']['expiry_days'],
config['preauthorized']
)
name, authentication_status, username = authenticator.login('Login', 'main')
if authentication_status:
try:
main()
except Exception as e:
print(e)
st.write('Något blev fel :(')
elif authentication_status is False:
st.error('Username/password is incorrect')
elif authentication_status is None:
st.warning('Please enter your username and password')