import streamlit as st from time import sleep from article2db import PDFProcessor from info import country_emojis from utils import fix_key from _base_class import StreamlitBaseClass from colorprinter.print_color import * class ArticleCollectionsPage(StreamlitBaseClass): def __init__(self, username: str): super().__init__(username=username) self.collection = self.get_settings()["current_collection"] self.page_name = "Article Collections" # Initialize attributes from session state if available for k, v in st.session_state[self.page_name].items(): setattr(self, k, v) def run(self): if self.user_arango.db.collection("article_collections").count() == 0: self.create_new_collection() self.update_current_page(self.page_name) self.choose_collection_method() self.choose_project_method() if self.collection: self.display_collection() self.sidebar_actions() if st.session_state.get("new_collection"): self.create_new_collection() # Persist state to session_state self.update_session_state(page_name=self.page_name) def choose_collection_method(self): self.collection = self.choose_collection() # Persist state after choosing collection self.update_session_state(self.page_name) def choose_project_method(self): # If you have a project selection similar to collection, implement here pass # Placeholder for project-related logic def choose_collection(self): collections = self.get_article_collections() current_collection = self.collection print_yellow(f"Current collection: {current_collection}") preselected = ( collections.index(current_collection) if current_collection in collections else None ) with st.sidebar: collection = st.selectbox( "Select a collection of favorite articles", collections, index=preselected, ) if collection: self.collection = collection self.update_settings("current_collection", collection) return self.collection def create_new_collection(self): with st.form("create_collection_form", clear_on_submit=True): new_collection_name = st.text_input("Enter the name of the new collection") submitted = st.form_submit_button("Create Collection") if submitted: if new_collection_name: self.user_arango.db.collection("article_collections").insert( {"name": new_collection_name, "articles": []} ) st.success(f'New collection "{new_collection_name}" created') self.collection = new_collection_name self.update_settings("current_collection", new_collection_name) # Persist state after creating a new collection self.update_session_state(page_name=self.page_name) sleep(1) st.rerun() def display_collection(self): with st.sidebar: col1, col2 = st.columns(2) with col1: if st.button("Create new collection"): st.session_state["new_collection"] = True with col2: if st.button(f':red[Remove collection "{self.collection}"]'): self.user_arango.db.collection("article_collections").delete_match( {"name": self.collection} ) st.success(f'Collection "{self.collection}" removed') self.collection = None self.update_settings("current_collection", None) # Persist state after removing a collection self.update_session_state(page_name=self.page_name) st.rerun() self.show_articles_in_collection() def show_articles_in_collection(self): collection_articles_cursor = self.user_arango.db.aql.execute( f""" FOR doc IN article_collections FILTER doc["name"] == @collection FOR article IN doc["articles"] RETURN article["_id"] """, bind_vars={"collection": self.collection}, ) collection_article_ids = list(collection_articles_cursor) sci_articles = [ _id for _id in collection_article_ids if _id.startswith("sci_articles") ] other_articles = [ _id for _id in collection_article_ids if not _id.startswith("sci_articles") ] collection_articles = [] if sci_articles: cursor = self.base_arango.db.aql.execute( """ FOR doc IN sci_articles FILTER doc["_id"] IN @article_ids RETURN doc """, bind_vars={"article_ids": sci_articles}, ) collection_articles += list(cursor) if other_articles: cursor = self.user_arango.db.aql.execute( """ FOR doc IN other_documents FILTER doc["_id"] IN @article_ids RETURN doc """, bind_vars={"article_ids": other_articles}, ) collection_articles += list(cursor) # Filter out None values and sort articles by title collection_articles = sorted( [article for article in collection_articles if article is not None], key=lambda x: ( x.get("metadata", {}).get("title", "No Title") if x.get("metadata") is not None else "No Title" ), ) if collection_articles: st.markdown(f"#### Articles in *{self.collection}*:") for article in collection_articles: if article is None: continue metadata = article.get("metadata", {}) if metadata: title = metadata.get("title", "No Title").strip() journal = metadata.get("journal", "No Journal").strip() published_year = metadata.get("published_year", "No Year") published_date = metadata.get("published_date", None) language = metadata.get("language", "No Language") else: title = "No Title" journal = "No Journal" published_year = "No Year" published_date = None language = "No Language" icon = country_emojis.get(language.upper(), "") if language else "" expander_title = f"**{title}** *{journal}* ({published_year}) {icon}" with st.expander(expander_title): if not title == "No Title": st.markdown(f"**Title:** \n{title}") if not journal == "No Journal": st.markdown(f"**Journal:** \n{journal}") if published_date: st.markdown(f"**Published Date:** \n{published_date}") for key, value in article.items(): if key in [ "_key", "text", "file", "_rev", "chunks", "user_access", "_id", "metadata", "doi", "title", "user_notes", ]: continue if isinstance(value, list): value = ", ".join(value) st.markdown(f"**{key.capitalize()}**: \n{value} ") if "doi" in article: if article["doi"]: st.markdown( f"**DOI:** \n[{article['doi']}](https://doi.org/{article['doi']}) " ) # Let the user add notes to the article, if it's not a scientific article # if not article._id.startswith("sci_articles"): if "user_notes" in article and article["user_notes"]: st.markdown(f":blue[**Your notes:**]") note_number = 0 for note in article["user_notes"]: note_number += 1 c1, c2 = st.columns([4, 1]) with c1: st.markdown(f":blue[{note}]") with c2: st.button( key=f'{article["_key"]}_{note_number}', label=f":red[Delete note]", on_click=self.delete_article_note, args=(article, note), ) with st.form( f"add_info_form_{article['_id']}", clear_on_submit=True ): new_info = st.text_area( ":blue[Add a note about the article]", key=f'new_info_{article["_id"]}', help="Add information such as what kind of article it is, what it's about, who's the author, etc.", ) submitted = st.form_submit_button(":blue[Add note]") if submitted: self.update_article(article, "user_notes", new_info) st.button( key=f'delete_{article["_id"]}', label=":red[Delete article from collection]", on_click=self.delete_article, args=(self.collection, article["_id"]), ) # Add info button and form st.markdown(":grey[Change metadata]") with st.form(f"update_metadata_form_{article['_id']}", clear_on_submit=True): new_title = st.text_input( ":blue[Update title]", key=f'new_metadata_{article["_id"]}_title', help="Update the title of the article.", ) new_author = st.text_input( ":blue[Update author]", key=f'new_metadata_{article["_id"]}_author', help="Update the author of the article.", ) new_journal = st.text_input( ":blue[Update journal]", key=f'new_metadata_{article["_id"]}_journal', help="Update the journal of the article.", ) new_published_year = st.text_input( ":blue[Update published year]", key=f'new_metadata_{article["_id"]}_published_year', help="Update the published year of the article.", ) submitted_metadata = st.form_submit_button(":blue[Add info]") if submitted_metadata: for info in ['new_title', 'new_author', 'new_journal', 'new_published_year']: if info: self.update_article(article, "metadata", info) else: st.write("No articles in this collection.") def sidebar_actions(self): with st.sidebar: st.markdown(f"### Add new articles to {self.collection}") with st.form("add_articles_form", clear_on_submit=True): pdf_files = st.file_uploader( "Upload PDF file(s)", type=["pdf"], accept_multiple_files=True ) is_sci = st.checkbox("All articles are from scientific journals") submitted = st.form_submit_button("Upload") if submitted and pdf_files: self.add_articles(pdf_files, is_sci) # Persist state after adding articles self.update_session_state(page_name=self.page_name) st.rerun() help_text = 'Paste a text containing DOIs, e.g., the reference section of a paper, and click "Add Articles" to add them to the collection.' new_articles = st.text_area( "Add articles to this collection", help=help_text ) if st.button("Add Articles"): with st.spinner("Processing..."): self.process_dois( article_collection_name=self.collection, text=new_articles ) # Persist state after processing DOIs self.update_session_state(page_name=self.page_name) st.rerun() self.write_not_downloaded() def add_articles(self, pdf_files: list, is_sci: bool) -> None: for pdf_file in pdf_files: status_container = st.empty() with status_container: is_sci = is_sci if is_sci else None with st.status(f"Processing {pdf_file.name}..."): processor = PDFProcessor( pdf_file=pdf_file, filename=pdf_file.name, process=False, username=st.session_state["username"], document_type="other_documents", is_sci=is_sci, ) _id, db, doi = processor.process_document() print_rainbow(_id, db, doi) if doi in st.session_state.get("not_downloaded", {}): st.session_state["not_downloaded"].pop(doi) self.articles2collection(collection=self.collection, db=db, _id=_id) st.success("Done!") sleep(1.5) def articles2collection(self, collection: str, db: str, _id: str = None) -> None: info = self.get_article_info(db, _id=_id) info = { k: v for k, v in info.items() if k in ["_id", "doi", "title", "metadata"] } doc_cursor = self.user_arango.db.aql.execute( f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc' ) doc = next(doc_cursor, None) if doc: articles = doc.get("articles", []) keys = [i["_id"] for i in articles] if info["_id"] not in keys: articles.append(info) self.user_arango.db.collection("article_collections").update_match( filters={"name": collection}, body={"articles": articles}, merge=True, ) # Persist state after updating articles self.update_session_state(page_name=self.page_name) def get_article_info(self, db: str, _id: str = None, doi: str = None) -> dict: assert _id or doi, "Either _id or doi must be provided." arango = self.get_arango(db_name=db) if _id: query = """ RETURN { "_id": DOCUMENT(@doc_id)._id, "doi": DOCUMENT(@doc_id).doi, "title": DOCUMENT(@doc_id).title, "metadata": DOCUMENT(@doc_id).metadata, "summary": DOCUMENT(@doc_id).summary } """ info_cursor = arango.db.aql.execute(query, bind_vars={"doc_id": _id}) elif doi: info_cursor = arango.db.aql.execute( f'FOR doc IN sci_articles FILTER doc["doi"] == "{doi}" LIMIT 1 RETURN {{"_id": doc["_id"], "doi": doc["doi"], "title": doc["title"], "metadata": doc["metadata"], "summary": doc["summary"]}}' ) return next(info_cursor, None) def process_dois( self, article_collection_name: str, text: str = None, dois: list = None ) -> None: processor = PDFProcessor(process=False) if not dois and text: dois = processor.extract_doi(text, multi=True) if "not_downloaded" not in st.session_state: st.session_state["not_downloaded"] = {} for doi in dois: downloaded, url, path, in_db = processor.doi2pdf(doi) if downloaded and not in_db: processor.process_pdf(path) in_db = True elif not downloaded and not in_db: st.session_state["not_downloaded"][doi] = url if in_db: st.success(f"Article with DOI {doi} added") self.articles2collection( collection=article_collection_name, db="base", _id=f"sci_articles/{fix_key(doi)}", ) # Persist state after processing DOIs self.update_session_state(page_name=self.page_name) def write_not_downloaded(self): not_downloaded = st.session_state.get("not_downloaded", {}) if not_downloaded: st.markdown( "*The articles below were not downloaded. Download them yourself and add them to the collection by dropping them in the area above. Some of them can be downloaded using the link.*" ) for doi, url in not_downloaded.items(): if url: st.markdown(f"- [{doi}]({url})") else: st.markdown(f"- {doi}") def delete_article(self, collection, _id): doc_cursor = self.user_arango.db.aql.execute( f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc' ) doc = next(doc_cursor, None) if doc: articles = [ article for article in doc.get("articles", []) if article["_id"] != _id ] self.user_arango.db.collection("article_collections").update_match( filters={"_id": doc["_id"]}, body={"articles": articles}, ) # Persist state after deleting an article self.update_session_state(page_name=self.page_name) def update_article(self, article, field, value): """ Update a specified field in an article with a new value. If the field already exists and is a list, the new value is appended to the list. If the field exists but is not a list, the field is converted to a list containing the old and new values. If the field does not exist, it is created as a list with the new value. Args: article (dict): The article to be updated. field (str): The field in the article to be updated. value (str): The new value to be added to the field. Returns: None """ value = str(value.strip()) if field in article: if isinstance(article[field], list): article[field].append(value) else: article[field] = [article[field], value] else: article[field] = [value] self.user_arango.db.update_document(article, check_rev=False, silent=True) sleep(0.2) st.rerun() def delete_article_note(self, article: dict, note: str): "Delete a note from a list of notes in an article document." if "user_notes" in article and note in article["user_notes"]: article["user_notes"].remove(note) self.user_arango.db.update_document(article, check_rev=False, silent=True) sleep(0.1)