You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
278 lines
8.8 KiB
278 lines
8.8 KiB
import streamlit as st |
|
import pandas as pd |
|
from sqlalchemy import create_engine |
|
import streamlit_authenticator as stauth |
|
import yaml |
|
from yaml.loader import SafeLoader |
|
|
|
|
|
class Params(): |
|
def __init__(self, d): |
|
|
|
for key in d: |
|
setattr(self, key, d[key]) |
|
|
|
for attr in ['q', 'category']: |
|
if attr not in d: |
|
setattr(self, attr, '') |
|
|
|
self.set_params() |
|
|
|
def set_params(self): |
|
st.experimental_set_query_params(q=self.q, category=self.category) |
|
|
|
def update(self, param, value): |
|
setattr(self, param, value) |
|
self.set_params() |
|
|
|
def reset_q(): |
|
st.experimental_set_query_params(q='') |
|
|
|
def download(df): |
|
|
|
st.download_button( |
|
"Ladda ner alla meddelanden som CSV", |
|
df.to_csv(index=False, sep=';').encode('utf-8'), |
|
"file.csv", |
|
"text/csv", |
|
key='download-csv') |
|
|
|
|
|
def define_search_terms(user_input): |
|
""" Takes user input and make them into search terms for SQL. |
|
|
|
Args: |
|
user_input (str): The string resulting from user input (input()). |
|
|
|
Returns: |
|
list: List of search terms. |
|
""" |
|
# Search for quated phrases. |
|
search_terms = [] |
|
while '"' in user_input: |
|
q1 = user_input.find('"') |
|
q2 = user_input.find('"', q1 + 1) |
|
quoted_term = user_input[q1 + 1 : q2] |
|
search_terms.append(quoted_term.lower()) |
|
user_input = user_input.replace(f'"{quoted_term}"', "") |
|
while " " in user_input: |
|
user_input = user_input.replace( |
|
" ", " " |
|
).strip() # Remove double and trailing blanks. |
|
|
|
# Add non-quoted terms. |
|
if len(user_input) > 0: |
|
search_terms += [i.lower() for i in user_input.strip().split(" ")] |
|
return search_terms |
|
|
|
def create_sql_query(search_terms, table): |
|
"""Returns a valid sql query.""" |
|
word_list = [] |
|
select_columns = 'body, "to" as m2, "from" as m1, senddate_str' |
|
return_limit = 1000 |
|
for word in search_terms: |
|
|
|
if "*" not in word: #Searching for the exact word. |
|
word_list.append(f" {word} ") |
|
|
|
else: |
|
if word[0] == "*" and word[-1] == "*": |
|
word_list.append(word.replace("*", "")) |
|
elif word[0] == "*": |
|
word_list.append(f"{word.replace('*', '')} ") |
|
elif word[-1] == "*": |
|
word_list.append(f" {word.replace('*', '')}") |
|
|
|
# Format for SQL. |
|
search_list = [f"'%%{i}%%'" for i in word_list] |
|
|
|
n = 0 |
|
for i in search_list: |
|
if " or " in i: |
|
search_list[n] = "OR" |
|
n += 1 |
|
|
|
# Handle searches with OR. |
|
or_terms = [] |
|
while "OR" in search_list: |
|
n_or = search_list.count("OR") |
|
or_terms.append(search_list.pop(search_list.index("OR") - 1)) |
|
if n_or == 1: |
|
or_terms.append(search_list.pop(search_list.index("OR") + 1)) |
|
search_list.remove("OR") |
|
or_sql = f"( body_lower LIKE {' OR body_lower LIKE '.join(or_terms)})" |
|
|
|
# Handle searches with -. |
|
not_terms = [] |
|
for term in search_list: |
|
if "-" in term: # TODO Make this not include words with hyphen. |
|
not_terms.append(search_list.pop(search_list.index(term)).replace("-", "")) |
|
|
|
# Create SQL query. |
|
search_sql = '' |
|
if search_list != []: |
|
search_sql = f'(body_lower LIKE {" AND body_lower LIKE ".join(search_list)}) ' |
|
|
|
if or_terms != []: |
|
if search_sql == '': |
|
search_sql = or_sql |
|
else: |
|
search_sql = search_sql + " AND " + or_sql |
|
|
|
if len(not_terms) > 0: |
|
search_sql += ( |
|
f' AND (body_lower NOT LIKE {" AND body_lower NOT LIKE ".join(not_terms)})' |
|
) |
|
|
|
sql = f"SELECT {select_columns} FROM {table} WHERE {search_sql} LIMIT {return_limit}" |
|
|
|
return sql |
|
|
|
def search_messages(search_for, engine, user=False): |
|
|
|
if user: # Search for all messages from/to a single user. |
|
select_columns = 'body, "to" as m2, "from" as m1, senddate_str' |
|
sql = f'select {select_columns} from messages where "to" == "{search_for}" or "from" == "{search_for}"' |
|
|
|
|
|
else: # Search for keywords. |
|
sql = create_sql_query(define_search_terms(search_for), 'messages') |
|
|
|
# Get data from db. |
|
df = pd.read_sql(sql, engine) |
|
download(df) |
|
|
|
st.write(f'Träffar: {len(df)}') |
|
|
|
talkers = list(set(df['m1'].tolist() + df['m2'].tolist())) |
|
if 'admin' in talkers: # Remove admin from conversations. |
|
talkers.remove('admin') |
|
conversations = {} |
|
|
|
for talker in talkers: |
|
df_ = df.query(f'm1=="{talker}" | m2=="{talker}" ').copy() |
|
others = list(set(df_['m1'].tolist() + df_['m2'].tolist())) |
|
others.remove(talker) |
|
|
|
for other in others: |
|
parts_list = [str(talker), str(other)] |
|
parts_list.sort() |
|
parts = '_'.join(parts_list) |
|
|
|
if parts not in conversations: |
|
df_ = df_.query(f'm1=="{other}" | m2=="{other}" ').copy() |
|
df_.sort_values('senddate_str', inplace=True) |
|
conversations[parts] = {'df': df_[['m1', 'm2', 'body', 'senddate_str']], 'parts': parts_list} |
|
|
|
for _, data in conversations.items(): |
|
data['parts'][0] |
|
base_url = 'https://lasseedfast.se/rosa/' |
|
url_from = f"{base_url}?q={data['parts'][0]}&category=Användare" |
|
url_to = f"{base_url}?q={data['parts'][1]}&category=Användare" |
|
st.markdown(f"{[str(data['parts'][0])]}"f"({url_from}) - {[str(data['parts'][1])]}"f"({url_to})") |
|
df = data['df'] |
|
df.rename({'m1': 'från', 'm2':'till', 'body':'meddelande', 'senddate_str':'datum'}, inplace=True, axis=1) |
|
st.dataframe(df, hide_index=True) |
|
|
|
def search_user(search_for, engine): |
|
|
|
search_for = search_for.lower() |
|
|
|
select = 'member_id, email, username' |
|
if '@' in search_for: |
|
search_columns = 'email_lower' |
|
else: |
|
search_columns = 'username_lower' |
|
|
|
df = pd.read_sql(f'select {select} from members_expanded where {search_columns} == "{search_for}"', engine) |
|
|
|
st.dataframe(df, use_container_width=True) |
|
|
|
params = st.experimental_get_query_params() |
|
# if 'user' in params: |
|
if df.shape[0] == 1: |
|
search_messages(df.username[0], engine, user=True) |
|
|
|
|
|
def main(): |
|
|
|
# Say hi. |
|
st.markdown(f'<h4 style="color:#ff748c";>Hej {name}!</h4>', unsafe_allow_html=True) |
|
|
|
# Define connection to sqlite database. |
|
engine = create_engine('sqlite:///db.sqlite') |
|
|
|
# Create query_params class and set empty parameters. |
|
params = Params(st.experimental_get_query_params()) |
|
|
|
# Let user chose what categorie to search in. |
|
categories = ['Meddelanden', 'Användare'] |
|
if params.category != '': |
|
index_category = categories.index(params.category[0]) |
|
else: |
|
index_category = 0 |
|
search_category = st.selectbox('Vill du söka efter meddelanden eller användare?', categories, index_category, on_change=reset_q) |
|
params.update('category', search_category) |
|
|
|
# Let user chose what words to search for. |
|
if params.q != '': |
|
placeholder=params.q[0] |
|
else: |
|
placeholder = 'Skriv här' |
|
|
|
# Set text to be shown above search field. |
|
if search_category == 'Meddelanden': |
|
search_text = 'Vilka ord vill du söka efter?' |
|
elif search_category == 'Användare': |
|
search_text = 'Skriv email-adress eller användarnamn.' |
|
|
|
helpt_text = 'Du kan använda asterix (*), minus(-), citattecken ("") och OR.' |
|
search_for = st.text_input(search_text, placeholder=placeholder, help=helpt_text).lower() |
|
|
|
# Set search input from params if nothing else is input. |
|
# This is the case when the user is coming from an url with q param. |
|
if search_for == '' and params.q != '': |
|
search_for = params.q[0] |
|
|
|
# Start the search. |
|
if search_for != '': |
|
params.update('q', search_for) |
|
|
|
# Replace å, ä, ö with ?. |
|
search_for = search_for.replace('å', '?').replace('ä', '?').replace('ö', '?') |
|
|
|
#* Search message. |
|
if search_category == 'Meddelanden': |
|
search_messages(search_for, engine) |
|
|
|
#* Seach user. |
|
elif search_category == 'Användare': |
|
search_user(search_for, engine) |
|
|
|
with open('credentials.yaml') as file: |
|
config = yaml.load(file, Loader=SafeLoader) |
|
|
|
authenticator = stauth.Authenticate( |
|
config['credentials'], |
|
config['cookie']['name'], |
|
config['cookie']['key'], |
|
config['cookie']['expiry_days'], |
|
config['preauthorized'] |
|
) |
|
|
|
name, authentication_status, username = authenticator.login('Login', 'main') |
|
|
|
if authentication_status: |
|
try: |
|
main() |
|
except Exception as e: |
|
print(e) |
|
st.write('Något blev fel :(') |
|
|
|
elif authentication_status is False: |
|
st.error('Username/password is incorrect') |
|
elif authentication_status is None: |
|
st.warning('Please enter your username and password') |
|
|
|
|
|
|
|
|