You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
439 lines
18 KiB
439 lines
18 KiB
from fastapi import FastAPI, BackgroundTasks, Request |
|
from fastapi.responses import JSONResponse, HTMLResponse |
|
import logging |
|
from datetime import datetime |
|
import json |
|
import os |
|
from typing import Dict, Any |
|
|
|
from prompts import get_summary_prompt |
|
from _llm import LLM |
|
from _arango import ArangoDB |
|
from models import ArticleChunk |
|
from _chromadb import ChromaDB |
|
|
|
|
|
app = FastAPI() |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
# Storage for the latest processed document |
|
latest_result: Dict[str, Any] = {} |
|
latest_result_file = os.path.join(os.path.dirname(__file__), "latest_summary_result.json") |
|
|
|
# Load any previously saved result on startup |
|
try: |
|
if os.path.exists(latest_result_file): |
|
with open(latest_result_file, 'r') as f: |
|
latest_result = json.load(f) |
|
logger.info(f"Loaded previous result from {latest_result_file}") |
|
except Exception as e: |
|
logger.warning(f"Could not load previous result: {e}") |
|
|
|
# Function to save the latest result to disk |
|
def save_latest_result(result: Dict[str, Any]): |
|
global latest_result |
|
latest_result = result |
|
try: |
|
# Save sanitized version (remove internal fields if needed) |
|
result_to_save = {k: v for k, v in result.items() if not k.startswith('_') or k == '_id'} |
|
with open(latest_result_file, 'w') as f: |
|
json.dump(result_to_save, f, indent=2) |
|
logger.info(f"Saved latest result to {latest_result_file}") |
|
except Exception as e: |
|
logger.error(f"Error saving latest result: {e}") |
|
|
|
# New endpoint to get the latest summarized document |
|
@app.get("/latest_result") |
|
async def get_latest_result(): |
|
""" |
|
Get the latest summarized document result. |
|
|
|
Returns the most recently processed document summary and chunk information. |
|
If no document has been processed yet, returns an empty object. |
|
|
|
Returns |
|
------- |
|
dict |
|
The latest processed document with summaries |
|
""" |
|
if not latest_result: |
|
return {"message": "No documents have been processed yet"} |
|
return latest_result |
|
|
|
@app.get("/view_results") |
|
async def view_results(): |
|
""" |
|
View the latest summarization results in a more readable format. |
|
|
|
Returns a formatted response with document summary and chunks. |
|
|
|
Returns |
|
------- |
|
dict |
|
A formatted representation of the latest summarized document |
|
""" |
|
if not latest_result: |
|
return {"message": "No documents have been processed yet"} |
|
|
|
# Extract the key information |
|
formatted_result = { |
|
"document_id": latest_result.get("_id", "Unknown"), |
|
"timestamp": datetime.now().isoformat(), |
|
"summary": latest_result.get("summary", {}).get("text_sum", "No summary available"), |
|
"model": latest_result.get("summary", {}).get("meta", {}).get("model", "Unknown model"), |
|
} |
|
|
|
# Format chunks information if available |
|
chunks = latest_result.get("chunks", []) |
|
if chunks: |
|
formatted_chunks = [] |
|
for i, chunk in enumerate(chunks): |
|
chunk_data = { |
|
"chunk_number": i + 1, |
|
"summary": chunk.get("summary", "No summary available"), |
|
"tags": chunk.get("tags", []) |
|
} |
|
# Add references for scientific articles if available |
|
if "references" in chunk: |
|
chunk_data["references"] = chunk.get("references", []) |
|
formatted_chunks.append(chunk_data) |
|
|
|
formatted_result["chunks"] = formatted_chunks |
|
formatted_result["chunk_count"] = len(chunks) |
|
|
|
return formatted_result |
|
|
|
@app.get("/html_results", response_class=HTMLResponse) |
|
async def html_results(): |
|
""" |
|
View the latest summarization results in a human-readable HTML format. |
|
""" |
|
if not latest_result: |
|
return """ |
|
<html> |
|
<head> |
|
<title>No Results Available</title> |
|
<style> |
|
body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; } |
|
</style> |
|
</head> |
|
<body> |
|
<h1>No Documents Have Been Processed Yet</h1> |
|
<p>Submit a document for summarization first.</p> |
|
</body> |
|
</html> |
|
""" |
|
|
|
# Get the document ID and summary |
|
doc_id = latest_result.get("_id", "Unknown") |
|
summary = latest_result.get("summary", {}).get("text_sum", "No summary available") |
|
model = latest_result.get("summary", {}).get("meta", {}).get("model", "Unknown model") |
|
|
|
# Format chunks |
|
chunks_html = "" |
|
chunks = latest_result.get("chunks", []) |
|
for i, chunk in enumerate(chunks): |
|
chunk_summary = chunk.get("summary", "No summary available") |
|
tags = chunk.get("tags", []) |
|
tags_html = ", ".join(tags) if tags else "None" |
|
|
|
references_html = "" |
|
if "references" in chunk and chunk["references"]: |
|
references_html = "<h4>References:</h4><ul>" |
|
for ref in chunk["references"]: |
|
references_html += f"<li>{ref}</li>" |
|
references_html += "</ul>" |
|
|
|
chunks_html += f""" |
|
<div class="chunk"> |
|
<h3>Chunk {i+1}</h3> |
|
<div class="chunk-summary">{chunk_summary}</div> |
|
<div class="chunk-tags"><strong>Tags:</strong> {tags_html}</div> |
|
{references_html} |
|
</div> |
|
<hr> |
|
""" |
|
|
|
html_content = f""" |
|
<html> |
|
<head> |
|
<title>Document Summary: {doc_id}</title> |
|
<style> |
|
body {{ font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; max-width: 1000px; margin: 0 auto; padding: 20px; }} |
|
h1, h2, h3 {{ color: #333; }} |
|
.summary {{ background-color: #f9f9f9; padding: 15px; border-left: 4px solid #4CAF50; margin-bottom: 20px; }} |
|
.chunk {{ background-color: #f5f5f5; padding: 15px; margin-bottom: 10px; border-radius: 4px; }} |
|
.chunk-tags {{ margin-top: 10px; font-style: italic; }} |
|
.metadata {{ color: #666; font-size: 0.9em; margin-bottom: 20px; }} |
|
hr {{ border: 0; height: 1px; background: #ddd; margin: 20px 0; }} |
|
.refresh-button {{ padding: 10px 15px; background-color: #4CAF50; color: white; border: none; cursor: pointer; border-radius: 4px; }} |
|
.refresh-button:hover {{ background-color: #45a049; }} |
|
</style> |
|
</head> |
|
<body> |
|
<h1>Document Summary</h1> |
|
<div class="metadata"> |
|
<strong>Document ID:</strong> {doc_id}<br> |
|
<strong>Model:</strong> {model}<br> |
|
<strong>Generated:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
</div> |
|
|
|
<h2>Summary</h2> |
|
<div class="summary">{summary}</div> |
|
|
|
<h2>Chunks ({len(chunks)})</h2> |
|
{chunks_html} |
|
|
|
<button class="refresh-button" onclick="window.location.reload()">Refresh Results</button> |
|
</body> |
|
</html> |
|
""" |
|
|
|
return html_content |
|
|
|
@app.post("/summarise_document") |
|
async def summarize_document(request: Request, background_tasks: BackgroundTasks): |
|
try: |
|
data = await request.json() |
|
logger.info(f"Received data: {data}") |
|
|
|
# Extract arango_id, checking both top-level field and inside arango_doc |
|
arango_doc = data.get('arango_doc', {}) or {} |
|
arango_id = arango_doc.get('_id', '') |
|
|
|
|
|
arango_db_name = data.get('arango_db_name', '').strip() |
|
if not arango_db_name: |
|
return JSONResponse( |
|
status_code=400, |
|
content={"detail": "Missing required field: arango_db_name"}, |
|
) |
|
|
|
print(arango_doc) |
|
# Prepare data for processing |
|
data['text'] = arango_doc.get('text', '').strip() |
|
data['chunks'] = arango_doc.get('chunks', []) |
|
data['arango_db_name'] = arango_db_name |
|
data['arango_id'] = arango_id |
|
data["arango_key"] = arango_doc['_key'] |
|
data['is_sci'] = data.get('is_sci', False) |
|
|
|
background_tasks.add_task(summarise_document_task, data) |
|
return {"message": "Document summarization has started."} |
|
except Exception as e: |
|
logger.error(f"Error in summarize_document: {e}") |
|
return JSONResponse( |
|
status_code=500, |
|
content={"detail": f"An unexpected error occurred: {str(e)}"}, |
|
) |
|
|
|
def summarise_document_task(doc_data: dict): |
|
try: |
|
# Get document ID and validate it |
|
_id = doc_data.get("arango_id", "") |
|
|
|
# Validate document ID - it should be in format "collection/key" |
|
if not _id or '/' not in _id: |
|
logger.error(f"Invalid document ID format: {_id}") |
|
return |
|
|
|
text = doc_data.get("text", "") |
|
is_sci = doc_data.get("is_sci", False) |
|
|
|
# Get collection name from document ID |
|
collection = _id.split('/')[0] |
|
|
|
# Set appropriate system message based on document type |
|
if collection == 'interviews': |
|
system_message = "You are summarising interview transcripts. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English." |
|
elif is_sci or collection == 'sci_articles': |
|
system_message = "You are summarising scientific articles. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English." |
|
else: |
|
system_message = "You are summarising a document. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English." |
|
|
|
# Initialize LLM and generate summary |
|
llm = LLM(system_message=system_message, chat=True) |
|
|
|
#if 'abstract' |
|
|
|
|
|
if 'summary' in doc_data and 'text_sum' in doc_data['summary']: |
|
# If a summary already exists, use it instead of generating a new one |
|
summary = doc_data['summary']['text_sum'] |
|
llm.messages.append({"role": "user", "content": 'Make a summary of this text:\n[text removed in chat history]'}, |
|
{"role": "assistant", "content": summary}) |
|
|
|
else: |
|
prompt = get_summary_prompt(text, is_sci) |
|
response = llm.generate(query=prompt) |
|
summary = response.content |
|
|
|
prompt = """Thanks! Now make a very short summary of the text, that is no longer than 50 words. |
|
The summary should give an idea of what sort of text it is, and what the main points are. |
|
Below are some examples of how to write the short summary: |
|
Example 1: "This is a *scientific article* about the effects of climate change on polar bears. It discusses the impact of melting ice caps on their habitat and food sources." |
|
Example 2: "This is an *interview* with a climate scientist discussing the challenges of communicating climate change to the public. It highlights the importance of clear messaging and public engagement." |
|
Example 3: "This is a *news article* about the latest developments in renewable energy technology. It covers advancements in solar and wind power, and their potential impact on reducing carbon emissions." |
|
Example 4: "This is a *blog post* about the benefits of meditation for mental health. It explores how mindfulness practices can reduce stress and improve overall well-being." |
|
Example 5: "This is a *report* on the economic impact of the COVID-19 pandemic. It analyzes job losses, government responses, and the path to recovery." |
|
Example 6: "This is a *research paper* on the effects of social media on youth mental health. It examines the correlation between social media use and anxiety, depression, and self-esteem issues." |
|
Example 7: "This is an *opinion piece* on the importance of biodiversity conservation. It argues for stronger environmental policies to protect endangered species and ecosystems." |
|
""" |
|
|
|
short_summary_response = llm.generate(query=prompt) |
|
short_summary = short_summary_response.content.strip() |
|
|
|
|
|
# Create summary document |
|
summary_doc = { |
|
"text_sum": summary, |
|
"meta": { |
|
"model": llm.model, |
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
}, |
|
"short_summary": short_summary, |
|
} |
|
|
|
# Process chunks if they exist |
|
chunks = doc_data.get("chunks", []) |
|
|
|
if chunks: |
|
doc_data["chunks"] = summarise_chunks(chunks, is_sci=is_sci) |
|
|
|
# Get database name and validate it |
|
db_name = doc_data.get("arango_db_name") |
|
if not db_name: |
|
logger.error("Missing database name") |
|
return |
|
|
|
# Update document in ArangoDB |
|
arango = ArangoDB(db_name=db_name) |
|
arango.db.update_document( |
|
{"summary": summary_doc, "_id": _id, "chunks": doc_data["chunks"]}, |
|
silent=True, |
|
check_rev=False, |
|
) |
|
|
|
# Update ChromaDB with the new summary |
|
chroma = ChromaDB() |
|
if db_name == "sci_articles": |
|
chroma.add_document( |
|
collection="sci_articles_article_summaries", |
|
document_id= doc_data["_key"], |
|
text=summary_doc["text_sum"], |
|
metadata={ |
|
"model": summary_doc["meta"]["model"], |
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
"arango_id": _id, |
|
"arango_db_name": db_name, |
|
}, |
|
) |
|
|
|
|
|
|
|
# Save the latest result |
|
save_latest_result({"summary": summary_doc, "_id": _id, "chunks": doc_data["chunks"]}) |
|
logger.info(f"Successfully processed document {_id}") |
|
|
|
except Exception as e: |
|
# Log error with document ID if available |
|
doc_id = doc_data.get("arango_id", "unknown") |
|
logger.error(f'Error processing document ID: {doc_id}') |
|
logger.error(f"Error in summarise_document_task: {e}") |
|
|
|
|
|
|
|
def summarise_chunks(chunks: list, is_sci=False): |
|
""" |
|
Summarize chunks of text in a document using a language model. |
|
For each chunk in the document that doesn't already have a summary, this function: |
|
1. Generates a summary of the chunk text |
|
2. Creates tags for the chunk |
|
3. If is_sci=True, extracts scientific references from the chunk |
|
Parameters |
|
---------- |
|
chunks: list |
|
A list of dictionaries representing chunks of text from a document. |
|
Each chunk should have a "text" field containing the text to summarize. |
|
is_sci : bool, default=False |
|
If True, uses a scientific article summarization prompt and extracts references. |
|
If False, uses a general article summarization prompt. |
|
Returns |
|
------- |
|
list |
|
A list of updated chunks containing summaries, tags, and metadata. |
|
Raises |
|
------ |
|
Exception |
|
If there's an error processing a chunk. |
|
Notes |
|
----- |
|
- Chunks that already have a "summary" field are skipped. |
|
- The function uses an LLM instance with a system prompt tailored to the document type. |
|
- The structured response is validated against the ArticleChunk model. |
|
""" |
|
|
|
if is_sci: |
|
system_message = """You are a science assistant summarizing scientific articles. |
|
You will get an article chunk by chunk, and you have three tasks for each chunk: |
|
1. Summarize the content of the chunk. |
|
2. Tag the chunk with relevant tags. |
|
3. Extract the scientific references from the chunk. |
|
""" |
|
else: |
|
system_message = """You are a general assistant summarizing articles. |
|
You will get an article chunk by chunk, and you have two tasks for each chunk: |
|
1. Summarize the content of the chunk. |
|
2. Tag the chunk with relevant tags. |
|
""" |
|
|
|
system_message += """\nPlease make use of the previous chunks you have already seen to understand the current chunk in context and make the summary stand for itself. But remember, *it is the current chunk you are summarizing* |
|
ONLY use the information in the chunks to make the summary, and do not add any information that is not in the chunks.""" |
|
|
|
llm = LLM(system_message=system_message) |
|
new_chunks = [] |
|
for chunk in chunks: |
|
if "summary" in chunk: |
|
new_chunks.append(chunk) |
|
continue |
|
prompt = f"""Summarize the following text to make it stand on its own:\n |
|
''' |
|
{chunk['text']} |
|
'''\n |
|
Your tasks are: |
|
1. Summarize the content of the chunk. Make sure to include all relevant details! |
|
2. Tag the chunk with relevant tags. |
|
""" |
|
if is_sci: |
|
prompt += "\n3. Extract the scientific references mentioned in this specific chunk. If there is a DOI reference, include that in the reference. Sometimes the reference is only a number in brackets, like [1], so make sure to include that as well (in brackets)." |
|
prompt += "\nONLY use the information in the chunks to make the summary, and do not add any information that is not in the chunks." |
|
|
|
try: |
|
response = llm.generate(prompt, format=ArticleChunk.model_json_schema()) |
|
structured_response = ArticleChunk.model_validate_json(response.content) |
|
chunk["summary"] = structured_response.summary |
|
chunk["tags"] = [i.lower() for i in structured_response.tags] |
|
|
|
# Add references for scientific articles if they exist in the response |
|
if is_sci and hasattr(structured_response, 'references') and structured_response.references: |
|
chunk["references"] = structured_response.references |
|
|
|
chunk["summary_meta"] = { |
|
"model": llm.model, |
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
} |
|
except Exception as e: |
|
logger.error(f"Error processing chunk: {e}") |
|
# Continue processing other chunks even if one fails |
|
chunk["summary"] = "Error processing chunk" |
|
chunk["tags"] = [] |
|
new_chunks.append(chunk) |
|
|
|
return new_chunks |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8100) |