You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

494 lines
20 KiB

import streamlit as st
from datetime import datetime
from colorprinter.print_color import *
from _base_class import StreamlitBaseClass
from projects_page import Project
from agent_research import (
ResearchReport,
MasterAgent,
StructureAgent,
ToolAgent,
ArchiveAgent,
)
import os
import json
class ResearchPage(StreamlitBaseClass):
"""
ResearchPage - A Streamlit interface for deep research using AI agents.
This class provides a user interface for conducting in-depth research using
multiple specialized AI agents working together. It allows users to input
research questions, track progress, and view detailed research reports.
Attributes:
username (str): The username of the current user.
project_name (str): Name of the selected project.
project (Project): Project instance the research is associated with.
page_name (str): Name of the current page ("Research").
research_state (dict): Dictionary tracking the current state of research.
report (ResearchReport): Instance for tracking research progress and results.
Methods:
run(): Main method to render the research interface and handle interactions.
sidebar_actions(): Renders sidebar elements for selecting projects and research options.
start_new_research(): Initiates a new research session.
view_saved_reports(): Displays a list of saved research reports.
display_report(): Renders a research report in the Streamlit interface.
show_research_progress(): Displays the current research progress.
"""
def __init__(self, username):
super().__init__(username=username)
self.project_name = None
self.project = None
self.page_name = "Research"
# Research state tracking
self.research_state = {
"in_progress": False,
"completed": False,
"question": None,
"started_at": None,
"report": None,
"current_step": None,
"steps_completed": 0,
"total_steps": 0,
}
self.report = None
# Initialize attributes from session state if available
if self.page_name in st.session_state:
for k, v in st.session_state[self.page_name].items():
setattr(self, k, v)
# Create reports directory if it doesn't exist
os.makedirs(f"/home/lasse/sci/reports", exist_ok=True)
def run(self):
self.update_current_page("Research")
self.sidebar_actions()
st.title("Deep Research")
if not self.project:
st.warning("Please select a project to start researching.")
return
# Main interface
if self.research_state["in_progress"]:
self.show_research_progress()
elif self.research_state["completed"]:
self.display_report(self.research_state["report"])
else:
# Input for new research
st.subheader(f"New Research for Project: {self.project_name}")
with st.form("research_form"):
question = st.text_area(
"Enter your research question:",
help="Be specific about what you want to research. Complex questions will be broken down into sub-questions.",
)
start_button = st.form_submit_button("Start Research")
if start_button and question:
self.start_new_research(question)
st.rerun()
# Option to view saved reports
with st.expander("View Saved Reports"):
self.view_saved_reports()
def sidebar_actions(self):
with st.sidebar:
with st.form("select_project"):
self.project = self.choose_project("Project for research:")
submitted = st.form_submit_button("Select Project")
if submitted and self.project:
self.project_name = self.project.name
st.success(f"Selected project: {self.project_name}")
if self.research_state["in_progress"]:
st.info(f"Research in progress: {self.research_state['question']}")
if st.button("Cancel Research"):
self.research_state["in_progress"] = False
st.rerun()
elif self.research_state["completed"]:
if st.button("Start New Research"):
self.research_state["completed"] = False
self.research_state["report"] = None
st.rerun()
def start_new_research(self, question):
"""Initiates a new research session with the given question"""
self.research_state["question"] = question
self.research_state["in_progress"] = True
self.research_state["completed"] = False
self.research_state["started_at"] = datetime.now().isoformat()
# Initialize the research report
self.report = ResearchReport(
question=question, username=self.username, project_name=self.project_name
)
# Save current state
st.session_state[self.page_name] = {
"project_name": self.project_name,
"project": self.project,
"research_state": self.research_state,
"report": self.report,
}
# Start a new thread to run the research process
# In a production environment, you might want to use a background job
# For now, we'll run it in the main thread with streamlit spinner
with st.spinner("Research in progress... This may take several minutes."):
try:
# Initialize agents
master_agent = MasterAgent(
username=self.username,
project=self.project,
report=self.report,
chat=True,
)
structure_agent = StructureAgent(
username=self.username, model="small", report=self.report
)
tool_agent = ToolAgent(
username=self.username,
model="tools",
system_message="You are an assistant with tools. Always choose a tool to help with the task.",
report=self.report,
project=self.project,
chat=True,
)
archive_agent = ArchiveAgent(
username=self.username,
report=self.report,
project=self.project,
system_message="You are an assistant specialized in reading and summarizing research information.",
chat=True,
)
# Track the research state in the master agent
master_agent.research_state["original_question"] = question
# Execute the research workflow
# 1. Create research plan
st.text("Creating research plan...")
research_plan = master_agent.make_plan(question)
self.report.log_plan(research_plan)
# 2. Structure the plan
st.text("Structuring research plan...")
structured_plan = structure_agent.make_structured(
research_plan, question
)
self.report.log_plan(research_plan, structured_plan.model_dump())
# Update total steps count
self.research_state["total_steps"] = len(structured_plan.steps)
# 3. Execute the plan step by step
execution_results = {}
for step_name, tasks in structured_plan.steps.items():
st.text(f"Processing step: {step_name}")
self.research_state["current_step"] = step_name
self.research_state["steps_completed"] += 1
# Collect all task descriptions in this step
step_tasks = [
{"task_name": task_name, "task_description": task_description}
for task_name, task_description in tasks
]
# Process the entire step
step_result = master_agent.process_step(step_name, step_tasks)
execution_results[step_name] = step_result
# 4. Evaluate if more steps are needed
st.text("Evaluating research plan...")
plan_evaluation = master_agent.evaluate_plan(execution_results)
self.report.log_plan_evaluation(plan_evaluation)
# 5. Write the final report
st.text("Writing final report...")
final_report = master_agent.write_report(execution_results)
self.report.log_final_report(final_report)
# 6. Save the reports
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
report_path = f"/home/lasse/sci/reports/research_report_{self.username}_{timestamp}"
# Save JSON report
json_path = f"{report_path}.json"
with open(json_path, "w") as f:
json.dump(self.report.get_full_report(), f, indent=2)
# Save markdown report
markdown_report = self.report.get_markdown_report()
markdown_path = f"{report_path}.md"
with open(markdown_path, "w") as f:
f.write(markdown_report)
# Update research state
self.research_state["in_progress"] = False
self.research_state["completed"] = True
self.research_state["report"] = {
"json_path": json_path,
"markdown_path": markdown_path,
"report_data": self.report.get_full_report(),
"markdown_content": markdown_report,
}
except Exception as e:
st.error(f"An error occurred during research: {str(e)}")
import traceback
st.code(traceback.format_exc())
self.research_state["in_progress"] = False
# Update session state
st.session_state[self.page_name] = {
"project_name": self.project_name,
"project": self.project,
"research_state": self.research_state,
"report": self.report,
}
def view_saved_reports(self):
"""Displays a list of saved research reports"""
reports_dir = "/home/lasse/sci/reports"
if not os.path.exists(reports_dir):
st.info("No saved reports found.")
return
# Get all report files
json_files = [
f
for f in os.listdir(reports_dir)
if f.endswith(".json") and f.startswith("research_report")
]
if not json_files:
st.info("No saved reports found.")
return
for file in sorted(json_files, reverse=True):
file_path = os.path.join(reports_dir, file)
try:
with open(file_path, "r") as f:
report_data = json.load(f)
# Extract basic info
question = report_data.get("metadata", {}).get(
"question", "Unknown question"
)
project = report_data.get("metadata", {}).get(
"project_name", "No project"
)
started_at = report_data.get("metadata", {}).get(
"started_at", "Unknown time"
)
# Format the date
try:
date_obj = datetime.fromisoformat(started_at)
date_str = date_obj.strftime("%Y-%m-%d %H:%M")
except:
date_str = started_at
# Create an expandable section for each report
st.markdown(f"_{question} ({project} - {date_str})_")
st.markdown(f"**Project:** {project}")
st.markdown(f"**Date:** {date_str}")
# Button to view full report
if st.button("View Full Report", key=f"view_{file}"):
# Load corresponding markdown file if it exists
md_file = file.replace(".json", ".md")
md_path = os.path.join(reports_dir, md_file)
if os.path.exists(md_path):
with open(md_path, "r") as f:
markdown_content = f.read()
else:
markdown_content = None
self.research_state["completed"] = True
self.research_state["report"] = {
"json_path": file_path,
"markdown_path": md_path if os.path.exists(md_path) else None,
"report_data": report_data,
"markdown_content": markdown_content,
}
st.rerun()
except Exception as e:
st.error(f"Error loading report {file}: {str(e)}")
def display_report(self, report_data):
"""Renders a research report in the Streamlit interface"""
if not report_data:
st.warning("No report data available.")
return
st.title("Research Report")
# Get report data
markdown_content = report_data.get("markdown_content")
json_data = report_data.get("report_data")
if markdown_content:
# Display the markdown report
st.markdown(markdown_content)
elif json_data:
# Fallback to displaying JSON data in a more readable format
question = json_data.get("metadata", {}).get("question", "Unknown question")
st.header(f"Research on: {question}")
# Display metadata
st.subheader("Metadata")
metadata = json_data.get("metadata", {})
st.markdown(f"**Project:** {metadata.get('project_name', 'None')}")
st.markdown(f"**Started:** {metadata.get('started_at', 'Unknown')}")
st.markdown(f"**Finished:** {metadata.get('finished_at', 'Unknown')}")
# Display final report
st.subheader("Research Findings")
st.markdown(json_data.get("final_report", "No final report available."))
# Display steps
st.subheader("Research Steps")
steps = json_data.get("steps", {})
for step_name, step_data in steps.items():
with st.expander(step_name):
st.markdown(
f"**Summary:** {step_data.get('summary', 'No summary available.')}"
)
# Display tools used
st.markdown("**Tools used:**")
for tool in step_data.get("tools_used", []):
st.markdown(
f"- {tool.get('tool', 'Unknown tool')} with query: _{tool.get('args', {}).get('query', 'No query')}_"
)
else:
st.error("No report content available to display.")
# Download buttons
col1, col2 = st.columns(2)
with col1:
if report_data.get("markdown_path") and os.path.exists(
report_data["markdown_path"]
):
with open(report_data["markdown_path"], "r") as f:
markdown_content = f.read()
st.download_button(
label="Download as Markdown",
data=markdown_content,
file_name=os.path.basename(report_data["markdown_path"]),
mime="text/markdown",
)
with col2:
if report_data.get("json_path") and os.path.exists(
report_data["json_path"]
):
with open(report_data["json_path"], "r") as f:
json_content = f.read()
st.download_button(
label="Download as JSON",
data=json_content,
file_name=os.path.basename(report_data["json_path"]),
mime="application/json",
)
def show_research_progress(self):
"""Displays the current research progress"""
st.subheader("Research in Progress")
st.markdown(f"**Question:** {self.research_state['question']}")
# Show progress bar
progress = 0
if self.research_state["total_steps"] > 0:
progress = (
self.research_state["steps_completed"]
/ self.research_state["total_steps"]
)
st.progress(progress)
# Show current step
current_step = self.research_state.get("current_step", "Planning")
st.markdown(f"**Current step:** {current_step}")
# Display research plan and progress in expandable sections
if self.report:
with st.expander("Research Plan", expanded=True):
if self.report.report["plan"]["original_text"]:
st.markdown("### Original Research Plan")
st.markdown(self.report.report["plan"]["original_text"])
if self.report.report["plan"]["structured"]:
st.markdown("### Structured Plan")
structured_plan = self.report.report["plan"]["structured"]
for step_name, tasks in structured_plan.get("steps", {}).items():
st.markdown(f"**{step_name}**")
for task_name, task_description in tasks:
st.markdown(f"- {task_name}: {task_description}")
# Show completed steps
if self.report.report["steps"]:
with st.expander("Completed Steps", expanded=True):
for step_name, step_data in self.report.report["steps"].items():
# Check if step is finished
if step_data.get("finished_at"):
st.markdown(f"### {step_name}")
if step_data.get("summary"):
st.markdown(f"**Summary:** {step_data['summary']}")
# Show tools used
if step_data.get("tools_used"):
st.markdown("**Tools used:**")
for tool in step_data["tools_used"]:
st.markdown(
f"- {tool.get('tool')} with query: _{tool.get('args', {}).get('query', 'No query')}_"
)
# Show information gathering in the current step
current_step_data = self.report.report["steps"].get(current_step, {})
if current_step_data and not current_step_data.get("finished_at"):
with st.expander("Current Step Progress", expanded=True):
st.markdown(f"### {current_step}")
# Show tools used in current step
if current_step_data.get("tools_used"):
st.markdown("**Tools used so far:**")
for tool in current_step_data["tools_used"]:
st.markdown(
f"- {tool.get('tool')} with query: _{tool.get('args', {}).get('query', 'No query')}_"
)
# Show information gathered so far
if current_step_data.get("information_gathered"):
st.markdown("**Information gathered:**")
sources_seen = set()
for info in current_step_data["information_gathered"]:
for source in info.get("sources", []):
if source not in sources_seen:
st.markdown(f"- {source}")
sources_seen.add(source)
st.info(
"Research is ongoing. This may take several minutes depending on the complexity of the question."
)
st.warning(
"Please do not navigate away from this page while research is in progress."
)