You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
482 lines
20 KiB
482 lines
20 KiB
import streamlit as st |
|
from time import sleep |
|
from article2db import PDFProcessor |
|
|
|
from info import country_emojis |
|
from utils import fix_key |
|
from _base_class import StreamlitBaseClass |
|
from colorprinter.print_color import * |
|
|
|
|
|
class ArticleCollectionsPage(StreamlitBaseClass): |
|
def __init__(self, username: str): |
|
super().__init__(username=username) |
|
self.collection = self.get_settings()["current_collection"] |
|
self.page_name = "Article Collections" |
|
|
|
# Initialize attributes from session state if available |
|
for k, v in st.session_state[self.page_name].items(): |
|
setattr(self, k, v) |
|
|
|
def run(self): |
|
if self.user_arango.db.collection("article_collections").count() == 0: |
|
self.create_new_collection() |
|
|
|
self.update_current_page(self.page_name) |
|
|
|
self.choose_collection_method() |
|
self.choose_project_method() |
|
|
|
if self.collection: |
|
self.display_collection() |
|
self.sidebar_actions() |
|
|
|
if st.session_state.get("new_collection"): |
|
self.create_new_collection() |
|
|
|
# Persist state to session_state |
|
self.update_session_state(page_name=self.page_name) |
|
|
|
def choose_collection_method(self): |
|
self.collection = self.choose_collection() |
|
# Persist state after choosing collection |
|
self.update_session_state(self.page_name) |
|
|
|
def choose_project_method(self): |
|
# If you have a project selection similar to collection, implement here |
|
pass # Placeholder for project-related logic |
|
|
|
def choose_collection(self): |
|
collections = self.get_article_collections() |
|
current_collection = self.collection |
|
print_yellow(f"Current collection: {current_collection}") |
|
preselected = ( |
|
collections.index(current_collection) |
|
if current_collection in collections |
|
else None |
|
) |
|
with st.sidebar: |
|
collection = st.selectbox( |
|
"Select a collection of favorite articles", |
|
collections, |
|
index=preselected, |
|
) |
|
if collection: |
|
self.collection = collection |
|
self.update_settings("current_collection", collection) |
|
return self.collection |
|
|
|
def create_new_collection(self): |
|
with st.form("create_collection_form", clear_on_submit=True): |
|
new_collection_name = st.text_input("Enter the name of the new collection") |
|
submitted = st.form_submit_button("Create Collection") |
|
if submitted: |
|
if new_collection_name: |
|
self.user_arango.db.collection("article_collections").insert( |
|
{"name": new_collection_name, "articles": []} |
|
) |
|
st.success(f'New collection "{new_collection_name}" created') |
|
self.collection = new_collection_name |
|
self.update_settings("current_collection", new_collection_name) |
|
# Persist state after creating a new collection |
|
self.update_session_state(page_name=self.page_name) |
|
sleep(1) |
|
st.rerun() |
|
|
|
def display_collection(self): |
|
with st.sidebar: |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
if st.button("Create new collection"): |
|
st.session_state["new_collection"] = True |
|
with col2: |
|
if st.button(f':red[Remove collection "{self.collection}"]'): |
|
self.user_arango.db.collection("article_collections").delete_match( |
|
{"name": self.collection} |
|
) |
|
st.success(f'Collection "{self.collection}" removed') |
|
self.collection = None |
|
self.update_settings("current_collection", None) |
|
# Persist state after removing a collection |
|
self.update_session_state(page_name=self.page_name) |
|
st.rerun() |
|
|
|
self.show_articles_in_collection() |
|
|
|
def show_articles_in_collection(self): |
|
collection_articles_cursor = self.user_arango.db.aql.execute( |
|
f""" |
|
FOR doc IN article_collections |
|
FILTER doc["name"] == @collection |
|
FOR article IN doc["articles"] |
|
RETURN article["_id"] |
|
""", |
|
bind_vars={"collection": self.collection}, |
|
) |
|
|
|
collection_article_ids = list(collection_articles_cursor) |
|
sci_articles = [ |
|
_id for _id in collection_article_ids if _id.startswith("sci_articles") |
|
] |
|
other_articles = [ |
|
_id for _id in collection_article_ids if not _id.startswith("sci_articles") |
|
] |
|
|
|
collection_articles = [] |
|
if sci_articles: |
|
cursor = self.base_arango.db.aql.execute( |
|
""" |
|
FOR doc IN sci_articles |
|
FILTER doc["_id"] IN @article_ids |
|
RETURN doc |
|
""", |
|
bind_vars={"article_ids": sci_articles}, |
|
) |
|
collection_articles += list(cursor) |
|
if other_articles: |
|
cursor = self.user_arango.db.aql.execute( |
|
""" |
|
FOR doc IN other_documents |
|
FILTER doc["_id"] IN @article_ids |
|
RETURN doc |
|
""", |
|
bind_vars={"article_ids": other_articles}, |
|
) |
|
collection_articles += list(cursor) |
|
|
|
# Filter out None values and sort articles by title |
|
collection_articles = sorted( |
|
[article for article in collection_articles if article is not None], |
|
key=lambda x: ( |
|
x.get("metadata", {}).get("title", "No Title") |
|
if x.get("metadata") is not None |
|
else "No Title" |
|
), |
|
) |
|
if collection_articles: |
|
st.markdown(f"#### Articles in *{self.collection}*:") |
|
for article in collection_articles: |
|
if article is None: |
|
continue |
|
metadata = article.get("metadata", {}) |
|
if metadata: |
|
title = metadata.get("title", "No Title").strip() |
|
journal = metadata.get("journal", "No Journal").strip() |
|
published_year = metadata.get("published_year", "No Year") |
|
published_date = metadata.get("published_date", None) |
|
language = metadata.get("language", "No Language") |
|
else: |
|
title = "No Title" |
|
journal = "No Journal" |
|
published_year = "No Year" |
|
published_date = None |
|
language = "No Language" |
|
icon = country_emojis.get(language.upper(), "") if language else "" |
|
|
|
expander_title = f"**{title}** *{journal}* ({published_year}) {icon}" |
|
|
|
with st.expander(expander_title): |
|
# if not title == "No Title": |
|
# st.markdown(f"**Title:** \n{title}") |
|
if 'summary' in article and 'short_summary' in article['summary']: |
|
st.markdown( |
|
f"{article['summary']['short_summary']}" |
|
) |
|
if not journal == "No Journal": |
|
st.markdown(f"**Journal:** \n{journal}") |
|
|
|
if published_date: |
|
st.markdown(f"**Published Date:** \n{published_date}") |
|
for key, value in article.items(): |
|
if key in [ |
|
"_key", |
|
"text", |
|
"file", |
|
"_rev", |
|
"chunks", |
|
"user_access", |
|
"_id", |
|
"metadata", |
|
"doi", |
|
"title", |
|
"user_notes", |
|
]: |
|
continue |
|
if isinstance(value, list): |
|
value = ", ".join(value) |
|
if key == "summary": |
|
st.markdown(f"**Summary:** \n{value['text_sum']}") |
|
st.markdown(f"**{key.capitalize()}**: \n{value} ") |
|
if "doi" in article: |
|
if article["doi"]: |
|
st.markdown( |
|
f"**DOI:** \n[{article['doi']}](https://doi.org/{article['doi']}) " |
|
) |
|
|
|
# Let the user add notes to the article, if it's not a scientific article |
|
# if not article._id.startswith("sci_articles"): |
|
if "user_notes" in article and article["user_notes"]: |
|
st.markdown(f":blue[**Your notes:**]") |
|
note_number = 0 |
|
for note in article["user_notes"]: |
|
note_number += 1 |
|
c1, c2 = st.columns([4, 1]) |
|
with c1: |
|
st.markdown(f":blue[{note}]") |
|
with c2: |
|
st.button( |
|
key=f'{article["_key"]}_{note_number}', |
|
label=f":red[Delete note]", |
|
on_click=self.delete_article_note, |
|
args=(article, note), |
|
) |
|
|
|
with st.form( |
|
f"add_info_form_{article['_id']}", clear_on_submit=True |
|
): |
|
new_info = st.text_area( |
|
":blue[Add a note about the article]", |
|
key=f'new_info_{article["_id"]}', |
|
help="Add information such as what kind of article it is, what it's about, who's the author, etc.", |
|
) |
|
submitted = st.form_submit_button(":blue[Add note]") |
|
if submitted: |
|
self.update_article(article, "user_notes", new_info) |
|
|
|
st.button( |
|
key=f'delete_{article["_id"]}', |
|
label=":red[Delete article from collection]", |
|
on_click=self.delete_article, |
|
args=(self.collection, article["_id"]), |
|
) |
|
# Add info button and form |
|
|
|
st.markdown(":grey[Change metadata]") |
|
with st.form(f"update_metadata_form_{article['_id']}", clear_on_submit=True): |
|
|
|
new_title = st.text_input( |
|
":blue[Update title]", |
|
key=f'new_metadata_{article["_id"]}_title', |
|
help="Update the title of the article.", |
|
) |
|
new_author = st.text_input( |
|
":blue[Update author]", |
|
key=f'new_metadata_{article["_id"]}_author', |
|
help="Update the author of the article.", |
|
) |
|
new_journal = st.text_input( |
|
":blue[Update journal]", |
|
key=f'new_metadata_{article["_id"]}_journal', |
|
help="Update the journal of the article.", |
|
) |
|
new_published_year = st.text_input( |
|
":blue[Update published year]", |
|
key=f'new_metadata_{article["_id"]}_published_year', |
|
help="Update the published year of the article.", |
|
) |
|
submitted_metadata = st.form_submit_button(":blue[Add info]") |
|
if submitted_metadata: |
|
for info in ['new_title', 'new_author', 'new_journal', 'new_published_year']: |
|
if info: |
|
self.update_article(article, "metadata", info) |
|
|
|
|
|
else: |
|
st.write("No articles in this collection.") |
|
|
|
def sidebar_actions(self): |
|
with st.sidebar: |
|
st.markdown(f"### Add new articles to {self.collection}") |
|
with st.form("add_articles_form", clear_on_submit=True): |
|
pdf_files = st.file_uploader( |
|
"Upload PDF file(s)", type=["pdf"], accept_multiple_files=True |
|
) |
|
is_sci = st.checkbox("All articles are from scientific journals") |
|
submitted = st.form_submit_button("Upload") |
|
if submitted and pdf_files: |
|
self.add_articles(pdf_files, is_sci) |
|
# Persist state after adding articles |
|
self.update_session_state(page_name=self.page_name) |
|
st.rerun() |
|
|
|
help_text = 'Paste a text containing DOIs, e.g., the reference section of a paper, and click "Add Articles" to add them to the collection.' |
|
new_articles = st.text_area( |
|
"Add articles to this collection", help=help_text |
|
) |
|
if st.button("Add Articles"): |
|
with st.spinner("Processing..."): |
|
self.process_dois( |
|
article_collection_name=self.collection, text=new_articles |
|
) |
|
# Persist state after processing DOIs |
|
self.update_session_state(page_name=self.page_name) |
|
st.rerun() |
|
|
|
self.write_not_downloaded() |
|
|
|
def add_articles(self, pdf_files: list, is_sci: bool) -> None: |
|
|
|
for pdf_file in pdf_files: |
|
status_container = st.empty() |
|
with status_container: |
|
is_sci = is_sci if is_sci else None |
|
with st.status(f"Processing {pdf_file.name}..."): |
|
processor = PDFProcessor( |
|
pdf_file=pdf_file, |
|
filename=pdf_file.name, |
|
process=False, |
|
username=st.session_state["username"], |
|
document_type="other_documents", |
|
is_sci=is_sci, |
|
) |
|
_id, db, doi = processor.process_document() |
|
print_rainbow(_id, db, doi) |
|
if doi in st.session_state.get("not_downloaded", {}): |
|
st.session_state["not_downloaded"].pop(doi) |
|
self.articles2collection(collection=self.collection, db=db, _id=_id) |
|
st.success("Done!") |
|
sleep(1.5) |
|
|
|
def articles2collection(self, collection: str, db: str, _id: str = None) -> None: |
|
info = self.get_article_info(db, _id=_id) |
|
info = { |
|
k: v for k, v in info.items() if k in ["_id", "doi", "title", "metadata"] |
|
} |
|
doc_cursor = self.user_arango.db.aql.execute( |
|
f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc' |
|
) |
|
doc = next(doc_cursor, None) |
|
if doc: |
|
articles = doc.get("articles", []) |
|
keys = [i["_id"] for i in articles] |
|
if info["_id"] not in keys: |
|
articles.append(info) |
|
self.user_arango.db.collection("article_collections").update_match( |
|
filters={"name": collection}, |
|
body={"articles": articles}, |
|
merge=True, |
|
) |
|
# Persist state after updating articles |
|
self.update_session_state(page_name=self.page_name) |
|
|
|
def get_article_info(self, db: str, _id: str = None, doi: str = None) -> dict: |
|
assert _id or doi, "Either _id or doi must be provided." |
|
arango = self.get_arango(db_name=db) |
|
if _id: |
|
query = """ |
|
RETURN { |
|
"_id": DOCUMENT(@doc_id)._id, |
|
"doi": DOCUMENT(@doc_id).doi, |
|
"title": DOCUMENT(@doc_id).title, |
|
"metadata": DOCUMENT(@doc_id).metadata, |
|
"summary": DOCUMENT(@doc_id).summary |
|
} |
|
""" |
|
|
|
info_cursor = arango.db.aql.execute(query, bind_vars={"doc_id": _id}) |
|
elif doi: |
|
info_cursor = arango.db.aql.execute( |
|
f'''FOR doc IN sci_articles |
|
FILTER doc["doi"] == "{doi}" |
|
LIMIT 1 |
|
RETURN {{ |
|
"_id": doc["_id"], |
|
"doi": doc["doi"], |
|
"title": doc["title"], |
|
"metadata": doc["metadata"], |
|
"summary": doc["summary"] |
|
}}''' |
|
) |
|
return next(info_cursor, None) |
|
|
|
def process_dois( |
|
self, article_collection_name: str, text: str = None, dois: list = None |
|
) -> None: |
|
processor = PDFProcessor(process=False) |
|
if not dois and text: |
|
dois = processor.extract_doi(text, multi=True) |
|
if "not_downloaded" not in st.session_state: |
|
st.session_state["not_downloaded"] = {} |
|
for doi in dois: |
|
downloaded, url, path, in_db = processor.doi2pdf(doi) |
|
if downloaded and not in_db: |
|
processor.process_pdf(path) |
|
in_db = True |
|
elif not downloaded and not in_db: |
|
st.session_state["not_downloaded"][doi] = url |
|
|
|
if in_db: |
|
st.success(f"Article with DOI {doi} added") |
|
self.articles2collection( |
|
collection=article_collection_name, |
|
db="base", |
|
_id=f"sci_articles/{fix_key(doi)}", |
|
) |
|
# Persist state after processing DOIs |
|
self.update_session_state(page_name=self.page_name) |
|
|
|
def write_not_downloaded(self): |
|
not_downloaded = st.session_state.get("not_downloaded", {}) |
|
if not_downloaded: |
|
st.markdown( |
|
"*The articles below were not downloaded. Download them yourself and add them to the collection by dropping them in the area above. Some of them can be downloaded using the link.*" |
|
) |
|
for doi, url in not_downloaded.items(): |
|
if url: |
|
st.markdown(f"- [{doi}]({url})") |
|
else: |
|
st.markdown(f"- {doi}") |
|
|
|
def delete_article(self, collection, _id): |
|
doc_cursor = self.user_arango.db.aql.execute( |
|
f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc' |
|
) |
|
doc = next(doc_cursor, None) |
|
if doc: |
|
articles = [ |
|
article for article in doc.get("articles", []) if article["_id"] != _id |
|
] |
|
self.user_arango.db.collection("article_collections").update_match( |
|
filters={"_id": doc["_id"]}, |
|
body={"articles": articles}, |
|
) |
|
# Persist state after deleting an article |
|
self.update_session_state(page_name=self.page_name) |
|
|
|
def update_article(self, article, field, value): |
|
""" |
|
Update a specified field in an article with a new value. |
|
|
|
If the field already exists and is a list, the new value is appended to the list. |
|
If the field exists but is not a list, the field is converted to a list containing |
|
the old and new values. If the field does not exist, it is created as a list with |
|
the new value. |
|
|
|
Args: |
|
article (dict): The article to be updated. |
|
field (str): The field in the article to be updated. |
|
value (str): The new value to be added to the field. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
value = str(value.strip()) |
|
|
|
if field in article: |
|
if isinstance(article[field], list): |
|
article[field].append(value) |
|
else: |
|
article[field] = [article[field], value] |
|
else: |
|
article[field] = [value] |
|
self.user_arango.db.update_document(article, check_rev=False, silent=True) |
|
sleep(0.2) |
|
st.rerun() |
|
|
|
def delete_article_note(self, article: dict, note: str): |
|
"Delete a note from a list of notes in an article document." |
|
if "user_notes" in article and note in article["user_notes"]: |
|
article["user_notes"].remove(note) |
|
self.user_arango.db.update_document(article, check_rev=False, silent=True) |
|
sleep(0.1)
|
|
|