Mostly working version, before Arango API

main
lasseedfast 7 months ago
parent 638e2a00d3
commit 5ee1a062f1
  1. 1
      _base_class.py
  2. 440
      _llm copy.py
  3. 43
      _llm.py
  4. 1339
      agent_research.py
  5. 470
      article2db.py
  6. 3
      chat_page.py
  7. 85
      llm_queries.py
  8. 53
      research_page.py
  9. 369
      semantic_schoolar.py
  10. 6
      streamlit_app.py
  11. 29
      streamlit_chatbot.py
  12. 14
      streamlit_pages.py
  13. 345
      streamlit_rss_old.py
  14. 692
      streamlit_search_paper_page.py

@ -22,7 +22,6 @@ class BaseClass:
elif admin:
return ArangoDB()
else:
from colorprinter.print_color import print_yellow
return ArangoDB(user=self.username, db_name=self.username)
def get_article_collections(self) -> list:

@ -1,440 +0,0 @@
import os
import base64
import re
import json
from typing import Any, Callable, Iterator, Literal, Mapping, Optional, Sequence, Union
import tiktoken
from ollama import Client, AsyncClient, ResponseError, ChatResponse, Message, Tool, Options
from ollama._types import JsonSchemaValue, ChatRequest
import env_manager
from colorprinter.print_color import *
env_manager.set_env()
tokenizer = tiktoken.get_encoding("cl100k_base")
# Define a base class for common functionality
class BaseClient:
def chat(
self,
model: str = '',
messages: Optional[Sequence[Union[Mapping[str, Any], Message]]] = None,
*,
tools: Optional[Sequence[Union[Mapping[str, Any], Tool, Callable]]] = None,
stream: bool = False,
format: Optional[Union[Literal['', 'json'], JsonSchemaValue]] = None,
options: Optional[Union[Mapping[str, Any], Options]] = None,
keep_alive: Optional[Union[float, str]] = None,
) -> Union[ChatResponse, Iterator[ChatResponse]]:
return self._request(
ChatResponse,
'POST',
'/api/chat',
json=ChatRequest(
model=model,
messages=[message for message in messages or []],
tools=[tool for tool in tools or []],
stream=stream,
format=format,
options=options,
keep_alive=keep_alive,
).model_dump(exclude_none=True),
stream=stream,
)
# Define your custom MyAsyncClient class
class MyAsyncClient(AsyncClient, BaseClient):
async def _request(self, response_type, method, path, headers=None, **kwargs):
# Merge default headers with per-call headers
all_headers = {**self._client.headers, **(headers or {})}
# Handle streaming separately
if kwargs.get('stream'):
kwargs.pop('stream')
async with self._client.stream(method, path, headers=all_headers, **kwargs) as response:
self.last_response = response # Store the response object
if response.status_code >= 400:
await response.aread()
raise ResponseError(response.text, response.status_code)
return self._stream(response_type, response)
else:
# Make the HTTP request with the combined headers
kwargs.pop('stream')
response = await self._request_raw(method, path, headers=all_headers, **kwargs)
self.last_response = response # Store the response object
if response.status_code >= 400:
raise ResponseError(response.text, response.status_code)
return response_type.model_validate_json(response.content)
async def chat(
self,
model: str = '',
messages: Optional[Sequence[Union[Mapping[str, Any], Message]]] = None,
*,
tools: Optional[Sequence[Union[Mapping[str, Any], Tool, Callable]]] = None,
stream: bool = False,
format: Optional[Union[Literal['', 'json'], JsonSchemaValue]] = None,
options: Optional[Union[Mapping[str, Any], Options]] = None,
keep_alive: Optional[Union[float, str]] = None,
) -> Union[ChatResponse, Iterator[ChatResponse]]:
return await self._request(
ChatResponse,
'POST',
'/api/chat',
json=ChatRequest(
model=model,
messages=[message for message in messages or []],
tools=[tool for tool in tools or []],
stream=stream,
format=format,
options=options,
keep_alive=keep_alive,
).model_dump(exclude_none=True),
stream=stream,
)
# Define your custom MyClient class
class MyClient(Client, BaseClient):
def _request(self, response_type, method, path, headers=None, **kwargs):
# Merge default headers with per-call headers
all_headers = {**self._client.headers, **(headers or {})}
# Handle streaming separately
if kwargs.get('stream'):
kwargs.pop('stream')
with self._client.stream(method, path, headers=all_headers, **kwargs) as response:
self.last_response = response # Store the response object
if response.status_code >= 400:
raise ResponseError(response.text, response.status_code)
return self._stream(response_type, response)
else:
# Make the HTTP request with the combined headers
kwargs.pop('stream')
response = self._request_raw(method, path, headers=all_headers, **kwargs)
self.last_response = response # Store the response object
if response.status_code >= 400:
raise ResponseError(response.text, response.status_code)
return response_type.model_validate_json(response.content)
class LLM:
"""
LLM class for interacting with a language model.
"""
def __init__(
self,
system_message="You are an assistant.",
temperature=0.01,
model: Optional[Literal["small", "standard", "vision"]] = "standard",
max_length_answer=4096,
messages=None,
chat=True,
chosen_backend=None,
) -> None:
self.model = self.get_model(model)
self.system_message = system_message
self.options = {"temperature": temperature}
self.messages = messages or [{"role": "system", "content": self.system_message}]
self.max_length_answer = max_length_answer
self.chat = chat
self.chosen_backend = chosen_backend
# Initialize the client with the host and default headers
credentials = f"{os.getenv('LLM_API_USER')}:{os.getenv('LLM_API_PWD_LASSE')}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
default_headers = {
"Authorization": f"Basic {encoded_credentials}",
}
host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/")
self.client = MyClient(host=host_url, headers=default_headers)
self.async_client = MyAsyncClient(host=host_url, headers=default_headers)
def get_model(self, model_alias):
models = {
"standard": "LLM_MODEL",
"small": "LLM_MODEL_SMALL",
"vision": "LLM_MODEL_VISION",
"standard_64k": "LLM_MODEL_64K",
}
return os.getenv(models.get(model_alias, "LLM_MODEL"))
def count_tokens(self):
num_tokens = 0
for i in self.messages:
for k, v in i.items():
if k == "content":
if not isinstance(v, str):
v = str(v)
tokens = tokenizer.encode(v)
num_tokens += len(tokens)
return int(num_tokens)
def generate(
self,
query: str = None,
user_input: str = None,
context: str = None,
stream: bool = False,
tools: list = None,
function_call: dict = None,
images: list = None,
model: Optional[Literal["small", "standard", "vision"]] = None,
temperature: float = None,
):
"""
Generates a response from the language model based on the provided inputs.
"""
# Prepare the model and temperature
model = self.get_model(model) if model else self.model
temperature = temperature if temperature else self.options["temperature"]
# Normalize whitespace and add the query to the messages
query = re.sub(r"\s*\n\s*", "\n", query)
message = {"role": "user", "content": query}
# Handle images if any
if images:
import base64
base64_images = []
base64_pattern = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")
for image in images:
if isinstance(image, str):
if base64_pattern.match(image):
base64_images.append(image)
else:
with open(image, "rb") as image_file:
base64_images.append(
base64.b64encode(image_file.read()).decode("utf-8")
)
elif isinstance(image, bytes):
base64_images.append(base64.b64encode(image).decode("utf-8"))
else:
print_red("Invalid image type")
message["images"] = base64_images
# Use the vision model
model = self.get_model("vision")
self.messages.append(message)
# Prepare headers
headers = {}
if self.chosen_backend:
headers["X-Chosen-Backend"] = self.chosen_backend
if model == self.get_model("small"):
headers["X-Model-Type"] = "small"
# Prepare options
options = Options(**self.options)
options.temperature = temperature
# Prepare tools if any
if tools:
tools = [
Tool(**tool) if isinstance(tool, dict) else tool
for tool in tools
]
# Adjust the options for long messages
if self.chat or len(self.messages) > 15000:
num_tokens = self.count_tokens() + self.max_length_answer // 2
if num_tokens > 8000:
model = self.get_model("standard_64k")
headers["X-Model-Type"] = "large"
# Call the client.chat method
try:
response = self.client.chat(
model=model,
messages=self.messages,
headers=headers,
tools=tools,
stream=stream,
options=options,
keep_alive=3600 * 24 * 7,
)
except ResponseError as e:
print_red("Error!")
print(e)
return "An error occurred."
# If user_input is provided, update the last message
if user_input:
if context:
if len(context) > 2000:
context = self.make_summary(context)
user_input = (
f"{user_input}\n\nUse the information below to answer the question.\n"
f'"""{context}"""\n[This is a summary of the context provided in the original message.]'
)
system_message_info = "\nSometimes some of the messages in the chat history are summarised, then that is clearly indicated in the message."
if system_message_info not in self.messages[0]["content"]:
self.messages[0]["content"] += system_message_info
self.messages[-1] = {"role": "user", "content": user_input}
self.chosen_backend = self.client.last_response.headers.get("X-Chosen-Backend")
# Handle streaming response
if stream:
return self.read_stream(response)
else:
# Process the response
if isinstance(response, ChatResponse):
result = response.message.content.strip('"')
self.messages.append({"role": "assistant", "content": result.strip('"')})
if tools and not response.message.get("tool_calls"):
print_yellow("No tool calls in response".upper())
if not self.chat:
self.messages = [self.messages[0]]
return result
else:
print_red("Unexpected response type")
return "An error occurred."
def make_summary(self, text):
# Implement your summary logic using self.client.chat()
summary_message = {
"role": "user",
"content": f'Summarize the text below:\n"""{text}"""\nRemember to be concise and detailed. Answer in English.',
}
messages = [
{"role": "system", "content": "You are summarizing a text. Make it detailed and concise. Answer ONLY with the summary. Don't add any new information."},
summary_message,
]
try:
response = self.client.chat(
model=self.get_model("small"),
messages=messages,
options=Options(temperature=0.01),
keep_alive=3600 * 24 * 7,
)
summary = response.message.content.strip()
print_blue("Summary:", summary)
return summary
except ResponseError as e:
print_red("Error generating summary:", e)
return "Summary generation failed."
def read_stream(self, response):
# Implement streaming response handling if needed
buffer = ""
message = ""
first_chunk = True
prev_content = None
for chunk in response:
if chunk:
content = chunk.message.content
if first_chunk and content.startswith('"'):
content = content[1:]
first_chunk = False
if chunk.done:
if prev_content and prev_content.endswith('"'):
prev_content = prev_content[:-1]
if prev_content:
yield prev_content
break
else:
if prev_content:
yield prev_content
prev_content = content
self.messages.append({"role": "assistant", "content": message.strip('"')})
async def async_generate(
self,
query: str = None,
user_input: str = None,
context: str = None,
stream: bool = False,
tools: list = None,
function_call: dict = None,
images: list = None,
model: Optional[Literal["small", "standard", "vision"]] = None,
temperature: float = None,
):
"""
Asynchronous method to generate a response from the language model.
"""
# Prepare the model and temperature
model = self.get_model(model) if model else self.model
temperature = temperature if temperature else self.options["temperature"]
# Normalize whitespace and add the query to the messages
query = re.sub(r"\s*\n\s*", "\n", query)
message = {"role": "user", "content": query}
# Handle images if any
if images:
# (Image handling code as in the generate method)
...
self.messages.append(message)
# Prepare headers
headers = {}
if self.chosen_backend:
headers["X-Chosen-Backend"] = self.chosen_backend
if model == self.get_model("small"):
headers["X-Model-Type"] = "small"
# Prepare options
options = Options(**self.options)
options.temperature = temperature
# Prepare tools if any
if tools:
tools = [
Tool(**tool) if isinstance(tool, dict) else tool
for tool in tools
]
# Adjust options for long messages
# (Adjustments as needed)
...
# Call the async client's chat method
try:
response = await self.async_client.chat(
model=model,
messages=self.messages,
tools=tools,
stream=stream,
options=options,
keep_alive=3600 * 24 * 7,
)
except ResponseError as e:
print_red("Error!")
print(e)
return "An error occurred."
# Process the response
if isinstance(response, ChatResponse):
result = response.message.content.strip('"')
self.messages.append({"role": "assistant", "content": result.strip('"')})
return result
else:
print_red("Unexpected response type")
return "An error occurred."
# Usage example
if __name__ == "__main__":
import asyncio
llm = LLM()
async def main():
result = await llm.async_generate(query="Hello, how are you?")
print(result)
asyncio.run(main())

@ -172,7 +172,8 @@ class LLM:
] = None,
temperature: float = None,
messages: list[dict] = None,
format = None
format = None,
think = False
):
"""
Generate a response based on the provided query and context.
@ -187,15 +188,18 @@ class LLM:
temperature (float): The temperature setting for the model.
messages (list[dict]): List of previous messages in the conversation.
format (Optional[BaseModel]): The format of the response.
think (bool): Whether to use the reasoning model.
Returns:
str: The generated response or an error message if an exception occurs.
"""
print_yellow(stream)
print_yellow("GENERATE")
# Prepare the model and temperature
model = self.get_model(model) if model else self.model
if model == self.get_model('tools'):
stream = False
# if model == self.get_model('tools'):
# stream = False
temperature = temperature if temperature else self.options["temperature"]
if messages:
@ -227,21 +231,19 @@ class LLM:
headers["X-Model-Type"] = "small"
if model == self.get_model("tools"):
headers["X-Model-Type"] = "tools"
elif model == self.get_model("reasoning"):
headers["X-Model-Type"] = "reasoning"
reasoning_models = ['qwen3', 'deepseek'] #TODO Add more reasoning models here when added to ollama
if any([model_name in model for model_name in reasoning_models]):
if think:
query = f"/think\n{query}"
else:
query = f"/no_think\n{query}"
# Prepare options
options = Options(**self.options)
options.temperature = temperature
#TODO This is a bit of a hack to get the reasoning model to work. It should be handled better.
# # Adjust the options for long messages
# if self.chat or len(self.messages) > 15000 and model != self.get_model("tools"):
# num_tokens = self.count_tokens()
# if num_tokens > 8000:
# model = self.get_model("standard_64k")
# print_purple("Switching to large model")
# headers["X-Model-Type"] = "large"
print_yellow("Stream the answer?", stream)
# Call the client.chat method
try:
@ -261,7 +263,7 @@ class LLM:
keep_alive=3600 * 24 * 7,
format=format
)
except ResponseError as e:
print_red("Error!")
print(e)
@ -286,8 +288,10 @@ class LLM:
# Handle streaming response
if stream:
print_purple("STREAMING")
return self.read_stream(response)
else:
print_purple("NOT STREAMING")
# Process the response
if isinstance(response, ChatResponse):
result = response.message.content.strip('"')
@ -300,6 +304,9 @@ class LLM:
print_yellow("No tool calls in response".upper())
if not self.chat:
self.messages = [self.messages[0]]
if not think:
response.message.content = remove_thinking(response.message.content)
return response.message
else:
print_red("Unexpected response type")
@ -550,7 +557,13 @@ class LLM:
return message
def remove_thinking(response):
"""Remove the thinking section from the response"""
response_text = response.content if hasattr(response, "content") else str(response)
if "</think>" in response_text:
return response_text.split("</think>")[1].strip()
return response_text
if __name__ == "__main__":
llm = LLM()

File diff suppressed because it is too large Load Diff

@ -23,6 +23,7 @@ from _chromadb import ChromaDB
from _llm import LLM
from colorprinter.print_color import *
from utils import fix_key
import semantic_schoolar
class Document:
@ -596,17 +597,320 @@ class Processor:
)
return
def get_semantic_scholar_by_doi(self, doi):
"""Use Semantic Scholar API to get metadata by DOI and verify it matches the document.
Performs verification to ensure the paper matches the document before accepting metadata.
Returns:
--------
dict or None
Metadata if paper is found and verified, None otherwise
"""
try:
paper = semantic_schoolar.get_paper_details(doi)
if not paper:
print_yellow(f"No paper found in Semantic Scholar for DOI: {doi}")
return None
print_green(f"Found potential paper match by DOI: '{paper.get('title')}'")
# Verification step - just because a DOI appears in the document doesn't mean it's the document's DOI
# Extract key information for verification
authors = []
if "authors" in paper:
authors = [author.get("name") for author in paper.get("authors", [])]
title = paper.get('title')
# Perform verification against document content
is_verified = False
confidence_reasons = []
if self.document.pdf:
# Extract text from first few pages
verification_text = ""
for page in self.document.pdf.pages(0, min(5, len(self.document.pdf))):
verification_text += page.get_text()
# Check if any authors appear in text (especially on first pages)
author_matches = []
for author in authors:
if author in verification_text:
author_matches.append(author)
if author_matches:
is_verified = True
confidence_reasons.append(f"Author(s) found in document: {', '.join(author_matches)}")
# Check title similarity
if title and self.document.title:
from difflib import SequenceMatcher
similarity = SequenceMatcher(None, title.lower(), self.document.title.lower()).ratio()
if similarity > 0.7: # Good similarity threshold
is_verified = True
confidence_reasons.append(f"Title similarity: {similarity:.2f}")
# If title from metadata matches PDF metadata exactly, that's a strong signal
if title and self.document.get_title(only_meta=True) and title == self.document.get_title(only_meta=True):
is_verified = True
confidence_reasons.append("Title in PDF metadata matches exactly")
# If no verification succeeded but we have the first page text, check if title is near the top
if not is_verified and title:
# Get just the first page text for a more focused check
first_page_text = self.document.pdf.pages(0, 1)[0].get_text()
# Check if title appears near the beginning of the document
if title.lower() in first_page_text.lower()[:500]:
is_verified = True
confidence_reasons.append("Title appears at beginning of document")
if is_verified or not self.document.pdf:
if confidence_reasons:
print_green(f"Paper verified: {', '.join(confidence_reasons)}")
elif not self.document.pdf:
print_yellow("No PDF available for verification, proceeding with metadata")
# Transform the response to match our metadata structure
journal_name = None
if "journal" in paper and paper["journal"]:
journal_name = paper["journal"].get("name")
metadata = {
"doi": doi,
"title": title,
"authors": authors,
"abstract": paper.get("abstract"),
"journal": journal_name,
"volume": None, # Not directly provided in response
"issue": None, # Not directly provided in response
"pages": None, # Not directly provided in response
"published_date": paper.get("publicationDate"),
"published_year": paper.get("year"),
"url_doi": f"https://doi.org/{doi}",
"link": paper.get("url"),
"semantic_scholar_url": paper.get("url"),
"open_access": paper.get("isOpenAccess", False),
"semantic_scholar_id": paper.get("paperId"),
"language": None, # Not directly provided in response
"verification": {
"verified": is_verified,
"reasons": confidence_reasons
}
}
print_green(f"Metadata retrieved from Semantic Scholar for DOI {doi}")
self.document.metadata = metadata
self.document.is_sci = True
return metadata
else:
print_yellow("Paper match could not be verified in document text. This DOI might be a reference, not the document's DOI.")
return None
except Exception as e:
print_yellow(f"Error retrieving metadata from Semantic Scholar: {e}")
return None
def get_semantic_scholar_by_title(self, title):
"""
Use Semantic Scholar API to get metadata by title and verify it matches the document
Returns metadata if the paper is found and verified, None otherwise
"""
try:
paper = semantic_schoolar.search_paper_by_title(title)
if not paper:
print_yellow(f"No paper found in Semantic Scholar for title: {title}")
return None
print_green(f"Found potential paper match: '{paper.get('title')}'")
# Extract DOI and authors for verification
doi = None
if "externalIds" in paper and paper["externalIds"] and "DOI" in paper["externalIds"]:
doi = paper["externalIds"]["DOI"]
authors = []
if "authors" in paper:
authors = [author.get("name") for author in paper.get("authors", [])]
# Verification step - extract text from first few pages of PDF
is_verified = False
confidence_reasons = []
verification_score = 0
if self.document.pdf:
# Extract text from first few pages
verification_text = ""
first_page_text = ""
try:
first_page = self.document.pdf.pages(0, 1)[0].get_text()
first_page_text = first_page
verification_text = first_page
# Include a few more pages for better verification coverage
for page in self.document.pdf.pages(1, min(5, len(self.document.pdf))):
verification_text += page.get_text()
except Exception as e:
print_yellow(f"Error extracting text from PDF: {e}")
# Check if DOI appears in text - BUT DOI appearing doesn't necessarily mean it's this paper's DOI
# It could be a citation, so we need multiple verification points
if doi and doi in verification_text:
# DOI found, but let's see if it appears to be the document's DOI or a citation
# If it appears in first 500 chars, more likely to be the paper's DOI
if doi in first_page_text[:500]:
verification_score += 3
confidence_reasons.append(f"DOI {doi} found at beginning of document")
else:
verification_score += 1
confidence_reasons.append(f"DOI {doi} found in document but may be a citation")
# Check if any authors appear in text
author_matches = []
for author in authors:
if author in verification_text:
author_matches.append(author)
# Author in first page gets higher score
if author in first_page_text:
verification_score += 2
else:
verification_score += 1
if author_matches:
confidence_reasons.append(f"Author(s) found in document: {', '.join(author_matches)}")
# Check title similarity - strong signal
found_title = paper.get('title')
if found_title and self.document.title:
from difflib import SequenceMatcher
similarity = SequenceMatcher(None, found_title.lower(), self.document.title.lower()).ratio()
confidence_reasons.append(f"Title similarity: {similarity:.2f}")
if similarity > 0.9: # Very high similarity
verification_score += 4
elif similarity > 0.8: # High similarity
verification_score += 3
elif similarity > 0.7: # Good similarity
verification_score += 2
elif similarity > 0.5: # Moderate similarity
verification_score += 1
# Check PDF metadata title
if found_title and self.document.get_title(only_meta=True):
pdf_meta_title = self.document.get_title(only_meta=True)
similarity = SequenceMatcher(None, found_title.lower(), pdf_meta_title.lower()).ratio()
if similarity > 0.8:
verification_score += 3
confidence_reasons.append(f"Title in PDF metadata matches (similarity: {similarity:.2f})")
# Look for title text in the document, especially near the beginning
if found_title:
# Perform partial fuzzy matching for title in first page
title_words = [word.lower() for word in found_title.split() if len(word) > 3]
title_word_matches = 0
for word in title_words:
if word.lower() in first_page_text.lower():
title_word_matches += 1
title_word_ratio = title_word_matches / len(title_words) if title_words else 0
if title_word_ratio > 0.7:
verification_score += 3
confidence_reasons.append(f"Most title keywords found in first page ({title_word_ratio:.2f})")
elif title_word_ratio > 0.5:
verification_score += 2
confidence_reasons.append(f"Some title keywords found in first page ({title_word_ratio:.2f})")
# Year verification if available
if "year" in paper and paper["year"]:
paper_year = str(paper["year"])
if paper_year in first_page_text:
verification_score += 1
confidence_reasons.append(f"Publication year {paper_year} found in document")
# Journal verification if available
journal_name = None
if "journal" in paper and paper["journal"] and paper["journal"].get("name"):
journal_name = paper["journal"].get("name")
if journal_name and journal_name in verification_text:
verification_score += 2
confidence_reasons.append(f"Journal name '{journal_name}' found in document")
# Final verification decision based on cumulative score
if verification_score >= 5:
is_verified = True
print_green(f"Paper verified with score {verification_score}/10")
else:
print_yellow(f"Paper verification score too low: {verification_score}/10")
# If not verified but we have a DOI, we can still try getting paper by DOI
# But we'll pass the verification context to avoid accepting incorrect metadata
if not is_verified and doi:
print_yellow(f"Paper match not verified by title, trying to get and verify metadata by DOI {doi}")
return self.get_semantic_scholar_by_doi(doi)
# If verified or no PDF for verification, proceed with the metadata
if is_verified or not self.document.pdf:
if confidence_reasons:
print_green(f"Paper verified: {', '.join(confidence_reasons)}")
elif not self.document.pdf:
print_yellow("No PDF available for verification, proceeding with metadata")
# If DOI found, get complete metadata through DOI endpoint (with verification)
if doi:
return self.get_semantic_scholar_by_doi(doi)
# Otherwise build metadata from the search result
journal_name = None
if "journal" in paper and paper["journal"]:
journal_name = paper["journal"].get("name")
metadata = {
"doi": doi,
"title": paper.get("title"),
"authors": authors,
"abstract": paper.get("abstract"),
"journal": journal_name,
"volume": None,
"issue": None,
"pages": None,
"published_date": paper.get("publicationDate"),
"published_year": paper.get("year"),
"url_doi": f"https://doi.org/{doi}" if doi else None,
"link": paper.get("url"),
"semantic_scholar_url": paper.get("url"),
"semantic_scholar_id": paper.get("paperId"),
"language": None,
"verification": {
"verified": is_verified,
"reasons": confidence_reasons,
"score": verification_score
}
}
print_green(f"Metadata retrieved from Semantic Scholar by title match")
self.document.metadata = metadata
self.document.is_sci = True
return metadata
else:
print_yellow(f"Paper match could not be verified in document text (score: {verification_score}/10)")
return None
except Exception as e:
print_yellow(f"Error retrieving metadata from Semantic Scholar by title: {e}")
return None
def process_document(self):
assert self.document.pdf_file or self.document.pdf, "PDF file must be provided."
if not self.document.pdf:
self.document.open_pdf(self.document.pdf_file)
if self.document.is_image:
return pymupdf4llm.to_markdown(
self.document.pdf, page_chunks=False, show_progress=False
)
self.document.title = self.document.get_title()
# Try to get DOI from filename or text
if not self.document.doi and self.document.filename:
self.document.doi = self.extract_doi(self.document.filename)
if not self.document.doi:
@ -614,17 +918,33 @@ class Processor:
for page in self.document.pdf.pages(0, 6):
text += page.get_text()
self.document.doi = self.extract_doi(text)
# If we have a DOI, try to get metadata
if self.document.doi:
self.document._key = fix_key(self.document.doi)
if self.check_doaj(self.document.doi):
self.document.open_access = True
self.document.is_sci = True
self.document.metadata = self.get_crossref(self.document.doi)
# Try Semantic Scholar first
self.document.metadata = self.get_semantic_scholar_by_doi(self.document.doi)
# If no metadata from Semantic Scholar, try CrossRef
if not self.document.metadata:
self.document.metadata = self.get_crossref(self.document.doi)
if not self.document.is_sci:
self.document.is_sci = bool(self.document.metadata)
# If still no metadata but we have a title, try title search
if not self.document.metadata and self.document.title:
self.document.metadata = self.get_semantic_scholar_by_title(self.document.title)
# Continue with the rest of the method...
arango_collection = self.get_arango()
# ... rest of the method remains the same ...
doc = arango_collection.get(self.document._key) if self.document.doi else None
@ -737,20 +1057,30 @@ class Processor:
await browser.close()
def doi2pdf(self, doi):
url = None
downloaded = False
path = None
in_db = False
sci_articles = self.get_arango(db_name="base", document_type="sci_articles")
if sci_articles.has(fix_key(doi)):
in_db = True
downloaded = True
doc = sci_articles.get(fix_key(doi))
url = doc["metadata"]["link"]
path = doc["file"]
print_green(f"Article {doi} already in database.")
return downloaded, url, doc["file"], in_db
"""
Try to get a PDF for a DOI by:
1. First checking if it's already in the database
2. Then trying to download from Semantic Scholar's open access PDFs (preferred source)
3. Falling back to DOAJ and other sources if needed
Returns:
--------
tuple: (downloaded, url, path, in_db)
- downloaded: Boolean indicating if download was successful
- url: The URL that was used (or attempted to use)
- path: Path to the downloaded file if successful
- in_db: Boolean indicating if the paper is already in the database
"""
# First check if we can get it from Semantic Scholar
downloaded, url, path, in_db = self.download_from_semantic_scholar(doi)
if downloaded:
print_green(f"Successfully downloaded PDF for {doi} from Semantic Scholar")
return downloaded, url, path, in_db
# If not available in Semantic Scholar, try the original methods
print_blue(f"Could not download from Semantic Scholar, trying other sources...")
# Check DOAJ for open access articles
doaj_data = self.check_doaj(doi)
sleep(0.5)
if doaj_data:
@ -759,27 +1089,29 @@ class Processor:
r = requests.get(link["url"])
soup = BeautifulSoup(r.content, "html.parser")
pdf_link_html = soup.find("a", {"class": "UD_ArticlePDF"})
pdf_url = "https://www.mdpi.com" + pdf_link_html["href"]
pdf = requests.get(pdf_url)
path = f"sci_articles/{doi}.pdf".replace("/", "_")
with open(path, "wb") as f:
f.write(pdf.content)
self.process_document()
print(f"Downloaded PDF for {doi}")
downloaded = True
url = link["url"]
if pdf_link_html and pdf_link_html.get("href"):
pdf_url = "https://www.mdpi.com" + pdf_link_html["href"]
pdf = requests.get(pdf_url)
path = f"sci_articles/{doi}.pdf".replace("/", "_")
with open(path, "wb") as f:
f.write(pdf.content)
print_green(f"Downloaded PDF for {doi} from MDPI")
downloaded = True
url = link["url"]
break
else:
downloaded = False
else:
# If still not downloaded, try to get metadata with a link
if not downloaded and not url:
metadata = self.get_crossref(doi)
if metadata:
url = metadata["link"]
print_blue(f"Could not download PDF, but found URL: {url}")
else:
print(f"Error fetching metadata for DOI: {doi}")
print_yellow(f"Error fetching metadata for DOI: {doi}")
return downloaded, url, path, in_db
@ -817,6 +1149,80 @@ class PDFProcessor(Processor):
document_type=document_type,
)
def download_from_semantic_scholar(self, doi):
"""
Try to download a paper from Semantic Scholar using its open access URL.
Parameters:
-----------
doi : str
The DOI of the paper to download
Returns:
--------
tuple: (downloaded, url, path, in_db)
- downloaded: Boolean indicating if download was successful
- url: The URL that was used (or attempted to use)
- path: Path to the downloaded file if successful
- in_db: Boolean indicating if the paper is already in the database
"""
try:
# Check if paper is in database
sci_articles = self.get_arango(db_name="base", document_type="sci_articles")
# Check if the DOI is already in the database
if sci_articles.has(fix_key(doi)):
in_db = True
doc = sci_articles.get(fix_key(doi))
url = doc["metadata"].get("link") or doc.get("semantic_scholar_url")
print_green(f"Article {doi} already in database.")
return True, url, doc["file"], in_db
else:
in_db = False
print_blue(f"Checking Semantic Scholar for open access PDF for DOI {doi}")
paper = semantic_schoolar.get_paper_details(doi, fields=["openAccessPdf"])
# Check if open access PDF is available
if paper and 'openAccessPdf' in paper and paper['openAccessPdf'] and 'url' in paper['openAccessPdf']:
pdf_url = paper['openAccessPdf']['url']
print_green(f"Found open access PDF for {doi} at {pdf_url}")
# Download the PDF
try:
response = requests.get(pdf_url, timeout=30)
if response.status_code == 200 and 'application/pdf' in response.headers.get('Content-Type', ''):
# Save to file
path = f"sci_articles/{doi}.pdf".replace("/", "_")
with open(path, "wb") as f:
f.write(response.content)
# Verify it's a PDF
if path.endswith(".pdf") and os.path.exists(path) and os.path.getsize(path) > 1000:
print_green(f"Successfully downloaded PDF for {doi} from Semantic Scholar")
# Process the document
self.document.pdf_file = path
self.document.open_pdf(self.document.pdf_file)
return True, pdf_url, path, in_db
else:
print_yellow(f"Downloaded file doesn't appear to be a valid PDF")
if os.path.exists(path):
os.remove(path)
else:
print_yellow(f"Failed to download PDF: Status {response.status_code}")
except Exception as e:
print_yellow(f"Error downloading PDF from Semantic Scholar: {str(e)}")
# If we couldn't download directly but have a URL from Semantic Scholar
if paper and 'url' in paper:
return False, paper['url'], None, in_db
return False, None, None, in_db
except Exception as e:
print_yellow(f"Error accessing Semantic Scholar API: {str(e)}")
return False, None, None, False
if __name__ == "__main__":
doi = "10.1007/s10584-019-02646-9"

@ -222,11 +222,10 @@ class BotChatPage(StreamlitBaseClass):
def remove_old_unsaved_chats(self):
two_weeks_ago = datetime.now() - timedelta(weeks=2)
q = f'FOR doc IN chats FILTER doc.saved == false AND doc.last_updated < "{two_weeks_ago.isoformat()}" RETURN doc'
print_blue(q)
old_chats = self.user_arango.db.aql.execute(
f'FOR doc IN chats RETURN doc'
)
print('test', old_chats)
old_chats = self.user_arango.db.aql.execute(
f'FOR doc IN chats FILTER doc.saved == false AND doc.last_updated < "{two_weeks_ago.isoformat()}" RETURN doc'
)

@ -0,0 +1,85 @@
def create_plan_questions(agent, question):
query = f"""
A journalist wants to get a report that answers this question: "{question}"
THIS IS *NOT* A QUESTION YOU CAN ANSWER! Instead, you need to split it into multiple questions that can be answered through research.
The questions should be specific and focused on a single aspect of the topic.
For example, if the question is "What are the effects of climate change on agriculture?", you could split it into:
- How does temperature change affect crop yields?
- What are the impacts of changing rainfall patterns on agriculture?
- How does increased CO2 levels affect plant growth?
"""
# Add project notes summary if available
if agent.project and hasattr(agent.project, "notes_summary"):
query += f'''\nTo help you understand the subject, here is a summary of notes the journalist has done: \n"""{agent.project.notes_summary}\n"""\n'''
query += """
Answer ONLY with the questions you have divided the original question into, not the answers to them (this will be done using research in a later step).
If the original question asked by the journalist is already specific, you can keep it as is.
Answer in a structured format with each of your question on a new line.
"""
return query
def create_plan(agent, question):
"""
This function creates a research plan for answering a given question. It should be used after create_plan_questions and be in the same chat.
"""
available_sources_str = ''
for source, count in agent.available_sources.items():
if source == 'scientific articles':
available_sources_str += f'- Scientific articles the journalist has gathered. Number of articles: {count}\n'
elif source == 'other articles':
available_sources_str += f'- Other articles the journalists has gathered, such as blog posts, news articles, etc. Number of articles: {count}\n'
elif source == 'notes':
available_sources_str += f'- The journalists own notes. Number of notes: {count}\n'
elif source == 'transcribed interviews':
available_sources_str += f'- Transcribed interviews (already done, you can\'t produce new ones). Number of interviews: {count}\n'
available_sources_str += '- An analyzing tool that can analyze the information you gather.\n'
query = f"""
Thanks! Now, create a research plan for answering the original question: "{question.replace('"', "'")}".
Include the questions you just created and any additional steps needed to answer the original question.
Include what type of information you need from what available sources.
*Available sources are:*
{available_sources_str}
All of the above sources are available in a database/LLM model, but you need to specify what you need. Be as precise as possible.
You are working in a limited context and can't access the internet or external databases, and some "best practices" might not apply, like cross-referencing sources. Therefore, make the plan basic, easy to follow and with the available sources in mind.
*IMPORTANT! Each step should try to answer one or many of the questions you created, an result in a summary of the information you found.*
*Please structure the plan like:*
## Step 1:
- Task1: Description of task and outcome
- Task2: Description of task and outcome
## Step 2:
- Task1: Description of task and outcome
Etc, with as many steps and tasks as needed.
Do NOT include the writiong of the report as a step, ONLY the tasks needed to gather information. The report will be written in a later step.
*Example of a plan:*
'''
Question: "What are the effects of climate change on agriculture?"
## Step 1: Read the notes
- Task1: Read the notes and pick out the most relevant information for the question.
- Task2: Summarize the information in a structured format. Try to formulate a hypothesis based on the notes and the question.
## Step 2: Read scientific articles
- Task1: Search for scientific articles to find information about the effects of climate change on agriculture. Use the information from the first step along with the question to formulate search queries.
- Task2: Read the articles and summarize the information in a structured format. Kepp the focus on the information that is relevant for the question.
## Step 3: Analyze the information
- Task1: Use the analyzing tool to analyze the information you gathered in the previous steps. Try to find patterns and connections between the different sources.
- Task2: From the information you gathered, and in regard to the question, is there any information that contradicts each other? If so, try to find out why. Is it because of the sources, or is it because of the information itself?
## Step 4: Read other articles
- Task1: Search for other articles to find information about the effects of climate change on agriculture.
- Task2: Read the articles and summarize the information in a structured format. Pick out some interesting facts that are related to what you found in the scientific articles (if there are any).
'''
The example above is just an example, you can use other steps and tasks that are more relevant for the question.
"""
return query

@ -402,5 +402,58 @@ class ResearchPage(StreamlitBaseClass):
current_step = self.research_state.get("current_step", "Planning")
st.markdown(f"**Current step:** {current_step}")
# Display research plan and progress in expandable sections
if self.report:
with st.expander("Research Plan", expanded=True):
if self.report.report["plan"]["original_text"]:
st.markdown("### Original Research Plan")
st.markdown(self.report.report["plan"]["original_text"])
if self.report.report["plan"]["structured"]:
st.markdown("### Structured Plan")
structured_plan = self.report.report["plan"]["structured"]
for step_name, tasks in structured_plan.get("steps", {}).items():
st.markdown(f"**{step_name}**")
for task_name, task_description in tasks:
st.markdown(f"- {task_name}: {task_description}")
# Show completed steps
if self.report.report["steps"]:
with st.expander("Completed Steps", expanded=True):
for step_name, step_data in self.report.report["steps"].items():
# Check if step is finished
if step_data.get("finished_at"):
st.markdown(f"### {step_name}")
if step_data.get("summary"):
st.markdown(f"**Summary:** {step_data['summary']}")
# Show tools used
if step_data.get("tools_used"):
st.markdown("**Tools used:**")
for tool in step_data["tools_used"]:
st.markdown(f"- {tool.get('tool')} with query: _{tool.get('args', {}).get('query', 'No query')}_")
# Show information gathering in the current step
current_step_data = self.report.report["steps"].get(current_step, {})
if current_step_data and not current_step_data.get("finished_at"):
with st.expander("Current Step Progress", expanded=True):
st.markdown(f"### {current_step}")
# Show tools used in current step
if current_step_data.get("tools_used"):
st.markdown("**Tools used so far:**")
for tool in current_step_data["tools_used"]:
st.markdown(f"- {tool.get('tool')} with query: _{tool.get('args', {}).get('query', 'No query')}_")
# Show information gathered so far
if current_step_data.get("information_gathered"):
st.markdown("**Information gathered:**")
sources_seen = set()
for info in current_step_data["information_gathered"]:
for source in info.get("sources", []):
if source not in sources_seen:
st.markdown(f"- {source}")
sources_seen.add(source)
st.info("Research is ongoing. This may take several minutes depending on the complexity of the question.")
st.warning("Please do not navigate away from this page while research is in progress.")

@ -0,0 +1,369 @@
import requests
import json
import argparse
from typing import Optional, List, Literal, Union
from colorprinter.print_color import *
def search_semantic_scholar(
query: str,
limit: int = 10,
fields: Optional[List[str]] = None,
publication_types: Optional[
List[
Literal[
"Review",
"JournalArticle",
"CaseReport",
"ClinicalTrial",
"Conference",
"Dataset",
"Editorial",
"LettersAndComments",
"MetaAnalysis",
"News",
"Study",
"Book",
"BookSection",
]
]
] = ["JournalArticle"],
open_access: bool = False,
min_citation_count: Optional[int] = None,
date_range: Optional[str] = None,
year_range: Optional[str] = None,
fields_of_study: Optional[
List[
Literal[
"Computer Science",
"Medicine",
"Chemistry",
"Biology",
"Materials Science",
"Physics",
"Geology",
"Psychology",
"Art",
"History",
"Geography",
"Sociology",
"Business",
"Political Science",
"Economics",
"Philosophy",
"Mathematics",
"Engineering",
"Environmental Science",
"Agricultural and Food Sciences",
"Education",
"Law",
"Linguistics",
]
]
] = None,
):
"""
Search for papers on Semantic Scholar with various filters.
Parameters:
-----------
query : str
The search query term
limit : int
Number of results to return (max 100)
fields : List[str], optional
List of fields to include in the response
publication_types : List[str], optional
Filter by publication types
open_access : bool
Only include papers with open access PDFs
min_citation_count : int, optional
Minimum number of citations
date_range : str, optional
Date range in format "YYYY-MM-DD:YYYY-MM-DD"
year_range : str, optional
Year range in format "YYYY-YYYY" or "YYYY-" or "-YYYY"
fields_of_study : List[str], optional
List of fields of study to filter by
Returns:
--------
dict
JSON response containing search results
"""
# Define the API endpoint URL
url = "https://api.semanticscholar.org/graph/v1/paper/search"
# Set up default fields if not provided
if fields is None:
fields = [
"title",
"url",
"abstract",
"year",
"publicationDate",
"authors.name",
"citationCount",
"openAccessPdf",
"tldr",
]
# Build query parameters
params = {"query": query, "limit": limit, "fields": ",".join(fields)}
# Add optional filters if provided
if publication_types:
params["publicationTypes"] = ",".join(publication_types)
if open_access:
params["openAccessPdf"] = ""
if min_citation_count:
params["minCitationCount"] = str(min_citation_count)
if date_range:
params["publicationDateOrYear"] = date_range
if year_range:
params["year"] = year_range
if fields_of_study:
params["fieldsOfStudy"] = ",".join(fields_of_study)
# Send the API request
try:
response = requests.get(url, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json().get("data", [])
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e}")
print(f"Response text: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
def main(
query: Optional[str] = None,
limit: int = 10,
fields: Optional[List[str]] = None,
publication_types: Optional[
List[
Literal[
"Review",
"JournalArticle",
"CaseReport",
"ClinicalTrial",
"Conference",
"Dataset",
"Editorial",
"LettersAndComments",
"MetaAnalysis",
"News",
"Study",
"Book",
"BookSection",
]
]
] = None,
open_access: bool = False,
min_citation_count: Optional[int] = None,
date_range: Optional[str] = None,
year_range: Optional[str] = None,
fields_of_study: Optional[
List[
Literal[
"Computer Science",
"Medicine",
"Chemistry",
"Biology",
"Materials Science",
"Physics",
"Geology",
"Psychology",
"Art",
"History",
"Geography",
"Sociology",
"Business",
"Political Science",
"Economics",
"Philosophy",
"Mathematics",
"Engineering",
"Environmental Science",
"Agricultural and Food Sciences",
"Education",
"Law",
"Linguistics",
]
]
] = None,
):
# Search for papers
papers = search_semantic_scholar(
query=query,
limit=limit,
fields=fields,
publication_types=publication_types,
open_access=open_access,
min_citation_count=min_citation_count,
date_range=date_range,
year_range=year_range,
fields_of_study=fields_of_study,
)
if not papers:
print("No results found or an error occurred.")
return
# Print results
print_green(f"\nFound {len(papers)} papers matching your query: '{query}'")
for paper in papers:
print(paper)
exit()
def search_paper_by_title(
title: str,
fields: Optional[List[str]] = None
):
"""
Search for a single paper that best matches the given title.
Parameters:
-----------
title : str
The title to search for
fields : List[str], optional
List of fields to include in the response
Returns:
--------
dict or None
JSON data for the best matching paper, or None if no match or error
"""
# Define the API endpoint URL
url = "https://api.semanticscholar.org/graph/v1/paper/search/match"
# Set up default fields if not provided
if fields is None:
fields = [
"title",
"abstract",
"year",
"authors.name",
"externalIds",
"url",
"publicationDate",
"journal",
"citationCount",
"openAccessPdf"
]
# Build query parameters
params = {"query": title, "fields": ",".join(fields)}
# Send the API request
try:
response = requests.get(url, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"No paper found matching title: {title}")
return None
else:
print(f"HTTP Error: {e}")
print(f"Response text: {e.response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
def get_paper_details(
paper_id: str,
fields: Optional[List[str]] = None
):
"""
Get detailed information about a paper by its identifier.
Parameters:
-----------
paper_id : str
The paper identifier. Can be:
- Semantic Scholar ID (e.g., 649def34f8be52c8b66281af98ae884c09aef38b)
- DOI (e.g., DOI:10.18653/v1/N18-3011)
- arXiv ID (e.g., ARXIV:2106.15928)
- etc.
fields : List[str], optional
List of fields to include in the response
Returns:
--------
dict or None
JSON data for the paper, or None if not found or error
"""
# Define the API endpoint URL
url = f"https://api.semanticscholar.org/graph/v1/paper/{paper_id}"
# Set up default fields if not provided
if fields is None:
fields = [
"title",
"abstract",
"year",
"authors.name",
"externalIds",
"url",
"publicationDate",
"journal",
"citationCount",
"openAccessPdf"
]
# Add DOI: prefix if it's a DOI without the prefix
if paper_id.startswith("10.") and "DOI:" not in paper_id:
paper_id = f"DOI:{paper_id}"
# Build query parameters
params = {"fields": ",".join(fields)}
# Send the API request
try:
response = requests.get(url, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"No paper found with ID: {paper_id}")
return None
else:
print(f"HTTP Error: {e}")
print(f"Response text: {e.response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
if __name__ == "__main__":
main(
query="machine learning",
limit=1,
fields=[
"title",
"url",
"abstract",
"tldr",
"externalIds",
"year",
"influentialCitationCount",
"fieldsOfStudy",
"publicationDate",
"journal",
],
open_access=True,
)

@ -60,7 +60,8 @@ if st.session_state["authentication_status"]:
Projects,
Settings,
RSS_Feeds,
Research
Research,
Search_Papers
)
break
@ -87,10 +88,11 @@ if st.session_state["authentication_status"]:
settings = st.Page(Settings)
rss_feeds = st.Page(RSS_Feeds)
research = st.Page(Research)
search_papers = st.Page(Search_Papers)
sleep(0.1)
pg = st.navigation([bot_chat, projects, article_collections, research, rss_feeds, settings])
pg = st.navigation([bot_chat, projects, article_collections, research, search_papers, rss_feeds, settings])
sleep(0.1)
pg.run()
# try: #TODO Use this when in production

@ -229,6 +229,7 @@ class StreamlitChat(Chat):
avatar = self.get_avatar(message)
with st.chat_message(message["role"], avatar=avatar):
if message["content"]:
print_blue('CONTENT', message["content"])
st.markdown(message["content"].strip('"'))
def get_avatar(self, message: dict = None, role=None) -> str:
@ -721,10 +722,11 @@ class Bot(BaseClass):
def get_notes(self):
# Minimal note retrieval
notes = self.user_arango.db.aql.execute(
f'FOR doc IN notes FILTER doc.project == "{self.project.name if self.project else ""}" RETURN doc'
notes_cursor = self.user_arango.db.aql.execute(
"FOR doc IN notes FILTER doc._id IN @note_ids RETURN doc.text",
bind_vars={"note_ids": self.project.notes},
)
return list(notes)
return list(notes_cursor)
def fetch_science_articles_tool(self, query: str, n_documents: int = 6):
"""
@ -829,7 +831,7 @@ class Bot(BaseClass):
Don't answer with anything you're not sure of!
"""
return self.chatbot.generate(query, stream=False)
return self.chatbot.generate(query, stream=True)
class StreamlitBot(Bot):
@ -991,7 +993,7 @@ class StreamlitBot(Bot):
# Separate thinking chunk and normal chunk
print_red("Model:", self.chatbot.model)
if self.chatbot.model == "reasoning":
if self.chatbot.model == self.chatbot.get_model("reasoning"):
bot_response = self.write_reasoning(response_text)
else:
@ -1036,8 +1038,13 @@ class StreamlitBot(Bot):
return "\n\n".join(bot_responses)
def write_reasoning(self, response_text):
chunks_iter = iter(response_text) # convert generator to iterator
def write_reasoning(self, response):
if isinstance(response, str):
# If the response is a string, just return it
print_yellow('Response is string:', response)
return st.write(response)
chunks_iter = iter(response) # convert generator to iterator
try:
first_mode, first_text = next(chunks_iter) # get first chunk
@ -1068,8 +1075,12 @@ class StreamlitBot(Bot):
bot_response = st.write_stream(full_gen())
def write_normal(self, response_text):
chunks_iter = iter(response_text) # convert generator to iterator
def write_normal(self, response):
if isinstance(response, str):
# If the response is a string, just return it
print_yellow('Response is string:', response)
return st.write(response)
chunks_iter = iter(response) # convert generator to iterator
def full_gen():
for chunk in chunks_iter:

@ -59,6 +59,20 @@ def Article_Collections():
article_collection.run()
def Search_Papers():
"""
Function to handle the Search Papers page.
"""
from streamlit_search_paper_page import SearchPaperPage
sleep(0.1)
if "Search Papers" not in st.session_state:
st.session_state["Search Papers"] = {}
search_papers_page = SearchPaperPage(username=st.session_state["username"])
search_papers_page.run()
def Settings():
"""
Function to handle the Settings page.

@ -1,345 +0,0 @@
import os
import urllib
import streamlit as st
from _base_class import StreamlitBaseClass
import feedparser
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from utils import fix_key
from colorprinter.print_color import *
from datetime import datetime, timedelta
class RSSFeedsPage(StreamlitBaseClass):
def __init__(self, username: str):
super().__init__(username=username)
self.page_name = "RSS Feeds"
# Initialize attributes from session state if available
for k, v in st.session_state.get(self.page_name, {}).items():
setattr(self, k, v)
def run(self):
if "selected_feed" not in st.session_state:
st.session_state["selected_feed"] = None
self.update_current_page(self.page_name)
self.display_feed()
self.sidebar_actions()
# Persist state to session_state
self.update_session_state(page_name=self.page_name)
def select_rss_feeds(self):
# Fetch RSS feeds from the user's ArangoDB collection
rss_feeds = self.get_rss_feeds()
if rss_feeds:
feed_options = [feed["title"] for feed in rss_feeds]
with st.sidebar:
st.subheader("Show your feeds")
selected_feed_title = st.selectbox(
"Select a feed", options=feed_options, index=None
)
if selected_feed_title:
st.session_state["selected_feed"] = [
feed["_key"]
for feed in rss_feeds
if feed["title"] == selected_feed_title
][0]
st.rerun()
else:
st.write("You have no RSS feeds added.")
def get_rss_feeds(self):
return list(self.user_arango.db.collection("rss_feeds").all())
def sidebar_actions(self):
with st.sidebar:
# Select a feed to show
self.select_rss_feeds()
st.subheader("Add a New RSS Feed")
rss_url = st.text_input("Website URL or RSS Feed URL")
if st.button("Discover Feeds"):
if rss_url:
with st.spinner("Discovering feeds..."):
feeds = self.discover_feeds(rss_url)
if feeds:
st.session_state["discovered_feeds"] = feeds
st.rerun()
else:
st.error("No RSS feeds found at the provided URL.")
if "discovered_feeds" in st.session_state:
st.subheader("Select a Feed to Add")
feeds = st.session_state["discovered_feeds"]
feed_options = [f"{feed['title']} ({feed['href']})" for feed in feeds]
selected_feed = st.selectbox("Available Feeds", options=feed_options)
selected_feed_url = feeds[feed_options.index(selected_feed)]["href"]
if st.button("Preview Feed"):
feed_data = feedparser.parse(selected_feed_url)
st.write(f"{feed_data.feed.get('title', 'No title')}")
description = html_to_markdown(
feed_data.feed.get("description", "No description")
)
st.write(f"_{description}_")
for entry in feed_data.entries[:5]:
print("ENTRY:")
with st.expander(entry.title):
summary = (
entry.summary
if "summary" in entry
else "No summary available"
)
markdown_summary = html_to_markdown(summary)
st.markdown(markdown_summary)
if st.button(
"Add RSS Feed",
on_click=self.add_rss_feed,
args=(selected_feed_url, feed_data, description),
):
del st.session_state["discovered_feeds"]
st.success("RSS Feed added.")
st.rerun()
def discover_feeds(self, url):
try:
if not url.startswith("http"):
url = "https://" + url
# Check if the input URL is already an RSS feed
f = feedparser.parse(url)
if len(f.entries) > 0:
return [
{
"href": url,
"title": f.feed.get("title", "No title"),
"icon": self.get_site_icon(url),
}
]
# If not, proceed to discover feeds from the webpage
raw = requests.get(url).text
result = []
possible_feeds = []
html = BeautifulSoup(raw, "html.parser")
# Find the site icon
icon_url = self.get_site_icon(url, html)
# Find all <link> tags with rel="alternate" and type containing "rss" or "xml"
feed_urls = html.findAll("link", rel="alternate")
for f in feed_urls:
t = f.get("type", None)
if t and ("rss" in t or "xml" in t):
href = f.get("href", None)
if href:
possible_feeds.append(urljoin(url, href))
# Find all <a> tags with href containing "rss", "xml", or "feed"
parsed_url = urllib.parse.urlparse(url)
base = parsed_url.scheme + "://" + parsed_url.hostname
atags = html.findAll("a")
for a in atags:
href = a.get("href", None)
if href and ("rss" in href or "xml" in href or "feed" in href):
possible_feeds.append(urljoin(base, href))
# Validate the possible feeds using feedparser
for feed_url in list(set(possible_feeds)):
f = feedparser.parse(feed_url)
if len(f.entries) > 0:
result.append(
{
"href": feed_url,
"title": f.feed.get("title", "No title"),
"icon": icon_url,
}
)
return result
except Exception as e:
print(f"Error discovering feeds: {e}")
return []
def add_rss_feed(self, url, feed_data, description):
try:
icon_url = feed_data["feed"]["image"]["href"]
except Exception as e:
icon_url = self.get_site_icon(url)
title = feed_data["feed"].get("title", "No title")
print_blue(title)
icon_path = download_icon(icon_url) if icon_url else None
_key = fix_key(url)
now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string
self.user_arango.db.collection("rss_feeds").insert(
{
"_key": _key,
"url": url,
"title": title,
"icon_path": icon_path,
"description": description,
'fetched_timestamp': now_timestamp, # Add the timestamp field
'feed_data': feed_data,
},
overwrite=True,
)
feed = self.get_feed_from_arango(_key)
now_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string
if feed:
self.update_feed(_key, feed)
else:
self.base_arango.db.collection("rss_feeds").insert(
{
"_key": _key,
"url": url,
"title": title,
"icon_path": icon_path,
"description": description,
'fetched_timestamp': now_timestamp, # Add the timestamp field
"feed_data": feed_data,
},
overwrite=True,
overwrite_mode="update",
)
def update_feed(self, feed_key, feed=None):
"""
Updates RSS feed that already exists in the ArangoDB base database.
Args:
feed_key (str): The key identifying the feed in the database.
Returns:
dict: The parsed feed data.
Raises:
Exception: If there is an error updating the feed in the database.
"""
if not feed:
feed = self.get_feed_from_arango(feed_key)
feed_data = feedparser.parse(feed["url"])
print_rainbow(feed_data['feed'])
feed["feed_data"] = feed_data
if self.username not in feed.get("users", []):
feed["users"] = feed.get("users", []) + [self.username]
fetched_timestamp = datetime.now().isoformat() # Convert datetime to ISO format string
# Update the fetched_timestamp in the database
self.base_arango.db.collection("rss_feeds").update(
{
"_key": feed["_key"],
"fetched_timestamp": fetched_timestamp,
"feed_data": feed_data,
}
)
return feed_data
def update_session_state(self, page_name=None):
# Update session state
if page_name:
st.session_state[page_name] = self.__dict__
def get_site_icon(self, url, html=None):
try:
if not html:
raw = requests.get(url).text
html = BeautifulSoup(raw, "html.parser")
icon_link = html.find("link", rel="icon")
if icon_link:
icon_url = icon_link.get("href", None)
if icon_url:
return urljoin(url, icon_url)
# Fallback to finding other common icon links
icon_link = html.find("link", rel="shortcut icon")
if icon_link:
icon_url = icon_link.get("href", None)
if icon_url:
return urljoin(url, icon_url)
return None
except Exception as e:
print(f"Error getting site icon: {e}")
return None
def get_feed_from_arango(self, feed_key):
"""
Retrieve an RSS feed from the ArangoDB base databse.
Args:
feed_key (str): The key of the RSS feed to retrieve from the ArangoDB base database.
Returns:
dict: The RSS feed document retrieved from the ArangoDB base database.
"""
return self.base_arango.db.collection("rss_feeds").get(feed_key)
def get_feed(self, feed_key):
feed = self.get_feed_from_arango(feed_key)
feed_data = feed["feed_data"]
fetched_time = datetime.fromisoformat(feed['fetched_timestamp']) # Parse the timestamp string
if datetime.now() - fetched_time < timedelta(hours=1):
return feed_data
else:
return self.update_feed(feed_key)
def display_feed(self):
if st.session_state["selected_feed"]:
feed_data = self.get_feed(st.session_state["selected_feed"])
st.title(feed_data['feed'].get("title", "No title"))
st.write(feed_data['feed'].get("description", "No description"))
st.write("**Recent Entries:**")
for entry in feed_data['entries'][:5]:
with st.expander(entry['title']):
summary = (
entry['summary'] if "summary" in entry else "No summary available"
)
markdown_summary = html_to_markdown(summary)
st.markdown(markdown_summary)
st.markdown(f"[Read more]({entry['link']})")
def html_to_markdown(html):
soup = BeautifulSoup(html, "html.parser")
for br in soup.find_all("br"):
br.replace_with("\n")
for strong in soup.find_all("strong"):
strong.replace_with(f"**{strong.text}**")
for em in soup.find_all("em"):
em.replace_with(f"*{em.text}*")
for p in soup.find_all("p"):
p.replace_with(f"{p.text}\n\n")
return soup.get_text()
def download_icon(icon_url, save_folder="external_icons"):
try:
if not os.path.exists(save_folder):
os.makedirs(save_folder)
response = requests.get(icon_url, stream=True)
if response.status_code == 200:
icon_name = os.path.basename(icon_url)
icon_path = os.path.join(save_folder, icon_name)
with open(icon_path, "wb") as f:
for chunk in response.iter_content(1024):
f.write(chunk)
return icon_path
else:
print(f"Failed to download icon: {response.status_code}")
return None
except Exception as e:
print(f"Error downloading icon: {e}")
return None

@ -0,0 +1,692 @@
import streamlit as st
from time import sleep
from datetime import datetime
import os
import requests
from pathlib import Path
from colorprinter.print_color import *
from _base_class import StreamlitBaseClass
from semantic_schoolar import search_semantic_scholar
from collections_page import ArticleCollectionsPage
from article2db import PDFProcessor
from utils import fix_key
class SearchPaperPage(StreamlitBaseClass):
"""
SearchPaperPage - A Streamlit interface for searching papers and adding to collections.
This class provides a user interface for searching research papers using the Semantic Scholar API
and adding them to article collections. It allows filtering by various parameters and displaying
detailed information about each paper.
Attributes:
username (str): The username of the current user.
page_name (str): Name of the current page ("Search Papers").
collection (str): The currently selected collection to add papers to.
search_results (list): List of papers from the most recent search.
search_state (dict): Dictionary tracking the state of the current search.
download_queue (dict): Tracks papers that need manual download.
"""
def __init__(self, username: str):
super().__init__(username=username)
self.page_name = "Search Papers"
self.collection = self.get_settings().get("current_collection")
# Initialize search results and state
self.search_results = []
self.search_state = {
"last_query": None,
"filters": {},
"page": 0,
"results_per_page": 10
}
# Track papers that need to be downloaded manually
self.download_queue = {}
# Download directory
self.download_dir = Path(f"/home/lasse/sci/downloads/{username}")
self.download_dir.mkdir(parents=True, exist_ok=True)
# Initialize attributes from session state if available
if self.page_name in st.session_state:
for k, v in st.session_state[self.page_name].items():
setattr(self, k, v)
def run(self):
"""Main method to render the search interface and handle user interactions."""
self.update_current_page(self.page_name)
st.title("Search Research Papers")
# Instructions for first-time users
with st.expander(" How to use this page", expanded=not self.search_results):
st.markdown("""
### Finding and Adding Research Papers
1. **Search for papers** using the search box below
2. **Select a collection** from the sidebar to add papers to
3. **Open access papers** can be downloaded automatically
4. **Non-open access papers** require manual download (you'll get instructions)
5. **View paper details** by clicking on a paper in the search results
### Understanding paper availability:
- 📄 = Open access paper (can be downloaded automatically)
- 🔒 = Requires manual download (instructions provided)
""")
# Show download queue if papers need manual download
if self.download_queue:
self.show_download_instructions()
# Sidebar for collections and filters
self.sidebar_actions()
# Main search interface
self.search_interface()
# Display search results if available
if self.search_results:
self.display_search_results()
# Show upload interface for manually downloaded papers
if self.download_queue:
self.show_manual_upload_interface()
# Persist state to session_state
self.update_session_state(page_name=self.page_name)
def sidebar_actions(self):
"""Renders sidebar elements for selecting collections and search filters."""
with st.sidebar:
# Collection selection
collections = self.get_article_collections()
if collections:
st.subheader("Select Collection")
self.collection = st.selectbox(
"Add papers to collection:",
collections,
index=collections.index(self.collection) if self.collection in collections else 0
)
self.update_settings("current_collection", self.collection)
else:
st.warning("No collections available. Create a collection first.")
if st.button("Create Collection"):
st.session_state["new_collection"] = True
self.collection = None
# Search Filters
st.subheader("Search Filters")
# Publication types
pub_types = [
"JournalArticle", "Review", "Conference", "MetaAnalysis",
"Study", "Book", "BookSection", "Dataset", "ClinicalTrial"
]
selected_pub_types = st.multiselect(
"Publication Types",
pub_types,
default=self.search_state.get("filters", {}).get("publication_types", ["JournalArticle"])
)
if selected_pub_types:
self.search_state["filters"]["publication_types"] = selected_pub_types
# Open access only
open_access = st.checkbox(
"Open Access Only",
value=self.search_state.get("filters", {}).get("open_access", False)
)
self.search_state["filters"]["open_access"] = open_access
# Year range
st.subheader("Year Range")
col1, col2 = st.columns(2)
with col1:
start_year = st.number_input(
"From",
min_value=1900,
max_value=datetime.now().year,
value=self.search_state.get("filters", {}).get("start_year", 2010)
)
with col2:
end_year = st.number_input(
"To",
min_value=1900,
max_value=datetime.now().year,
value=self.search_state.get("filters", {}).get("end_year", datetime.now().year)
)
if start_year and end_year:
self.search_state["filters"]["start_year"] = start_year
self.search_state["filters"]["end_year"] = end_year
self.search_state["filters"]["year_range"] = f"{start_year}-{end_year}"
# Minimum citations
min_citations = st.number_input(
"Minimum Citations",
min_value=0,
value=self.search_state.get("filters", {}).get("min_citation_count", 0)
)
self.search_state["filters"]["min_citation_count"] = min_citations
# Fields of study
fields_of_study = [
"Computer Science", "Medicine", "Chemistry", "Biology",
"Mathematics", "Engineering", "Psychology", "Physics",
"Economics", "Business", "Political Science", "Environmental Science"
]
selected_fields = st.multiselect(
"Fields of Study",
fields_of_study,
default=self.search_state.get("filters", {}).get("fields_of_study", [])
)
if selected_fields:
self.search_state["filters"]["fields_of_study"] = selected_fields
# Results per page
results_per_page = st.select_slider(
"Results Per Page",
options=[5, 10, 15, 20, 25, 30],
value=self.search_state.get("results_per_page", 10)
)
self.search_state["results_per_page"] = results_per_page
# Reset filters button
if st.button("Reset Filters"):
self.search_state["filters"] = {}
self.search_state["results_per_page"] = 10
st.rerun()
def search_interface(self):
"""Renders the main search interface with query input and search button."""
with st.form("search_form", clear_on_submit=False):
col1, col2 = st.columns([3, 1])
with col1:
query = st.text_input(
"Search for papers:",
value=self.search_state.get("last_query", ""),
placeholder="Enter keywords, title, author, etc."
)
submitted = st.form_submit_button("Search")
if submitted and query:
self.search_state["last_query"] = query
self.search_state["page"] = 0 # Reset to first page
self.perform_search(query)
def perform_search(self, query: str):
"""
Executes a search using the Semantic Scholar API with the provided query and filters.
Args:
query (str): The search query string.
"""
with st.spinner("Searching for papers..."):
try:
# Extract filters from search state
filters = self.search_state.get("filters", {})
# Perform the search
results = search_semantic_scholar(
query=query,
limit=self.search_state.get("results_per_page", 10),
publication_types=filters.get("publication_types"),
open_access=filters.get("open_access", False),
min_citation_count=filters.get("min_citation_count"),
year_range=filters.get("year_range"),
fields_of_study=filters.get("fields_of_study")
)
if results:
self.search_results = results
st.success(f"Found {len(results)} papers matching your query")
else:
st.warning("No results found. Try modifying your search query or filters.")
self.search_results = []
except Exception as e:
st.error(f"An error occurred during search: {str(e)}")
print_red(f"Search error: {str(e)}")
self.search_results = []
def display_search_results(self):
"""Displays the search results with paper details and options to add to collections."""
st.subheader("Search Results")
# Summary of results
open_access_count = sum(1 for paper in self.search_results if paper.get('openAccessPdf', {}).get('url'))
st.info(f"Found {len(self.search_results)} papers ({open_access_count} open access)")
# Pagination controls
if len(self.search_results) > 0:
total_pages = 1 # Currently only showing one page of results from the API
current_page = self.search_state.get("page", 0)
# Display papers on the current page
for i, paper in enumerate(self.search_results):
self.display_paper_card(paper, i)
def display_paper_card(self, paper, index):
"""
Displays a single paper as a card with details and action buttons.
Args:
paper (dict): The paper data from Semantic Scholar API
index (int): Index of the paper in the search results
"""
# Extract paper details
title = paper.get('title', 'No Title')
authors = paper.get('authors', [])
author_names = [author.get('name', '') for author in authors]
author_str = ", ".join(author_names) if author_names else "Unknown Authors"
year = paper.get('year', 'Unknown Year')
journal = paper.get('journal', {}).get('name', 'Unknown Journal')
citation_count = paper.get('citationCount', 0)
influential_citation_count = paper.get('influentialCitationCount', 0)
paper_id = paper.get('paperId', '')
paper_url = paper.get('url', '')
doi = paper.get('externalIds', {}).get('DOI', '')
# Check if open access
open_access = paper.get('openAccessPdf', {}).get('url', None)
open_access_icon = "📄" if open_access else "🔒"
# Create card
with st.expander(f"{title} ({year}) {open_access_icon}"):
# Basic information
st.markdown(f"**Title:** {title}")
st.markdown(f"**Authors:** {author_str}")
st.markdown(f"**Year:** {year}")
st.markdown(f"**Journal:** {journal}")
# Citations
st.markdown(f"**Citations:** {citation_count} (Influential: {influential_citation_count})")
# Abstract - Don't use nested expander
abstract = paper.get('abstract', 'No abstract available')
if abstract:
st.markdown("**Abstract:**")
st.markdown(f"<div style='border-left: 2px solid #ccc; padding-left: 10px; margin-bottom: 10px;'>{abstract}</div>", unsafe_allow_html=True)
# TL;DR summary if available
tldr = None
if 'tldr' in paper and isinstance(paper['tldr'], dict):
tldr = paper['tldr'].get('text')
if tldr:
st.markdown(f"**TL;DR:** {tldr}")
# Fields of study
fields = paper.get('fieldsOfStudy', [])
if fields:
st.markdown(f"**Fields:** {', '.join(fields)}")
# External IDs
external_ids = paper.get('externalIds', {})
if external_ids:
id_text = []
for id_type, id_value in external_ids.items():
id_text.append(f"{id_type}: {id_value}")
st.markdown(f"**IDs:** {', '.join(id_text)}")
# Paper actions section
st.markdown("---")
st.markdown("### Access Paper")
col1, col2 = st.columns(2)
# Open access papers - direct download and add
if open_access:
with col1:
st.success("📄 This paper is open access")
st.markdown(f"[View on Semantic Scholar]({paper_url})")
if doi:
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})")
with col2:
# Direct download button
if st.button(f"Download PDF", key=f"dl_{paper_id}"):
with st.spinner("Downloading PDF..."):
success, filepath = self.download_pdf(open_access, f"{paper_id}.pdf")
if success:
st.success(f"Downloaded to {filepath}")
# Automatically add to collection if selected
if self.collection:
self.add_paper_to_collection(paper, self.collection, filepath)
else:
st.error("Failed to download PDF")
# Direct add to collection without downloading
if self.collection:
if st.button(f"Add to {self.collection} without download", key=f"add_nodl_{paper_id}"):
self.add_paper_to_collection(paper, self.collection)
# Non-open access papers - manual workflow
else:
with col1:
st.warning("🔒 This paper requires manual download")
st.markdown(f"[View on Semantic Scholar]({paper_url})")
if doi:
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})")
with col2:
# Add to manual download queue
if st.button(f"I want this paper", key=f"want_{paper_id}"):
self.download_queue[paper_id] = {
"paper": paper,
"collection": self.collection
}
st.info("Added to your download queue. See instructions at the top of the page.")
def show_download_instructions(self):
"""Shows instructions for papers that need manual download."""
with st.expander(" Papers that need manual download", expanded=True):
st.markdown("### Papers Requiring Manual Download")
st.markdown("""
Some papers aren't freely available and need to be downloaded manually. Follow these steps:
1. Click the links below to access the publisher's website
2. Download the PDF file (you may need institutional access)
3. Save the file to your computer
4. Upload the PDF in the 'Upload Downloaded Papers' section below
""")
for paper_id, info in self.download_queue.items():
paper = info["paper"]
title = paper.get('title', 'No Title')
doi = paper.get('externalIds', {}).get('DOI', '')
paper_url = paper.get('url', '')
st.markdown(f"**{title}**")
col1, col2 = st.columns([3, 1])
with col1:
if paper_url:
st.markdown(f"[Semantic Scholar]({paper_url})")
if doi:
st.markdown(f"[DOI: {doi}](https://doi.org/{doi})")
with col2:
if st.button("Remove", key=f"remove_{paper_id}"):
del self.download_queue[paper_id]
st.rerun()
def show_manual_upload_interface(self):
"""Interface for uploading manually downloaded papers."""
st.markdown("---")
st.header("Upload Downloaded Papers")
st.markdown("Upload the PDFs you downloaded manually here:")
with st.form("manual_upload_form", clear_on_submit=True):
uploaded_files = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True)
collection_for_upload = st.selectbox(
"Add to collection:",
self.get_article_collections(),
index=self.get_article_collections().index(self.collection) if self.collection in self.get_article_collections() else 0
)
submitted = st.form_submit_button("Upload and Process")
if submitted and uploaded_files:
with st.spinner("Processing uploaded files..."):
success_count = 0
for pdf_file in uploaded_files:
# Process the uploaded PDF
success = self.process_uploaded_pdf(pdf_file, collection_for_upload)
if success:
success_count += 1
# Remove from download queue if it matches a title
for paper_id in list(self.download_queue.keys()):
if self.download_queue[paper_id]["paper"].get("title", "").lower() in pdf_file.name.lower():
del self.download_queue[paper_id]
st.success(f"Successfully processed {success_count} of {len(uploaded_files)} files")
if success_count > 0:
st.rerun()
def download_pdf(self, url, filename):
"""
Downloads a PDF from a URL and saves it to the download directory.
Args:
url (str): The URL of the PDF to download
filename (str): The filename to save the PDF as
Returns:
tuple: (success, filepath)
"""
try:
# Make sure the filename is safe
safe_filename = "".join(c for c in filename if c.isalnum() or c in "._-") or "paper.pdf"
if not safe_filename.endswith(".pdf"):
safe_filename += ".pdf"
filepath = self.download_dir / safe_filename
print_blue(f"Downloading {url} to {filepath}")
# Download the file
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
# Check if the content is actually a PDF
content_type = response.headers.get('Content-Type', '')
if 'application/pdf' not in content_type.lower() and not url.lower().endswith('.pdf'):
print_red(f"Warning: Content may not be a PDF. Content-Type: {content_type}")
# Save the file
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify the file exists and has content
if not filepath.exists() or filepath.stat().st_size == 0:
print_red(f"Downloaded file is empty or doesn't exist")
return False, None
print_blue(f"Successfully downloaded to {filepath}")
return True, filepath
except Exception as e:
print_red(f"Error downloading PDF: {str(e)}")
return False, None
def process_uploaded_pdf(self, pdf_file, collection_name):
"""
Process an uploaded PDF file and add it to a collection.
Args:
pdf_file (UploadedFile): The uploaded PDF file
collection_name (str): The collection to add the paper to
Returns:
bool: Success or failure
"""
try:
# Create a temporary file to process
processor = PDFProcessor(
pdf_file=pdf_file,
filename=pdf_file.name,
process=False,
username=self.username,
document_type="other_documents",
)
_id, db, doi = processor.process_document()
print_blue(f"Processed document: ID={_id}, DB={db}, DOI={doi}")
if _id:
# Add to collection
self.articles2collection(collection=collection_name, db=db, _id=_id)
return True
return False
except Exception as e:
print_red(f"Error processing PDF: {str(e)}")
st.error(f"Failed to process {pdf_file.name}: {str(e)}")
return False
def add_paper_to_collection(self, paper, collection_name, filepath=None):
"""
Adds a paper from search results to the selected collection.
Args:
paper (dict): The paper data from Semantic Scholar API
collection_name (str): Name of the collection to add the paper to
filepath (str, optional): Path to downloaded PDF file
"""
# If we have a filepath, process the PDF
if filepath:
# Debug information about the file
if not os.path.exists(filepath):
st.error(f"File does not exist: {filepath}")
print_red(f"File does not exist: {filepath}")
# Fall back to metadata-only
else:
file_size = os.path.getsize(filepath)
st.info(f"Processing PDF file: {filepath} ({file_size} bytes)")
print_blue(f"Processing PDF file: {filepath} ({file_size} bytes)")
with open(filepath, 'rb') as f:
header = f.read(10)
is_pdf = header.startswith(b'%PDF-')
print_blue(f"File header check - Is PDF: {is_pdf}, Header: {header}")
# Read the entire file content
f.seek(0)
pdf_content = f.read()
# Create PDFProcessor with content bytes instead of file handle
st.info("Creating PDFProcessor...")
processor = PDFProcessor(
pdf_file=pdf_content, # Pass content bytes instead of file handle
filename=os.path.basename(filepath),
process=False,
username=self.username,
document_type="other_documents",
is_sci=True
)
st.info("Checking if processor was created successfully...")
if processor:
print_blue(f"PDFProcessor created: {processor}")
st.info("Processing document...")
_id, db, doi = processor.process_document() #!
print_blue(f"Document processed: ID={_id}, DB={db}, DOI={doi}")
if _id:
self.articles2collection(collection=collection_name, db=db, _id=_id)
st.success(f"Added '{paper.get('title')}' to {collection_name}")
return
else:
st.warning("Process document returned no ID. Falling back to metadata.")
else:
st.warning("PDFProcessor creation failed. Falling back to metadata.")
# Add directly using metadata if file processing failed or no filepath
st.info("Adding paper using metadata only")
paper_info = {
"_id": f"sci_articles/{paper.get('paperId', '')}",
"doi": paper.get('externalIds', {}).get('DOI', ''),
"metadata": {
"title": paper.get('title', 'No Title'),
"journal": paper.get('journal', {}).get('name', 'Unknown Journal'),
"published_year": paper.get('year', ''),
"published_date": paper.get('publicationDate', ''),
"authors": [author.get('name', '') for author in paper.get('authors', [])],
"abstract": paper.get('abstract', ''),
"url": paper.get('url', ''),
"open_access_url": paper.get('openAccessPdf', {}).get('url', ''),
"citation_count": paper.get('citationCount', 0),
"fields_of_study": paper.get('fieldsOfStudy', []),
}
}
# Check if collection exists
doc_cursor = self.user_arango.db.aql.execute(
f'FOR doc IN article_collections FILTER doc["name"] == "{collection_name}" RETURN doc'
)
doc = next(doc_cursor, None)
if doc:
# Check if paper already exists in the collection
articles = doc.get("articles", [])
for article in articles:
if article.get("_id") == paper_info["_id"] or article.get("doi") == paper_info["doi"]:
st.warning(f"This paper is already in the '{collection_name}' collection.")
return
# Add paper to collection
articles.append(paper_info)
self.user_arango.db.collection("article_collections").update_match(
filters={"name": collection_name},
body={"articles": articles},
merge=True,
)
st.success(f"Added '{paper.get('title')}' to {collection_name}")
# Persist state after adding paper
self.update_session_state(page_name=self.page_name)
else:
st.error(f"Collection '{collection_name}' not found.")
def articles2collection(self, collection: str, db: str, _id: str = None) -> None:
"""
Add an article to a collection by retrieving its info from the database.
Args:
collection (str): The collection name
db (str): The database name
_id (str): The article ID
"""
info = self.get_article_info(db, _id=_id)
info = {
k: v for k, v in info.items() if k in ["_id", "doi", "title", "metadata"]
}
doc_cursor = self.user_arango.db.aql.execute(
f'FOR doc IN article_collections FILTER doc["name"] == "{collection}" RETURN doc'
)
doc = next(doc_cursor, None)
if doc:
articles = doc.get("articles", [])
keys = [i["_id"] for i in articles]
if info["_id"] not in keys:
articles.append(info)
self.user_arango.db.collection("article_collections").update_match(
filters={"name": collection},
body={"articles": articles},
merge=True,
)
# Persist state after updating articles
self.update_session_state(page_name=self.page_name)
def get_article_info(self, db: str, _id: str = None, doi: str = None) -> dict:
"""
Get article info from the database.
Args:
db (str): The database name
_id (str, optional): The article ID
doi (str, optional): The article DOI
Returns:
dict: The article info
"""
assert _id or doi, "Either _id or doi must be provided."
arango = self.get_arango(db_name=db)
if _id:
query = """
RETURN {
"_id": DOCUMENT(@doc_id)._id,
"doi": DOCUMENT(@doc_id).doi,
"title": DOCUMENT(@doc_id).title,
"metadata": DOCUMENT(@doc_id).metadata,
"summary": DOCUMENT(@doc_id).summary
}
"""
info_cursor = arango.db.aql.execute(query, bind_vars={"doc_id": _id})
elif doi:
info_cursor = arango.db.aql.execute(
f'FOR doc IN sci_articles FILTER doc["doi"] == "{doi}" LIMIT 1 RETURN {{"_id": doc["_id"], "doi": doc["doi"], "title": doc["title"], "metadata": doc["metadata"], "summary": doc["summary"]}}'
)
return next(info_cursor, None)
Loading…
Cancel
Save