Refactor public API to include register_tool and get_tools; enhance LLM class with response normalization

main
lasseedfast 3 months ago
parent b72df20b03
commit d2319209e1
  1. 3
      __init__.py
  2. 7
      _llm/__init__.py
  3. 404
      _llm/llm.py
  4. 4
      llm_client.py

@ -3,5 +3,6 @@ llm_client: A Python package for interacting with LLM models through Ollama.
"""
from _llm._llm.llm import LLM
from _llm._llm.tool_registy import register_tool, get_tools
__all__ = ["LLM"]
__all__ = ["LLM", "register_tool", "get_tools"]

@ -1,7 +1,6 @@
# ...existing code...
# Export the LLM class so "from _llm import LLM" works.
from .llm import LLM # re-export the class from the module
from .tool_registy import register_tool, get_tools
# Define public API
__all__ = ["LLM"]
# ...existing code...
__all__ = ["LLM", "register_tool", "get_tools"]

@ -8,10 +8,15 @@ from openai import OpenAI, AsyncOpenAI
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai.types.chat.chat_completion import Choice
from openai.types.chat.chat_completion_message import ChatCompletionMessage
from openai.types.responses import ParsedResponse
import backoff
import env_manager
from tool_registy import get_tools, register_tool
import json
try:
from .tool_registy import get_tools, register_tool
except ImportError:
from tool_registy import get_tools, register_tool
try:
from colorprinter.print_color import *
@ -232,17 +237,203 @@ class LLM:
"""Build model options, setting temperature and other parameters."""
temp = temperature if temperature is not None else self.options["temperature"]
return {"temperature": temp}
def _normalize_parsed_response(self, resp: Any) -> Any:
"""
Normalize a ParsedResponse into a ChatCompletion-like object while:
- Promoting the original parsed object (Pydantic instance or dict) to message.content when present.
- Providing message.content_text (string/JSON) for safe history/logging.
- Preserving reasoning_text into message.reasoning_content.
- Falling back to plain string content when no parsed object exists.
"""
import json
from typing import Any, List, Optional
# Optional: detect Pydantic BaseModel if available to use JSON helpers
try:
from pydantic import BaseModel as _PydanticBaseModel
except Exception:
_PydanticBaseModel = None
# lightweight shims to mimic ChatCompletion shape used downstream
class _Msg:
def __init__(self, content: Any = "", role: str = "assistant", reasoning: Optional[str] = None):
# content will be either:
# - the original parsed object (Pydantic instance or dict) if present, OR
# - a plain string when no parsed object
self.content = content
self.role = role
self.reasoning_content = reasoning
# produce a safe string representation for history/logging:
try:
if _PydanticBaseModel is not None and isinstance(content, _PydanticBaseModel):
# pydantic v2: model_dump_json, v1: json()
if hasattr(content, "model_dump_json"):
self.content_text = content.model_dump_json()
elif hasattr(content, "json"):
self.content_text = content.json()
else:
# fallback: try to dump to dict then JSON
try:
dumped = content.model_dump() if hasattr(content, "model_dump") else content.dict()
self.content_text = json.dumps(dumped)
except Exception:
self.content_text = str(content)
else:
if isinstance(content, (dict, list)):
self.content_text = json.dumps(content, default=str)
elif isinstance(content, str):
self.content_text = content
else:
# unknown object: try to JSON roundtrip, else str()
try:
self.content_text = json.dumps(content, default=str)
except Exception:
self.content_text = str(content)
except Exception:
self.content_text = str(content)
# compatibility alias
self.content_str = self.content_text
class _Choice:
def __init__(self, message: _Msg, finish_reason: Optional[str] = None, index: int = 0):
self.message = message
self.finish_reason = finish_reason
self.index = index
class _ChatCompletion:
def __init__(self, id: str, choices: List[_Choice], created: Optional[float] = None, model: Optional[str] = None, usage: Any = None):
self.id = id
self.choices = choices
self.created = created
self.model = model
self.usage = usage
self.object = "chat.completion"
# --- metadata ---
resp_id = getattr(resp, "id", "") or getattr(resp, "response_id", "") or getattr(resp, "responseId", "")
created = getattr(resp, "created_at", None) or getattr(resp, "created", None)
model = getattr(resp, "model", None)
usage = getattr(resp, "usage", None)
# --- gather outputs and attempt to find parsed object ---
outputs = getattr(resp, "output", None) or getattr(resp, "outputs", None)
reasoning_texts: List[str] = []
role = "assistant"
finish_reason = None
# Helper: return original parsed object as-is
def _keep_original(parsed_obj: Any) -> Any:
return parsed_obj
# Search priority:
# 1) top-level resp.output_parsed / resp.parsed
top_parsed = getattr(resp, "output_parsed", None) or getattr(resp, "parsed", None) or getattr(resp, "outputParsed", None)
if top_parsed is not None:
# promote the original object
message_content = _keep_original(top_parsed)
# No need to inspect outputs for nested parsed in this case, but still collect reasoning if present in outputs
if outputs and isinstance(outputs, (list, tuple)):
for out in outputs:
contents = getattr(out, "content", None) or getattr(out, "contents", None)
if not contents:
continue
for c in contents:
c_type = c.get("type") if isinstance(c, dict) else getattr(c, "type", None)
c_text = (c.get("text") if isinstance(c, dict) else getattr(c, "text", None)) or (c.get("content") if isinstance(c, dict) else getattr(c, "content", None))
if isinstance(c_type, str) and "reasoning" in c_type and c_text:
reasoning_texts.append(c_text)
# Build message now
reasoning_combined = "\n".join(reasoning_texts).strip() if reasoning_texts else None
msg = _Msg(content=message_content, role=role, reasoning=reasoning_combined)
choice = _Choice(message=msg, finish_reason=finish_reason)
normalized = _ChatCompletion(id=resp_id or "", choices=[choice], created=created, model=model, usage=usage)
return normalized
# 2) Inspect outputs list for content items that contain a parsed object
found_parsed = None
main_text_acc: List[str] = []
if outputs and isinstance(outputs, (list, tuple)):
chosen_contents = None
for out in outputs:
contents = getattr(out, "content", None) or getattr(out, "contents", None)
if contents:
chosen_contents = contents
# capture role/status if present
role = getattr(out, "role", role)
finish_reason = getattr(out, "status", finish_reason)
break
if chosen_contents is not None:
# chosen_contents is typically a list; scan items for 'parsed'
for c in chosen_contents:
if isinstance(c, dict):
# dict-like content item
c_parsed = c.get("parsed") or c.get("parsed_output")
c_type = c.get("type")
c_text = c.get("text") or c.get("content")
else:
# object-like content item
c_parsed = getattr(c, "parsed", None) or getattr(c, "parsed_output", None)
c_type = getattr(c, "type", None)
c_text = getattr(c, "text", None) or getattr(c, "content", None)
# collect reasoning text
if isinstance(c_type, str) and "reasoning" in c_type and c_text:
reasoning_texts.append(c_text)
if c_parsed is not None and found_parsed is None:
# promote the original parsed object (do not convert)
found_parsed = _keep_original(c_parsed)
# If no parsed, but text exists, keep for potential fallback
if c_parsed is None and c_text:
main_text_acc.append(c_text)
# If we found a parsed nested in content, promote it
if found_parsed is not None:
message_content = found_parsed
reasoning_combined = "\n".join(reasoning_texts).strip() if reasoning_texts else None
msg = _Msg(content=message_content, role=role or "assistant", reasoning=reasoning_combined)
choice = _Choice(message=msg, finish_reason=finish_reason)
normalized = _ChatCompletion(id=resp_id or "", choices=[choice], created=created, model=model, usage=usage)
return normalized
# 3) No parsed object found — use textual output if present (mimic chat.create)
# Prefer joined main_text_acc from outputs, else resp.output_text / resp.text, else empty string
if main_text_acc:
message_text = "".join(main_text_acc).strip()
else:
message_text = getattr(resp, "output_text", None) or getattr(resp, "text", None) or ""
# If still empty and resp.choices exist, try to get the chat-like message text
if not message_text:
try:
choices = getattr(resp, "choices", None)
if choices and len(choices) > 0:
c0 = choices[0]
m = getattr(c0, "message", None) or getattr(c0, "text", None)
if m is not None:
message_text = getattr(m, "content", None) or getattr(m, "text", None) or message_text
# pull reasoning_content if present
reasoning = getattr(m, "reasoning_content", None)
if reasoning:
reasoning_texts.append(reasoning)
role = getattr(m, "role", role)
finish_reason = getattr(c0, "finish_reason", finish_reason)
except Exception:
pass
reasoning_combined = "\n".join(reasoning_texts).strip() if reasoning_texts else None
msg = _Msg(content=message_text, role=role or "assistant", reasoning=reasoning_combined)
choice = _Choice(message=msg, finish_reason=finish_reason)
normalized = _ChatCompletion(id=resp_id or "", choices=[choice], created=created, model=model, usage=usage)
return normalized
# @backoff.on_exception(
# backoff.expo,
# (Exception, TimeoutError),
# max_tries=3,
# factor=2,
# base=10,
# on_backoff=lambda details: print_yellow(
# f"Retrying due to error: {details['exception']}"
# ),
# )
def _call_remote_api(
self, model, tools, stream, options, format, headers, think=False
) -> ChatCompletion:
@ -277,12 +468,25 @@ class LLM:
# Add response format if provided
if format:
kwargs["response_format"] = {"type": format}
kwargs['input'] = kwargs['messages']
del kwargs['messages']
del kwargs['max_tokens']
kwargs["text_format"] = format
response: ParsedResponse = self.client.responses.parse(**kwargs)
# NORMALIZE the ParsedResponse into a ChatCompletion-like object
try:
normalized: ChatCompletion = self._normalize_parsed_response(response) #This is not really a ChatCompletion, but close enough
# Optionally print normalized for debugging
# print("Normalized:", normalized.choices[0].message.content)
return normalized
except Exception:
# If normalization fails, fallback to returning raw response object
traceback.print_exc()
return response
# Call the OpenAI API
print_rainbow(kwargs)
response: ChatCompletion = self.client.chat.completions.create(**kwargs)
else:
response: ChatCompletion = self.client.chat.completions.create(**kwargs)
# Try to extract backend information if available
try:
response_headers = getattr(response, "_headers", {})
@ -294,16 +498,6 @@ class LLM:
return response
# @backoff.on_exception(
# backoff.expo,
# (Exception, TimeoutError),
# max_tries=3,
# factor=2,
# base=10,
# on_backoff=lambda details: print_yellow(
# f"Retrying due to error: {details['exception']}"
# ),
# )
async def _call_remote_api_async(
self, model, tools, stream, options, format, headers, think=False
):
@ -350,36 +544,22 @@ class LLM:
# Add response format if provided
if format:
kwargs["response_format"] = {"type": format}
# Handle thinking mode through system messages or tool calls
if think and model == self.get_model("reasoning"):
# Implement thinking through tool calls if supported by the server
thinking_tool = {
"type": "function",
"function": {
"name": "thinking",
"description": "Share your step-by-step reasoning process",
"parameters": {
"type": "object",
"properties": {
"thinking": {
"type": "string",
"description": "Your step-by-step reasoning"
}
},
"required": ["thinking"]
}
}
}
if "tools" not in kwargs:
kwargs["tools"] = [thinking_tool]
else:
kwargs["tools"].append(thinking_tool)
kwargs['input'] = kwargs['messages']
del kwargs['messages']
del kwargs['max_tokens']
response = await self.async_client.responses.parse(**kwargs)
# Normalize the response the same way as the sync path
try:
normalized = self._normalize_parsed_response(response)
return normalized
except Exception:
traceback.print_exc()
return response
else:
# Call the OpenAI API
response = await self.async_client.chat.completions.create(**kwargs)
response = await self.async_client.chat.completions.create(**kwargs)
return response
def _call_local_ollama(self, model, stream, temperature, think=False):
@ -402,28 +582,6 @@ class LLM:
"max_tokens": self.max_length_answer
}
# Handle thinking mode through system messages or tool calls
if think and model == self.get_model("reasoning"):
# Implement thinking through tool calls if supported
thinking_tool = {
"type": "function",
"function": {
"name": "thinking",
"description": "Share your step-by-step reasoning process",
"parameters": {
"type": "object",
"properties": {
"thinking": {
"type": "string",
"description": "Your step-by-step reasoning"
}
},
"required": ["thinking"]
}
}
}
kwargs["tools"] = [thinking_tool]
if stream:
response_stream = local_client.chat.completions.create(**kwargs)
@ -462,29 +620,6 @@ class LLM:
"max_tokens": self.max_length_answer
}
# Handle thinking mode through system messages or tool calls
if think and model == self.get_model("reasoning"):
# Implement thinking through tool calls if supported
thinking_tool = {
"type": "function",
"function": {
"name": "thinking",
"description": "Share your step-by-step reasoning process",
"parameters": {
"type": "object",
"properties": {
"thinking": {
"type": "string",
"description": "Your step-by-step reasoning"
}
},
"required": ["thinking"]
}
}
}
kwargs["tools"] = [thinking_tool]
if stream:
response_stream = await local_client.chat.completions.create(**kwargs)
@ -525,33 +660,37 @@ class LLM:
force_local: bool = False,
) -> ChatCompletionMessage:
"""
Attempts to generate a response using a remote API first, then falls back to
local Ollama if the remote call fails or if force_local is True.
Generate a response using either a remote API or local Ollama server.
This method handles text generation with support for various models, streaming,
tool usage, and image inputs. It first attempts to use a remote OpenAI-compatible
API and falls back to a local Ollama server if the remote call fails or if
force_local is True.
Args:
query (str, optional): The main query or prompt for generation.
user_input (str, optional): Alternative user input if query is not provided.
context (str, optional): Additional context to include in the generation.
query (str, optional): The main query or prompt text.
user_input (str, optional): Additional user input to include.
context (str, optional): Context information to prepend to the query.
stream (bool, optional): Whether to stream the response. Defaults to False.
tools (list, optional): List of tools to make available for the model.
tools (list, optional): List of tools/functions available to the model.
images (list, optional): List of images to include in the request.
model (Literal["small", "standard", "vision", "reasoning", "tools", "embeddings"], optional):
The model type to use. Defaults to "standard".
temperature (float, optional): Temperature parameter for generation randomness.
Uses instance default if not provided.
messages (list[dict], optional): Pre-formatted message history.
The model type to use. If None, uses instance model or "standard".
temperature (float, optional): Sampling temperature. Uses instance default if None.
messages (list[dict], optional): Pre-formatted message history to use.
format (optional): Response format specification.
think (bool, optional): Whether to enable thinking mode. Defaults to None.
force_local (bool, optional): Force use of local Ollama instead of remote API.
Defaults to False.
think (optional): Whether to enable reasoning/thinking mode. Uses instance default if None.
force_local (bool, optional): Force use of local Ollama server. Defaults to False.
Returns:
The generated response. Type varies based on stream parameter and success:
- For streaming: Returns stream reader object
- For non-streaming remote success: Returns response message object
- For local fallback: Returns local response
- For complete failure: Returns error message string
ChatCompletionMessage: The generated response message containing the model's output.
str: Error message if both remote and local generation fail.
Raises:
Exception: Catches and prints exceptions from both remote API and local server calls.
Note:
- Automatically appends assistant responses to message history
- Resets message history to system message only if not in chat mode
- Prioritizes remote API unless force_local is True
- Falls back gracefully when services are unavailable
"""
if model is None and self.model:
model = self.model
elif model is None:
@ -580,18 +719,14 @@ class LLM:
return self.read_stream(response)
else:
choice = response.choices[0]
print('---')
print_rainbow(choice.__dict__, single_line=True)
print('---')
message: ChatCompletionMessage = choice.message
result: str = message.content
if hasattr(message, 'content_text'):
result: str = message.content_text
# Store in message history (without tool calls for clean history)
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
return message
return message
@ -953,6 +1088,9 @@ class LLM:
return message
# ------------------- TESTS ---------------------------------------------------------
if __name__ == "__main__":
import asyncio
from pydantic import BaseModel
@ -970,6 +1108,20 @@ if __name__ == "__main__":
final_answer: float
explanation: str
class NameResponse(BaseModel):
name: str
age: int
occupation: str
hobbies: List[str]
llm = LLM(silent=False, chat=False) # Don't persist chat history
response = llm.generate("Hello! Can you introduce yourself briefly?", model='vllm', format=NameResponse)
print(response.__dict__)
response = llm.generate("What's the weather like in San Francisco? Also calculate 15 * 7 for me.", model='vllm')
print(response.__dict__)
exit()
# Define a tool for calculations
@register_tool
def calculate_tool(number: int, multiply_factor: int) -> int:

@ -2,6 +2,6 @@
llm_client package entry point to simplify imports
"""
from _llm import LLM
from _llm import LLM, register_tool, get_tools
__all__ = ["LLM"]
__all__ = ["LLM", "register_tool", "get_tools"]
Loading…
Cancel
Save