|
|
import streamlit as st |
|
|
from time import sleep |
|
|
from datetime import datetime |
|
|
import os |
|
|
import requests |
|
|
from pathlib import Path |
|
|
from colorprinter.print_color import * |
|
|
|
|
|
from _base_class import StreamlitBaseClass |
|
|
from semantic_schoolar import search_semantic_scholar |
|
|
from collections_page import ArticleCollectionsPage |
|
|
from article2db import PDFProcessor |
|
|
from utils import fix_key |
|
|
|
|
|
|
|
|
class SearchPaperPage(StreamlitBaseClass): |
|
|
""" |
|
|
SearchPaperPage - A Streamlit interface for searching papers and adding to collections. |
|
|
|
|
|
This class provides a user interface for searching research papers using the Semantic Scholar API |
|
|
and adding them to article collections. It allows filtering by various parameters and displaying |
|
|
detailed information about each paper. |
|
|
|
|
|
Attributes: |
|
|
username (str): The username of the current user. |
|
|
page_name (str): Name of the current page ("Search Papers"). |
|
|
collection (str): The currently selected collection to add papers to. |
|
|
search_results (list): List of papers from the most recent search. |
|
|
search_state (dict): Dictionary tracking the state of the current search. |
|
|
download_queue (dict): Tracks papers that need manual download. |
|
|
""" |
|
|
def __init__(self, username: str): |
|
|
super().__init__(username=username) |
|
|
self.page_name = "Search Papers" |
|
|
self.collection = self.get_settings().get("current_collection") |
|
|
|
|
|
# Initialize search results and state |
|
|
self.search_results = [] |
|
|
self.search_state = { |
|
|
"last_query": None, |
|
|
"filters": {}, |
|
|
"page": 0, |
|
|
"results_per_page": 10 |
|
|
} |
|
|
|
|
|
# Track papers that need to be downloaded manually |
|
|
self.download_queue = {} |
|
|
|
|
|
# Download directory |
|
|
self.download_dir = Path(f"/home/lasse/sci/downloads/{username}") |
|
|
self.download_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
# Initialize attributes from session state if available |
|
|
if self.page_name in st.session_state: |
|
|
for k, v in st.session_state[self.page_name].items(): |
|
|
setattr(self, k, v) |
|
|
|
|
|
def run(self): |
|
|
"""Main method to render the search interface and handle user interactions.""" |
|
|
self.update_current_page(self.page_name) |
|
|
|
|
|
st.title("Search Research Papers") |
|
|
|
|
|
# Instructions for first-time users |
|
|
with st.expander("ℹ️ How to use this page", expanded=not self.search_results): |
|
|
st.markdown(""" |
|
|
### Finding and Adding Research Papers |
|
|
1. **Search for papers** using the search box below |
|
|
2. **Select a collection** from the sidebar to add papers to |
|
|
3. **Open access papers** can be downloaded automatically |
|
|
4. **Non-open access papers** require manual download (you'll get instructions) |
|
|
5. **View paper details** by clicking on a paper in the search results |
|
|
|
|
|
### Understanding paper availability: |
|
|
- 📄 = Open access paper (can be downloaded automatically) |
|
|
- 🔒 = Requires manual download (instructions provided) |
|
|
""") |
|
|
|
|
|
# Show download queue if papers need manual download |
|
|
if self.download_queue: |
|
|
self.show_download_instructions() |
|
|
|
|
|
# Sidebar for collections and filters |
|
|
self.sidebar_actions() |
|
|
|
|
|
# Main search interface |
|
|
self.search_interface() |
|
|
|
|
|
# Display search results if available |
|
|
if self.search_results: |
|
|
self.display_search_results() |
|
|
|
|
|
# Show upload interface for manually downloaded papers |
|
|
if self.download_queue: |
|
|
self.show_manual_upload_interface() |
|
|
|
|
|
# Persist state to session_state |
|
|
self.update_session_state(page_name=self.page_name) |
|
|
|
|
|
def sidebar_actions(self): |
|
|
"""Renders sidebar elements for selecting collections and search filters.""" |
|
|
with st.sidebar: |
|
|
# Collection selection |
|
|
collections = self.get_article_collections() |
|
|
if collections: |
|
|
st.subheader("Select Collection") |
|
|
self.collection = st.selectbox( |
|
|
"Add papers to collection:", |
|
|
collections, |
|
|
index=collections.index(self.collection) if self.collection in collections else 0 |
|
|
) |
|
|
self.update_settings("current_collection", self.collection) |
|
|
else: |
|
|
st.warning("No collections available. Create a collection first.") |
|
|
if st.button("Create Collection"): |
|
|
st.session_state["new_collection"] = True |
|
|
self.collection = None |
|
|
|
|
|
# Search Filters |
|
|
st.subheader("Search Filters") |
|
|
|
|
|
# Publication types |
|
|
pub_types = [ |
|
|
"JournalArticle", "Review", "Conference", "MetaAnalysis", |
|
|
"Study", "Book", "BookSection", "Dataset", "ClinicalTrial" |
|
|
] |
|
|
selected_pub_types = st.multiselect( |
|
|
"Publication Types", |
|
|
pub_types, |
|
|
default=self.search_state.get("filters", {}).get("publication_types", ["JournalArticle"]) |
|
|
) |
|
|
if selected_pub_types: |
|
|
self.search_state["filters"]["publication_types"] = selected_pub_types |
|
|
|
|
|
# Open access only |
|
|
open_access = st.checkbox( |
|
|
"Open Access Only", |
|
|
value=self.search_state.get("filters", {}).get("open_access", False) |
|
|
) |
|
|
self.search_state["filters"]["open_access"] = open_access |
|
|
|
|
|
# Year range |
|
|
st.subheader("Year Range") |
|
|
col1, col2 = st.columns(2) |
|
|
with col1: |
|
|
start_year = st.number_input( |
|
|
"From", |
|
|
min_value=1900, |
|
|
max_value=datetime.now().year, |
|
|
value=self.search_state.get("filters", {}).get("start_year", 2010) |
|
|
) |
|
|
with col2: |
|
|
end_year = st.number_input( |
|
|
"To", |
|
|
min_value=1900, |
|
|
max_value=datetime.now().year, |
|
|
value=self.search_state.get("filters", {}).get("end_year", datetime.now().year) |
|
|
) |
|
|
if start_year and end_year: |
|
|
self.search_state["filters"]["start_year"] = start_year |
|
|
self.search_state["filters"]["end_year"] = end_year |
|
|
self.search_state["filters"]["year_range"] = f"{start_year}-{end_year}" |
|
|
|
|
|
# Minimum citations |
|
|
min_citations = st.number_input( |
|
|
"Minimum Citations", |
|
|
min_value=0, |
|
|
value=self.search_state.get("filters", {}).get("min_citation_count", 0) |
|
|
) |
|
|
self.search_state["filters"]["min_citation_count"] = min_citations |
|
|
|
|
|
# Fields of study |
|
|
fields_of_study = [ |
|
|
"Computer Science", "Medicine", "Chemistry", "Biology", |
|
|
"Mathematics", "Engineering", "Psychology", "Physics", |
|
|
"Economics", "Business", "Political Science", "Environmental Science" |
|
|
] |
|
|
selected_fields = st.multiselect( |
|
|
"Fields of Study", |
|
|
fields_of_study, |
|
|
default=self.search_state.get("filters", {}).get("fields_of_study", []) |
|
|
) |
|
|
if selected_fields: |
|
|
self.search_state["filters"]["fields_of_study"] = selected_fields |
|
|
|
|
|
# Results per page |
|
|
results_per_page = st.select_slider( |
|
|
"Results Per Page", |
|
|
options=[5, 10, 15, 20, 25, 30], |
|
|
value=self.search_state.get("results_per_page", 10) |
|
|
) |
|
|
self.search_state["results_per_page"] = results_per_page |
|
|
|
|
|
# Reset filters button |
|
|
if st.button("Reset Filters"): |
|
|
self.search_state["filters"] = {} |
|
|
self.search_state["results_per_page"] = 10 |
|
|
st.rerun() |
|
|
|
|
|
def search_interface(self): |
|
|
"""Renders the main search interface with query input and search button.""" |
|
|
with st.form("search_form", clear_on_submit=False): |
|
|
col1, col2 = st.columns([3, 1]) |
|
|
with col1: |
|
|
query = st.text_input( |
|
|
"Search for papers:", |
|
|
value=self.search_state.get("last_query", ""), |
|
|
placeholder="Enter keywords, title, author, etc." |
|
|
) |
|
|
|
|
|
submitted = st.form_submit_button("Search") |
|
|
|
|
|
if submitted and query: |
|
|
self.search_state["last_query"] = query |
|
|
self.search_state["page"] = 0 # Reset to first page |
|
|
self.perform_search(query) |
|
|
|
|
|
def perform_search(self, query: str): |
|
|
""" |
|
|
Executes a search using the Semantic Scholar API with the provided query and filters. |
|
|
|
|
|
Args: |
|
|
query (str): The search query string. |
|
|
""" |
|
|
with st.spinner("Searching for papers..."): |
|
|
try: |
|
|
# Extract filters from search state |
|
|
filters = self.search_state.get("filters", {}) |
|
|
|
|
|
# Perform the search |
|
|
results = search_semantic_scholar( |
|
|
query=query, |
|
|
limit=self.search_state.get("results_per_page", 10), |
|
|
publication_types=filters.get("publication_types"), |
|
|
open_access=filters.get("open_access", False), |
|
|
min_citation_count=filters.get("min_citation_count"), |
|
|
year_range=filters.get("year_range"), |
|
|
fields_of_study=filters.get("fields_of_study") |
|
|
) |
|
|
|
|
|
if results: |
|
|
self.search_results = results |
|
|
st.success(f"Found {len(results)} papers matching your query") |
|
|
else: |
|
|
st.warning("No results found. Try modifying your search query or filters.") |
|
|
self.search_results = [] |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"An error occurred during search: {str(e)}") |
|
|
print_red(f"Search error: {str(e)}") |
|
|
self.search_results = [] |
|
|
|
|
|
def display_search_results(self): |
|
|
"""Displays the search results with paper details and options to add to collections.""" |
|
|
st.subheader("Search Results") |
|
|
|
|
|
# Summary of results |
|
|
open_access_count = sum(1 for paper in self.search_results if paper.get('openAccessPdf', {}).get('url')) |
|
|
st.info(f"Found {len(self.search_results)} papers ({open_access_count} open access)") |
|
|
|
|
|
# Pagination controls |
|
|
if len(self.search_results) > 0: |
|
|
total_pages = 1 # Currently only showing one page of results from the API |
|
|
current_page = self.search_state.get("page", 0) |
|
|
|
|
|
# Display papers on the current page |
|
|
for i, paper in enumerate(self.search_results): |
|
|
self.display_paper_card(paper, i) |
|
|
|
|
|
def display_paper_card(self, paper, index): |
|
|
""" |
|
|
Displays a single paper as a card with details and action buttons. |
|
|
|
|
|
Args: |
|
|
paper (dict): The paper data from Semantic Scholar API |
|
|
index (int): Index of the paper in the search results |
|
|
""" |
|
|
# Extract paper details |
|
|
title = paper.get('title', 'No Title') |
|
|
authors = paper.get('authors', []) |
|
|
author_names = [author.get('name', '') for author in authors] |
|
|
author_str = ", ".join(author_names) if author_names else "Unknown Authors" |
|
|
year = paper.get('year', 'Unknown Year') |
|
|
journal = paper.get('journal', {}).get('name', 'Unknown Journal') |
|
|
citation_count = paper.get('citationCount', 0) |
|
|
influential_citation_count = paper.get('influentialCitationCount', 0) |
|
|
paper_id = paper.get('paperId', '') |
|
|
paper_url = paper.get('url', '') |
|
|
doi = paper.get('externalIds', {}).get('DOI', '') |
|
|
|
|
|
# Check if open access |
|
|
open_access = paper.get('openAccessPdf', {}).get('url', None) |
|
|
open_access_icon = "📄" if open_access else "🔒" |
|
|
|
|
|
# Create card |
|
|
with st.expander(f"{title} ({year}) {open_access_icon}"): |
|
|
# Basic information |
|
|
st.markdown(f"**Title:** {title}") |
|
|
st.markdown(f"**Authors:** {author_str}") |
|
|
st.markdown(f"**Year:** {year}") |
|
|
st.markdown(f"**Journal:** {journal}") |
|
|
|
|
|
# Citations |
|
|
st.markdown(f"**Citations:** {citation_count} (Influential: {influential_citation_count})") |
|
|
|
|
|
# Abstract - Don't use nested expander |
|
|
abstract = paper.get('abstract', 'No abstract available') |
|
|
if abstract: |
|
|
st.markdown("**Abstract:**") |
|
|
st.markdown(f"<div style='border-left: 2px solid #ccc; padding-left: 10px; margin-bottom: 10px;'>{abstract}</div>", unsafe_allow_html=True) |
|
|
|
|
|
# TL;DR summary if available |
|
|
tldr = None |
|
|
if 'tldr' in paper and isinstance(paper['tldr'], dict): |
|
|
tldr = paper['tldr'].get('text') |
|
|
if tldr: |
|
|
st.markdown(f"**TL;DR:** {tldr}") |
|
|
|
|
|
# Fields of study |
|
|
fields = paper.get('fieldsOfStudy', []) |
|
|
if fields: |
|
|
st.markdown(f"**Fields:** {', '.join(fields)}") |
|
|
|
|
|
# External IDs |
|
|
external_ids = paper.get('externalIds', {}) |
|
|
if external_ids: |
|
|
id_text = [] |
|
|
for id_type, id_value in external_ids.items(): |
|
|
id_text.append(f"{id_type}: {id_value}") |
|
|
st.markdown(f"**IDs:** {', '.join(id_text)}") |
|
|
|
|
|
# Paper actions section |
|
|
st.markdown("---") |
|
|
st.markdown("### Access Paper") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
# Open access papers - direct download and add |
|
|
if open_access: |
|
|
with col1: |
|
|
st.success("📄 This paper is open access") |
|
|
st.markdown(f"[View on Semantic Scholar]({paper_url})") |
|
|
if doi: |
|
|
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})") |
|
|
|
|
|
with col2: |
|
|
# Direct download button |
|
|
if st.button(f"Download PDF", key=f"dl_{paper_id}"): |
|
|
with st.spinner("Downloading PDF..."): |
|
|
success, filepath = self.download_pdf(open_access, f"{paper_id}.pdf") |
|
|
if success: |
|
|
st.success(f"Downloaded to {filepath}") |
|
|
# Automatically add to collection if selected |
|
|
if self.collection: |
|
|
self.add_paper_to_collection(paper, self.collection, filepath) |
|
|
else: |
|
|
st.error("Failed to download PDF") |
|
|
|
|
|
# Direct add to collection without downloading |
|
|
if self.collection: |
|
|
if st.button(f"Add to {self.collection} without download", key=f"add_nodl_{paper_id}"): |
|
|
self.add_paper_to_collection(paper, self.collection) |
|
|
|
|
|
# Non-open access papers - manual workflow |
|
|
else: |
|
|
with col1: |
|
|
st.warning("🔒 This paper requires manual download") |
|
|
st.markdown(f"[View on Semantic Scholar]({paper_url})") |
|
|
if doi: |
|
|
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})") |
|
|
|
|
|
with col2: |
|
|
# Add to manual download queue |
|
|
if st.button(f"I want this paper", key=f"want_{paper_id}"): |
|
|
self.download_queue[paper_id] = { |
|
|
"paper": paper, |
|
|
"collection": self.collection |
|
|
} |
|
|
st.info("Added to your download queue. See instructions at the top of the page.") |
|
|
|
|
|
def show_download_instructions(self): |
|
|
"""Shows instructions for papers that need manual download.""" |
|
|
with st.expander("⬇️ Papers that need manual download", expanded=True): |
|
|
st.markdown("### Papers Requiring Manual Download") |
|
|
st.markdown(""" |
|
|
Some papers aren't freely available and need to be downloaded manually. Follow these steps: |
|
|
|
|
|
1. Click the links below to access the publisher's website |
|
|
2. Download the PDF file (you may need institutional access) |
|
|
3. Save the file to your computer |
|
|
4. Upload the PDF in the 'Upload Downloaded Papers' section below |
|
|
""") |
|
|
|
|
|
for paper_id, info in self.download_queue.items(): |
|
|
paper = info["paper"] |
|
|
title = paper.get('title', 'No Title') |
|
|
doi = paper.get('externalIds', {}).get('DOI', '') |
|
|
paper_url = paper.get('url', '') |
|
|
|
|
|
st.markdown(f"**{title}**") |
|
|
col1, col2 = st.columns([3, 1]) |
|
|
with col1: |
|
|
if paper_url: |
|
|
st.markdown(f"[Semantic Scholar]({paper_url})") |
|
|
if doi: |
|
|
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})") |
|
|
with col2: |
|
|
if st.button("Remove", key=f"remove_{paper_id}"): |
|
|
del self.download_queue[paper_id] |
|
|
st.rerun() |
|
|
|
|
|
def show_manual_upload_interface(self): |
|
|
"""Interface for uploading manually downloaded papers.""" |
|
|
st.markdown("---") |
|
|
st.header("Upload Downloaded Papers") |
|
|
st.markdown("Upload the PDFs you downloaded manually here:") |
|
|
|
|
|
with st.form("manual_upload_form", clear_on_submit=True): |
|
|
uploaded_files = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True) |
|
|
collection_for_upload = st.selectbox( |
|
|
"Add to collection:", |
|
|
self.get_article_collections(), |
|
|
index=self.get_article_collections().index(self.collection) if self.collection in self.get_article_collections() else 0 |
|
|
) |
|
|
submitted = st.form_submit_button("Upload and Process") |
|
|
|
|
|
if submitted and uploaded_files: |
|
|
with st.spinner("Processing uploaded files..."): |
|
|
success_count = 0 |
|
|
for pdf_file in uploaded_files: |
|
|
# Process the uploaded PDF |
|
|
success = self.process_uploaded_pdf(pdf_file, collection_for_upload) |
|
|
if success: |
|
|
success_count += 1 |
|
|
|
|
|
# Remove from download queue if it matches a title |
|
|
for paper_id in list(self.download_queue.keys()): |
|
|
if self.download_queue[paper_id]["paper"].get("title", "").lower() in pdf_file.name.lower(): |
|
|
del self.download_queue[paper_id] |
|
|
|
|
|
st.success(f"Successfully processed {success_count} of {len(uploaded_files)} files") |
|
|
if success_count > 0: |
|
|
st.rerun() |
|
|
|
|
|
def download_pdf(self, url, filename): |
|
|
""" |
|
|
Downloads a PDF from a URL and saves it to the download directory. |
|
|
|
|
|
Args: |
|
|
url (str): The URL of the PDF to download |
|
|
filename (str): The filename to save the PDF as |
|
|
|
|
|
Returns: |
|
|
tuple: (success, filepath) |
|
|
""" |
|
|
try: |
|
|
# Make sure the filename is safe |
|
|
safe_filename = "".join(c for c in filename if c.isalnum() or c in "._-") or "paper.pdf" |
|
|
if not safe_filename.endswith(".pdf"): |
|
|
safe_filename += ".pdf" |
|
|
|
|
|
filepath = self.download_dir / safe_filename |
|
|
print_blue(f"Downloading {url} to {filepath}") |
|
|
|
|
|
# Download the file |
|
|
response = requests.get(url, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
# Check if the content is actually a PDF |
|
|
content_type = response.headers.get('Content-Type', '') |
|
|
if 'application/pdf' not in content_type.lower() and not url.lower().endswith('.pdf'): |
|
|
print_red(f"Warning: Content may not be a PDF. Content-Type: {content_type}") |
|
|
|
|
|
# Save the file |
|
|
with open(filepath, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
# Verify the file exists and has content |
|
|
if not filepath.exists() or filepath.stat().st_size == 0: |
|
|
print_red(f"Downloaded file is empty or doesn't exist") |
|
|
return False, None |
|
|
|
|
|
print_blue(f"Successfully downloaded to {filepath}") |
|
|
return True, filepath |
|
|
except Exception as e: |
|
|
print_red(f"Error downloading PDF: {str(e)}") |
|
|
return False, None |
|
|
|
|
|
def process_uploaded_pdf(self, pdf_file, collection_name): |
|
|
""" |
|
|
Process an uploaded PDF file and add it to a collection. |
|
|
|
|
|
Args: |
|
|
pdf_file (UploadedFile): The uploaded PDF file |
|
|
collection_name (str): The collection to add the paper to |
|
|
|
|
|
Returns: |
|
|
bool: Success or failure |
|
|
""" |
|
|
try: |
|
|
# Create a temporary file to process |
|
|
processor = PDFProcessor( |
|
|
pdf_file=pdf_file, |
|
|
filename=pdf_file.name, |
|
|
process=False, |
|
|
username=self.username, |
|
|
document_type="other_documents", |
|
|
) |
|
|
|
|
|
_id, db, doi = processor.process_document() |
|
|
print_blue(f"Processed document: ID={_id}, DB={db}, DOI={doi}") |
|
|
|
|
|
if _id: |
|
|
# Add to collection |
|
|
self.articles2collection(collection=collection_name, db=db, _id=_id) |
|
|
return True |
|
|
return False |
|
|
except Exception as e: |
|
|
print_red(f"Error processing PDF: {str(e)}") |
|
|
st.error(f"Failed to process {pdf_file.name}: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
def add_paper_to_collection(self, paper, collection_name, filepath=None): |
|
|
""" |
|
|
Adds a paper from search results to the selected collection. |
|
|
|
|
|
Args: |
|
|
paper (dict): The paper data from Semantic Scholar API |
|
|
collection_name (str): Name of the collection to add the paper to |
|
|
filepath (str, optional): Path to downloaded PDF file |
|
|
""" |
|
|
# If we have a filepath, process the PDF |
|
|
if filepath: |
|
|
# Debug information about the file |
|
|
if not os.path.exists(filepath): |
|
|
st.error(f"File does not exist: {filepath}") |
|
|
print_red(f"File does not exist: {filepath}") |
|
|
# Fall back to metadata-only |
|
|
else: |
|
|
file_size = os.path.getsize(filepath) |
|
|
st.info(f"Processing PDF file: {filepath} ({file_size} bytes)") |
|
|
print_blue(f"Processing PDF file: {filepath} ({file_size} bytes)") |
|
|
|
|
|
with open(filepath, 'rb') as f: |
|
|
header = f.read(10) |
|
|
is_pdf = header.startswith(b'%PDF-') |
|
|
print_blue(f"File header check - Is PDF: {is_pdf}, Header: {header}") |
|
|
|
|
|
# Read the entire file content |
|
|
f.seek(0) |
|
|
pdf_content = f.read() |
|
|
|
|
|
# Create PDFProcessor with content bytes instead of file handle |
|
|
st.info("Creating PDFProcessor...") |
|
|
processor = PDFProcessor( |
|
|
pdf_file=pdf_content, # Pass content bytes instead of file handle |
|
|
filename=os.path.basename(filepath), |
|
|
process=False, |
|
|
username=self.username, |
|
|
document_type="other_documents", |
|
|
is_sci=True |
|
|
) |
|
|
|
|
|
st.info("Checking if processor was created successfully...") |
|
|
if processor: |
|
|
print_blue(f"PDFProcessor created: {processor}") |
|
|
st.info("Processing document...") |
|
|
_id, db, doi = processor.process_document() #! |
|
|
print_blue(f"Document processed: ID={_id}, DB={db}, DOI={doi}") |
|
|
|
|
|
if _id: |
|
|
self.articles2collection(collection=collection_name, db=db, _id=_id) |
|
|
st.success(f"Added '{paper.get('title')}' to {collection_name}") |
|
|
return |
|
|
else: |
|
|
st.warning("Process document returned no ID. Falling back to metadata.") |
|
|
else: |
|
|
st.warning("PDFProcessor creation failed. Falling back to metadata.") |
|
|
|
|
|
|
|
|
# Add directly using metadata if file processing failed or no filepath |
|
|
st.info("Adding paper using metadata only") |
|
|
paper_info = { |
|
|
"_id": f"sci_articles/{paper.get('paperId', '')}", |
|
|
"doi": paper.get('externalIds', {}).get('DOI', ''), |
|
|
"metadata": { |
|
|
"title": paper.get('title', 'No Title'), |
|
|
"journal": paper.get('journal', {}).get('name', 'Unknown Journal'), |
|
|
"published_year": paper.get('year', ''), |
|
|
"published_date": paper.get('publicationDate', ''), |
|
|
"authors": [author.get('name', '') for author in paper.get('authors', [])], |
|
|
"abstract": paper.get('abstract', ''), |
|
|
"url": paper.get('url', ''), |
|
|
"open_access_url": paper.get('openAccessPdf', {}).get('url', ''), |
|
|
"citation_count": paper.get('citationCount', 0), |
|
|
"fields_of_study": paper.get('fieldsOfStudy', []), |
|
|
} |
|
|
} |
|
|
|
|
|
# Check if collection exists |
|
|
doc_cursor = self.user_arango.db.aql.execute( |
|
|
f'FOR doc IN article_collections FILTER doc["name"] == "{collection_name}" RETURN doc' |
|
|
) |
|
|
doc = next(doc_cursor, None) |
|
|
|
|
|
if doc: |
|
|
# Check if paper already exists in the collection |
|
|
articles = doc.get("articles", []) |
|
|
for article in articles: |
|
|
if article.get("_id") == paper_info["_id"] or article.get("doi") == paper_info["doi"]: |
|
|
st.warning(f"This paper is already in the '{collection_name}' collection.") |
|
|
return |
|
|
|
|
|
# Add paper to collection |
|
|
articles.append(paper_info) |
|
|
self.user_arango.db.collection("article_collections").update_match( |
|
|
filters={"name": collection_name}, |
|
|
body={"articles": articles}, |
|
|
merge=True, |
|
|
) |
|
|
st.success(f"Added '{paper.get('title')}' to {collection_name}") |
|
|
|
|
|
# Persist state after adding paper |
|
|
self.update_session_state(page_name=self.page_name) |
|
|
else: |
|
|
st.error(f"Collection '{collection_name}' not found.") |
|
|
|
|
|
|
|
|
|
|
|
def articles2collection(self, collection: str, db: str, _id: str = None) -> None: |
|
|
""" |
|
|
Add an article to a collection by retrieving its info from the database. |
|
|
|
|
|
Args: |
|
|
collection (str): The collection name |
|
|
db (str): The database name |
|
|
_id (str): The article ID |
|
|
""" |
|
|
info = self.get_article_info(db, _id=_id) |
|
|
info = { |
|
|
k: v for k, v in info.items() if k in ["_id", "doi", "title", "metadata"] |
|
|
} |
|
|
doc_cursor = self.user_arango.db.aql.execute( |
|
|
f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc' |
|
|
) |
|
|
doc = next(doc_cursor, None) |
|
|
if doc: |
|
|
articles = doc.get("articles", []) |
|
|
keys = [i["_id"] for i in articles] |
|
|
if info["_id"] not in keys: |
|
|
articles.append(info) |
|
|
self.user_arango.db.collection("article_collections").update_match( |
|
|
filters={"name": collection}, |
|
|
body={"articles": articles}, |
|
|
merge=True, |
|
|
) |
|
|
# Persist state after updating articles |
|
|
self.update_session_state(page_name=self.page_name) |
|
|
|
|
|
def get_article_info(self, db: str, _id: str = None, doi: str = None) -> dict: |
|
|
""" |
|
|
Get article info from the database. |
|
|
|
|
|
Args: |
|
|
db (str): The database name |
|
|
_id (str, optional): The article ID |
|
|
doi (str, optional): The article DOI |
|
|
|
|
|
Returns: |
|
|
dict: The article info |
|
|
""" |
|
|
assert _id or doi, "Either _id or doi must be provided." |
|
|
arango = self.get_arango(db_name=db) |
|
|
if _id: |
|
|
query = """ |
|
|
RETURN { |
|
|
"_id": DOCUMENT(@doc_id)._id, |
|
|
"doi": DOCUMENT(@doc_id).doi, |
|
|
"title": DOCUMENT(@doc_id).title, |
|
|
"metadata": DOCUMENT(@doc_id).metadata, |
|
|
"summary": DOCUMENT(@doc_id).summary |
|
|
} |
|
|
""" |
|
|
|
|
|
info_cursor = arango.db.aql.execute(query, bind_vars={"doc_id": _id}) |
|
|
elif doi: |
|
|
info_cursor = arango.db.aql.execute( |
|
|
f'FOR doc IN sci_articles FILTER doc["doi"] == "{doi}" LIMIT 1 RETURN {{"_id": doc["_id"], "doi": doc["doi"], "title": doc["title"], "metadata": doc["metadata"], "summary": doc["summary"]}}' |
|
|
) |
|
|
return next(info_cursor, None) |