You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
18 KiB

from fastapi import FastAPI, BackgroundTasks, Request
from fastapi.responses import JSONResponse, HTMLResponse
import logging
from datetime import datetime
import json
import os
from typing import Dict, Any
from prompts import get_summary_prompt
from _llm import LLM
from _arango import ArangoDB
from models import ArticleChunk
from _chromadb import ChromaDB
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Storage for the latest processed document
latest_result: Dict[str, Any] = {}
latest_result_file = os.path.join(os.path.dirname(__file__), "latest_summary_result.json")
# Load any previously saved result on startup
try:
if os.path.exists(latest_result_file):
with open(latest_result_file, 'r') as f:
latest_result = json.load(f)
logger.info(f"Loaded previous result from {latest_result_file}")
except Exception as e:
logger.warning(f"Could not load previous result: {e}")
# Function to save the latest result to disk
def save_latest_result(result: Dict[str, Any]):
global latest_result
latest_result = result
try:
# Save sanitized version (remove internal fields if needed)
result_to_save = {k: v for k, v in result.items() if not k.startswith('_') or k == '_id'}
with open(latest_result_file, 'w') as f:
json.dump(result_to_save, f, indent=2)
logger.info(f"Saved latest result to {latest_result_file}")
except Exception as e:
logger.error(f"Error saving latest result: {e}")
# New endpoint to get the latest summarized document
@app.get("/latest_result")
async def get_latest_result():
"""
Get the latest summarized document result.
Returns the most recently processed document summary and chunk information.
If no document has been processed yet, returns an empty object.
Returns
-------
dict
The latest processed document with summaries
"""
if not latest_result:
return {"message": "No documents have been processed yet"}
return latest_result
@app.get("/view_results")
async def view_results():
"""
View the latest summarization results in a more readable format.
Returns a formatted response with document summary and chunks.
Returns
-------
dict
A formatted representation of the latest summarized document
"""
if not latest_result:
return {"message": "No documents have been processed yet"}
# Extract the key information
formatted_result = {
"document_id": latest_result.get("_id", "Unknown"),
"timestamp": datetime.now().isoformat(),
"summary": latest_result.get("summary", {}).get("text_sum", "No summary available"),
"model": latest_result.get("summary", {}).get("meta", {}).get("model", "Unknown model"),
}
# Format chunks information if available
chunks = latest_result.get("chunks", [])
if chunks:
formatted_chunks = []
for i, chunk in enumerate(chunks):
chunk_data = {
"chunk_number": i + 1,
"summary": chunk.get("summary", "No summary available"),
"tags": chunk.get("tags", [])
}
# Add references for scientific articles if available
if "references" in chunk:
chunk_data["references"] = chunk.get("references", [])
formatted_chunks.append(chunk_data)
formatted_result["chunks"] = formatted_chunks
formatted_result["chunk_count"] = len(chunks)
return formatted_result
@app.get("/html_results", response_class=HTMLResponse)
async def html_results():
"""
View the latest summarization results in a human-readable HTML format.
"""
if not latest_result:
return """
<html>
<head>
<title>No Results Available</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }
</style>
</head>
<body>
<h1>No Documents Have Been Processed Yet</h1>
<p>Submit a document for summarization first.</p>
</body>
</html>
"""
# Get the document ID and summary
doc_id = latest_result.get("_id", "Unknown")
summary = latest_result.get("summary", {}).get("text_sum", "No summary available")
model = latest_result.get("summary", {}).get("meta", {}).get("model", "Unknown model")
# Format chunks
chunks_html = ""
chunks = latest_result.get("chunks", [])
for i, chunk in enumerate(chunks):
chunk_summary = chunk.get("summary", "No summary available")
tags = chunk.get("tags", [])
tags_html = ", ".join(tags) if tags else "None"
references_html = ""
if "references" in chunk and chunk["references"]:
references_html = "<h4>References:</h4><ul>"
for ref in chunk["references"]:
references_html += f"<li>{ref}</li>"
references_html += "</ul>"
chunks_html += f"""
<div class="chunk">
<h3>Chunk {i+1}</h3>
<div class="chunk-summary">{chunk_summary}</div>
<div class="chunk-tags"><strong>Tags:</strong> {tags_html}</div>
{references_html}
</div>
<hr>
"""
html_content = f"""
<html>
<head>
<title>Document Summary: {doc_id}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; max-width: 1000px; margin: 0 auto; padding: 20px; }}
h1, h2, h3 {{ color: #333; }}
.summary {{ background-color: #f9f9f9; padding: 15px; border-left: 4px solid #4CAF50; margin-bottom: 20px; }}
.chunk {{ background-color: #f5f5f5; padding: 15px; margin-bottom: 10px; border-radius: 4px; }}
.chunk-tags {{ margin-top: 10px; font-style: italic; }}
.metadata {{ color: #666; font-size: 0.9em; margin-bottom: 20px; }}
hr {{ border: 0; height: 1px; background: #ddd; margin: 20px 0; }}
.refresh-button {{ padding: 10px 15px; background-color: #4CAF50; color: white; border: none; cursor: pointer; border-radius: 4px; }}
.refresh-button:hover {{ background-color: #45a049; }}
</style>
</head>
<body>
<h1>Document Summary</h1>
<div class="metadata">
<strong>Document ID:</strong> {doc_id}<br>
<strong>Model:</strong> {model}<br>
<strong>Generated:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
</div>
<h2>Summary</h2>
<div class="summary">{summary}</div>
<h2>Chunks ({len(chunks)})</h2>
{chunks_html}
<button class="refresh-button" onclick="window.location.reload()">Refresh Results</button>
</body>
</html>
"""
return html_content
@app.post("/summarise_document")
async def summarize_document(request: Request, background_tasks: BackgroundTasks):
try:
data = await request.json()
logger.info(f"Received data: {data}")
# Extract arango_id, checking both top-level field and inside arango_doc
arango_doc = data.get('arango_doc', {}) or {}
arango_id = arango_doc.get('_id', '')
arango_db_name = data.get('arango_db_name', '').strip()
if not arango_db_name:
return JSONResponse(
status_code=400,
content={"detail": "Missing required field: arango_db_name"},
)
print(arango_doc)
# Prepare data for processing
data['text'] = arango_doc.get('text', '').strip()
data['chunks'] = arango_doc.get('chunks', [])
data['arango_db_name'] = arango_db_name
data['arango_id'] = arango_id
data["arango_key"] = arango_doc['_key']
data['is_sci'] = data.get('is_sci', False)
background_tasks.add_task(summarise_document_task, data)
return {"message": "Document summarization has started."}
except Exception as e:
logger.error(f"Error in summarize_document: {e}")
return JSONResponse(
status_code=500,
content={"detail": f"An unexpected error occurred: {str(e)}"},
)
def summarise_document_task(doc_data: dict):
try:
# Get document ID and validate it
_id = doc_data.get("arango_id", "")
# Validate document ID - it should be in format "collection/key"
if not _id or '/' not in _id:
logger.error(f"Invalid document ID format: {_id}")
return
text = doc_data.get("text", "")
is_sci = doc_data.get("is_sci", False)
# Get collection name from document ID
collection = _id.split('/')[0]
# Set appropriate system message based on document type
if collection == 'interviews':
system_message = "You are summarising interview transcripts. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English."
elif is_sci or collection == 'sci_articles':
system_message = "You are summarising scientific articles. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English."
else:
system_message = "You are summarising a document. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English."
# Initialize LLM and generate summary
llm = LLM(system_message=system_message, chat=True)
#if 'abstract'
if 'summary' in doc_data and 'text_sum' in doc_data['summary']:
# If a summary already exists, use it instead of generating a new one
summary = doc_data['summary']['text_sum']
llm.messages.append({"role": "user", "content": 'Make a summary of this text:\n[text removed in chat history]'},
{"role": "assistant", "content": summary})
else:
prompt = get_summary_prompt(text, is_sci)
response = llm.generate(query=prompt)
summary = response.content
prompt = """Thanks! Now make a very short summary of the text, that is no longer than 50 words.
The summary should give an idea of what sort of text it is, and what the main points are.
Below are some examples of how to write the short summary:
Example 1: "This is a *scientific article* about the effects of climate change on polar bears. It discusses the impact of melting ice caps on their habitat and food sources."
Example 2: "This is an *interview* with a climate scientist discussing the challenges of communicating climate change to the public. It highlights the importance of clear messaging and public engagement."
Example 3: "This is a *news article* about the latest developments in renewable energy technology. It covers advancements in solar and wind power, and their potential impact on reducing carbon emissions."
Example 4: "This is a *blog post* about the benefits of meditation for mental health. It explores how mindfulness practices can reduce stress and improve overall well-being."
Example 5: "This is a *report* on the economic impact of the COVID-19 pandemic. It analyzes job losses, government responses, and the path to recovery."
Example 6: "This is a *research paper* on the effects of social media on youth mental health. It examines the correlation between social media use and anxiety, depression, and self-esteem issues."
Example 7: "This is an *opinion piece* on the importance of biodiversity conservation. It argues for stronger environmental policies to protect endangered species and ecosystems."
"""
short_summary_response = llm.generate(query=prompt)
short_summary = short_summary_response.content.strip()
# Create summary document
summary_doc = {
"text_sum": summary,
"meta": {
"model": llm.model,
"date": datetime.now().strftime("%Y-%m-%d"),
},
"short_summary": short_summary,
}
# Process chunks if they exist
chunks = doc_data.get("chunks", [])
if chunks:
doc_data["chunks"] = summarise_chunks(chunks, is_sci=is_sci)
# Get database name and validate it
db_name = doc_data.get("arango_db_name")
if not db_name:
logger.error("Missing database name")
return
# Update document in ArangoDB
arango = ArangoDB(db_name=db_name)
arango.db.update_document(
{"summary": summary_doc, "_id": _id, "chunks": doc_data["chunks"]},
silent=True,
check_rev=False,
)
# Update ChromaDB with the new summary
chroma = ChromaDB()
if db_name == "sci_articles":
chroma.add_document(
collection="sci_articles_article_summaries",
document_id= doc_data["_key"],
text=summary_doc["text_sum"],
metadata={
"model": summary_doc["meta"]["model"],
"date": datetime.now().strftime("%Y-%m-%d"),
"arango_id": _id,
"arango_db_name": db_name,
},
)
# Save the latest result
save_latest_result({"summary": summary_doc, "_id": _id, "chunks": doc_data["chunks"]})
logger.info(f"Successfully processed document {_id}")
except Exception as e:
# Log error with document ID if available
doc_id = doc_data.get("arango_id", "unknown")
logger.error(f'Error processing document ID: {doc_id}')
logger.error(f"Error in summarise_document_task: {e}")
def summarise_chunks(chunks: list, is_sci=False):
"""
Summarize chunks of text in a document using a language model.
For each chunk in the document that doesn't already have a summary, this function:
1. Generates a summary of the chunk text
2. Creates tags for the chunk
3. If is_sci=True, extracts scientific references from the chunk
Parameters
----------
chunks: list
A list of dictionaries representing chunks of text from a document.
Each chunk should have a "text" field containing the text to summarize.
is_sci : bool, default=False
If True, uses a scientific article summarization prompt and extracts references.
If False, uses a general article summarization prompt.
Returns
-------
list
A list of updated chunks containing summaries, tags, and metadata.
Raises
------
Exception
If there's an error processing a chunk.
Notes
-----
- Chunks that already have a "summary" field are skipped.
- The function uses an LLM instance with a system prompt tailored to the document type.
- The structured response is validated against the ArticleChunk model.
"""
if is_sci:
system_message = """You are a science assistant summarizing scientific articles.
You will get an article chunk by chunk, and you have three tasks for each chunk:
1. Summarize the content of the chunk.
2. Tag the chunk with relevant tags.
3. Extract the scientific references from the chunk.
"""
else:
system_message = """You are a general assistant summarizing articles.
You will get an article chunk by chunk, and you have two tasks for each chunk:
1. Summarize the content of the chunk.
2. Tag the chunk with relevant tags.
"""
system_message += """\nPlease make use of the previous chunks you have already seen to understand the current chunk in context and make the summary stand for itself. But remember, *it is the current chunk you are summarizing*
ONLY use the information in the chunks to make the summary, and do not add any information that is not in the chunks."""
llm = LLM(system_message=system_message)
new_chunks = []
for chunk in chunks:
if "summary" in chunk:
new_chunks.append(chunk)
continue
prompt = f"""Summarize the following text to make it stand on its own:\n
'''
{chunk['text']}
'''\n
Your tasks are:
1. Summarize the content of the chunk. Make sure to include all relevant details!
2. Tag the chunk with relevant tags.
"""
if is_sci:
prompt += "\n3. Extract the scientific references mentioned in this specific chunk. If there is a DOI reference, include that in the reference. Sometimes the reference is only a number in brackets, like [1], so make sure to include that as well (in brackets)."
prompt += "\nONLY use the information in the chunks to make the summary, and do not add any information that is not in the chunks."
try:
response = llm.generate(prompt, format=ArticleChunk.model_json_schema())
structured_response = ArticleChunk.model_validate_json(response.content)
chunk["summary"] = structured_response.summary
chunk["tags"] = [i.lower() for i in structured_response.tags]
# Add references for scientific articles if they exist in the response
if is_sci and hasattr(structured_response, 'references') and structured_response.references:
chunk["references"] = structured_response.references
chunk["summary_meta"] = {
"model": llm.model,
"date": datetime.now().strftime("%Y-%m-%d"),
}
except Exception as e:
logger.error(f"Error processing chunk: {e}")
# Continue processing other chunks even if one fails
chunk["summary"] = "Error processing chunk"
chunk["tags"] = []
new_chunks.append(chunk)
return new_chunks
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)