From b72df20b03e91faf1122d66a311e5a6be505cba3 Mon Sep 17 00:00:00 2001 From: lasseedfast Date: Wed, 24 Sep 2025 07:08:36 +0200 Subject: [PATCH] New version to use OpenAI Python API and vLLM --- _llm/llm.py | 1216 ++++++++++++++++++++++++++++-------------- _llm/tool_registy.py | 214 ++++++++ 2 files changed, 1037 insertions(+), 393 deletions(-) create mode 100644 _llm/tool_registy.py diff --git a/_llm/llm.py b/_llm/llm.py index 3705721..5d72e1b 100644 --- a/_llm/llm.py +++ b/_llm/llm.py @@ -2,45 +2,24 @@ import os import base64 import re import traceback -from typing import Literal, Optional +from typing import Literal, Optional, Any, Dict, AsyncGenerator, Generator, Tuple, List, Union import tiktoken -import ollama -from ollama import ( - Client, - AsyncClient, - ResponseError, - Options, -) - -from ollama._types import ChatResponse -from pydantic import Field - -class ChatResponseWithHeaders(ChatResponse): - headers: dict = Field(default_factory=dict) - -def patched_request(self, cls, *args, stream=False, **kwargs): - if stream: - return self._original_request(cls, *args, stream=stream, **kwargs) - raw_response = self._request_raw(*args, **kwargs) - # Use the subclass if cls is ChatResponse - if cls.__name__ == "ChatResponse": - obj = ChatResponseWithHeaders(**raw_response.json()) - else: - obj = cls(**raw_response.json()) - obj.headers = dict(raw_response.headers) - return obj - -ollama.Client._original_request = ollama.Client._request -ollama.Client._request = patched_request +from openai import OpenAI, AsyncOpenAI +from openai.types.chat import ChatCompletion, ChatCompletionChunk +from openai.types.chat.chat_completion import Choice +from openai.types.chat.chat_completion_message import ChatCompletionMessage import backoff import env_manager +from tool_registy import get_tools, register_tool try: - from colorprinter.colorprinter.print_color import * + from colorprinter.print_color import * except ImportError: from colorprinter.print_color import * + + env_manager.set_env() tokenizer = tiktoken.get_encoding("cl100k_base") @@ -48,7 +27,7 @@ tokenizer = tiktoken.get_encoding("cl100k_base") class LLM: """ - LLM class for interacting with an instance of Ollama. + LLM class for interacting with OpenAI-compatible APIs (Ollama, vLLM). Attributes: model (str): The model to be used for response generation. @@ -58,8 +37,8 @@ class LLM: max_length_answer (int): Maximum length of the generated answer. chat (bool): Whether the chat mode is enabled. chosen_backend (str): The chosen backend server for the API. - client (Client): The client for synchronous API calls. - async_client (AsyncClient): The client for asynchronous API calls. + client (OpenAI): The client for synchronous API calls. + async_client (AsyncOpenAI): The client for asynchronous API calls. tools (list): List of tools to be used in generating the response. Note: @@ -133,13 +112,28 @@ class LLM: # If connected over VPN self.on_vpn = on_vpn if self.on_vpn: - self.host_url = f"{os.getenv('LLM_URL')}:{os.getenv('LLM_PORT')}" + self.host_url = f"{os.getenv('LLM_URL')}:{os.getenv('LLM_PORT')}/vllm/v1" else: - self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/") - self.client: Client = Client( - host=self.host_url, headers=headers, timeout=timeout + self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/")+'/vllm/v1' + + self._make_clients(headers, timeout) + + def _make_clients(self, headers=None, timeout=300): + headers = headers or {"Authorization": f"Basic {self.get_credentials()}"} + # Sync client + self.client = OpenAI( + base_url=self.host_url, + api_key="NONE", + default_headers=headers, + timeout=timeout, + ) + # Async client - fix the double /v1 issue + self.async_client = AsyncOpenAI( + base_url=self.host_url, + api_key="NONE", + default_headers=headers, + timeout=timeout, ) - self.async_client: AsyncClient = AsyncClient() def get_credentials(self): # Initialize the client with the host and default headers @@ -147,6 +141,9 @@ class LLM: return base64.b64encode(credentials.encode()).decode() def get_model(self, model_alias): + + if model_alias == "vllm": + return "/mnt/model_drive/models/gpt-oss-20b" models = { "standard": "LLM_MODEL", "small": "LLM_MODEL_SMALL", @@ -193,7 +190,10 @@ class LLM: query = re.sub(r"\s*\n\s*", "\n", query) message = {"role": "user", "content": query} - if images: + if model == 'vllm': #TODO This should not be like this... + model = self.get_model('vllm') + + elif images: message = self.prepare_images(images, message) model = self.get_model("vision") print_blue(f"Using vision model: {model}") @@ -226,90 +226,100 @@ class LLM: headers["X-Model-Type"] = "tools" if model == self.get_model("embeddings"): headers["X-Model-Type"] = "embeddings" - # No longer need to modify message content for thinking - handled by native API return headers def _get_options(self, temperature): """Build model options, setting temperature and other parameters.""" - options = Options(**self.options) - options.temperature = ( - temperature if temperature is not None else self.options["temperature"] - ) - return options + temp = temperature if temperature is not None else self.options["temperature"] + return {"temperature": temp} # @backoff.on_exception( # backoff.expo, - # (ResponseError, TimeoutError), + # (Exception, TimeoutError), # max_tries=3, # factor=2, # base=10, # on_backoff=lambda details: print_yellow( # f"Retrying due to error: {details['exception']}" - # ) + # ), # ) def _call_remote_api( self, model, tools, stream, options, format, headers, think=False - ): - """Call the remote Ollama API synchronously.""" + ) -> ChatCompletion: + """Call the remote API using OpenAI client.""" self.call_model = model - self.client: Client = Client(host=self.host_url, headers=headers, timeout=300) + # Update the client with the latest headers + self.client = OpenAI( + base_url=f"{self.host_url}", + api_key="ollama", + default_headers=headers, + timeout=300 + ) + if not self.silent: if self.on_vpn: print_yellow(f"🤖 Generating using {model} (remote, on VPN)...") else: print_yellow(f"🤖 Generating using {model} (remote)...") - # If this is an embeddings model, call the embed endpoint instead of chat. - if model == self.get_model("embeddings"): - # Find the last user message content to embed - input_text = "" - for m in reversed(self.messages): - if m.get("role") == "user" and m.get("content"): - input_text = m["content"] - break - if not input_text and self.messages: - input_text = self.messages[-1].get("content", "") - - # Use the embed API (synchronous) - response = self.client.embed( - model=model, input=input_text, keep_alive=3600 * 24 * 7 - ) - return response - - response = self.client.chat( - model=model, - messages=self.messages, - tools=tools, - stream=stream, - options=options, - keep_alive=3600 * 24 * 7, - format=format, - think=think, - ) - if hasattr(response, "headers") and "x-chosen-backend" in response.headers: - self.chosen_backend = response.headers["x-chosen-backend"] - print_blue(f"Backend used: {self.chosen_backend}") - self.chosen_backend = response.headers.get("x-chosen-backend", None) + # Build kwargs for chat completion + kwargs = { + "model": model, + "messages": self.messages, + "temperature": options["temperature"], + "stream": stream, + "max_tokens": self.max_length_answer + } + + # Add tools if provided + if tools: + kwargs["tools"] = tools + + # Add response format if provided + if format: + kwargs["response_format"] = {"type": format} + + # Call the OpenAI API + print_rainbow(kwargs) + response: ChatCompletion = self.client.chat.completions.create(**kwargs) + + # Try to extract backend information if available + try: + response_headers = getattr(response, "_headers", {}) + if response_headers and "x-chosen-backend" in response_headers: + self.chosen_backend = response_headers["x-chosen-backend"] + print_blue(f"Backend used: {self.chosen_backend}") + except: + pass + return response - @backoff.on_exception( - backoff.expo, - (ResponseError, TimeoutError), - max_tries=3, - factor=2, - base=10, - on_backoff=lambda details: print_yellow( - f"Retrying due to error: {details['exception']}" - ), - ) + # @backoff.on_exception( + # backoff.expo, + # (Exception, TimeoutError), + # max_tries=3, + # factor=2, + # base=10, + # on_backoff=lambda details: print_yellow( + # f"Retrying due to error: {details['exception']}" + # ), + # ) async def _call_remote_api_async( self, model, tools, stream, options, format, headers, think=False ): - """Call the remote Ollama API asynchronously.""" + """Call the remote API asynchronously using OpenAI async client.""" + # Update the async client with the latest headers + self.async_client = AsyncOpenAI( + base_url=self.host_url, # Remove the extra /v1 + api_key="ollama", + default_headers=headers, + timeout=300 + ) + if not self.silent: print_yellow(f"🤖 Generating using {model} (remote, async)...") - # If embedding model, use async embed endpoint + # If embedding model, use embeddings endpoint if model == self.get_model("embeddings"): input_text = "" for m in reversed(self.messages): @@ -319,187 +329,183 @@ class LLM: if not input_text and self.messages: input_text = self.messages[-1].get("content", "") - response = await self.async_client.embed(model=model, input=input_text) + response = await self.async_client.embeddings.create( + model=model, + input=input_text, + ) return response - response = await self.async_client.chat( - model=model, - messages=self.messages, - headers=headers, - tools=tools, - stream=stream, - options=options, - keep_alive=3600 * 24 * 7, - think=think, # Use native Ollama thinking support - ) + # Build kwargs for chat completion + kwargs = { + "model": model, + "messages": self.messages, + "temperature": options["temperature"], + "stream": stream, + "max_tokens": self.max_length_answer + } + + # Add tools if provided + if tools: + kwargs["tools"] = tools + + # Add response format if provided + if format: + kwargs["response_format"] = {"type": format} + + # Handle thinking mode through system messages or tool calls + if think and model == self.get_model("reasoning"): + # Implement thinking through tool calls if supported by the server + thinking_tool = { + "type": "function", + "function": { + "name": "thinking", + "description": "Share your step-by-step reasoning process", + "parameters": { + "type": "object", + "properties": { + "thinking": { + "type": "string", + "description": "Your step-by-step reasoning" + } + }, + "required": ["thinking"] + } + } + } + + if "tools" not in kwargs: + kwargs["tools"] = [thinking_tool] + else: + kwargs["tools"].append(thinking_tool) + + # Call the OpenAI API + response = await self.async_client.chat.completions.create(**kwargs) return response def _call_local_ollama(self, model, stream, temperature, think=False): - """Call the local Ollama instance synchronously.""" - import ollama + """Call a local OpenAI-compatible server.""" if not self.silent: print_yellow(f"🤖 Generating using {model} (local)...") - options = {"temperature": temperature} - if stream: - response_stream = ollama.chat( - model=model, - messages=self.messages, - options=options, - stream=True, - think=think, # Pass thinking parameter to local ollama - ) - - def local_stream_adapter(): - for chunk in response_stream: - # Handle both content and thinking in streaming chunks - chunk_message = chunk["message"] - content = chunk_message.get("content", "") - thinking = chunk_message.get("thinking", None) - - yield type( - "OllamaResponse", - (), - { - "message": type( - "Message", - (), - { - "content": content, - "thinking": thinking, # Include thinking in stream chunks - }, - ), - "done": chunk.get("done", False), + + # Create a local client pointed at localhost + local_client = OpenAI( + base_url="http://localhost:11434/v1", + api_key="ollama", + ) + + # Build kwargs for chat completion + kwargs = { + "model": model, + "messages": self.messages, + "temperature": temperature, + "stream": stream, + "max_tokens": self.max_length_answer + } + + # Handle thinking mode through system messages or tool calls + if think and model == self.get_model("reasoning"): + # Implement thinking through tool calls if supported + thinking_tool = { + "type": "function", + "function": { + "name": "thinking", + "description": "Share your step-by-step reasoning process", + "parameters": { + "type": "object", + "properties": { + "thinking": { + "type": "string", + "description": "Your step-by-step reasoning" + } }, - ) - - return self.read_stream(local_stream_adapter()) + "required": ["thinking"] + } + } + } + + kwargs["tools"] = [thinking_tool] + + if stream: + response_stream = local_client.chat.completions.create(**kwargs) + return self.read_stream(response_stream) else: - response = ollama.chat( - model=model, - messages=self.messages, - options=options, - think=think, # Pass thinking parameter to local ollama - ) - result = response["message"]["content"] - - # Handle thinking content if present with native support - thinking_content = response["message"].get("thinking", None) - - response_obj = type( - "LocalChatResponse", - (), - { - "message": type( - "Message", - (), - { - "content": result, - "thinking": thinking_content, - "get": lambda x: None, - }, - ) - }, - ) - - # Store only the main content in message history + response = local_client.chat.completions.create(**kwargs) + result = response.choices[0].message.content + + # Create a response object that matches the structure expected by the rest of the code + message_obj = response.choices[0].message + + # Store in message history self.messages.append({"role": "assistant", "content": result}) if not self.chat: self.messages = [self.messages[0]] - return response_obj.message - - # @backoff.on_exception( - # backoff.expo, - # (ResponseError, TimeoutError), - # max_tries=3, - # factor=2, - # base=10, - # on_backoff=lambda details: print_yellow( - # f"Retrying due to error: {details['exception']}" - # ) - # ) + + return message_obj async def _call_local_ollama_async(self, model, stream, temperature, think=False): - """Call the local Ollama instance asynchronously (using a thread pool).""" - import ollama - import asyncio + """Call a local OpenAI-compatible server asynchronously.""" if not self.silent: print_yellow(f"🤖 Generating using {model} (local, async)...") - options = {"temperature": temperature} - loop = asyncio.get_event_loop() - if stream: - - def run_stream(): - return ollama.chat( - model=model, - messages=self.messages, - options=options, - stream=True, - think=think, # Pass thinking parameter to local ollama - ) - - response_stream = await loop.run_in_executor(None, run_stream) - - async def local_stream_adapter(): - for chunk in response_stream: - # Handle both content and thinking in async streaming - chunk_message = chunk["message"] - content = chunk_message.get("content", "") - thinking = chunk_message.get("thinking", None) - - yield type( - "OllamaResponse", - (), - { - "message": type( - "Message", - (), - { - "content": content, - "thinking": thinking, # Include thinking in async stream chunks - }, - ), - "done": chunk.get("done", False), + + # Create a local async client pointed at localhost + local_client = AsyncOpenAI( + base_url="http://localhost:11434/v1", + api_key="ollama", + ) + + # Build kwargs for chat completion + kwargs = { + "model": model, + "messages": self.messages, + "temperature": temperature, + "stream": stream, + "max_tokens": self.max_length_answer + } + + # Handle thinking mode through system messages or tool calls + if think and model == self.get_model("reasoning"): + # Implement thinking through tool calls if supported + thinking_tool = { + "type": "function", + "function": { + "name": "thinking", + "description": "Share your step-by-step reasoning process", + "parameters": { + "type": "object", + "properties": { + "thinking": { + "type": "string", + "description": "Your step-by-step reasoning" + } }, - ) - - return local_stream_adapter() + "required": ["thinking"] + } + } + } + + kwargs["tools"] = [thinking_tool] + + if stream: + response_stream = await local_client.chat.completions.create(**kwargs) + + async def stream_adapter(): + async for chunk in response_stream: + yield chunk + + return await self.read_stream_async(stream_adapter()) else: - - def run_chat(): - return ollama.chat( - model=model, - messages=self.messages, - options=options, - think=think, # Pass thinking parameter to local ollama - ) - - response_dict = await loop.run_in_executor(None, run_chat) - result = response_dict["message"]["content"] - - # Handle thinking content if present with native support - thinking_content = response_dict["message"].get("thinking", None) - - # Create response object with thinking support - response_obj = type( - "LocalChatResponse", - (), - { - "message": type( - "Message", - (), - { - "content": result, - "thinking": thinking_content, - "get": lambda x: None, - }, - ) - }, - ) - + response = await local_client.chat.completions.create(**kwargs) + result = response.choices[0].message.content + + # Create a response object that matches the structure expected by the rest of the code + message_obj = response.choices[0].message + + # Store in message history self.messages.append({"role": "assistant", "content": result}) if not self.chat: self.messages = [self.messages[0]] - return response_obj.message + + return message_obj def generate( self, @@ -517,7 +523,7 @@ class LLM: format=None, think=None, force_local: bool = False, - ): + ) -> ChatCompletionMessage: """ Attempts to generate a response using a remote API first, then falls back to local Ollama if the remote call fails or if force_local is True. @@ -538,18 +544,13 @@ class LLM: think (bool, optional): Whether to enable thinking mode. Defaults to None. force_local (bool, optional): Force use of local Ollama instead of remote API. Defaults to False. - local_available (bool, optional): Whether local Ollama is available. Returns: The generated response. Type varies based on stream parameter and success: - For streaming: Returns stream reader object - For non-streaming remote success: Returns response message object - - For local fallback: Returns local Ollama response + - For local fallback: Returns local response - For complete failure: Returns error message string - - Raises: - Prints stack trace for exceptions but doesn't propagate them, instead - returning error messages or attempting fallback to local processing. """ if model is None and self.model: model = self.model @@ -558,41 +559,42 @@ class LLM: model = self._prepare_messages_and_model( query, user_input, context, messages, images, model ) + print_red(model) temperature = temperature if temperature else self.options["temperature"] if think is None: think = self.think + + options = self._get_options(temperature) + if not force_local: try: headers = self._build_headers(model) - options = self._get_options(temperature) - # Call Ollama server) - response: ChatResponse = self._call_remote_api( + # Call OpenAI-compatible API + response: ChatCompletion = self._call_remote_api( model, tools, stream, options, format, headers, think=think ) - - # If using embeddings model, the response is an embed result (not a ChatResponse). - if model == self.get_model("embeddings"): - return response - if stream: return self.read_stream(response) else: - if isinstance(response, ChatResponse): - # With native thinking support, content is already clean - result = response.message.content.strip('"') - - self.messages.append({"role": "assistant", "content": result}) + choice = response.choices[0] + print('---') + print_rainbow(choice.__dict__, single_line=True) + print('---') + message: ChatCompletionMessage = choice.message + result: str = message.content + + + # Store in message history (without tool calls for clean history) + self.messages.append({"role": "assistant", "content": result}) + if not self.chat: + self.messages = [self.messages[0]] + return message + + return message - if not self.chat: - self.messages = [self.messages[0]] - - Warning = "Please use reposen.message.content when ising _llm" - return response - else: - return "An error occurred." except Exception as e: traceback.print_exc() @@ -601,7 +603,9 @@ class LLM: return self._call_local_ollama(model, stream, temperature, think=think) except Exception as e: traceback.print_exc() - return "Both remote API and local Ollama failed. An error occurred." + return "Both remote API and local server failed. An error occurred." + + return "Remote API failed and local server not available. An error occurred." async def async_generate( self, @@ -635,7 +639,7 @@ class LLM: messages (list[dict], optional): List of messages to use instead of building from query. format: Format specification for the response. think (bool, optional): Whether to use thinking mode for reasoning models. - force_local (bool, optional): Force using local Ollama instead of remote API. + force_local (bool, optional): Force using local server instead of remote API. Returns: The generated response message or an error message if an exception occurs. @@ -644,12 +648,13 @@ class LLM: query, user_input, context, messages, images, model ) temperature = temperature if temperature else self.options["temperature"] + options = self._get_options(temperature) # First try with remote API if not force_local: try: headers = self._build_headers(model) - options = self._get_options(temperature) + response = await self._call_remote_api_async( model, tools, stream, options, format, headers, think=think ) @@ -659,35 +664,79 @@ class LLM: return response if stream: - return self.read_stream(response) + # Return the async generator directly + return self.read_stream_async(response) else: - if isinstance(response, ChatResponse): - # With native thinking support, content is already clean - result = response.message.content.strip('"') - + # Handle the OpenAI response format + if hasattr(response, 'choices') and len(response.choices) > 0: + choice = response.choices[0] + message = choice.message + result = message.content + + # Extract thinking from reasoning_content if present + thinking_content = None + if hasattr(message, 'reasoning_content') and message.reasoning_content: + thinking_content = message.reasoning_content + + # Extract thinking from tool calls if present + if not thinking_content and hasattr(message, 'tool_calls') and message.tool_calls: + for tool_call in message.tool_calls: + if hasattr(tool_call, 'function') and tool_call.function.name == "thinking": + try: + import json + thinking_data = json.loads(tool_call.function.arguments) + thinking_content = thinking_data.get("thinking", "") + except (json.JSONDecodeError, AttributeError): + thinking_content = tool_call.function.arguments + + # Store in message history (without tool calls for clean history) self.messages.append({"role": "assistant", "content": result}) - if not self.chat: self.messages = [self.messages[0]] - - return response.message + + # Add thinking attribute to message object if present + if thinking_content: + # Create a copy of the message to avoid modifying the original + class MessageWithThinking: + def __init__(self, original_message, thinking): + self.content = original_message.content + self.role = original_message.role + self.thinking = thinking + # Copy other attributes from original message + for attr in dir(original_message): + if not attr.startswith('_') and attr not in ['content', 'role']: + try: + setattr(self, attr, getattr(original_message, attr)) + except: + pass + + return MessageWithThinking(message, thinking_content) + + return message else: - return "An error occurred." - + return "An error occurred processing the response." except Exception as e: traceback.print_exc() + # Return error message instead of trying to access .content on string + if self.local_available: + pass # Fall through to local attempt + else: + return "Remote API failed and local server not available. An error occurred." - # Fallback to local Ollama or if force_local is True - try: - return await self._call_local_ollama_async( - model, stream, temperature, think=think - ) - except Exception as e: - traceback.print_exc() - return "Both remote API and local Ollama failed. An error occurred." + # Fallback to local server or if force_local is True + if self.local_available: + try: + return await self._call_local_ollama_async( + model, stream, temperature, think=think + ) + except Exception as e: + traceback.print_exc() + return "Both remote API and local server failed. An error occurred." + + return "Remote API failed and local server not available. An error occurred." def make_summary(self, text): - # Implement your summary logic using self.client.chat() + """Generate a summary using the OpenAI-compatible API.""" summary_message = { "role": "user", "content": f'Summarize the text below:\n"""{text}"""\nRemember to be concise and detailed. Answer in English.', @@ -700,22 +749,28 @@ class LLM: summary_message, ] try: - response = self.client.chat( + response = self.client.chat.completions.create( model=self.get_model("small"), messages=messages, - options=Options(temperature=0.01), - keep_alive=3600 * 24 * 7, + temperature=0.01, + max_tokens=self.max_length_answer ) - summary = response.message.content.strip() + summary = response.choices[0].message.content.strip() return summary - except ResponseError as e: + except Exception as e: print_red("Error generating summary:", e) return "Summary generation failed." - def read_stream(self, response): + def read_stream(self, response) -> Generator[Tuple[str, str], None, None]: """ - Read streaming response and handle thinking content with native Ollama v0.9.0+ support. - Thinking content is separate from main content and yielded as different chunk types. + Read streaming response from OpenAI API. + + Args: + response: Stream of ChatCompletionChunks from the OpenAI API + + Yields: + Tuples of (chunk_type, chunk_content) where chunk_type is either + "content", "thinking", or "thinking_complete" """ accumulated_content = "" accumulated_thinking = "" @@ -724,31 +779,123 @@ class LLM: if not chunk: continue - # Handle thinking content (native v0.9.0+ support) - thinking_content = getattr(chunk.message, "thinking", None) - if thinking_content: - accumulated_thinking += thinking_content - yield ("thinking", thinking_content) + # Check if this is a chat completion chunk with choices + if hasattr(chunk, 'choices') and len(chunk.choices) > 0: + choice = chunk.choices[0] + + # Get the delta content if available + if hasattr(choice, 'delta'): + delta = choice.delta + content = getattr(delta, 'content', None) or "" + + if content: + accumulated_content += content + yield ("content", content) + + # Check for reasoning content in delta + if hasattr(delta, 'reasoning_content') and delta.reasoning_content: + thinking = delta.reasoning_content + accumulated_thinking += thinking + yield ("thinking", thinking) + + # Check for tool calls which might contain thinking content + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + if hasattr(tool_call, 'function') and tool_call.function.name == "thinking": + # Extract arguments from the function call + thinking = getattr(tool_call.function, 'arguments', None) or "" + if thinking: + accumulated_thinking += thinking + yield ("thinking", thinking) + elif hasattr(choice, 'message'): + # Non-delta format + message = choice.message + content = getattr(message, 'content', None) or "" + if content: + accumulated_content += content + yield ("content", content) + else: + # Fallback for non-standard responses + content = getattr(chunk, 'content', None) or getattr(getattr(chunk, 'message', {}), 'content', '') + if content: + accumulated_content += content + yield ("content", content) + + # Store the complete response in message history + if accumulated_content: + self.messages.append({"role": "assistant", "content": accumulated_content}) + if not self.chat: + self.messages = [self.messages[0]] - # Handle regular content - content = chunk.message.content - if content: - # Clean up quotes that sometimes appear in streaming - if content.startswith('"') and len(accumulated_content) == 0: - content = content[1:] - if chunk.done and content.endswith('"'): - content = content[:-1] + # Yield complete thinking summary if accumulated + if accumulated_thinking: + yield ("thinking_complete", accumulated_thinking) - accumulated_content += content - yield ("content", content) + async def read_stream_async(self, response) -> AsyncGenerator[Tuple[str, str], None]: + """ + Asynchronously read streaming response from OpenAI API. + + Args: + response: Async stream of ChatCompletionChunks from the OpenAI API + + Yields: + Tuples of (chunk_type, chunk_content) where chunk_type is either + "content", "thinking", or "thinking_complete" + """ + accumulated_content = "" + accumulated_thinking = "" - if chunk.done: - break + async for chunk in response: + if not chunk: + continue - # Store the complete response in message history (without thinking content) - self.messages.append({"role": "assistant", "content": accumulated_content}) - if not self.chat: - self.messages = [self.messages[0]] + # Check if this is a chat completion chunk with choices + if hasattr(chunk, 'choices') and len(chunk.choices) > 0: + choice = chunk.choices[0] + + # Get the delta content if available + if hasattr(choice, 'delta'): + delta = choice.delta + content = getattr(delta, 'content', None) or "" + + if content: + accumulated_content += content + yield ("content", content) + + # Check for reasoning content in delta + if hasattr(delta, 'reasoning_content') and delta.reasoning_content: + thinking = delta.reasoning_content + accumulated_thinking += thinking + yield ("thinking", thinking) + + # Check for tool calls which might contain thinking content + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + if hasattr(tool_call, 'function') and tool_call.function.name == "thinking": + # Extract arguments from the function call + thinking = getattr(tool_call.function, 'arguments', None) or "" + if thinking: + accumulated_thinking += thinking + yield ("thinking", thinking) + elif hasattr(choice, 'message'): + # Non-delta format + message = choice.message + content = getattr(message, 'content', None) or "" + if content: + accumulated_content += content + yield ("content", content) + else: + # Fallback for non-standard responses + content = getattr(chunk, 'content', None) or getattr(getattr(chunk, 'message', {}), 'content', '') + if content: + accumulated_content += content + yield ("content", content) + + # Store the complete response in message history + if accumulated_content: + self.messages.append({"role": "assistant", "content": accumulated_content}) + if not self.chat: + self.messages = [self.messages[0]] # Yield complete thinking summary if accumulated if accumulated_thinking: @@ -756,82 +903,365 @@ class LLM: def prepare_images(self, images, message): """ - Prepares a list of images by converting them to base64 encoded strings and adds them to the provided message dictionary. + Prepares images for OpenAI vision models using the expected format. + Args: - images (list): A list of images, where each image can be a file path (str), a base64 encoded string (str), or bytes. - message (dict): A dictionary to which the base64 encoded images will be added under the key "images". + images (list): A list of images, where each image can be a file path (str), + a base64 encoded string (str), or bytes. + message (dict): The user message to which images will be added + Returns: - dict: The updated message dictionary with the base64 encoded images added under the key "images". - Raises: - ValueError: If an image is not a string or bytes. + dict: The updated message with content reformatted to include images """ - import base64 - - base64_images = [] - # base64 pattern: must be divisible by 4, only valid chars, and proper padding + # Start with the text content + text_content = message["content"] + + # Create a multimodal content array starting with text + content = [{"type": "text", "text": text_content}] + + # Pattern to check if a string is base64 encoded base64_pattern = re.compile(r"^[A-Za-z0-9+/]+={0,2}$") - + + # Process each image for image in images: + image_base64 = None + if isinstance(image, str): - # If it looks like base64, just pass it through + # If it looks like base64, just use it if base64_pattern.match(image) and len(image) % 4 == 0: - base64_images.append(image) + image_base64 = image else: + # Read file and convert to base64 with open(image, "rb") as image_file: - base64_images.append( - base64.b64encode(image_file.read()).decode("utf-8") - ) + image_base64 = base64.b64encode(image_file.read()).decode("utf-8") elif isinstance(image, bytes): - base64_images.append(base64.b64encode(image).decode("utf-8")) + image_base64 = base64.b64encode(image).decode("utf-8") else: print_red("Invalid image type") - - message["images"] = base64_images + continue + + # Add to content with proper OpenAI format + content.append({ + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_base64}" + } + }) + + # Replace the original content with the multi-modal content + message["content"] = content return message if __name__ == "__main__": - # Example usage of the LLM class with thinking mode - system_message = "You are an extraction assistant. You will get one page at a time from a PDF document, and your task is to extract subcontractors and close clues from the text.\nAssume the state oil company in Uganda is UNOC even if not named. Extract subcontractors (entities or individuals contracted to perform work for UNOC) and close clues.\n\nMANDATES:\n- Do NOT hallucinate. Extract only names or clear clues present in the text.\n- Include \"clues\" such as 'awarded to', 'won the tender', 'appointed', 'supplier', 'contractor', 'consultant', 'tender', 'procured by'.\n- Return EXACT supporting text snippets (<= 300 chars) that justify each extraction.\n- Provide a brief explanation where you explain why the entity is a subcontractor.\n- Returned named entities should be real entities (companies or persons) that could plausibly be subcontractors." - llm = LLM(system_message=system_message) + import asyncio + from pydantic import BaseModel + from typing import List + + # Define structured output models + class CalculationStep(BaseModel): + step_number: int + description: str + calculation: str + result: float + + class MathSolution(BaseModel): + steps: List[CalculationStep] + final_answer: float + explanation: str + + # Define a tool for calculations + @register_tool + def calculate_tool(number: int, multiply_factor: int) -> int: + '''Multiply a number by a factor + Args: + number (int): The number to be multiplied + multiply_factor (int): The factor to multiply by + Returns: + int: The result of the multiplication + ''' + return number * multiply_factor + - prompt = "Page 1 of document:\n-----START OF PAGE-----\n**GUIDELINES FOR THE 2019 REGISTRATION ON THE NATIONAL OIL AND GAS**\n**TALENT REGISTER**\n**Welcome to the 2019 National Oil and Gas Talent Register (NOGTR).**\nThe Petroleum Authority of Uganda has developed a National Oil and Gas Talent Register\nto capture all talent that can potentially work in the oil and gas sector as required by law.\nThe NOGTR is a register classified into the demand and supply side users. The demand\nside users consist of companies/government agencies which meet the eligibility criteria\nseeking to recruit human resource across projects in the oil and gas sector and\nredeployment of the same to other sectors.\nOn the other hand, the supply side users include individuals who meet the eligibility criteria\nfor the workforce demands of the oil and gas and are likely to be recruited by the demand\nside users. The NOGTR registration process shall be maintained annually for both the\ndemand and supply side users and the 2019 process will be in line with these guidelines.\nApplicants are encouraged to visit our website and read the submission guidelines carefully before\nregistering.\n**1.** **Annual Registration Calendar**\nThe Authority shall publish the list of entities and persons willing to provide and supply\nlabour force in the oil and gas sector every 31 [st] December of the applicable year. To\nachieve this process the following timelines shall apply.\n**a) Demand side use**\n1. The window for registration shall open on the 1 [st] February 2019 and remain open\nthroughout the year.\n2. The demand side users shall post all available job openings in the Oil and Gas\nSector on the window provided for this purpose by PAU in addition to other\nchannels of advertising that they may opt to use to give wider publicity as required\nby different guidelines.\n3. The demand side users shall have access to the window for purposes of identifying\nand offering employment to any person from the supply side users who meets their\njob description. A considered supply side user shall be contacted and notified of\nthe offer.\n4. Priority shall be given to potential applicants registered on the NOGTR.\n5. The demand side users are encouraged to verify documents uploaded on the\nwindow with the awarding institutions before relying upon them to award jobs.\n6. The Authority shall publish the list of the demand side users that have qualified to\nthe system every 31 [st] December of the applicable year.\n7. The list shall be updated three times; on 31 [st] March 2019, 30 [th] June 2019 and 31 [st]\nAugust 2019 in line with the National Suppliers Database.\n8. The demand side user shall remain on the NOGTR automatically when they\nreapply on the NSD unless if they no longer meet the criteria.\n-----END OF PAGE-----\nPlease extract subcontractors as per the mandates given." - response = llm.generate(query=prompt) - - print(response.message.content) - - # Basic usage - result = llm.generate( - query="I want to add 2 and 2", - ) - print("Basic result:", result.message.content) - - # Example with thinking mode (for reasoning models) - print("\n--- Thinking Mode Example ---") - thinking_result = llm.generate( - query="Solve this step by step: If I have 15 apples and give away 7, then buy 3 more, how many do I have?", - model="reasoning", - think=True, - ) - print("Answer:", thinking_result.message.content) - if hasattr(thinking_result, "thinking") and thinking_result.thinking: - print("Model's reasoning:", thinking_result.thinking) - - # Example with streaming and thinking - print("\n--- Streaming with Thinking Example ---") - for chunk_type, chunk_content in llm.generate( - query="Write a short explanation of photosynthesis", - model="reasoning", - stream=True, - think=True, - ): - if chunk_type == "thinking": - # Use print with blue color escape codes since print_blue doesn't support 'end' parameter - print( - f"\033[94m {chunk_content}\033[0m", end="" - ) # Show reasoning process in blue - elif chunk_type == "content": - print(chunk_content, end="") # Show final answer - elif chunk_type == "thinking_complete": - print_green(f"\n💭 Complete reasoning available") - print() # Final newline + + async def run_tests(): + print("🧪 Testing LLM class with vLLM model") + print("=" * 50) + + # Initialize LLM instance - use fresh instance for each test + def get_fresh_llm(): + return LLM(silent=False, chat=False) # Don't persist chat history + + # Test 1: Basic vLLM generation + print("\n1️⃣ Basic vLLM Generation Test") + print("-" * 30) + try: + llm = get_fresh_llm() + response = llm.generate( + query="Hello! Can you introduce yourself briefly?", + model='vllm' + ) + print(f"✅ Basic response: {response.content[:100]}...") + except Exception as e: + print(f"❌ Basic test failed: {e}") + + # Test 2: Tools usage + print("\n2️⃣ Tools Usage Test") + print("-" * 30) + try: + llm = get_fresh_llm() + tools = get_tools() + response = llm.generate( + query="What's the weather like in San Francisco? Also calculate 15 * 7 for me.", + model='vllm', + tools=tools + ) + print(f"✅ Tools response: {response.content[:100]}...") + + # Enhanced tool call detection + tool_calls_found = False + if hasattr(response, 'tool_calls') and response.tool_calls: + print(f"🔧 OpenAI-style tool calls detected: {len(response.tool_calls)} calls") + for i, tool_call in enumerate(response.tool_calls): + print(f" Tool {i+1}: {tool_call.function.name}") + print(f" Arguments: {tool_call.function.arguments}") + tool_calls_found = True + + # Check if response contains JSON that might be tool-like + if not tool_calls_found: + try: + import json + # Try to parse the content as JSON + content_json = json.loads(response.content) + if isinstance(content_json, dict): + print("🔧 JSON-formatted response detected (not OpenAI tool calls)") + print(f" Keys: {list(content_json.keys())}") + + # Check if it looks like a tool call + if any(key in content_json for key in ['location', 'expression', 'function', 'name']): + print(" ℹ️ This appears to be tool-like output in JSON format") + except json.JSONDecodeError: + print("ℹ️ No structured tool calls or JSON found") + + except Exception as e: + print(f"❌ Tools test failed: {e}") + + # Test 3: Thinking mode (use vllm model since reasoning model doesn't exist) + print("\n3️⃣ Thinking Mode Test (using vllm)") + print("-" * 30) + try: + llm = get_fresh_llm() + response: ChatCompletionMessage = llm.generate( + query="Solve this step by step: If I have 20 apples, eat 3, give away 5, then buy 8 more, how many do I have?", + model='vllm', # Use vllm instead of reasoning + think=True + ) + print(f"✅ Thinking response: {response.content[:100]}...") + + if hasattr(response, 'reasoning_content') and response.reasoning_content: + print(f"🧠 Thinking content: {response.reasoning_content[:100]}...") + else: + print("ℹ️ No explicit thinking content found") + + except Exception as e: + print(f"❌ Thinking test failed: {e}") + + # Test 4: Streaming (simplified test) + print("\n4️⃣ Streaming Test") + print("-" * 30) + try: + llm = get_fresh_llm() + print("Streaming response: ", end="") + + stream = llm.generate( + query="Explain photosynthesis in 2 sentences", + model='vllm', + stream=True + ) + + content_parts = [] + try: + for chunk_type, chunk_content in stream: + if chunk_type == "content": + content_parts.append(chunk_content) + print(chunk_content, end="") + elif chunk_type == "thinking": + print(f"\033[94m{chunk_content}\033[0m", end="") # Blue for thinking + + print(f"\n✅ Streaming completed - Content: {len(content_parts)} chunks") + except Exception as stream_error: + print(f"\n❌ Stream processing failed: {stream_error}") + + except Exception as e: + print(f"❌ Streaming test failed: {e}") + + # Test 5: Structured output (JSON mode) + print("\n5️⃣ Structured Output Test") + print("-" * 30) + try: + llm = get_fresh_llm() + response = llm.generate( + query="""Create a simple math problem solution in JSON format with this structure: + { + "problem": "the math problem", + "steps": ["step 1", "step 2", "step 3"], + "answer": "final answer" + } + + Problem: What is 12 * 8 + 15?""", + model='vllm', + format="json_object" # Request JSON format + ) + print(f"✅ Structured response: {response.content[:150]}...") + + # Try to parse as JSON to verify structure + try: + import json + parsed = json.loads(response.content) + print(f"🎯 Valid JSON with keys: {list(parsed.keys())}") + except json.JSONDecodeError: + print("⚠️ Response is not valid JSON") + + except Exception as e: + print(f"❌ Structured output test failed: {e}") + + # Test 6: Async generation + print("\n6️⃣ Async Generation Test") + print("-" * 30) + try: + llm = get_fresh_llm() + response = await llm.async_generate( + query="What's the capital of France? Answer briefly.", + model='vllm' + ) + print(f"✅ Async response: {response.content[:100]}...") + except Exception as e: + print(f"❌ Async test failed: {e}") + + # Test 7: Multiple tools with vllm (enhanced debugging) + print("\n7️⃣ Complex Integration Test") + print("-" * 30) + try: + llm = get_fresh_llm() + tools = get_tools() # Get all registered tools + response = llm.generate( + query="I need to multiply 12 by 11", + model='vllm', + tools=tools, + think=True + ) + print(f"✅ Complex response: {response.content[:100]}...") + + # Enhanced checking for both thinking and tool usage + has_thinking = hasattr(response, 'thinking') and response.reasoning_content + has_tool_calls = hasattr(response, 'tool_calls') and response.tool_calls + + print(f"🧠 Has thinking: {has_thinking}") + if has_thinking: + print(f" Thinking content: {response.reasoning_content[:50]}...") + + print(f"🔧 Has OpenAI tool calls: {has_tool_calls}") + if has_tool_calls: + print(f" Tool calls count: {len(response.tool_calls)}") + for i, tool_call in enumerate(response.tool_calls): + print(f" Tool {i+1}: {tool_call.function.name}") + + # Check for JSON-style tool responses + try: + import json + content_json = json.loads(response.content) + if isinstance(content_json, dict) and any(key in content_json for key in ['expression', 'calculation', 'result']): + print("🔧 JSON-style tool response detected:") + print(f" Content: {content_json}") + except json.JSONDecodeError: + pass + + except Exception as e: + print(f"❌ Complex test failed: {e}") + + # New Test 8: Tool Call Format Analysis + print("\n8️⃣ Tool Call Format Analysis") + print("-" * 30) + try: + llm = get_fresh_llm() + tools = get_tools() + + # Test with explicit tool instruction + response = llm.generate( + query="Use the calculate tool to compute 25 * 4. Make sure to call the function.", + model='vllm', + tools=tools + ) + + print(f"Response content: {response.content}") + print(f"Response type: {type(response)}") + print(f"Has tool_calls attribute: {hasattr(response, 'tool_calls')}") + + if hasattr(response, 'tool_calls') and response.tool_calls: + print(f"Tool calls count: {len(response.tool_calls)}") + print(f"Tool calls type: {type(response.tool_calls)}") + + for i, tool_call in enumerate(response.tool_calls): + print(f"Tool {i+1}:") + print(f" ID: {tool_call.id}") + print(f" Type: {tool_call.type}") + print(f" Function name: {tool_call.function.name}") + print(f" Function arguments: {tool_call.function.arguments}") + else: + print("No tool calls found") + + except Exception as e: + print(f"❌ Tool format analysis failed: {e}") + + # New Test 9: vLLM Tool Response Conversion Test + print("\n9️⃣ vLLM Tool Response Conversion Test") + print("-" * 30) + try: + llm = get_fresh_llm() + tools = get_tools() # Get all registered tools + + # Test multiple tool scenarios + test_cases = [ + "Calculate 15 * 7 using the calculate tool", + "Get weather for New York using the weather tool", + "Use both tools: calculate 20 + 5 and get weather for London" + ] + + for i, test_query in enumerate(test_cases, 1): + print(f"\n Test {i}: {test_query}") + response = llm.generate( + query=test_query, + model='vllm', + tools=tools + ) + + print(f" Response: {response.content[:60]}...") + + if hasattr(response, 'tool_calls') and response.tool_calls: + print(f" ✅ Converted to {len(response.tool_calls)} tool call(s)") + for j, tool_call in enumerate(response.tool_calls): + print(f" Tool {j+1}: {tool_call.function.name}") + else: + print(" ⚠️ No tool calls detected") + + except Exception as e: + print(f"❌ vLLM conversion test failed: {e}") + + print("\n" + "=" * 50) + print("🏁 Test suite completed!") + + # Helper function for non-async testing + def translate_to_spanish(text): + llm = LLM() + prompt = f"Translate the following text to Spanish:\n\n{text}" + response = llm.generate(query=prompt, model='vllm') + return response.content + + # Run the test suite + print("Starting comprehensive test suite...") + asyncio.run(run_tests()) + + # Quick translation test + print("\n🌍 Translation Test:") + spanish_text = translate_to_spanish("Hello, how are you today?") + print(f"Spanish translation: {spanish_text}") diff --git a/_llm/tool_registy.py b/_llm/tool_registy.py new file mode 100644 index 0000000..3eadaa9 --- /dev/null +++ b/_llm/tool_registy.py @@ -0,0 +1,214 @@ +# assume your client already has: import inspect, json +from typing import Callable, Dict, Any +import inspect, json +import re +from pydantic import BaseModel + +TOOL_REGISTRY: Dict[str, Dict[str, Any]] = {} + +def _parse_google_docstring(docstring: str) -> Dict[str, Any]: + """Parse Google-style docstring to extract description and parameter info.""" + if not docstring: + return {"description": "", "params": {}} + + # Split into lines and clean up + lines = [line.strip() for line in docstring.strip().split('\n')] + + # Find the main description (everything before Args:) + description_lines = [] + i = 0 + while i < len(lines): + if lines[i].lower().startswith('args:') or lines[i].lower().startswith('arguments:'): + break + description_lines.append(lines[i]) + i += 1 + + description = ' '.join(description_lines).strip() + + # Parse parameters section + params = {} + if i < len(lines): + i += 1 # Skip the "Args:" line + while i < len(lines): + line = lines[i] + if line.lower().startswith(('returns:', 'yields:', 'raises:', 'note:', 'example:')): + break + + # Match parameter format: param_name (type): description + match = re.match(r'^\s*(\w+)\s*(?:\(([^)]+)\))?\s*:\s*(.*)$', line) + if match: + param_name = match.group(1) + param_type = match.group(2) + param_desc = match.group(3) + + # Collect multi-line descriptions + j = i + 1 + while j < len(lines) and lines[j] and not re.match(r'^\s*\w+\s*(?:\([^)]+\))?\s*:', lines[j]): + param_desc += ' ' + lines[j].strip() + j += 1 + + params[param_name] = { + "description": param_desc.strip(), + "type": param_type.strip() if param_type else None + } + i = j - 1 + + i += 1 + + return {"description": description, "params": params} + +def _pytype_to_jsonschema(t): + # Very-small helper; extend as needed or use pydantic models for complex types + mapping = {str: {"type": "string"}, int: {"type": "integer"}, + float: {"type": "number"}, bool: {"type": "boolean"}, + dict: {"type": "object"}, list: {"type": "array"}} + return mapping.get(t, {"type": "string"}) # fallback to string + +def register_tool(func: Callable = None, *, name: str = None, description: str = None, schema: dict = None): + """ + Use as decorator or call directly: + @register_tool + def foo(x: int): ... + or + register_tool(func=myfunc, name="myfunc", schema=...) + """ + def _register(f): + fname = name or f.__name__ + + # Parse docstring for description and parameter info + docstring_info = _parse_google_docstring(f.__doc__) + func_description = description or docstring_info["description"] or "" + + # If explicit schema provided, use it + if schema is not None: + func_schema = schema + else: + sig = inspect.signature(f) + props = {} + required = [] + for param_name, param in sig.parameters.items(): + ann = param.annotation + # If user used a Pydantic BaseModel as a single arg, use its schema + if inspect.isclass(ann) and issubclass(ann, BaseModel): + func_schema = ann.schema() + # wrap into a single-arg object if necessary + props = func_schema.get("properties", {}) + required = func_schema.get("required", []) + # done early - for single-model param + break + + # Create property schema from type annotation + prop_schema = _pytype_to_jsonschema(ann) + + # Add description from docstring if available + if param_name in docstring_info["params"]: + prop_schema["description"] = docstring_info["params"][param_name]["description"] + + props[param_name] = prop_schema + if param.default is inspect._empty: + required.append(param_name) + + if 'func_schema' not in locals(): + func_schema = { + "type": "object", + "properties": props, + "required": required + } + + TOOL_REGISTRY[fname] = { + "callable": f, + "schema": { + "type": "function", + "function": { + "name": fname, + "description": func_description, + "parameters": func_schema + } + } + } + return f + + if func is None: + return _register + else: + return _register(func) + +def get_tools() -> list: + """Return list of function schemas (JSON) to send to the model""" + return [v["schema"] for v in TOOL_REGISTRY.values()] + +def handle_function_call_and_inject_result(response_choice, messages): + """ + Given the model choice (response.choices[0]) and your messages list: + - extracts function/tool call + - executes the registered python callable + - appends the tool result as a tool message and returns it + """ + # Support different shapes: some SDKs use .message.tool_calls, others .message.function_call + msg = getattr(response_choice, "message", None) or (response_choice.get("message") if isinstance(response_choice, dict) else None) + func_name = None + func_args = None + # try tool_calls style + if msg: + tool_calls = getattr(msg, "tool_calls", None) or (msg.get("tool_calls") if isinstance(msg, dict) else None) + if tool_calls: + tc = tool_calls[0] + fn = getattr(tc, "function", None) or (tc.get("function") if isinstance(tc, dict) else None) + func_name = getattr(fn, "name", None) or (fn.get("name") if isinstance(fn, dict) else None) + func_args = getattr(fn, "arguments", None) or (fn.get("arguments") if isinstance(fn, dict) else None) + # fallback to function_call + if func_name is None: + fc = getattr(msg, "function_call", None) or (msg.get("function_call") if isinstance(msg, dict) else None) + if fc: + func_name = getattr(fc, "name", None) or fc.get("name") + args_raw = getattr(fc, "arguments", None) or fc.get("arguments") + # arguments are often a JSON string depending on SDK shape + if isinstance(args_raw, str): + try: + func_args = json.loads(args_raw) + except Exception: + func_args = None + else: + func_args = args_raw + + if not func_name: + return None # no function call found + + entry = TOOL_REGISTRY.get(func_name) + if not entry: + raise RuntimeError(f"Function {func_name} not registered") + + result = entry["callable"](**(func_args or {})) + # convert result to string/JSON for tool message + tool_content = result if isinstance(result, str) else json.dumps(result) + # append tool message so model can see the result + messages.append({"role": "tool", "name": func_name, "content": tool_content}) + return tool_content + +if __name__ == "__main__": + # Example usage and test + @register_tool + def add(x: int, y: int) -> int: + """Add two integers + Args: + x (int): First integer + y (int): Second integer + Returns: + int: Sum of x and y + """ + return x + y + + @register_tool(name="echo", description="Echoes the input string") + def echo_message(message: str) -> str: + """Echo the input message + Args: + message (str): The message to echo + Returns: + str: The echoed message + """ + return message + + print("Registered tools:") + import pprint + for info in get_tools(): + pprint.pprint(info) \ No newline at end of file