You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

692 lines
30 KiB

import streamlit as st
from time import sleep
from datetime import datetime
import os
import requests
from pathlib import Path
from colorprinter.print_color import *
from _base_class import StreamlitBaseClass
from semantic_schoolar import search_semantic_scholar
from collections_page import ArticleCollectionsPage
from article2db import PDFProcessor
from utils import fix_key
class SearchPaperPage(StreamlitBaseClass):
"""
SearchPaperPage - A Streamlit interface for searching papers and adding to collections.
This class provides a user interface for searching research papers using the Semantic Scholar API
and adding them to article collections. It allows filtering by various parameters and displaying
detailed information about each paper.
Attributes:
username (str): The username of the current user.
page_name (str): Name of the current page ("Search Papers").
collection (str): The currently selected collection to add papers to.
search_results (list): List of papers from the most recent search.
search_state (dict): Dictionary tracking the state of the current search.
download_queue (dict): Tracks papers that need manual download.
"""
def __init__(self, username: str):
super().__init__(username=username)
self.page_name = "Search Papers"
self.collection = self.get_settings().get("current_collection")
# Initialize search results and state
self.search_results = []
self.search_state = {
"last_query": None,
"filters": {},
"page": 0,
"results_per_page": 10
}
# Track papers that need to be downloaded manually
self.download_queue = {}
# Download directory
self.download_dir = Path(f"/home/lasse/sci/downloads/{username}")
self.download_dir.mkdir(parents=True, exist_ok=True)
# Initialize attributes from session state if available
if self.page_name in st.session_state:
for k, v in st.session_state[self.page_name].items():
setattr(self, k, v)
def run(self):
"""Main method to render the search interface and handle user interactions."""
self.update_current_page(self.page_name)
st.title("Search Research Papers")
# Instructions for first-time users
with st.expander(" How to use this page", expanded=not self.search_results):
st.markdown("""
### Finding and Adding Research Papers
1. **Search for papers** using the search box below
2. **Select a collection** from the sidebar to add papers to
3. **Open access papers** can be downloaded automatically
4. **Non-open access papers** require manual download (you'll get instructions)
5. **View paper details** by clicking on a paper in the search results
### Understanding paper availability:
- 📄 = Open access paper (can be downloaded automatically)
- 🔒 = Requires manual download (instructions provided)
""")
# Show download queue if papers need manual download
if self.download_queue:
self.show_download_instructions()
# Sidebar for collections and filters
self.sidebar_actions()
# Main search interface
self.search_interface()
# Display search results if available
if self.search_results:
self.display_search_results()
# Show upload interface for manually downloaded papers
if self.download_queue:
self.show_manual_upload_interface()
# Persist state to session_state
self.update_session_state(page_name=self.page_name)
def sidebar_actions(self):
"""Renders sidebar elements for selecting collections and search filters."""
with st.sidebar:
# Collection selection
collections = self.get_article_collections()
if collections:
st.subheader("Select Collection")
self.collection = st.selectbox(
"Add papers to collection:",
collections,
index=collections.index(self.collection) if self.collection in collections else 0
)
self.update_settings("current_collection", self.collection)
else:
st.warning("No collections available. Create a collection first.")
if st.button("Create Collection"):
st.session_state["new_collection"] = True
self.collection = None
# Search Filters
st.subheader("Search Filters")
# Publication types
pub_types = [
"JournalArticle", "Review", "Conference", "MetaAnalysis",
"Study", "Book", "BookSection", "Dataset", "ClinicalTrial"
]
selected_pub_types = st.multiselect(
"Publication Types",
pub_types,
default=self.search_state.get("filters", {}).get("publication_types", ["JournalArticle"])
)
if selected_pub_types:
self.search_state["filters"]["publication_types"] = selected_pub_types
# Open access only
open_access = st.checkbox(
"Open Access Only",
value=self.search_state.get("filters", {}).get("open_access", False)
)
self.search_state["filters"]["open_access"] = open_access
# Year range
st.subheader("Year Range")
col1, col2 = st.columns(2)
with col1:
start_year = st.number_input(
"From",
min_value=1900,
max_value=datetime.now().year,
value=self.search_state.get("filters", {}).get("start_year", 2010)
)
with col2:
end_year = st.number_input(
"To",
min_value=1900,
max_value=datetime.now().year,
value=self.search_state.get("filters", {}).get("end_year", datetime.now().year)
)
if start_year and end_year:
self.search_state["filters"]["start_year"] = start_year
self.search_state["filters"]["end_year"] = end_year
self.search_state["filters"]["year_range"] = f"{start_year}-{end_year}"
# Minimum citations
min_citations = st.number_input(
"Minimum Citations",
min_value=0,
value=self.search_state.get("filters", {}).get("min_citation_count", 0)
)
self.search_state["filters"]["min_citation_count"] = min_citations
# Fields of study
fields_of_study = [
"Computer Science", "Medicine", "Chemistry", "Biology",
"Mathematics", "Engineering", "Psychology", "Physics",
"Economics", "Business", "Political Science", "Environmental Science"
]
selected_fields = st.multiselect(
"Fields of Study",
fields_of_study,
default=self.search_state.get("filters", {}).get("fields_of_study", [])
)
if selected_fields:
self.search_state["filters"]["fields_of_study"] = selected_fields
# Results per page
results_per_page = st.select_slider(
"Results Per Page",
options=[5, 10, 15, 20, 25, 30],
value=self.search_state.get("results_per_page", 10)
)
self.search_state["results_per_page"] = results_per_page
# Reset filters button
if st.button("Reset Filters"):
self.search_state["filters"] = {}
self.search_state["results_per_page"] = 10
st.rerun()
def search_interface(self):
"""Renders the main search interface with query input and search button."""
with st.form("search_form", clear_on_submit=False):
col1, col2 = st.columns([3, 1])
with col1:
query = st.text_input(
"Search for papers:",
value=self.search_state.get("last_query", ""),
placeholder="Enter keywords, title, author, etc."
)
submitted = st.form_submit_button("Search")
if submitted and query:
self.search_state["last_query"] = query
self.search_state["page"] = 0 # Reset to first page
self.perform_search(query)
def perform_search(self, query: str):
"""
Executes a search using the Semantic Scholar API with the provided query and filters.
Args:
query (str): The search query string.
"""
with st.spinner("Searching for papers..."):
try:
# Extract filters from search state
filters = self.search_state.get("filters", {})
# Perform the search
results = search_semantic_scholar(
query=query,
limit=self.search_state.get("results_per_page", 10),
publication_types=filters.get("publication_types"),
open_access=filters.get("open_access", False),
min_citation_count=filters.get("min_citation_count"),
year_range=filters.get("year_range"),
fields_of_study=filters.get("fields_of_study")
)
if results:
self.search_results = results
st.success(f"Found {len(results)} papers matching your query")
else:
st.warning("No results found. Try modifying your search query or filters.")
self.search_results = []
except Exception as e:
st.error(f"An error occurred during search: {str(e)}")
print_red(f"Search error: {str(e)}")
self.search_results = []
def display_search_results(self):
"""Displays the search results with paper details and options to add to collections."""
st.subheader("Search Results")
# Summary of results
open_access_count = sum(1 for paper in self.search_results if paper.get('openAccessPdf', {}).get('url'))
st.info(f"Found {len(self.search_results)} papers ({open_access_count} open access)")
# Pagination controls
if len(self.search_results) > 0:
total_pages = 1 # Currently only showing one page of results from the API
current_page = self.search_state.get("page", 0)
# Display papers on the current page
for i, paper in enumerate(self.search_results):
self.display_paper_card(paper, i)
def display_paper_card(self, paper, index):
"""
Displays a single paper as a card with details and action buttons.
Args:
paper (dict): The paper data from Semantic Scholar API
index (int): Index of the paper in the search results
"""
# Extract paper details
title = paper.get('title', 'No Title')
authors = paper.get('authors', [])
author_names = [author.get('name', '') for author in authors]
author_str = ", ".join(author_names) if author_names else "Unknown Authors"
year = paper.get('year', 'Unknown Year')
journal = paper.get('journal', {}).get('name', 'Unknown Journal')
citation_count = paper.get('citationCount', 0)
influential_citation_count = paper.get('influentialCitationCount', 0)
paper_id = paper.get('paperId', '')
paper_url = paper.get('url', '')
doi = paper.get('externalIds', {}).get('DOI', '')
# Check if open access
open_access = paper.get('openAccessPdf', {}).get('url', None)
open_access_icon = "📄" if open_access else "🔒"
# Create card
with st.expander(f"{title} ({year}) {open_access_icon}"):
# Basic information
st.markdown(f"**Title:** {title}")
st.markdown(f"**Authors:** {author_str}")
st.markdown(f"**Year:** {year}")
st.markdown(f"**Journal:** {journal}")
# Citations
st.markdown(f"**Citations:** {citation_count} (Influential: {influential_citation_count})")
# Abstract - Don't use nested expander
abstract = paper.get('abstract', 'No abstract available')
if abstract:
st.markdown("**Abstract:**")
st.markdown(f"<div style='border-left: 2px solid #ccc; padding-left: 10px; margin-bottom: 10px;'>{abstract}</div>", unsafe_allow_html=True)
# TL;DR summary if available
tldr = None
if 'tldr' in paper and isinstance(paper['tldr'], dict):
tldr = paper['tldr'].get('text')
if tldr:
st.markdown(f"**TL;DR:** {tldr}")
# Fields of study
fields = paper.get('fieldsOfStudy', [])
if fields:
st.markdown(f"**Fields:** {', '.join(fields)}")
# External IDs
external_ids = paper.get('externalIds', {})
if external_ids:
id_text = []
for id_type, id_value in external_ids.items():
id_text.append(f"{id_type}: {id_value}")
st.markdown(f"**IDs:** {', '.join(id_text)}")
# Paper actions section
st.markdown("---")
st.markdown("### Access Paper")
col1, col2 = st.columns(2)
# Open access papers - direct download and add
if open_access:
with col1:
st.success("📄 This paper is open access")
st.markdown(f"[View on Semantic Scholar]({paper_url})")
if doi:
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})")
with col2:
# Direct download button
if st.button(f"Download PDF", key=f"dl_{paper_id}"):
with st.spinner("Downloading PDF..."):
success, filepath = self.download_pdf(open_access, f"{paper_id}.pdf")
if success:
st.success(f"Downloaded to {filepath}")
# Automatically add to collection if selected
if self.collection:
self.add_paper_to_collection(paper, self.collection, filepath)
else:
st.error("Failed to download PDF")
# Direct add to collection without downloading
if self.collection:
if st.button(f"Add to {self.collection} without download", key=f"add_nodl_{paper_id}"):
self.add_paper_to_collection(paper, self.collection)
# Non-open access papers - manual workflow
else:
with col1:
st.warning("🔒 This paper requires manual download")
st.markdown(f"[View on Semantic Scholar]({paper_url})")
if doi:
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})")
with col2:
# Add to manual download queue
if st.button(f"I want this paper", key=f"want_{paper_id}"):
self.download_queue[paper_id] = {
"paper": paper,
"collection": self.collection
}
st.info("Added to your download queue. See instructions at the top of the page.")
def show_download_instructions(self):
"""Shows instructions for papers that need manual download."""
with st.expander(" Papers that need manual download", expanded=True):
st.markdown("### Papers Requiring Manual Download")
st.markdown("""
Some papers aren't freely available and need to be downloaded manually. Follow these steps:
1. Click the links below to access the publisher's website
2. Download the PDF file (you may need institutional access)
3. Save the file to your computer
4. Upload the PDF in the 'Upload Downloaded Papers' section below
""")
for paper_id, info in self.download_queue.items():
paper = info["paper"]
title = paper.get('title', 'No Title')
doi = paper.get('externalIds', {}).get('DOI', '')
paper_url = paper.get('url', '')
st.markdown(f"**{title}**")
col1, col2 = st.columns([3, 1])
with col1:
if paper_url:
st.markdown(f"[Semantic Scholar]({paper_url})")
if doi:
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})")
with col2:
if st.button("Remove", key=f"remove_{paper_id}"):
del self.download_queue[paper_id]
st.rerun()
def show_manual_upload_interface(self):
"""Interface for uploading manually downloaded papers."""
st.markdown("---")
st.header("Upload Downloaded Papers")
st.markdown("Upload the PDFs you downloaded manually here:")
with st.form("manual_upload_form", clear_on_submit=True):
uploaded_files = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True)
collection_for_upload = st.selectbox(
"Add to collection:",
self.get_article_collections(),
index=self.get_article_collections().index(self.collection) if self.collection in self.get_article_collections() else 0
)
submitted = st.form_submit_button("Upload and Process")
if submitted and uploaded_files:
with st.spinner("Processing uploaded files..."):
success_count = 0
for pdf_file in uploaded_files:
# Process the uploaded PDF
success = self.process_uploaded_pdf(pdf_file, collection_for_upload)
if success:
success_count += 1
# Remove from download queue if it matches a title
for paper_id in list(self.download_queue.keys()):
if self.download_queue[paper_id]["paper"].get("title", "").lower() in pdf_file.name.lower():
del self.download_queue[paper_id]
st.success(f"Successfully processed {success_count} of {len(uploaded_files)} files")
if success_count > 0:
st.rerun()
def download_pdf(self, url, filename):
"""
Downloads a PDF from a URL and saves it to the download directory.
Args:
url (str): The URL of the PDF to download
filename (str): The filename to save the PDF as
Returns:
tuple: (success, filepath)
"""
try:
# Make sure the filename is safe
safe_filename = "".join(c for c in filename if c.isalnum() or c in "._-") or "paper.pdf"
if not safe_filename.endswith(".pdf"):
safe_filename += ".pdf"
filepath = self.download_dir / safe_filename
print_blue(f"Downloading {url} to {filepath}")
# Download the file
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
# Check if the content is actually a PDF
content_type = response.headers.get('Content-Type', '')
if 'application/pdf' not in content_type.lower() and not url.lower().endswith('.pdf'):
print_red(f"Warning: Content may not be a PDF. Content-Type: {content_type}")
# Save the file
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify the file exists and has content
if not filepath.exists() or filepath.stat().st_size == 0:
print_red(f"Downloaded file is empty or doesn't exist")
return False, None
print_blue(f"Successfully downloaded to {filepath}")
return True, filepath
except Exception as e:
print_red(f"Error downloading PDF: {str(e)}")
return False, None
def process_uploaded_pdf(self, pdf_file, collection_name):
"""
Process an uploaded PDF file and add it to a collection.
Args:
pdf_file (UploadedFile): The uploaded PDF file
collection_name (str): The collection to add the paper to
Returns:
bool: Success or failure
"""
try:
# Create a temporary file to process
processor = PDFProcessor(
pdf_file=pdf_file,
filename=pdf_file.name,
process=False,
username=self.username,
document_type="other_documents",
)
_id, db, doi = processor.process_document()
print_blue(f"Processed document: ID={_id}, DB={db}, DOI={doi}")
if _id:
# Add to collection
self.articles2collection(collection=collection_name, db=db, _id=_id)
return True
return False
except Exception as e:
print_red(f"Error processing PDF: {str(e)}")
st.error(f"Failed to process {pdf_file.name}: {str(e)}")
return False
def add_paper_to_collection(self, paper, collection_name, filepath=None):
"""
Adds a paper from search results to the selected collection.
Args:
paper (dict): The paper data from Semantic Scholar API
collection_name (str): Name of the collection to add the paper to
filepath (str, optional): Path to downloaded PDF file
"""
# If we have a filepath, process the PDF
if filepath:
# Debug information about the file
if not os.path.exists(filepath):
st.error(f"File does not exist: {filepath}")
print_red(f"File does not exist: {filepath}")
# Fall back to metadata-only
else:
file_size = os.path.getsize(filepath)
st.info(f"Processing PDF file: {filepath} ({file_size} bytes)")
print_blue(f"Processing PDF file: {filepath} ({file_size} bytes)")
with open(filepath, 'rb') as f:
header = f.read(10)
is_pdf = header.startswith(b'%PDF-')
print_blue(f"File header check - Is PDF: {is_pdf}, Header: {header}")
# Read the entire file content
f.seek(0)
pdf_content = f.read()
# Create PDFProcessor with content bytes instead of file handle
st.info("Creating PDFProcessor...")
processor = PDFProcessor(
pdf_file=pdf_content, # Pass content bytes instead of file handle
filename=os.path.basename(filepath),
process=False,
username=self.username,
document_type="other_documents",
is_sci=True
)
st.info("Checking if processor was created successfully...")
if processor:
print_blue(f"PDFProcessor created: {processor}")
st.info("Processing document...")
_id, db, doi = processor.process_document() #!
print_blue(f"Document processed: ID={_id}, DB={db}, DOI={doi}")
if _id:
self.articles2collection(collection=collection_name, db=db, _id=_id)
st.success(f"Added '{paper.get('title')}' to {collection_name}")
return
else:
st.warning("Process document returned no ID. Falling back to metadata.")
else:
st.warning("PDFProcessor creation failed. Falling back to metadata.")
# Add directly using metadata if file processing failed or no filepath
st.info("Adding paper using metadata only")
paper_info = {
"_id": f"sci_articles/{paper.get('paperId', '')}",
"doi": paper.get('externalIds', {}).get('DOI', ''),
"metadata": {
"title": paper.get('title', 'No Title'),
"journal": paper.get('journal', {}).get('name', 'Unknown Journal'),
"published_year": paper.get('year', ''),
"published_date": paper.get('publicationDate', ''),
"authors": [author.get('name', '') for author in paper.get('authors', [])],
"abstract": paper.get('abstract', ''),
"url": paper.get('url', ''),
"open_access_url": paper.get('openAccessPdf', {}).get('url', ''),
"citation_count": paper.get('citationCount', 0),
"fields_of_study": paper.get('fieldsOfStudy', []),
}
}
# Check if collection exists
doc_cursor = self.user_arango.db.aql.execute(
f'FOR doc IN article_collections FILTER doc["name"] == "{collection_name}" RETURN doc'
)
doc = next(doc_cursor, None)
if doc:
# Check if paper already exists in the collection
articles = doc.get("articles", [])
for article in articles:
if article.get("_id") == paper_info["_id"] or article.get("doi") == paper_info["doi"]:
st.warning(f"This paper is already in the '{collection_name}' collection.")
return
# Add paper to collection
articles.append(paper_info)
self.user_arango.db.collection("article_collections").update_match(
filters={"name": collection_name},
body={"articles": articles},
merge=True,
)
st.success(f"Added '{paper.get('title')}' to {collection_name}")
# Persist state after adding paper
self.update_session_state(page_name=self.page_name)
else:
st.error(f"Collection '{collection_name}' not found.")
def articles2collection(self, collection: str, db: str, _id: str = None) -> None:
"""
Add an article to a collection by retrieving its info from the database.
Args:
collection (str): The collection name
db (str): The database name
_id (str): The article ID
"""
info = self.get_article_info(db, _id=_id)
info = {
k: v for k, v in info.items() if k in ["_id", "doi", "title", "metadata"]
}
doc_cursor = self.user_arango.db.aql.execute(
f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc'
)
doc = next(doc_cursor, None)
if doc:
articles = doc.get("articles", [])
keys = [i["_id"] for i in articles]
if info["_id"] not in keys:
articles.append(info)
self.user_arango.db.collection("article_collections").update_match(
filters={"name": collection},
body={"articles": articles},
merge=True,
)
# Persist state after updating articles
self.update_session_state(page_name=self.page_name)
def get_article_info(self, db: str, _id: str = None, doi: str = None) -> dict:
"""
Get article info from the database.
Args:
db (str): The database name
_id (str, optional): The article ID
doi (str, optional): The article DOI
Returns:
dict: The article info
"""
assert _id or doi, "Either _id or doi must be provided."
arango = self.get_arango(db_name=db)
if _id:
query = """
RETURN {
"_id": DOCUMENT(@doc_id)._id,
"doi": DOCUMENT(@doc_id).doi,
"title": DOCUMENT(@doc_id).title,
"metadata": DOCUMENT(@doc_id).metadata,
"summary": DOCUMENT(@doc_id).summary
}
"""
info_cursor = arango.db.aql.execute(query, bind_vars={"doc_id": _id})
elif doi:
info_cursor = arango.db.aql.execute(
f'FOR doc IN sci_articles FILTER doc["doi"] == "{doi}" LIMIT 1 RETURN {{"_id": doc["_id"], "doi": doc["doi"], "title": doc["title"], "metadata": doc["metadata"], "summary": doc["summary"]}}'
)
return next(info_cursor, None)