You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1448 lines
58 KiB
1448 lines
58 KiB
from _llm import LLM |
|
from streamlit_chatbot import Bot |
|
from typing import Dict, List, Tuple, Optional, Any |
|
from colorprinter.print_color import * |
|
from projects_page import Project |
|
from _base_class import BaseClass |
|
from prompts import get_tools_prompt |
|
import time |
|
import traceback |
|
import json |
|
from datetime import datetime |
|
|
|
import llm_queries |
|
from models import EvaluateFormat, Plan, ChunkSearchResults, UnifiedSearchResults, UnifiedDataChunk, UnifiedToolResponse |
|
|
|
|
|
class ResearchReport: |
|
"""Class for tracking and logging decisions and data access during research""" |
|
|
|
def __init__(self, question, username, project_name=None): |
|
self.report = { |
|
"metadata": { |
|
"question": question, |
|
"username": username, |
|
"project_name": project_name, |
|
"started_at": datetime.now().isoformat(), |
|
"finished_at": None, |
|
}, |
|
"plan": {"original_text": None, "structured": None, "subquestions": []}, |
|
"steps": {}, |
|
"evaluation": None, |
|
"final_report": None, |
|
"statistics": { |
|
"tools_used": {}, |
|
"sources_accessed": [], |
|
"total_time": None, |
|
}, |
|
} |
|
# Track current context for easier logging |
|
self.current_step = None |
|
self.current_task = None |
|
|
|
def log_plan(self, original_plan, structured_plan=None): |
|
"""Log the research plan""" |
|
self.report["plan"]["original_text"] = original_plan |
|
if structured_plan: |
|
self.report["plan"]["structured"] = structured_plan |
|
|
|
def start_step(self, step_name): |
|
"""Mark the beginning of a new step""" |
|
self.current_step = step_name |
|
if step_name not in self.report["steps"]: |
|
self.report["steps"][step_name] = { |
|
"started_at": datetime.now().isoformat(), |
|
"finished_at": None, |
|
"tasks": {}, |
|
"tools_used": [], |
|
"information_gathered": [], |
|
"summary": None, |
|
"evaluation": None, |
|
} |
|
return self.current_step |
|
|
|
def start_task(self, task_name, task_description): |
|
"""Mark the beginning of a new task within the current step""" |
|
if not self.current_step: |
|
raise ValueError("Cannot start task without active step") |
|
|
|
self.current_task = task_name |
|
self.report["steps"][self.current_step]["tasks"][task_name] = { |
|
"description": task_description, |
|
"started_at": datetime.now().isoformat(), |
|
"finished_at": None, |
|
"tools_used": [], |
|
"information_gathered": [], |
|
} |
|
return self.current_task |
|
|
|
def log_tool_use(self, tool_name, tool_args): |
|
"""Log when a tool is used""" |
|
if not self.current_step: |
|
raise ValueError("Cannot log tool use without active step") |
|
|
|
# Add to step level |
|
self.report["steps"][self.current_step]["tools_used"].append( |
|
{ |
|
"tool": tool_name, |
|
"args": tool_args, |
|
"timestamp": datetime.now().isoformat(), |
|
} |
|
) |
|
|
|
# Add to task level if we have an active task |
|
if self.current_task: |
|
self.report["steps"][self.current_step]["tasks"][self.current_task][ |
|
"tools_used" |
|
].append( |
|
{ |
|
"tool": tool_name, |
|
"args": tool_args, |
|
"timestamp": datetime.now().isoformat(), |
|
} |
|
) |
|
|
|
# Update global statistics |
|
if tool_name in self.report["statistics"]["tools_used"]: |
|
self.report["statistics"]["tools_used"][tool_name] += 1 |
|
else: |
|
self.report["statistics"]["tools_used"][tool_name] = 1 |
|
|
|
def log_information(self, information): |
|
"""Log information gathered from tools""" |
|
if not self.current_step: |
|
raise ValueError("Cannot log information without active step") |
|
|
|
# Process information to extract sources |
|
sources = self._extract_sources(information) |
|
|
|
# Add unique sources to global statistics |
|
for source in sources: |
|
if source not in self.report["statistics"]["sources_accessed"]: |
|
self.report["statistics"]["sources_accessed"].append(source) |
|
|
|
# Add to step level |
|
self.report["steps"][self.current_step]["information_gathered"].append( |
|
{ |
|
"data": information, |
|
"sources": sources, |
|
"timestamp": datetime.now().isoformat(), |
|
} |
|
) |
|
|
|
# Add to task level if we have an active task |
|
if self.current_task: |
|
self.report["steps"][self.current_step]["tasks"][self.current_task][ |
|
"information_gathered" |
|
].append( |
|
{ |
|
"data": information, |
|
"sources": sources, |
|
"timestamp": datetime.now().isoformat(), |
|
} |
|
) |
|
|
|
def _extract_sources(self, information): |
|
"""Extract source information from gathered data""" |
|
sources = [] |
|
|
|
# Handle different result formats |
|
for item in information: |
|
try: |
|
if "result" in item and "content" in item["result"]: |
|
if isinstance(item["result"]["content"], dict): |
|
# Handle structured content like chunks |
|
for title, group in item["result"]["content"].items(): |
|
if "chunks" in group: |
|
for chunk in group["chunks"]: |
|
metadata = chunk.get("metadata", {}) |
|
source = f"{metadata.get('title', 'Unknown')}" |
|
if metadata.get("journal"): |
|
source += f" ({metadata.get('journal')})" |
|
if source not in sources: |
|
sources.append(source) |
|
except Exception as e: |
|
print_yellow(f"Error extracting sources: {e}") |
|
sources.append("No source") |
|
|
|
return sources |
|
|
|
def update_step_summary(self, summary): |
|
"""Log summary of gathered information""" |
|
if not self.current_step: |
|
raise ValueError("Cannot log summary without active step") |
|
|
|
self.report["steps"][self.current_step]["summary"] = remove_thinking(summary) |
|
|
|
def log_evaluation(self, evaluation): |
|
"""Log evaluation of gathered information""" |
|
if not self.current_step: |
|
raise ValueError("Cannot log evaluation without active step") |
|
|
|
self.report["steps"][self.current_step]["evaluation"] = evaluation |
|
|
|
def finish_task(self): |
|
"""Mark the end of the current task""" |
|
if not self.current_step or not self.current_task: |
|
raise ValueError("No active task to finish") |
|
|
|
self.report["steps"][self.current_step]["tasks"][self.current_task][ |
|
"finished_at" |
|
] = datetime.now().isoformat() |
|
self.current_task = None |
|
|
|
def finish_step(self): |
|
"""Mark the end of the current step""" |
|
if not self.current_step: |
|
raise ValueError("No active step to finish") |
|
|
|
self.report["steps"][self.current_step][ |
|
"finished_at" |
|
] = datetime.now().isoformat() |
|
self.current_step = None |
|
|
|
def log_plan_evaluation(self, evaluation): |
|
"""Log the overall plan evaluation""" |
|
self.report["evaluation"] = evaluation |
|
|
|
def log_final_report(self, report): |
|
"""Log the final generated report""" |
|
self.report["final_report"] = report |
|
self.report["metadata"]["finished_at"] = datetime.now().isoformat() |
|
# Calculate total time |
|
start = datetime.fromisoformat(self.report["metadata"]["started_at"]) |
|
end = datetime.fromisoformat(self.report["metadata"]["finished_at"]) |
|
self.report["statistics"]["total_time"] = (end - start).total_seconds() |
|
|
|
def get_full_report(self): |
|
"""Get the complete report data""" |
|
return self.report |
|
|
|
def get_markdown_report(self): |
|
"""Get the report formatted as markdown for easy viewing""" |
|
md = f"# Research Report: {self.report['metadata']['question']}\n\n" |
|
|
|
# Metadata |
|
md += "## Metadata \n" |
|
md += f"- **Project**: {self.report['metadata']['project_name'] or 'None'} \n" |
|
md += f"- **User**: {self.report['metadata']['username']} \n" |
|
md += f"- **Started**: {self.report['metadata']['started_at']} \n" |
|
if self.report["metadata"]["finished_at"]: |
|
md += f"- **Finished**: {self.report['metadata']['finished_at']} \n" |
|
md += f"- **Total time**: {self.report['statistics']['total_time']:.2f} seconds \n" |
|
|
|
# Statistics |
|
md += "\n## Statistics \n" |
|
md += "### Tools Used \n" |
|
for tool, count in self.report["statistics"]["tools_used"].items(): |
|
md += f"- {tool}: {count} times \n" |
|
|
|
md += "\n### Sources Accessed \n" |
|
for source in self.report["statistics"]["sources_accessed"]: |
|
md += f"- {source} \n" |
|
|
|
# Research plan |
|
md += "\n## Research Plan \n" |
|
if self.report["plan"]["original_text"]: |
|
md += f"\n{self.report['plan']['original_text']} \n\n" |
|
|
|
# Steps |
|
md += "\n## Research Steps \n" |
|
for step_name, step_data in self.report["steps"].items(): |
|
md += f"### {step_name}\n" |
|
|
|
if step_data.get("summary"): |
|
md += f"**Summary**: {step_data['summary']} \n\n" |
|
|
|
md += "**Tools used**: \n" |
|
for tool in step_data["tools_used"]: |
|
md += f"- {tool['tool']} with query: _{tool['args'].get('query', 'No query').replace('_', ' ')}_\n" |
|
|
|
md += "\n**Tasks**:\n" |
|
for task_name, task_data in step_data.get("tasks", {}).items(): |
|
md += f"- {task_name}: {task_data['description']}\n" |
|
|
|
# Final report |
|
if self.report["final_report"]: |
|
md += "\n## Final Report \n" |
|
md += self.report["final_report"] |
|
|
|
return md |
|
|
|
def save_to_file(self, filepath=None): |
|
"""Save the report to a file""" |
|
if not filepath: |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filepath = f"research_report_{timestamp}.json" |
|
|
|
# Create a deep copy of the report that's JSON serializable |
|
def make_json_serializable(obj): |
|
"""Convert any non-JSON serializable objects to dictionaries""" |
|
if hasattr(obj, "model_dump"): # Check if it's a Pydantic model |
|
return obj.model_dump() # Convert Pydantic models to dict |
|
elif isinstance(obj, dict): |
|
return {k: make_json_serializable(v) for k, v in obj.items()} |
|
elif isinstance(obj, list): |
|
return [make_json_serializable(item) for item in obj] |
|
else: |
|
return obj |
|
|
|
# Create a JSON-serializable version of the report |
|
json_report = make_json_serializable(self.report) |
|
|
|
with open(filepath, "w") as f: |
|
json.dump(json_report, f, indent=2) |
|
|
|
print_green(f"Report saved to {filepath}") |
|
return filepath |
|
|
|
|
|
class ResearchBase(Bot): |
|
"""Base class for all research agents with improved integration with Bot functionality""" |
|
|
|
def __init__( |
|
self, |
|
username: str, |
|
model: str = "standard", |
|
chat: bool = True, |
|
report=None, |
|
**kwargs, |
|
): |
|
super().__init__(username=username, **kwargs) |
|
self.model: str = model |
|
self.llm = LLM( |
|
system_message="You are a research assistant.", |
|
model=model, |
|
chat=chat, |
|
messages=[], |
|
) |
|
|
|
# Tracking for research flow |
|
self.research_state = { |
|
"current_step": None, |
|
"current_task": None, |
|
"start_time": time.time(), |
|
"steps_completed": 0, |
|
"tasks_completed": 0, |
|
} |
|
|
|
self.report: ResearchReport = report |
|
|
|
# Define available tool functions |
|
self.available_functions = { |
|
"fetch_science_articles_tool": self.fetch_science_articles_tool, |
|
"fetch_notes_tool": self.fetch_notes_tool, |
|
"fetch_other_documents_tool": self.fetch_other_documents_tool, |
|
"fetch_science_articles_and_other_documents_tool": self.fetch_science_articles_and_other_documents_tool, |
|
"analyze_tool": self.analyze_tool, |
|
} |
|
|
|
self.tools = [ |
|
self.available_functions[tool] if isinstance(tool, str) else tool |
|
for tool in self.available_functions |
|
] |
|
|
|
def update_research_state(self, **kwargs): |
|
"""Update the research state with new information""" |
|
self.research_state.update(kwargs) |
|
current_time = time.time() |
|
elapsed = current_time - self.research_state["start_time"] |
|
|
|
# Log progress info |
|
if "current_step" in kwargs or "current_task" in kwargs: |
|
print_yellow( |
|
f"Progress: Step {self.research_state.get('steps_completed', 0)}, " |
|
f"Time elapsed: {elapsed:.1f}s" |
|
) |
|
|
|
# Update report if available |
|
if self.report: |
|
if "current_step" in kwargs and kwargs["current_step"]: |
|
self.report.start_step(kwargs["current_step"]) |
|
if ( |
|
"current_task" in kwargs |
|
and kwargs["current_task"] |
|
and self.report.current_step |
|
): |
|
# For simplicity, we're using the task description as the name too |
|
self.report.start_task( |
|
kwargs["current_task"], kwargs["current_task"] |
|
) |
|
|
|
def use_tools(self, tool_calls, task_description, task_description_as_query=True) -> UnifiedToolResponse: |
|
"""Execute the selected tools to gather information""" |
|
self.update_research_state(current_task=f"Gathering information with tools") |
|
|
|
gathered_information = UnifiedToolResponse() |
|
|
|
for tool_call in tool_calls: |
|
tool_name = tool_call.function.name |
|
tool_args = tool_call.function.arguments |
|
|
|
print_green(f"Using tool: {tool_name} with args: {tool_args}") |
|
|
|
# Add the query to arguments if not already present |
|
if "query" in tool_args: |
|
if task_description_as_query: |
|
tool_args["query"] = task_description |
|
else: |
|
if "query" not in tool_args and task_description: |
|
tool_args["query"] = task_description |
|
|
|
# Log tool use in report |
|
if self.report: |
|
self.report.log_tool_use(tool_name, tool_args) |
|
|
|
try: |
|
# Call the tool function |
|
function_to_call = self.available_functions.get(tool_name) |
|
if function_to_call: |
|
result = function_to_call(**tool_args) |
|
else: |
|
result = f"Unknown tool: {tool_name}" |
|
|
|
# Process the result |
|
if isinstance(result, UnifiedSearchResults): |
|
# Convert to a unified format |
|
gathered_information.extend_search_results(result) |
|
gathered_information.extend_tool_name(tool_name) |
|
elif isinstance(result, str): |
|
# Already in the correct format |
|
gathered_information.extend_text_results(result) |
|
gathered_information.extend_tool_name(tool_name) |
|
|
|
# Log gathered information in report |
|
# if self.report: |
|
# self.report.log_information([gathered_info]) |
|
|
|
except Exception as e: |
|
print_red(f"Error executing tool {tool_name}: {e}") |
|
traceback.print_exc() |
|
import sys |
|
|
|
sys.exit(1) |
|
|
|
return gathered_information |
|
|
|
# Tool function definitions |
|
def fetch_science_articles_tool( |
|
self, query: str, n_documents: int = 6 |
|
) -> ChunkSearchResults: |
|
""" |
|
Fetches information from scientific articles. |
|
|
|
Parameters: |
|
query (str): The search query to find relevant scientific articles in a vector database. |
|
n_documents (int): How many documents to fetch. A complex query may require more documents. Min: 3, Max: 10. |
|
|
|
Returns: |
|
ChunkSearchResults: A structured result containing articles with their chunks. |
|
""" |
|
|
|
where_filter = {} |
|
if hasattr(self, "chroma_ids_retrieved") and len(self.chroma_ids_retrieved) > 0: |
|
where_filter = {"_id": {"$in": self.chroma_ids_retrieved}} |
|
|
|
|
|
found_chunks = self.get_chunks( |
|
user_input=query, |
|
collections=["sci_articles"], |
|
n_results=n_documents, |
|
n_sources=max(n_documents, 4) |
|
) |
|
|
|
# Standardize the chunks using UnifiedDataChunk |
|
unified_chunks = [ |
|
UnifiedDataChunk( |
|
content=chunk.content, |
|
metadata=chunk.metadata.model_dump(), |
|
source_type="other_documents", |
|
) |
|
for chunk in found_chunks.chunks |
|
] |
|
# Return the unified search results |
|
return UnifiedSearchResults(chunks=unified_chunks, source_ids=[]) |
|
|
|
|
|
def fetch_notes_tool(self, **argv) -> UnifiedSearchResults: |
|
""" |
|
Fetches project notes as a list for the researcher to understand what's important in the project. |
|
This tool is useful for getting a quick overview of the project's key points. |
|
Takes no arguments! |
|
|
|
Returns: |
|
UnifiedSearchResults: A structured result containing notes as data chunks. |
|
""" |
|
chunks = [] |
|
for i, note in enumerate(self.get_notes()): |
|
# Create a unified data chunk for each note |
|
unified_chunk = UnifiedDataChunk( |
|
content=note, |
|
metadata={ |
|
"title": f"Note {i+1}", # Use a default string instead of None |
|
"source": "Project Notes" # Add more metadata for better identification |
|
}, |
|
source_type="note", |
|
) |
|
chunks.append(unified_chunk) |
|
return UnifiedSearchResults(chunks=chunks, source_ids=[]) |
|
|
|
|
|
class MasterAgent(ResearchBase): |
|
"""A large and reasoning (if not specified not to be) LLM that handles the complex thinking tasks and coordinates other agents""" |
|
|
|
def __init__( |
|
self, username: str, project: Project = None, model: str = "reasoning", tools: list=[], **kwargs |
|
): |
|
# Configure for reasoning model |
|
kwargs["model_config"] = { |
|
"system_message": "You are an assistant helping a journalist writing a report based on extensive research.", |
|
"temperature": 0.3, |
|
"model": model, |
|
"chat": True, |
|
} |
|
super().__init__(username=username, project=project, tools=tools, **kwargs) |
|
self.model = model |
|
self.available_sources = {} |
|
|
|
# Initialize sub-agents |
|
self.structure_agent = StructureAgent( |
|
username=username, model="small", report=self.report |
|
) |
|
self.tool_agent = ToolAgent( |
|
username=username, |
|
model="tools", |
|
system_message=f"You are an assistant with some tools. The tools you can choose from are {tools} Always choose a tool to help with the task. Your task is to choose one or multiple tools to answer a user's query. DON'T come up with your own tools, only use the ones provided.", |
|
report=self.report, |
|
project=project, |
|
chat=True, |
|
) |
|
self.archive_agent = ArchiveAgent( |
|
username=username, |
|
report=self.report, |
|
project=project, |
|
system_message=""" |
|
You are an assistant specialized in reading and summarizing research information. |
|
You are helping a researcher with a research divided in many steps, where you will get information for each step. |
|
Your goal is to provide clear, accurate summaries that capture the essential points while maintaining context. |
|
If your summary is deemed insufficient to complete the step, you will be given more information and asked for a updated summary. Please then include the previous information you got in your new summary so that all information is taken into account. |
|
""", |
|
chat=True, |
|
) |
|
self.assistant_agent = AssistantAgent( |
|
username=username, |
|
report=self.report, |
|
project=project, |
|
system_message=""" |
|
You are an assistant specialized in summarizing steps and keeping track of the research process. |
|
Your job is to maintain a structured record of what has been done and help the researcher navigate |
|
through the research process by providing clear summaries of steps completed. |
|
""", |
|
chat=True, |
|
) |
|
|
|
# Track execution results |
|
self.execution_results = {} |
|
|
|
def check_available_sources(self): |
|
""" |
|
Check the available sources in the database and update the report. |
|
|
|
This method iterates through the arango_ids stored in the instance and |
|
counts the number of documents by type based on their ID prefixes. |
|
It then updates the self.available_sources dictionary with counts for: |
|
- other_documents: Documents with IDs starting with "other_documents" |
|
- sci_articles: Scientific articles with IDs starting with "sci_articles" |
|
- notes: Notes with IDs starting with "note" |
|
- interviews: Interviews with IDs starting with "interview" |
|
|
|
Returns: |
|
None |
|
""" |
|
#! Update when more sources are added! |
|
other_documents = 0 |
|
science_articles = 0 |
|
notes = 0 |
|
interviews = 0 |
|
|
|
for id in self.arango_ids: |
|
if id.startswith("other_documents"): |
|
other_documents += 1 |
|
elif id.startswith("sci_articles"): |
|
science_articles += 1 |
|
elif id.startswith("note"): |
|
notes += 1 |
|
elif id.startswith("interview"): |
|
interviews += 1 |
|
|
|
for source in [ |
|
"other_documents", |
|
"sci_articles", |
|
"notes", |
|
"interviews", |
|
]: |
|
if source == "other_documents": |
|
self.available_sources[source] = other_documents |
|
elif source == "sci_articles": |
|
self.available_sources[source] = science_articles |
|
elif source == "notes": |
|
self.available_sources[source] = notes |
|
elif source == "interviews": |
|
self.available_sources[source] = interviews |
|
|
|
def make_plan(self, question): |
|
"""Generate a research plan for answering the question/exploring the topic""" |
|
|
|
self.update_research_state( |
|
current_step="Plan Creation", current_task="Splitting into questions." |
|
) |
|
|
|
query = llm_queries.create_plan_questions(self, question) |
|
|
|
response = self.llm.generate(query=query, model=self.model, think=True) |
|
print_purple(response.content) |
|
subquestions = [i for i in remove_thinking(response).split("\n") if "?" in i] |
|
|
|
self.report.report["plan"]["subquestions"] = subquestions |
|
|
|
self.update_research_state( |
|
current_step="Plan Creation", current_task="Creating initial research plan" |
|
) |
|
|
|
# TODO Update the available resources in the query when more resources are added! |
|
make_plan_query = llm_queries.create_plan(self, question) |
|
|
|
# Generate the plan and handle potential formatting issues |
|
try: |
|
response = self.llm.generate(query=make_plan_query, model=self.model, think=True) |
|
plan = self.structure_agent.make_structured(response.content, question) |
|
|
|
|
|
print("THIS IS THE PLAN\n") |
|
print_rainbow(plan.__dict__) |
|
self.update_research_state(steps_completed=1) |
|
return plan |
|
|
|
except Exception as e: |
|
print_red(f"Error creating research plan: {e}") |
|
traceback.print_exc() |
|
return f"Error creating research plan: {str(e)}" |
|
|
|
def process_step(self, step_name, step_tasks, max_attempts=3): |
|
""" |
|
Process a research step with multiple tasks using various agents to gather and organize information. |
|
This function handles the complete workflow for a research step: |
|
1. Determines required tools for all tasks |
|
2. Gathers information using the selected tools |
|
3. Summarizes the gathered information |
|
4. Evaluates if the information is sufficient |
|
5. Iteratively gathers more information if needed (up to 3 attempts) |
|
6. Finalizes the step results |
|
""" |
|
print_purple(f"\nProcessing Step: {step_name}") |
|
self.update_research_state( |
|
current_step=step_name, current_task="Processing entire step" |
|
) |
|
|
|
# 1. Determine tools needed for all tasks in the step |
|
print_blue("Determining tools for all tasks in step...") |
|
all_tasks_description = f"## Step: {step_name}\n" |
|
for task in step_tasks: |
|
all_tasks_description += ( |
|
f"- {task['task_name']}: {task['task_description']}\n" |
|
) |
|
all_tasks_description += "\nWhat tools should I use to gather all necessary information for these tasks efficiently?" |
|
|
|
tool_calls = self.tool_agent.task_tools(all_tasks_description) |
|
print_purple("Tools selected for the entire step:") |
|
for i, tool_call in enumerate(tool_calls, 1): |
|
args_dict = tool_call.function.arguments |
|
# Format the arguments as a comma-separated list of key=value pairs |
|
args_formatted = ", ".join([f"{k}={v}" for k, v in args_dict.items()]) |
|
print_purple(f"{i}. {tool_call.function.name} ({args_formatted})") |
|
|
|
# 2. Gather data according to the selected tools |
|
print_blue("Gathering data for all tasks...") |
|
gathered_info = self.archive_agent.use_tools(tool_calls, all_tasks_description) |
|
self.archive_agent.chroma_ids_retrieved += gathered_info.get_chroma_ids |
|
|
|
# 3. Summarize the gathered data |
|
print_blue("Summarizing gathered information...") |
|
self.archive_agent.reset_chat_history() |
|
print_yellow("Step description:") |
|
print_yellow(all_tasks_description) |
|
print() |
|
print_yellow("Summarizing all gathered information...") |
|
print("Gathered information:") |
|
for info in gathered_info: |
|
print(info) |
|
print_yellow("Summarizing all gathered information...") |
|
summary = self.archive_agent.read_and_summarize( |
|
gathered_info, all_tasks_description |
|
) |
|
summary = remove_thinking(summary) |
|
print_green("Step Information Summary:") |
|
print(summary) |
|
|
|
# Have the assistant agent track this step |
|
self.assistant_agent.summarize_step(step_name, summary) |
|
|
|
# 4. Evaluate if the data is sufficient for all tasks |
|
print_blue("Evaluating if information is sufficient for all tasks...") |
|
evaluation = self.evaluate_step_completeness(step_tasks, summary) |
|
|
|
self.report.log_evaluation(evaluation) |
|
|
|
# 5. If not enough information, gather more |
|
attempt = 1 |
|
|
|
while not evaluation["status"] and attempt < max_attempts: |
|
print_yellow( |
|
f"Information not sufficient. Attempt {attempt}/{max_attempts} to gather more..." |
|
) |
|
|
|
# Create a query focusing on missing information |
|
additional_query = f"For step '{step_name}', I need additional information on:\n{evaluation['missing_info']}\n\nWhat tools should I use to fill these gaps?" |
|
|
|
# Get additional tools |
|
additional_tool_calls = self.tool_agent.task_tools(additional_query) |
|
|
|
# Use additional tools to gather more information |
|
additional_info = self.tool_agent.use_tools( |
|
additional_tool_calls, additional_query |
|
) |
|
|
|
# Add to gathered information |
|
# TODO Is it better to append or or make the LLM use the chat history? |
|
# gathered_info.extend(additional_info) |
|
gathered_info = additional_info |
|
|
|
# Update summary with all information |
|
updated_summary = self.archive_agent.read_and_summarize( |
|
gathered_info, all_tasks_description |
|
) |
|
summary = remove_thinking(updated_summary) |
|
print_green("Updated Summary:") |
|
print(summary) |
|
|
|
# Update assistant agent with new summary |
|
self.assistant_agent.summarize_step(step_name, summary) |
|
|
|
# Re-evaluate |
|
evaluation = self.evaluate_step_completeness(step_tasks, summary) |
|
attempt += 1 |
|
|
|
self.report.update_step_summary(summary) |
|
|
|
# 6. Let the MasterAgent use the gathered data to finalize the step |
|
print_blue("Finalizing step results...") |
|
step_result = self.finalize_step_result(step_name, step_tasks, summary) |
|
|
|
# Pack and store results |
|
step_result = { |
|
"step_name": step_name, |
|
"tasks": [ |
|
{ |
|
"task_name": task["task_name"], |
|
"task_description": task["task_description"], |
|
} |
|
for task in step_tasks |
|
], |
|
"information_gathered": gathered_info, |
|
"summary": summary, |
|
"evaluation": evaluation, |
|
"result": step_result, |
|
} |
|
|
|
self.execution_results[step_name] = step_result |
|
|
|
def execute_research_plan(self, structured_plan): |
|
"""Execute the structured research plan step by step""" |
|
# Execute the plan step by step |
|
print_blue("\n--- EXECUTING RESEARCH PLAN ---") |
|
|
|
for step_name, tasks in structured_plan.steps.items(): |
|
print_blue(f"\n### Processing Step: {step_name}") |
|
self.report.start_step(step_name) |
|
|
|
# Collect all task descriptions in this step |
|
step_tasks = [ |
|
{"task_name": task_name, "task_description": task_description} |
|
for task_name, task_description in tasks |
|
] |
|
|
|
# Process the entire step |
|
self.archive_agent.reset_chroma_ids() |
|
self.process_step(step_name, step_tasks) |
|
|
|
# Finish the step in report |
|
self.report.finish_step() |
|
|
|
# Evaluate if more steps are needed |
|
print_blue("\n--- EVALUATING RESEARCH PLAN ---") |
|
plan_evaluation = self.evaluate_plan(self.execution_results) |
|
self.report.log_plan_evaluation(plan_evaluation) |
|
print_yellow("Plan Evaluation:") |
|
print(plan_evaluation["explanation"]) |
|
|
|
return self.execution_results |
|
|
|
def evaluate_step_completeness(self, step_tasks, summary): |
|
"""Evaluate if the information is sufficient for all tasks in the step""" |
|
|
|
# Add None for additional_info if it's not present |
|
|
|
self.update_research_state(current_task="Evaluating step completeness") |
|
|
|
# Create a query to evaluate all tasks |
|
step_description = "\n".join( |
|
[ |
|
f"- {task['task_name']}: {task['task_description']}" |
|
for task in step_tasks |
|
] |
|
) |
|
|
|
query = f""" |
|
You are evaluating if the gathered information is sufficient for completing ALL the following tasks: |
|
|
|
{step_description} |
|
|
|
Information gathered: |
|
""" |
|
{summary} |
|
""" |
|
|
|
Is this information sufficient to complete ALL tasks in this step? |
|
|
|
First, analyze each task individually and determine if the information is sufficient. |
|
Then, provide an overall assessment where "status" is True if all tasks are complete and False if not. |
|
Explain why the information is sufficient or not in the "explanation" field. |
|
If ANY task has insufficient information, specify exactly what additional information is needed. |
|
""" |
|
|
|
response = self.llm.generate( |
|
query=query, |
|
format=EvaluateFormat.model_json_schema(), |
|
model="standard", |
|
think=True, |
|
) |
|
structured_response = EvaluateFormat.model_validate_json(response.content) |
|
# Add None for additional_info if it's not present |
|
if not hasattr(structured_response, "additional_info"): |
|
structured_response.additional_info = None |
|
|
|
if structured_response.status: |
|
print_green( |
|
f'\nEVALUATION PASSED\n"Step: {step_description}\n{structured_response.explanation}' |
|
) |
|
elif not structured_response.status: |
|
print_red(f"EVALUATION FAILED\n{structured_response.explanation}") |
|
|
|
return { |
|
"status": structured_response.status, |
|
"explanation": structured_response.explanation, |
|
"missing_info": structured_response.additional_info, |
|
} |
|
|
|
def evaluate_step(self, information, task_description): |
|
"""Evaluate if the information is sufficient for the current step/task""" |
|
|
|
self.update_research_state(current_task=f"Evaluating '{task_description}'") |
|
|
|
query = f''' |
|
You are evaluating if the gathered information is sufficient for completing this research task. |
|
|
|
Task: {task_description} |
|
|
|
Information gathered: |
|
""" |
|
{information} |
|
""" |
|
|
|
Is this information sufficient to complete the task? Respond in the format requested. |
|
If insufficient, explain exactly what additional information would be needed. |
|
''' |
|
|
|
response = self.llm.generate( |
|
query=query, format=EvaluateFormat.model_json_schema() |
|
) |
|
structured_response = EvaluateFormat.model_validate_json(response.content) |
|
if structured_response.status: |
|
print_green( |
|
f'\nEVALUATION PASSED\n"Task: {task_description}\n{structured_response.explanation}' |
|
) |
|
|
|
# Determine status based on the response |
|
else: |
|
print_red("EVALUATION FAILED") |
|
print_yellow(f"Task: {task_description}") |
|
print_rainbow(structured_response.__dict__) |
|
return { |
|
"status": structured_response.status, |
|
"explanation": structured_response.explanation, |
|
"additional_info": structured_response.additional_info, |
|
} |
|
|
|
def evaluate_plan(self, execution_results): |
|
"""Evaluate if more research steps are needed""" |
|
self.update_research_state( |
|
current_step="Plan Evaluation", |
|
current_task="Evaluating overall research progress", |
|
) |
|
|
|
# Create a summary of completed research |
|
steps_summary = "" |
|
for step_name, step_data in execution_results.items(): |
|
steps_summary += f"\n## {step_name} \n" |
|
# Add the step's summary |
|
steps_summary += f"{step_data.get('summary', 'No summary available')} \n" |
|
|
|
# If you want to include individual tasks |
|
for task in step_data.get('tasks', []): |
|
task_name = task.get('task_name', 'Unnamed task') |
|
steps_summary += f"- {task_name} \n" |
|
|
|
|
|
query = f''' |
|
Based on the research that has been conducted so far, determine if additional steps are needed |
|
to create a comprehensive report. |
|
|
|
Research completed: |
|
""" |
|
{steps_summary} |
|
"""""" |
|
|
|
Original question to answer: {self.research_state.get('original_question', 'No question provided')} |
|
|
|
Are additional research steps needed? Respond with COMPLETE or INCOMPLETE, |
|
followed by a brief explanation. If INCOMPLETE, suggest what additional steps would be valuable. |
|
''' |
|
|
|
response = self.llm.generate(query=query, think=True) |
|
evaluation = response.content if hasattr(response, "content") else str(response) |
|
|
|
if "COMPLETE" in evaluation.upper().split(" "): |
|
print_green(f'\nEVALUATION PASSED\n"Evaluation: {evaluation}') |
|
return { |
|
"status": "no more information is needed", |
|
"explanation": evaluation, |
|
} |
|
else: |
|
print_red(f'\nEVALUATION FAILED\n"Evaluation: {evaluation}') |
|
return {"status": "more information is needed", "explanation": evaluation} |
|
|
|
def finalize_step_result(self, step_name, step_tasks, summary): |
|
"""Generate a comprehensive result for the entire step using all gathered information""" |
|
self.update_research_state( |
|
current_task=f"Finalizing results for step: {step_name}" |
|
) |
|
|
|
tasks_description = "\n".join( |
|
[ |
|
f"- {task['task_name']}: {task['task_description']}" |
|
for task in step_tasks |
|
] |
|
) |
|
|
|
query = f""" |
|
Based on the following information gathered for step "{step_name}", |
|
create a comprehensive analysis that addresses all the tasks in this step. |
|
|
|
Step tasks: |
|
{tasks_description} |
|
|
|
Information gathered: |
|
{summary} |
|
|
|
Your response should: |
|
1. Be structured with clear sections for each aspect of the analysis |
|
2. Draw connections between different pieces of information |
|
3. Highlight key insights relevant to the original research question |
|
4. Provide a comprehensive understanding of this step's contribution to the overall research |
|
|
|
Sometimes the information is limited, if so do not make up information, but rather say that the information is limited and write a shorter response. |
|
""" |
|
|
|
response = self.llm.generate(query=query) |
|
step_result = remove_thinking(response) |
|
print_green("Step Result:") |
|
print(step_result) |
|
|
|
self.update_research_state( |
|
steps_completed=self.research_state.get("steps_completed", 0) + 1 |
|
) |
|
return step_result |
|
|
|
def write_report(self, execution_results): |
|
"""Generate the final report based on the collected information""" |
|
self.update_research_state( |
|
current_step="Report Writing", current_task="Generating final report" |
|
) |
|
|
|
# Prepare all the gathered information in a structured way |
|
gathered_info = "" |
|
for step_name, step_data in execution_results.items(): |
|
gathered_info += f"\n## {step_name}\n" |
|
# Add the step's summary |
|
gathered_info += f"Step Summary: {step_data.get('summary', 'No summary available')}\n\n" |
|
|
|
# Add information about tasks |
|
for task in step_data.get('tasks', []): |
|
task_name = task.get('task_name', 'Unnamed task') |
|
task_description = task.get('task_description', 'No description') |
|
gathered_info += f"### {task_name}\n" |
|
gathered_info += f"Description: {task_description}\n\n" |
|
|
|
# Include sources when available |
|
sources = [] |
|
for info in step_data.get('information_gathered', []): |
|
if isinstance(info, dict) and "result" in info and "content" in info["result"]: |
|
if (isinstance(info["result"]["content"], dict) and |
|
"chunks" in info["result"]["content"]): |
|
for chunk in info["result"]["content"].get("chunks", []): |
|
metadata = chunk.get("metadata", {}) |
|
source = f"{metadata.get('title', 'Unknown')}" |
|
if metadata.get("journal"): |
|
source += f" ({metadata.get('journal')})" |
|
if source not in sources: |
|
sources.append(source) |
|
|
|
if sources: |
|
gathered_info += "\n### Sources:\n" |
|
for i, source in enumerate(sources): |
|
gathered_info += f"- [{i+1}] {source}\n" |
|
|
|
# Rest of the method continues... |
|
|
|
print_blue("\n\nGathered information:\n".upper()) |
|
print(gathered_info, "\n") |
|
|
|
query = f''' |
|
Based on the following research information, write a extensive report that in detail answers the question: |
|
"{self.research_state.get('original_question', 'No question provided').replace('"', "'")}" |
|
|
|
Research Information: |
|
""" |
|
{gathered_info} |
|
""" |
|
|
|
The report should be well-structured with appropriate headings, present the information |
|
accurately, and highlight key insights. Cite sources using [number] notation when referencing specific information. |
|
As the report is for journalistic reseach, please be generous with details and cases that can be used when reporting on the subject! |
|
''' |
|
|
|
response = self.llm.generate(query=query) |
|
report = response.content if hasattr(response, "content") else str(response) |
|
report = remove_thinking(report) |
|
|
|
self.update_research_state( |
|
steps_completed=self.research_state.get("steps_completed", 0) + 1 |
|
) |
|
return report |
|
|
|
|
|
class StructureAgent(ResearchBase): |
|
"""A small LLM for structuring text as JSON""" |
|
|
|
def __init__(self, username, model: str = "standard", **kwargs): |
|
|
|
super().__init__(username=username, **kwargs) |
|
self.model = model |
|
self.system_message = """You are helping a researcher to structure a text. You will get a text and make it into structured data. |
|
Make sure not to change the meaning of the text and keeps all the details in the subtasks. |
|
The content and/or of each step and task should be understandable by itself. Therefore, if a task seems to refer to something that is not mentioned in the step, you should include the necessary information in the task itself. Example: if a task consists of "Collect relevant information for the subject", you should include what the subject is in the task itself. |
|
""" |
|
self.llm = LLM( |
|
system_message="You are a research assistant.", |
|
model=self.model, |
|
chat=False, |
|
messages=[], |
|
) |
|
|
|
def make_structured(self, text, question=None): |
|
"""Convert the research plan into a structured format""" |
|
self.update_research_state( |
|
current_step="Plan Structuring", |
|
current_task="Converting plan to structured format", |
|
) |
|
|
|
# Prepare query based on whether a question is provided |
|
if question: |
|
query = f'''This is a proposed plan for how to write a report on "{question}":\n"""{text}"""\nPlease make the plan into structured data with subtasks. Make sure to keep all the details in the subtasks.''' |
|
else: |
|
query = f'''This is a proposed plan for how to write a report:\n"""{text}"""\nPlease make the plan into structured data with subtasks. Make sure to keep all the details in the subtasks.''' |
|
|
|
# Generate the structured plan |
|
try: |
|
response = self.llm.generate( |
|
query=query, format=Plan.model_json_schema(), model=self.model |
|
) |
|
response_content = ( |
|
response.content if hasattr(response, "content") else str(response) |
|
) |
|
structured_response = Plan.model_validate_json(response_content) |
|
|
|
self.update_research_state( |
|
steps_completed=self.research_state.get("steps_completed", 0) + 1 |
|
) |
|
return Plan.model_validate_json(response_content) |
|
|
|
except Exception as e: |
|
print_red(f"Error structuring plan: {e}") |
|
traceback.print_exc() |
|
# Create a basic fallback structure |
|
import sys |
|
|
|
sys.exit(1) |
|
|
|
|
|
class ToolAgent(ResearchBase): |
|
"""An LLM specialized in choosing tools based on information needs""" |
|
|
|
def __init__(self, username, **kwargs): |
|
# Initialize the LLM configuration |
|
kwargs["model_config"] = { |
|
"system_message": kwargs.get( |
|
"system_message", |
|
f""" |
|
You are a helpful assistant with tools. |
|
Your task is to choose one or multiple tools to answer a user's query. |
|
DON'T come up with your own tools, only use the ones provided. |
|
""", |
|
), |
|
"temperature": 0.1, |
|
"model": "tools", |
|
"chat": kwargs.get("chat", True), |
|
} |
|
|
|
super().__init__(username=username, **kwargs) |
|
|
|
def task_tools(self, task_description): |
|
"""Determine which tools to use for a task""" |
|
self.update_research_state(current_task=f"Selecting tools for task") |
|
|
|
query = f'''Research task description: |
|
""" |
|
{task_description} |
|
""" |
|
You have to choose one or many tools in order fetch information neccessary to complete the task. |
|
It's important that you think of what information is needed, and choose the right tool for the job considering the tools descriptions. |
|
Make sure to read the description of the tools carefully before choosing! |
|
You can ONLY chose a tool you are provided with, don't make up a tool! |
|
You HAVE TO CHOOSE A TOOL, even if you think you can answer without it. Don't answer the question without choosing a tool. |
|
''' |
|
|
|
response = self.llm.generate(query=query, tools=self.tools, model="tools") |
|
|
|
# Extract tool calls from the response |
|
tool_calls = response.tool_calls if hasattr(response, "tool_calls") else [] |
|
return tool_calls |
|
|
|
|
|
class AssistantAgent(ResearchBase): |
|
"""A small LLM agent for summarizing steps and keeping track of the research process by managing "research notes"/common memory. |
|
This agent is designed to work with smaller language models and maintain a structured |
|
record of the research process through its Notes system. |
|
""" |
|
|
|
class Notes: |
|
""" |
|
A class for storing and retrieving notes related to different steps in a process. |
|
This class allows adding notes with step name, information, and summary, |
|
and retrieving notes for specific steps. |
|
Attributes: |
|
step_notes (list): A list of dictionaries containing step notes. |
|
Each dictionary has keys 'step_name', 'step_information', and 'summary'. |
|
""" |
|
|
|
def __init__(self): |
|
self.step_notes = [] |
|
|
|
def add_step_note(self, step_name, step_information, step_summary): |
|
"""Add a note for a specific step""" |
|
|
|
self.step_notes.append( |
|
{ |
|
"step_name": step_name, |
|
"step_information": step_information, |
|
"summary": step_summary, |
|
} |
|
) |
|
|
|
def get_step_notes(self, step_name): |
|
"""Get notes for a specific step. |
|
|
|
Arguments: |
|
step_name (str): The name of the step to retrieve notes for. |
|
Returns: |
|
list: A list of notes for the specified step. |
|
""" |
|
return [note for note in self.step_notes if note["step_name"] == step_name] |
|
|
|
def __init__(self, username: str, system_message: str, **kwargs): |
|
# Configure for small model |
|
kwargs["model_config"] = { |
|
"temperature": 0.1, |
|
"system_message": system_message, |
|
"model": "small", |
|
"chat": kwargs.get("chat", True), |
|
} |
|
super().__init__(username=username, **kwargs) |
|
self.system_message = system_message |
|
self.notes = self.Notes() |
|
|
|
def summarize_step(self, step_name, step_tasks): |
|
"""Summarize the results of a step""" |
|
self.update_research_state(current_task=f"Summarizing step '{step_name}'") |
|
|
|
# Create a query to summarize the step |
|
query = f""" |
|
You are summarizing the results of this research step: |
|
|
|
Step name: {step_name} |
|
|
|
Tasks: |
|
{step_tasks} |
|
|
|
Summarize the results of the tasks in a clear and concise manner. Focus on the facts, and mention sources for reference. |
|
""" |
|
|
|
response = self.llm.generate(query=query) |
|
summary = response.content if hasattr(response, "content") else str(response) |
|
self.notes.add_step_note( |
|
step_name=step_name, |
|
step_information=step_tasks, |
|
step_summary=summary, |
|
) |
|
|
|
|
|
class ArchiveAgent(ResearchBase): |
|
"""A small LLM for summarizing large amounts of text""" |
|
|
|
def __init__(self, username: str, system_message: str, **kwargs): |
|
# Configure for small model |
|
kwargs["model_config"] = { |
|
"temperature": 0.1, |
|
"system_message": system_message, |
|
"model": "small", |
|
"chat": kwargs.get("chat", True), |
|
} |
|
super().__init__(username=username, **kwargs) |
|
self.system_message = system_message |
|
self.chroma_ids_retrieved = [] |
|
|
|
def reset_chroma_ids(self): |
|
self.chroma_ids_retrieved = [] |
|
|
|
|
|
def read_and_summarize(self, information: UnifiedToolResponse, step_information): |
|
"""Summarize the information gathered by the tools""" |
|
self.update_research_state(current_task=f"Summarizing gathered information") |
|
|
|
# Check if there are full articles to process |
|
full_articles_to_process = [] |
|
if information.search_results and information.search_results.chunks: |
|
for chunk in information.search_results.chunks: |
|
if chunk.source_type == "full_article" and chunk.metadata.get("requires_full_summary", False): |
|
full_articles_to_process.append(chunk.metadata) |
|
|
|
# If we have full articles to process, summarize them |
|
if full_articles_to_process: |
|
article_summaries = [] |
|
question = None |
|
if hasattr(self, "research_state"): |
|
question = self.research_state.get("original_question", "") |
|
|
|
for article_meta in full_articles_to_process: |
|
article_id = article_meta.get("article_id") |
|
if article_id: |
|
summary = self.fetch_and_summarize_full_article_tool(article_id, question) |
|
article_summaries.append(summary) |
|
|
|
# If we've processed full articles, create a unified summary |
|
if article_summaries: |
|
info_text = "\n\n---\n\n".join(article_summaries) |
|
else: |
|
# If no full articles were successfully processed, fall back to regular text |
|
info_text = information.to_text |
|
else: |
|
# Process chunks as usual |
|
info_text = information.to_text |
|
|
|
print_purple(f"INFO TEXT for summarization:\n{info_text}\n") |
|
|
|
query = f''' |
|
Below is the description of the current research step. *It is only for your information, nothing you should reference in the summary*. |
|
|
|
""" |
|
{step_information} |
|
""" |
|
|
|
Please read the following information to make a summary for the researcher. |
|
|
|
""" |
|
{info_text} |
|
""" |
|
|
|
Focus on *information and facts* that are important and relevant for the research step. |
|
Ensure no important details are lost. |
|
Reference the sources in the summary so it's clear where each piece of information comes from. |
|
Some pieces of information might not be of use in this research step, if so, just ignore it without mentioning it. |
|
You are only allowed to use the sources provided in the information, don't make up any sources. |
|
You should only make a summary of the information, not the step itself or any kind of evaluation! |
|
''' |
|
|
|
response = self.llm.generate(query=query) |
|
summary = response.content if hasattr(response, "content") else str(response) |
|
|
|
return summary |
|
|
|
def reset_chat_history(self): |
|
self.llm.messages = [{"role": "system", "content": self.system_message}] |
|
|
|
|
|
def main(question, username="lasse", project_name="Electric Cars"): |
|
"""Main function to execute the research workflow""" |
|
|
|
# Initialize base and project |
|
base = BaseClass(username=username) |
|
project: Project = Project( |
|
username=username, project_name=project_name, user_arango=base.get_arango() |
|
) |
|
|
|
# Map what kind of sources there are in self.arango_ids |
|
number_of_documents = {'other_documents': 0, 'science_articles': 0, 'notes': 0, 'interviews': 0} |
|
|
|
bot = Bot(username=username, project=project, user_arango=base.get_arango(), tools="all") |
|
for id in bot.arango_ids: |
|
if id.startswith("other_documents"): |
|
number_of_documents['other_documents'] += 1 |
|
elif id.startswith("sci_articles"): |
|
number_of_documents['science_articles'] += 1 |
|
elif id.startswith("note"): |
|
number_of_documents['notes'] += 1 |
|
elif id.startswith("interview"): |
|
number_of_documents['interviews'] += 1 |
|
|
|
|
|
tool_sources = { |
|
"fetch_other_documents_tool": ["other_documents"], |
|
"fetch_science_articles_tool": ["science_articles"], |
|
"fetch_science_articles_and_other_documents_tool": ["other_documents", "science_articles"], |
|
"fetch_notes_tool": ["notes"] |
|
} |
|
|
|
# Create a list of tools that is to be given to the bots |
|
bot_tools: list = [tool.__name__ for tool in bot.tools if callable(tool)] |
|
for tool, sources in tool_sources.items(): |
|
print(tool, sources) |
|
documents = 0 |
|
for source in sources: |
|
documents += number_of_documents[source] |
|
if documents == 0: |
|
print_yellow(f"Removing {tool} from bot, as there are no documents in the source: {sources}") |
|
bot_tools.remove(tool) |
|
|
|
|
|
# Initialize report tracking |
|
report: ResearchReport = ResearchReport( |
|
question=question, username=username, project_name=project_name |
|
) |
|
|
|
# Initialize agents with report tracking |
|
master_agent = MasterAgent( |
|
username=username, project=project, report=report, chat=True, tools=bot_tools |
|
) |
|
|
|
|
|
# Track the research state in the master agent |
|
master_agent.research_state["original_question"] = question |
|
|
|
# Execute workflow with proper error handling and progress tracking |
|
print_blue(f"Starting research on: {question}") |
|
|
|
# Research plan creation |
|
print_blue("\n--- CREATING RESEARCH PLAN ---") |
|
research_plan = master_agent.make_plan(question) |
|
|
|
# Log the plan in the report |
|
report.log_plan(research_plan) |
|
|
|
# research_plan = ''' |
|
# plan = """## Step 1: Review the journalist's notes |
|
# - Task1: Identify and extract information from the journalist's notes that directly relates to lithium mining's social, technical, and economic aspects. |
|
# - Task2: Summarize the extracted information into a structured format, highlighting key themes (e.g., environmental impact, cost benefits, political greenwashing). |
|
|
|
# ## Step 2: Search for social impact information |
|
# - Task1: Use the database/LLM to search for information on the social impacts of lithium mining, such as displacement, labor conditions, health risks, and indigenous land rights. |
|
# - Task2: Summarize findings into a structured format, focusing on how lithium mining affects local communities and indigenous populations. |
|
|
|
# ## Step 3: Search for technical challenges |
|
# - Task1: Use the database/LLM to search for technical challenges of lithium mining, including environmental degradation, water usage, energy consumption, and ecosystem impacts. |
|
# - Task2: Summarize findings into a structured format, emphasizing technical risks and environmental consequences. |
|
|
|
# ## Step 4: Search for economic aspects |
|
# - Task1: Use the database/LLM to search for economic challenges of lithium mining, such as production costs, market volatility, profitability, and local economic impacts. |
|
# - Task2: Summarize findings into a structured format, highlighting economic trade-offs and long-term sustainability. |
|
|
|
# ## Step 5: Cross-reference and compile findings |
|
# - Task1: Compare information from Steps 2–4 to identify overlaps, contradictions, or gaps in the data. |
|
# - Task2: Compile all findings into a cohesive summary, ensuring each aspect (social, technical, economic) is addressed with evidence from the sources. |
|
|
|
# ## Step 6: Analyze long-term risks and sustainability |
|
# - Task1: Use the database/LLM to search for information on long-term risks of lithium mining, such as resource depletion, pollution, and water scarcity. |
|
# - Task2: Summarize findings into a structured format, linking long-term risks to social, technical, and economic aspects. |
|
# ''' |
|
|
|
|
|
|
|
report.log_plan(research_plan, research_plan) |
|
|
|
|
|
# Execute the plan step by step |
|
execution_results = master_agent.execute_research_plan(research_plan) |
|
|
|
# Write the final report |
|
print_blue("\n--- WRITING FINAL REPORT ---") |
|
final_report = master_agent.write_report(execution_results) |
|
report.log_final_report(final_report) |
|
print_green("Final Report:") |
|
print(final_report) |
|
|
|
# Save the full research report |
|
report_path = report.save_to_file( |
|
f"/home/lasse/sci/reports/research_report_{username}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json" |
|
) |
|
|
|
# Create a more readable markdown version |
|
markdown_report = report.get_markdown_report() |
|
markdown_path = report_path.replace(".json", ".md") |
|
with open(markdown_path, "w") as f: |
|
f.write(markdown_report) |
|
print_green(f"Markdown report saved to {markdown_path}") |
|
|
|
return { |
|
"question": question, |
|
"research_plan": research_plan, |
|
"structured_plan": research_plan, |
|
"execution_results": execution_results, |
|
"final_report": final_report, |
|
"full_report": report.get_full_report(), |
|
"report_path": report_path, |
|
"markdown_path": markdown_path, |
|
} |
|
|
|
|
|
def remove_thinking(response): |
|
"""Remove the thinking section from the response""" |
|
response_text = response.content if hasattr(response, "content") else str(response) |
|
if "</think>" in response_text: |
|
return response_text.split("</think>")[1].strip() |
|
return response_text |
|
|
|
|
|
if __name__ == "__main__": |
|
question = "What are the problems around lithium mining? I'm interested in social, technical and economical aspects." |
|
result = main( |
|
question, username="lasse", project_name="Electric Cars" |
|
) # Use these parameters to test the code, don't change!
|
|
|