import streamlit as st import pandas as pd from sqlalchemy import create_engine import streamlit_authenticator as stauth import yaml from yaml.loader import SafeLoader class Params(): def __init__(self, d): for key in d: setattr(self, key, d[key]) for attr in ['q', 'category']: if attr not in d: setattr(self, attr, '') self.set_params() def set_params(self): st.experimental_set_query_params(q=self.q, category=self.category) def update(self, param, value): setattr(self, param, value) self.set_params() def reset_q(): st.experimental_set_query_params(q='') def download(df): st.download_button( "Ladda ner alla meddelanden som CSV", df.to_csv(index=False, sep=';').encode('utf-8'), "file.csv", "text/csv", key='download-csv') def define_search_terms(user_input): """ Takes user input and make them into search terms for SQL. Args: user_input (str): The string resulting from user input (input()). Returns: list: List of search terms. """ # Search for quated phrases. search_terms = [] while '"' in user_input: q1 = user_input.find('"') q2 = user_input.find('"', q1 + 1) quoted_term = user_input[q1 + 1 : q2] search_terms.append(quoted_term.lower()) user_input = user_input.replace(f'"{quoted_term}"', "") while " " in user_input: user_input = user_input.replace( " ", " " ).strip() # Remove double and trailing blanks. # Add non-quoted terms. if len(user_input) > 0: search_terms += [i.lower() for i in user_input.strip().split(" ")] return search_terms def create_sql_query(search_terms, table): """Returns a valid sql query.""" word_list = [] select_columns = 'body, "to" as m2, "from" as m1, senddate_str' return_limit = 1000 for word in search_terms: if "*" not in word: #Searching for the exact word. word_list.append(f" {word} ") else: if word[0] == "*" and word[-1] == "*": word_list.append(word.replace("*", "")) elif word[0] == "*": word_list.append(f"{word.replace('*', '')} ") elif word[-1] == "*": word_list.append(f" {word.replace('*', '')}") # Format for SQL. search_list = [f"'%%{i}%%'" for i in word_list] n = 0 for i in search_list: if " or " in i: search_list[n] = "OR" n += 1 # Handle searches with OR. or_terms = [] while "OR" in search_list: n_or = search_list.count("OR") or_terms.append(search_list.pop(search_list.index("OR") - 1)) if n_or == 1: or_terms.append(search_list.pop(search_list.index("OR") + 1)) search_list.remove("OR") or_sql = f"( body_lower LIKE {' OR body_lower LIKE '.join(or_terms)})" # Handle searches with -. not_terms = [] for term in search_list: if "-" in term: # TODO Make this not include words with hyphen. not_terms.append(search_list.pop(search_list.index(term)).replace("-", "")) # Create SQL query. search_sql = '' if search_list != []: search_sql = f'(body_lower LIKE {" AND body_lower LIKE ".join(search_list)}) ' if or_terms != []: if search_sql == '': search_sql = or_sql else: search_sql = search_sql + " AND " + or_sql if len(not_terms) > 0: search_sql += ( f' AND (body_lower NOT LIKE {" AND body_lower NOT LIKE ".join(not_terms)})' ) sql = f"SELECT {select_columns} FROM {table} WHERE {search_sql} LIMIT {return_limit}" return sql def search_messages(search_for, engine, user=False): if user: # Search for all messages from/to a single user. select_columns = 'body, "to" as m2, "from" as m1, senddate_str' sql = f'select {select_columns} from messages where "to" == "{search_for}" or "from" == "{search_for}"' else: # Search for keywords. sql = create_sql_query(define_search_terms(search_for), 'messages') # Get data from db. df = pd.read_sql(sql, engine) download(df) st.write(f'Träffar: {len(df)}') talkers = list(set(df['m1'].tolist() + df['m2'].tolist())) if 'admin' in talkers: # Remove admin from conversations. talkers.remove('admin') conversations = {} for talker in talkers: df_ = df.query(f'm1=="{talker}" | m2=="{talker}" ').copy() others = list(set(df_['m1'].tolist() + df_['m2'].tolist())) others.remove(talker) for other in others: parts_list = [str(talker), str(other)] parts_list.sort() parts = '_'.join(parts_list) if parts not in conversations: df_ = df_.query(f'm1=="{other}" | m2=="{other}" ').copy() df_.sort_values('senddate_str', inplace=True) conversations[parts] = {'df': df_[['m1', 'm2', 'body', 'senddate_str']], 'parts': parts_list} for _, data in conversations.items(): data['parts'][0] base_url = 'https://lasseedfast.se/rosa/' url_from = f"{base_url}?q={data['parts'][0]}&category=Användare" url_to = f"{base_url}?q={data['parts'][1]}&category=Användare" st.markdown(f"{[str(data['parts'][0])]}"f"({url_from}) - {[str(data['parts'][1])]}"f"({url_to})") df = data['df'] df.rename({'m1': 'från', 'm2':'till', 'body':'meddelande', 'senddate_str':'datum'}, inplace=True, axis=1) st.dataframe(df, hide_index=True) def search_user(search_for, engine): search_for = search_for.lower() select = 'member_id, email, username' if '@' in search_for: search_columns = 'email_lower' else: search_columns = 'username_lower' df = pd.read_sql(f'select {select} from members_expanded where {search_columns} == "{search_for}"', engine) st.dataframe(df, use_container_width=True) params = st.experimental_get_query_params() # if 'user' in params: if df.shape[0] == 1: search_messages(df.username[0], engine, user=True) def main(): # Say hi. st.markdown(f'

Hej {name}!

', unsafe_allow_html=True) # Define connection to sqlite database. engine = create_engine('sqlite:///db.sqlite') # Create query_params class and set empty parameters. params = Params(st.experimental_get_query_params()) # Let user chose what categorie to search in. categories = ['Meddelanden', 'Användare'] if params.category != '': index_category = categories.index(params.category[0]) else: index_category = 0 search_category = st.selectbox('Vill du söka efter meddelanden eller användare?', categories, index_category, on_change=reset_q) params.update('category', search_category) # Let user chose what words to search for. if params.q != '': placeholder=params.q[0] else: placeholder = 'Skriv här' # Set text to be shown above search field. if search_category == 'Meddelanden': search_text = 'Vilka ord vill du söka efter?' elif search_category == 'Användare': search_text = 'Skriv email-adress eller användarnamn.' helpt_text = 'Du kan använda asterix (*), minus(-), citattecken ("") och OR.' search_for = st.text_input(search_text, placeholder=placeholder, help=helpt_text).lower() # Set search input from params if nothing else is input. # This is the case when the user is coming from an url with q param. if search_for == '' and params.q != '': search_for = params.q[0] # Start the search. if search_for != '': params.update('q', search_for) # Replace å, ä, ö with ?. search_for = search_for.replace('å', '?').replace('ä', '?').replace('ö', '?') #* Search message. if search_category == 'Meddelanden': search_messages(search_for, engine) #* Seach user. elif search_category == 'Användare': search_user(search_for, engine) with open('credentials.yaml') as file: config = yaml.load(file, Loader=SafeLoader) authenticator = stauth.Authenticate( config['credentials'], config['cookie']['name'], config['cookie']['key'], config['cookie']['expiry_days'], config['preauthorized'] ) name, authentication_status, username = authenticator.login('Login', 'main') if authentication_status: try: main() except Exception as e: print(e) st.write('Något blev fel :(') elif authentication_status is False: st.error('Username/password is incorrect') elif authentication_status is None: st.warning('Please enter your username and password')