You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1448 lines
58 KiB

from _llm import LLM
from streamlit_chatbot import Bot
from typing import Dict, List, Tuple, Optional, Any
from colorprinter.print_color import *
from projects_page import Project
from _base_class import BaseClass
from prompts import get_tools_prompt
import time
import traceback
import json
from datetime import datetime
import llm_queries
from models import EvaluateFormat, Plan, ChunkSearchResults, UnifiedSearchResults, UnifiedDataChunk, UnifiedToolResponse
class ResearchReport:
"""Class for tracking and logging decisions and data access during research"""
def __init__(self, question, username, project_name=None):
self.report = {
"metadata": {
"question": question,
"username": username,
"project_name": project_name,
"started_at": datetime.now().isoformat(),
"finished_at": None,
},
"plan": {"original_text": None, "structured": None, "subquestions": []},
"steps": {},
"evaluation": None,
"final_report": None,
"statistics": {
"tools_used": {},
"sources_accessed": [],
"total_time": None,
},
}
# Track current context for easier logging
self.current_step = None
self.current_task = None
def log_plan(self, original_plan, structured_plan=None):
"""Log the research plan"""
self.report["plan"]["original_text"] = original_plan
if structured_plan:
self.report["plan"]["structured"] = structured_plan
def start_step(self, step_name):
"""Mark the beginning of a new step"""
self.current_step = step_name
if step_name not in self.report["steps"]:
self.report["steps"][step_name] = {
"started_at": datetime.now().isoformat(),
"finished_at": None,
"tasks": {},
"tools_used": [],
"information_gathered": [],
"summary": None,
"evaluation": None,
}
return self.current_step
def start_task(self, task_name, task_description):
"""Mark the beginning of a new task within the current step"""
if not self.current_step:
raise ValueError("Cannot start task without active step")
self.current_task = task_name
self.report["steps"][self.current_step]["tasks"][task_name] = {
"description": task_description,
"started_at": datetime.now().isoformat(),
"finished_at": None,
"tools_used": [],
"information_gathered": [],
}
return self.current_task
def log_tool_use(self, tool_name, tool_args):
"""Log when a tool is used"""
if not self.current_step:
raise ValueError("Cannot log tool use without active step")
# Add to step level
self.report["steps"][self.current_step]["tools_used"].append(
{
"tool": tool_name,
"args": tool_args,
"timestamp": datetime.now().isoformat(),
}
)
# Add to task level if we have an active task
if self.current_task:
self.report["steps"][self.current_step]["tasks"][self.current_task][
"tools_used"
].append(
{
"tool": tool_name,
"args": tool_args,
"timestamp": datetime.now().isoformat(),
}
)
# Update global statistics
if tool_name in self.report["statistics"]["tools_used"]:
self.report["statistics"]["tools_used"][tool_name] += 1
else:
self.report["statistics"]["tools_used"][tool_name] = 1
def log_information(self, information):
"""Log information gathered from tools"""
if not self.current_step:
raise ValueError("Cannot log information without active step")
# Process information to extract sources
sources = self._extract_sources(information)
# Add unique sources to global statistics
for source in sources:
if source not in self.report["statistics"]["sources_accessed"]:
self.report["statistics"]["sources_accessed"].append(source)
# Add to step level
self.report["steps"][self.current_step]["information_gathered"].append(
{
"data": information,
"sources": sources,
"timestamp": datetime.now().isoformat(),
}
)
# Add to task level if we have an active task
if self.current_task:
self.report["steps"][self.current_step]["tasks"][self.current_task][
"information_gathered"
].append(
{
"data": information,
"sources": sources,
"timestamp": datetime.now().isoformat(),
}
)
def _extract_sources(self, information):
"""Extract source information from gathered data"""
sources = []
# Handle different result formats
for item in information:
try:
if "result" in item and "content" in item["result"]:
if isinstance(item["result"]["content"], dict):
# Handle structured content like chunks
for title, group in item["result"]["content"].items():
if "chunks" in group:
for chunk in group["chunks"]:
metadata = chunk.get("metadata", {})
source = f"{metadata.get('title', 'Unknown')}"
if metadata.get("journal"):
source += f" ({metadata.get('journal')})"
if source not in sources:
sources.append(source)
except Exception as e:
print_yellow(f"Error extracting sources: {e}")
sources.append("No source")
return sources
def update_step_summary(self, summary):
"""Log summary of gathered information"""
if not self.current_step:
raise ValueError("Cannot log summary without active step")
self.report["steps"][self.current_step]["summary"] = remove_thinking(summary)
def log_evaluation(self, evaluation):
"""Log evaluation of gathered information"""
if not self.current_step:
raise ValueError("Cannot log evaluation without active step")
self.report["steps"][self.current_step]["evaluation"] = evaluation
def finish_task(self):
"""Mark the end of the current task"""
if not self.current_step or not self.current_task:
raise ValueError("No active task to finish")
self.report["steps"][self.current_step]["tasks"][self.current_task][
"finished_at"
] = datetime.now().isoformat()
self.current_task = None
def finish_step(self):
"""Mark the end of the current step"""
if not self.current_step:
raise ValueError("No active step to finish")
self.report["steps"][self.current_step][
"finished_at"
] = datetime.now().isoformat()
self.current_step = None
def log_plan_evaluation(self, evaluation):
"""Log the overall plan evaluation"""
self.report["evaluation"] = evaluation
def log_final_report(self, report):
"""Log the final generated report"""
self.report["final_report"] = report
self.report["metadata"]["finished_at"] = datetime.now().isoformat()
# Calculate total time
start = datetime.fromisoformat(self.report["metadata"]["started_at"])
end = datetime.fromisoformat(self.report["metadata"]["finished_at"])
self.report["statistics"]["total_time"] = (end - start).total_seconds()
def get_full_report(self):
"""Get the complete report data"""
return self.report
def get_markdown_report(self):
"""Get the report formatted as markdown for easy viewing"""
md = f"# Research Report: {self.report['metadata']['question']}\n\n"
# Metadata
md += "## Metadata \n"
md += f"- **Project**: {self.report['metadata']['project_name'] or 'None'} \n"
md += f"- **User**: {self.report['metadata']['username']} \n"
md += f"- **Started**: {self.report['metadata']['started_at']} \n"
if self.report["metadata"]["finished_at"]:
md += f"- **Finished**: {self.report['metadata']['finished_at']} \n"
md += f"- **Total time**: {self.report['statistics']['total_time']:.2f} seconds \n"
# Statistics
md += "\n## Statistics \n"
md += "### Tools Used \n"
for tool, count in self.report["statistics"]["tools_used"].items():
md += f"- {tool}: {count} times \n"
md += "\n### Sources Accessed \n"
for source in self.report["statistics"]["sources_accessed"]:
md += f"- {source} \n"
# Research plan
md += "\n## Research Plan \n"
if self.report["plan"]["original_text"]:
md += f"\n{self.report['plan']['original_text']} \n\n"
# Steps
md += "\n## Research Steps \n"
for step_name, step_data in self.report["steps"].items():
md += f"### {step_name}\n"
if step_data.get("summary"):
md += f"**Summary**: {step_data['summary']} \n\n"
md += "**Tools used**: \n"
for tool in step_data["tools_used"]:
md += f"- {tool['tool']} with query: _{tool['args'].get('query', 'No query').replace('_', ' ')}_\n"
md += "\n**Tasks**:\n"
for task_name, task_data in step_data.get("tasks", {}).items():
md += f"- {task_name}: {task_data['description']}\n"
# Final report
if self.report["final_report"]:
md += "\n## Final Report \n"
md += self.report["final_report"]
return md
def save_to_file(self, filepath=None):
"""Save the report to a file"""
if not filepath:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = f"research_report_{timestamp}.json"
# Create a deep copy of the report that's JSON serializable
def make_json_serializable(obj):
"""Convert any non-JSON serializable objects to dictionaries"""
if hasattr(obj, "model_dump"): # Check if it's a Pydantic model
return obj.model_dump() # Convert Pydantic models to dict
elif isinstance(obj, dict):
return {k: make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [make_json_serializable(item) for item in obj]
else:
return obj
# Create a JSON-serializable version of the report
json_report = make_json_serializable(self.report)
with open(filepath, "w") as f:
json.dump(json_report, f, indent=2)
print_green(f"Report saved to {filepath}")
return filepath
class ResearchBase(Bot):
"""Base class for all research agents with improved integration with Bot functionality"""
def __init__(
self,
username: str,
model: str = "standard",
chat: bool = True,
report=None,
**kwargs,
):
super().__init__(username=username, **kwargs)
self.model: str = model
self.llm = LLM(
system_message="You are a research assistant.",
model=model,
chat=chat,
messages=[],
)
# Tracking for research flow
self.research_state = {
"current_step": None,
"current_task": None,
"start_time": time.time(),
"steps_completed": 0,
"tasks_completed": 0,
}
self.report: ResearchReport = report
# Define available tool functions
self.available_functions = {
"fetch_science_articles_tool": self.fetch_science_articles_tool,
"fetch_notes_tool": self.fetch_notes_tool,
"fetch_other_documents_tool": self.fetch_other_documents_tool,
"fetch_science_articles_and_other_documents_tool": self.fetch_science_articles_and_other_documents_tool,
"analyze_tool": self.analyze_tool,
}
self.tools = [
self.available_functions[tool] if isinstance(tool, str) else tool
for tool in self.available_functions
]
def update_research_state(self, **kwargs):
"""Update the research state with new information"""
self.research_state.update(kwargs)
current_time = time.time()
elapsed = current_time - self.research_state["start_time"]
# Log progress info
if "current_step" in kwargs or "current_task" in kwargs:
print_yellow(
f"Progress: Step {self.research_state.get('steps_completed', 0)}, "
f"Time elapsed: {elapsed:.1f}s"
)
# Update report if available
if self.report:
if "current_step" in kwargs and kwargs["current_step"]:
self.report.start_step(kwargs["current_step"])
if (
"current_task" in kwargs
and kwargs["current_task"]
and self.report.current_step
):
# For simplicity, we're using the task description as the name too
self.report.start_task(
kwargs["current_task"], kwargs["current_task"]
)
def use_tools(self, tool_calls, task_description, task_description_as_query=True) -> UnifiedToolResponse:
"""Execute the selected tools to gather information"""
self.update_research_state(current_task=f"Gathering information with tools")
gathered_information = UnifiedToolResponse()
for tool_call in tool_calls:
tool_name = tool_call.function.name
tool_args = tool_call.function.arguments
print_green(f"Using tool: {tool_name} with args: {tool_args}")
# Add the query to arguments if not already present
if "query" in tool_args:
if task_description_as_query:
tool_args["query"] = task_description
else:
if "query" not in tool_args and task_description:
tool_args["query"] = task_description
# Log tool use in report
if self.report:
self.report.log_tool_use(tool_name, tool_args)
try:
# Call the tool function
function_to_call = self.available_functions.get(tool_name)
if function_to_call:
result = function_to_call(**tool_args)
else:
result = f"Unknown tool: {tool_name}"
# Process the result
if isinstance(result, UnifiedSearchResults):
# Convert to a unified format
gathered_information.extend_search_results(result)
gathered_information.extend_tool_name(tool_name)
elif isinstance(result, str):
# Already in the correct format
gathered_information.extend_text_results(result)
gathered_information.extend_tool_name(tool_name)
# Log gathered information in report
# if self.report:
# self.report.log_information([gathered_info])
except Exception as e:
print_red(f"Error executing tool {tool_name}: {e}")
traceback.print_exc()
import sys
sys.exit(1)
return gathered_information
# Tool function definitions
def fetch_science_articles_tool(
self, query: str, n_documents: int = 6
) -> ChunkSearchResults:
"""
Fetches information from scientific articles.
Parameters:
query (str): The search query to find relevant scientific articles in a vector database.
n_documents (int): How many documents to fetch. A complex query may require more documents. Min: 3, Max: 10.
Returns:
ChunkSearchResults: A structured result containing articles with their chunks.
"""
where_filter = {}
if hasattr(self, "chroma_ids_retrieved") and len(self.chroma_ids_retrieved) > 0:
where_filter = {"_id": {"$in": self.chroma_ids_retrieved}}
found_chunks = self.get_chunks(
user_input=query,
collections=["sci_articles"],
n_results=n_documents,
n_sources=max(n_documents, 4)
)
# Standardize the chunks using UnifiedDataChunk
unified_chunks = [
UnifiedDataChunk(
content=chunk.content,
metadata=chunk.metadata.model_dump(),
source_type="other_documents",
)
for chunk in found_chunks.chunks
]
# Return the unified search results
return UnifiedSearchResults(chunks=unified_chunks, source_ids=[])
def fetch_notes_tool(self, **argv) -> UnifiedSearchResults:
"""
Fetches project notes as a list for the researcher to understand what's important in the project.
This tool is useful for getting a quick overview of the project's key points.
Takes no arguments!
Returns:
UnifiedSearchResults: A structured result containing notes as data chunks.
"""
chunks = []
for i, note in enumerate(self.get_notes()):
# Create a unified data chunk for each note
unified_chunk = UnifiedDataChunk(
content=note,
metadata={
"title": f"Note {i+1}", # Use a default string instead of None
"source": "Project Notes" # Add more metadata for better identification
},
source_type="note",
)
chunks.append(unified_chunk)
return UnifiedSearchResults(chunks=chunks, source_ids=[])
class MasterAgent(ResearchBase):
"""A large and reasoning (if not specified not to be) LLM that handles the complex thinking tasks and coordinates other agents"""
def __init__(
self, username: str, project: Project = None, model: str = "reasoning", tools: list=[], **kwargs
):
# Configure for reasoning model
kwargs["model_config"] = {
"system_message": "You are an assistant helping a journalist writing a report based on extensive research.",
"temperature": 0.3,
"model": model,
"chat": True,
}
super().__init__(username=username, project=project, tools=tools, **kwargs)
self.model = model
self.available_sources = {}
# Initialize sub-agents
self.structure_agent = StructureAgent(
username=username, model="small", report=self.report
)
self.tool_agent = ToolAgent(
username=username,
model="tools",
system_message=f"You are an assistant with some tools. The tools you can choose from are {tools} Always choose a tool to help with the task. Your task is to choose one or multiple tools to answer a user's query. DON'T come up with your own tools, only use the ones provided.",
report=self.report,
project=project,
chat=True,
)
self.archive_agent = ArchiveAgent(
username=username,
report=self.report,
project=project,
system_message="""
You are an assistant specialized in reading and summarizing research information.
You are helping a researcher with a research divided in many steps, where you will get information for each step.
Your goal is to provide clear, accurate summaries that capture the essential points while maintaining context.
If your summary is deemed insufficient to complete the step, you will be given more information and asked for a updated summary. Please then include the previous information you got in your new summary so that all information is taken into account.
""",
chat=True,
)
self.assistant_agent = AssistantAgent(
username=username,
report=self.report,
project=project,
system_message="""
You are an assistant specialized in summarizing steps and keeping track of the research process.
Your job is to maintain a structured record of what has been done and help the researcher navigate
through the research process by providing clear summaries of steps completed.
""",
chat=True,
)
# Track execution results
self.execution_results = {}
def check_available_sources(self):
"""
Check the available sources in the database and update the report.
This method iterates through the arango_ids stored in the instance and
counts the number of documents by type based on their ID prefixes.
It then updates the self.available_sources dictionary with counts for:
- other_documents: Documents with IDs starting with "other_documents"
- sci_articles: Scientific articles with IDs starting with "sci_articles"
- notes: Notes with IDs starting with "note"
- interviews: Interviews with IDs starting with "interview"
Returns:
None
"""
#! Update when more sources are added!
other_documents = 0
science_articles = 0
notes = 0
interviews = 0
for id in self.arango_ids:
if id.startswith("other_documents"):
other_documents += 1
elif id.startswith("sci_articles"):
science_articles += 1
elif id.startswith("note"):
notes += 1
elif id.startswith("interview"):
interviews += 1
for source in [
"other_documents",
"sci_articles",
"notes",
"interviews",
]:
if source == "other_documents":
self.available_sources[source] = other_documents
elif source == "sci_articles":
self.available_sources[source] = science_articles
elif source == "notes":
self.available_sources[source] = notes
elif source == "interviews":
self.available_sources[source] = interviews
def make_plan(self, question):
"""Generate a research plan for answering the question/exploring the topic"""
self.update_research_state(
current_step="Plan Creation", current_task="Splitting into questions."
)
query = llm_queries.create_plan_questions(self, question)
response = self.llm.generate(query=query, model=self.model, think=True)
print_purple(response.content)
subquestions = [i for i in remove_thinking(response).split("\n") if "?" in i]
self.report.report["plan"]["subquestions"] = subquestions
self.update_research_state(
current_step="Plan Creation", current_task="Creating initial research plan"
)
# TODO Update the available resources in the query when more resources are added!
make_plan_query = llm_queries.create_plan(self, question)
# Generate the plan and handle potential formatting issues
try:
response = self.llm.generate(query=make_plan_query, model=self.model, think=True)
plan = self.structure_agent.make_structured(response.content, question)
print("THIS IS THE PLAN\n")
print_rainbow(plan.__dict__)
self.update_research_state(steps_completed=1)
return plan
except Exception as e:
print_red(f"Error creating research plan: {e}")
traceback.print_exc()
return f"Error creating research plan: {str(e)}"
def process_step(self, step_name, step_tasks, max_attempts=3):
"""
Process a research step with multiple tasks using various agents to gather and organize information.
This function handles the complete workflow for a research step:
1. Determines required tools for all tasks
2. Gathers information using the selected tools
3. Summarizes the gathered information
4. Evaluates if the information is sufficient
5. Iteratively gathers more information if needed (up to 3 attempts)
6. Finalizes the step results
"""
print_purple(f"\nProcessing Step: {step_name}")
self.update_research_state(
current_step=step_name, current_task="Processing entire step"
)
# 1. Determine tools needed for all tasks in the step
print_blue("Determining tools for all tasks in step...")
all_tasks_description = f"## Step: {step_name}\n"
for task in step_tasks:
all_tasks_description += (
f"- {task['task_name']}: {task['task_description']}\n"
)
all_tasks_description += "\nWhat tools should I use to gather all necessary information for these tasks efficiently?"
tool_calls = self.tool_agent.task_tools(all_tasks_description)
print_purple("Tools selected for the entire step:")
for i, tool_call in enumerate(tool_calls, 1):
args_dict = tool_call.function.arguments
# Format the arguments as a comma-separated list of key=value pairs
args_formatted = ", ".join([f"{k}={v}" for k, v in args_dict.items()])
print_purple(f"{i}. {tool_call.function.name} ({args_formatted})")
# 2. Gather data according to the selected tools
print_blue("Gathering data for all tasks...")
gathered_info = self.archive_agent.use_tools(tool_calls, all_tasks_description)
self.archive_agent.chroma_ids_retrieved += gathered_info.get_chroma_ids
# 3. Summarize the gathered data
print_blue("Summarizing gathered information...")
self.archive_agent.reset_chat_history()
print_yellow("Step description:")
print_yellow(all_tasks_description)
print()
print_yellow("Summarizing all gathered information...")
print("Gathered information:")
for info in gathered_info:
print(info)
print_yellow("Summarizing all gathered information...")
summary = self.archive_agent.read_and_summarize(
gathered_info, all_tasks_description
)
summary = remove_thinking(summary)
print_green("Step Information Summary:")
print(summary)
# Have the assistant agent track this step
self.assistant_agent.summarize_step(step_name, summary)
# 4. Evaluate if the data is sufficient for all tasks
print_blue("Evaluating if information is sufficient for all tasks...")
evaluation = self.evaluate_step_completeness(step_tasks, summary)
self.report.log_evaluation(evaluation)
# 5. If not enough information, gather more
attempt = 1
while not evaluation["status"] and attempt < max_attempts:
print_yellow(
f"Information not sufficient. Attempt {attempt}/{max_attempts} to gather more..."
)
# Create a query focusing on missing information
additional_query = f"For step '{step_name}', I need additional information on:\n{evaluation['missing_info']}\n\nWhat tools should I use to fill these gaps?"
# Get additional tools
additional_tool_calls = self.tool_agent.task_tools(additional_query)
# Use additional tools to gather more information
additional_info = self.tool_agent.use_tools(
additional_tool_calls, additional_query
)
# Add to gathered information
# TODO Is it better to append or or make the LLM use the chat history?
# gathered_info.extend(additional_info)
gathered_info = additional_info
# Update summary with all information
updated_summary = self.archive_agent.read_and_summarize(
gathered_info, all_tasks_description
)
summary = remove_thinking(updated_summary)
print_green("Updated Summary:")
print(summary)
# Update assistant agent with new summary
self.assistant_agent.summarize_step(step_name, summary)
# Re-evaluate
evaluation = self.evaluate_step_completeness(step_tasks, summary)
attempt += 1
self.report.update_step_summary(summary)
# 6. Let the MasterAgent use the gathered data to finalize the step
print_blue("Finalizing step results...")
step_result = self.finalize_step_result(step_name, step_tasks, summary)
# Pack and store results
step_result = {
"step_name": step_name,
"tasks": [
{
"task_name": task["task_name"],
"task_description": task["task_description"],
}
for task in step_tasks
],
"information_gathered": gathered_info,
"summary": summary,
"evaluation": evaluation,
"result": step_result,
}
self.execution_results[step_name] = step_result
def execute_research_plan(self, structured_plan):
"""Execute the structured research plan step by step"""
# Execute the plan step by step
print_blue("\n--- EXECUTING RESEARCH PLAN ---")
for step_name, tasks in structured_plan.steps.items():
print_blue(f"\n### Processing Step: {step_name}")
self.report.start_step(step_name)
# Collect all task descriptions in this step
step_tasks = [
{"task_name": task_name, "task_description": task_description}
for task_name, task_description in tasks
]
# Process the entire step
self.archive_agent.reset_chroma_ids()
self.process_step(step_name, step_tasks)
# Finish the step in report
self.report.finish_step()
# Evaluate if more steps are needed
print_blue("\n--- EVALUATING RESEARCH PLAN ---")
plan_evaluation = self.evaluate_plan(self.execution_results)
self.report.log_plan_evaluation(plan_evaluation)
print_yellow("Plan Evaluation:")
print(plan_evaluation["explanation"])
return self.execution_results
def evaluate_step_completeness(self, step_tasks, summary):
"""Evaluate if the information is sufficient for all tasks in the step"""
# Add None for additional_info if it's not present
self.update_research_state(current_task="Evaluating step completeness")
# Create a query to evaluate all tasks
step_description = "\n".join(
[
f"- {task['task_name']}: {task['task_description']}"
for task in step_tasks
]
)
query = f"""
You are evaluating if the gathered information is sufficient for completing ALL the following tasks:
{step_description}
Information gathered:
"""
{summary}
"""
Is this information sufficient to complete ALL tasks in this step?
First, analyze each task individually and determine if the information is sufficient.
Then, provide an overall assessment where "status" is True if all tasks are complete and False if not.
Explain why the information is sufficient or not in the "explanation" field.
If ANY task has insufficient information, specify exactly what additional information is needed.
"""
response = self.llm.generate(
query=query,
format=EvaluateFormat.model_json_schema(),
model="standard",
think=True,
)
structured_response = EvaluateFormat.model_validate_json(response.content)
# Add None for additional_info if it's not present
if not hasattr(structured_response, "additional_info"):
structured_response.additional_info = None
if structured_response.status:
print_green(
f'\nEVALUATION PASSED\n"Step: {step_description}\n{structured_response.explanation}'
)
elif not structured_response.status:
print_red(f"EVALUATION FAILED\n{structured_response.explanation}")
return {
"status": structured_response.status,
"explanation": structured_response.explanation,
"missing_info": structured_response.additional_info,
}
def evaluate_step(self, information, task_description):
"""Evaluate if the information is sufficient for the current step/task"""
self.update_research_state(current_task=f"Evaluating '{task_description}'")
query = f'''
You are evaluating if the gathered information is sufficient for completing this research task.
Task: {task_description}
Information gathered:
"""
{information}
"""
Is this information sufficient to complete the task? Respond in the format requested.
If insufficient, explain exactly what additional information would be needed.
'''
response = self.llm.generate(
query=query, format=EvaluateFormat.model_json_schema()
)
structured_response = EvaluateFormat.model_validate_json(response.content)
if structured_response.status:
print_green(
f'\nEVALUATION PASSED\n"Task: {task_description}\n{structured_response.explanation}'
)
# Determine status based on the response
else:
print_red("EVALUATION FAILED")
print_yellow(f"Task: {task_description}")
print_rainbow(structured_response.__dict__)
return {
"status": structured_response.status,
"explanation": structured_response.explanation,
"additional_info": structured_response.additional_info,
}
def evaluate_plan(self, execution_results):
"""Evaluate if more research steps are needed"""
self.update_research_state(
current_step="Plan Evaluation",
current_task="Evaluating overall research progress",
)
# Create a summary of completed research
steps_summary = ""
for step_name, step_data in execution_results.items():
steps_summary += f"\n## {step_name} \n"
# Add the step's summary
steps_summary += f"{step_data.get('summary', 'No summary available')} \n"
# If you want to include individual tasks
for task in step_data.get('tasks', []):
task_name = task.get('task_name', 'Unnamed task')
steps_summary += f"- {task_name} \n"
query = f'''
Based on the research that has been conducted so far, determine if additional steps are needed
to create a comprehensive report.
Research completed:
"""
{steps_summary}
""""""
Original question to answer: {self.research_state.get('original_question', 'No question provided')}
Are additional research steps needed? Respond with COMPLETE or INCOMPLETE,
followed by a brief explanation. If INCOMPLETE, suggest what additional steps would be valuable.
'''
response = self.llm.generate(query=query, think=True)
evaluation = response.content if hasattr(response, "content") else str(response)
if "COMPLETE" in evaluation.upper().split(" "):
print_green(f'\nEVALUATION PASSED\n"Evaluation: {evaluation}')
return {
"status": "no more information is needed",
"explanation": evaluation,
}
else:
print_red(f'\nEVALUATION FAILED\n"Evaluation: {evaluation}')
return {"status": "more information is needed", "explanation": evaluation}
def finalize_step_result(self, step_name, step_tasks, summary):
"""Generate a comprehensive result for the entire step using all gathered information"""
self.update_research_state(
current_task=f"Finalizing results for step: {step_name}"
)
tasks_description = "\n".join(
[
f"- {task['task_name']}: {task['task_description']}"
for task in step_tasks
]
)
query = f"""
Based on the following information gathered for step "{step_name}",
create a comprehensive analysis that addresses all the tasks in this step.
Step tasks:
{tasks_description}
Information gathered:
{summary}
Your response should:
1. Be structured with clear sections for each aspect of the analysis
2. Draw connections between different pieces of information
3. Highlight key insights relevant to the original research question
4. Provide a comprehensive understanding of this step's contribution to the overall research
Sometimes the information is limited, if so do not make up information, but rather say that the information is limited and write a shorter response.
"""
response = self.llm.generate(query=query)
step_result = remove_thinking(response)
print_green("Step Result:")
print(step_result)
self.update_research_state(
steps_completed=self.research_state.get("steps_completed", 0) + 1
)
return step_result
def write_report(self, execution_results):
"""Generate the final report based on the collected information"""
self.update_research_state(
current_step="Report Writing", current_task="Generating final report"
)
# Prepare all the gathered information in a structured way
gathered_info = ""
for step_name, step_data in execution_results.items():
gathered_info += f"\n## {step_name}\n"
# Add the step's summary
gathered_info += f"Step Summary: {step_data.get('summary', 'No summary available')}\n\n"
# Add information about tasks
for task in step_data.get('tasks', []):
task_name = task.get('task_name', 'Unnamed task')
task_description = task.get('task_description', 'No description')
gathered_info += f"### {task_name}\n"
gathered_info += f"Description: {task_description}\n\n"
# Include sources when available
sources = []
for info in step_data.get('information_gathered', []):
if isinstance(info, dict) and "result" in info and "content" in info["result"]:
if (isinstance(info["result"]["content"], dict) and
"chunks" in info["result"]["content"]):
for chunk in info["result"]["content"].get("chunks", []):
metadata = chunk.get("metadata", {})
source = f"{metadata.get('title', 'Unknown')}"
if metadata.get("journal"):
source += f" ({metadata.get('journal')})"
if source not in sources:
sources.append(source)
if sources:
gathered_info += "\n### Sources:\n"
for i, source in enumerate(sources):
gathered_info += f"- [{i+1}] {source}\n"
# Rest of the method continues...
print_blue("\n\nGathered information:\n".upper())
print(gathered_info, "\n")
query = f'''
Based on the following research information, write a extensive report that in detail answers the question:
"{self.research_state.get('original_question', 'No question provided').replace('"', "'")}"
Research Information:
"""
{gathered_info}
"""
The report should be well-structured with appropriate headings, present the information
accurately, and highlight key insights. Cite sources using [number] notation when referencing specific information.
As the report is for journalistic reseach, please be generous with details and cases that can be used when reporting on the subject!
'''
response = self.llm.generate(query=query)
report = response.content if hasattr(response, "content") else str(response)
report = remove_thinking(report)
self.update_research_state(
steps_completed=self.research_state.get("steps_completed", 0) + 1
)
return report
class StructureAgent(ResearchBase):
"""A small LLM for structuring text as JSON"""
def __init__(self, username, model: str = "standard", **kwargs):
super().__init__(username=username, **kwargs)
self.model = model
self.system_message = """You are helping a researcher to structure a text. You will get a text and make it into structured data.
Make sure not to change the meaning of the text and keeps all the details in the subtasks.
The content and/or of each step and task should be understandable by itself. Therefore, if a task seems to refer to something that is not mentioned in the step, you should include the necessary information in the task itself. Example: if a task consists of "Collect relevant information for the subject", you should include what the subject is in the task itself.
"""
self.llm = LLM(
system_message="You are a research assistant.",
model=self.model,
chat=False,
messages=[],
)
def make_structured(self, text, question=None):
"""Convert the research plan into a structured format"""
self.update_research_state(
current_step="Plan Structuring",
current_task="Converting plan to structured format",
)
# Prepare query based on whether a question is provided
if question:
query = f'''This is a proposed plan for how to write a report on "{question}":\n"""{text}"""\nPlease make the plan into structured data with subtasks. Make sure to keep all the details in the subtasks.'''
else:
query = f'''This is a proposed plan for how to write a report:\n"""{text}"""\nPlease make the plan into structured data with subtasks. Make sure to keep all the details in the subtasks.'''
# Generate the structured plan
try:
response = self.llm.generate(
query=query, format=Plan.model_json_schema(), model=self.model
)
response_content = (
response.content if hasattr(response, "content") else str(response)
)
structured_response = Plan.model_validate_json(response_content)
self.update_research_state(
steps_completed=self.research_state.get("steps_completed", 0) + 1
)
return Plan.model_validate_json(response_content)
except Exception as e:
print_red(f"Error structuring plan: {e}")
traceback.print_exc()
# Create a basic fallback structure
import sys
sys.exit(1)
class ToolAgent(ResearchBase):
"""An LLM specialized in choosing tools based on information needs"""
def __init__(self, username, **kwargs):
# Initialize the LLM configuration
kwargs["model_config"] = {
"system_message": kwargs.get(
"system_message",
f"""
You are a helpful assistant with tools.
Your task is to choose one or multiple tools to answer a user's query.
DON'T come up with your own tools, only use the ones provided.
""",
),
"temperature": 0.1,
"model": "tools",
"chat": kwargs.get("chat", True),
}
super().__init__(username=username, **kwargs)
def task_tools(self, task_description):
"""Determine which tools to use for a task"""
self.update_research_state(current_task=f"Selecting tools for task")
query = f'''Research task description:
"""
{task_description}
"""
You have to choose one or many tools in order fetch information neccessary to complete the task.
It's important that you think of what information is needed, and choose the right tool for the job considering the tools descriptions.
Make sure to read the description of the tools carefully before choosing!
You can ONLY chose a tool you are provided with, don't make up a tool!
You HAVE TO CHOOSE A TOOL, even if you think you can answer without it. Don't answer the question without choosing a tool.
'''
response = self.llm.generate(query=query, tools=self.tools, model="tools")
# Extract tool calls from the response
tool_calls = response.tool_calls if hasattr(response, "tool_calls") else []
return tool_calls
class AssistantAgent(ResearchBase):
"""A small LLM agent for summarizing steps and keeping track of the research process by managing "research notes"/common memory.
This agent is designed to work with smaller language models and maintain a structured
record of the research process through its Notes system.
"""
class Notes:
"""
A class for storing and retrieving notes related to different steps in a process.
This class allows adding notes with step name, information, and summary,
and retrieving notes for specific steps.
Attributes:
step_notes (list): A list of dictionaries containing step notes.
Each dictionary has keys 'step_name', 'step_information', and 'summary'.
"""
def __init__(self):
self.step_notes = []
def add_step_note(self, step_name, step_information, step_summary):
"""Add a note for a specific step"""
self.step_notes.append(
{
"step_name": step_name,
"step_information": step_information,
"summary": step_summary,
}
)
def get_step_notes(self, step_name):
"""Get notes for a specific step.
Arguments:
step_name (str): The name of the step to retrieve notes for.
Returns:
list: A list of notes for the specified step.
"""
return [note for note in self.step_notes if note["step_name"] == step_name]
def __init__(self, username: str, system_message: str, **kwargs):
# Configure for small model
kwargs["model_config"] = {
"temperature": 0.1,
"system_message": system_message,
"model": "small",
"chat": kwargs.get("chat", True),
}
super().__init__(username=username, **kwargs)
self.system_message = system_message
self.notes = self.Notes()
def summarize_step(self, step_name, step_tasks):
"""Summarize the results of a step"""
self.update_research_state(current_task=f"Summarizing step '{step_name}'")
# Create a query to summarize the step
query = f"""
You are summarizing the results of this research step:
Step name: {step_name}
Tasks:
{step_tasks}
Summarize the results of the tasks in a clear and concise manner. Focus on the facts, and mention sources for reference.
"""
response = self.llm.generate(query=query)
summary = response.content if hasattr(response, "content") else str(response)
self.notes.add_step_note(
step_name=step_name,
step_information=step_tasks,
step_summary=summary,
)
class ArchiveAgent(ResearchBase):
"""A small LLM for summarizing large amounts of text"""
def __init__(self, username: str, system_message: str, **kwargs):
# Configure for small model
kwargs["model_config"] = {
"temperature": 0.1,
"system_message": system_message,
"model": "small",
"chat": kwargs.get("chat", True),
}
super().__init__(username=username, **kwargs)
self.system_message = system_message
self.chroma_ids_retrieved = []
def reset_chroma_ids(self):
self.chroma_ids_retrieved = []
def read_and_summarize(self, information: UnifiedToolResponse, step_information):
"""Summarize the information gathered by the tools"""
self.update_research_state(current_task=f"Summarizing gathered information")
# Check if there are full articles to process
full_articles_to_process = []
if information.search_results and information.search_results.chunks:
for chunk in information.search_results.chunks:
if chunk.source_type == "full_article" and chunk.metadata.get("requires_full_summary", False):
full_articles_to_process.append(chunk.metadata)
# If we have full articles to process, summarize them
if full_articles_to_process:
article_summaries = []
question = None
if hasattr(self, "research_state"):
question = self.research_state.get("original_question", "")
for article_meta in full_articles_to_process:
article_id = article_meta.get("article_id")
if article_id:
summary = self.fetch_and_summarize_full_article_tool(article_id, question)
article_summaries.append(summary)
# If we've processed full articles, create a unified summary
if article_summaries:
info_text = "\n\n---\n\n".join(article_summaries)
else:
# If no full articles were successfully processed, fall back to regular text
info_text = information.to_text
else:
# Process chunks as usual
info_text = information.to_text
print_purple(f"INFO TEXT for summarization:\n{info_text}\n")
query = f'''
Below is the description of the current research step. *It is only for your information, nothing you should reference in the summary*.
"""
{step_information}
"""
Please read the following information to make a summary for the researcher.
"""
{info_text}
"""
Focus on *information and facts* that are important and relevant for the research step.
Ensure no important details are lost.
Reference the sources in the summary so it's clear where each piece of information comes from.
Some pieces of information might not be of use in this research step, if so, just ignore it without mentioning it.
You are only allowed to use the sources provided in the information, don't make up any sources.
You should only make a summary of the information, not the step itself or any kind of evaluation!
'''
response = self.llm.generate(query=query)
summary = response.content if hasattr(response, "content") else str(response)
return summary
def reset_chat_history(self):
self.llm.messages = [{"role": "system", "content": self.system_message}]
def main(question, username="lasse", project_name="Electric Cars"):
"""Main function to execute the research workflow"""
# Initialize base and project
base = BaseClass(username=username)
project: Project = Project(
username=username, project_name=project_name, user_arango=base.get_arango()
)
# Map what kind of sources there are in self.arango_ids
number_of_documents = {'other_documents': 0, 'science_articles': 0, 'notes': 0, 'interviews': 0}
bot = Bot(username=username, project=project, user_arango=base.get_arango(), tools="all")
for id in bot.arango_ids:
if id.startswith("other_documents"):
number_of_documents['other_documents'] += 1
elif id.startswith("sci_articles"):
number_of_documents['science_articles'] += 1
elif id.startswith("note"):
number_of_documents['notes'] += 1
elif id.startswith("interview"):
number_of_documents['interviews'] += 1
tool_sources = {
"fetch_other_documents_tool": ["other_documents"],
"fetch_science_articles_tool": ["science_articles"],
"fetch_science_articles_and_other_documents_tool": ["other_documents", "science_articles"],
"fetch_notes_tool": ["notes"]
}
# Create a list of tools that is to be given to the bots
bot_tools: list = [tool.__name__ for tool in bot.tools if callable(tool)]
for tool, sources in tool_sources.items():
print(tool, sources)
documents = 0
for source in sources:
documents += number_of_documents[source]
if documents == 0:
print_yellow(f"Removing {tool} from bot, as there are no documents in the source: {sources}")
bot_tools.remove(tool)
# Initialize report tracking
report: ResearchReport = ResearchReport(
question=question, username=username, project_name=project_name
)
# Initialize agents with report tracking
master_agent = MasterAgent(
username=username, project=project, report=report, chat=True, tools=bot_tools
)
# Track the research state in the master agent
master_agent.research_state["original_question"] = question
# Execute workflow with proper error handling and progress tracking
print_blue(f"Starting research on: {question}")
# Research plan creation
print_blue("\n--- CREATING RESEARCH PLAN ---")
research_plan = master_agent.make_plan(question)
# Log the plan in the report
report.log_plan(research_plan)
# research_plan = '''
# plan = """## Step 1: Review the journalist's notes
# - Task1: Identify and extract information from the journalist's notes that directly relates to lithium mining's social, technical, and economic aspects.
# - Task2: Summarize the extracted information into a structured format, highlighting key themes (e.g., environmental impact, cost benefits, political greenwashing).
# ## Step 2: Search for social impact information
# - Task1: Use the database/LLM to search for information on the social impacts of lithium mining, such as displacement, labor conditions, health risks, and indigenous land rights.
# - Task2: Summarize findings into a structured format, focusing on how lithium mining affects local communities and indigenous populations.
# ## Step 3: Search for technical challenges
# - Task1: Use the database/LLM to search for technical challenges of lithium mining, including environmental degradation, water usage, energy consumption, and ecosystem impacts.
# - Task2: Summarize findings into a structured format, emphasizing technical risks and environmental consequences.
# ## Step 4: Search for economic aspects
# - Task1: Use the database/LLM to search for economic challenges of lithium mining, such as production costs, market volatility, profitability, and local economic impacts.
# - Task2: Summarize findings into a structured format, highlighting economic trade-offs and long-term sustainability.
# ## Step 5: Cross-reference and compile findings
# - Task1: Compare information from Steps 2–4 to identify overlaps, contradictions, or gaps in the data.
# - Task2: Compile all findings into a cohesive summary, ensuring each aspect (social, technical, economic) is addressed with evidence from the sources.
# ## Step 6: Analyze long-term risks and sustainability
# - Task1: Use the database/LLM to search for information on long-term risks of lithium mining, such as resource depletion, pollution, and water scarcity.
# - Task2: Summarize findings into a structured format, linking long-term risks to social, technical, and economic aspects.
# '''
report.log_plan(research_plan, research_plan)
# Execute the plan step by step
execution_results = master_agent.execute_research_plan(research_plan)
# Write the final report
print_blue("\n--- WRITING FINAL REPORT ---")
final_report = master_agent.write_report(execution_results)
report.log_final_report(final_report)
print_green("Final Report:")
print(final_report)
# Save the full research report
report_path = report.save_to_file(
f"/home/lasse/sci/reports/research_report_{username}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json"
)
# Create a more readable markdown version
markdown_report = report.get_markdown_report()
markdown_path = report_path.replace(".json", ".md")
with open(markdown_path, "w") as f:
f.write(markdown_report)
print_green(f"Markdown report saved to {markdown_path}")
return {
"question": question,
"research_plan": research_plan,
"structured_plan": research_plan,
"execution_results": execution_results,
"final_report": final_report,
"full_report": report.get_full_report(),
"report_path": report_path,
"markdown_path": markdown_path,
}
def remove_thinking(response):
"""Remove the thinking section from the response"""
response_text = response.content if hasattr(response, "content") else str(response)
if "</think>" in response_text:
return response_text.split("</think>")[1].strip()
return response_text
if __name__ == "__main__":
question = "What are the problems around lithium mining? I'm interested in social, technical and economical aspects."
result = main(
question, username="lasse", project_name="Electric Cars"
) # Use these parameters to test the code, don't change!