Enhance LLM class: add 'think' parameter for reasoning models and improve message handling

legacy
lasseedfast 9 months ago
parent c275704014
commit 17d3335ff6
  1. 2
      __init__.py
  2. 10
      _llm/__init__.py
  3. 388
      _llm/llm.py

@ -2,6 +2,6 @@
llm_client: A Python package for interacting with LLM models through Ollama.
"""
from _llm.llm import LLM
from _llm._llm.llm import LLM
__all__ = ["LLM"]

@ -1,7 +1,7 @@
"""
llm_client: A Python package for interacting with LLM models through Ollama.
"""
# """
# llm_client: A Python package for interacting with LLM models through Ollama.
# """
from _llm.llm import LLM
# from ._llm.llm import LLM # Use relative import with dot prefix
__all__ = ["LLM"]
# __all__ = ["LLM"]

@ -60,6 +60,7 @@ class LLM:
chat: bool = True,
chosen_backend: str = None,
tools: list = None,
think: bool = False,
) -> None:
"""
Initialize the assistant with the given parameters.
@ -72,6 +73,7 @@ class LLM:
messages (list[dict], optional): A list of initial messages. Defaults to None.
chat (bool): Whether the assistant is in chat mode. Defaults to True.
chosen_backend (str, optional): The backend server to use. If not provided, the least connected server is chosen.
think (bool): Whether to use thinking mode for reasoning models. Defaults to False.
Returns:
None
@ -89,23 +91,22 @@ class LLM:
self.chosen_backend = chosen_backend
headers = {
"Authorization": f"Basic {self.get_credentials()}",
}
if self.chosen_backend:
headers["X-Chosen-Backend"] = self.chosen_backend
self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/")
self.client: Client = Client(host=self.host_url, headers=headers, timeout=120)
self.client: Client = Client(host=self.host_url, headers=headers, timeout=120)
self.async_client: AsyncClient = AsyncClient()
def get_credentials(self):
# Initialize the client with the host and default headers
credentials = f"{os.getenv('LLM_API_USER')}:{os.getenv('LLM_API_PWD_LASSE')}"
return base64.b64encode(credentials.encode()).decode()
def get_model(self, model_alias):
models = {
"standard": "LLM_MODEL",
@ -130,7 +131,9 @@ class LLM:
num_tokens += len(tokens)
return int(num_tokens)
def _prepare_messages_and_model(self, query, user_input, context, messages, images, model):
def _prepare_messages_and_model(
self, query, user_input, context, messages, images, model
):
"""Prepare messages and select the appropriate model, handling images if present."""
if messages:
messages = [
@ -144,38 +147,43 @@ class LLM:
else:
query = re.sub(r"\s*\n\s*", "\n", query)
message = {"role": "user", "content": query}
if images:
message = self.prepare_images(images, message)
model = self.get_model("vision")
else:
model = self.get_model(model)
self.messages.append(message)
return model
def _build_headers(self, model, tools, think):
"""Build HTTP headers for API requests, including auth and backend/model info."""
headers = {"Authorization": f"Basic {self.get_credentials()}"}
if self.chosen_backend and model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]:
if self.chosen_backend and model not in [
self.get_model("vision"),
self.get_model("tools"),
self.get_model("reasoning"),
]:
headers["X-Chosen-Backend"] = self.chosen_backend
if model == self.get_model("small"):
headers["X-Model-Type"] = "small"
if model == self.get_model("tools"):
headers["X-Model-Type"] = "tools"
if think and model and any([m in model for m in ['qwen3', 'deepseek']]):
self.messages[-1]['content'] = f"/think\n{self.messages[-1]['content']}"
elif model and any([m in model for m in ['qwen3', 'deepseek']]):
self.messages[-1]['content'] = f"/no_think\n{self.messages[-1]['content']}"
# No longer need to modify message content for thinking - handled by native API
return headers
def _get_options(self, temperature):
"""Build model options, setting temperature and other parameters."""
options = Options(**self.options)
options.temperature = temperature if temperature is not None else self.options["temperature"]
options.temperature = (
temperature if temperature is not None else self.options["temperature"]
)
return options
def _call_remote_api(self, model, tools, stream, options, format, headers):
def _call_remote_api(
self, model, tools, stream, options, format, headers, think=False
):
"""Call the remote Ollama API synchronously."""
self.call_model = model
self.client: Client = Client(host=self.host_url, headers=headers, timeout=300)
@ -187,11 +195,14 @@ class LLM:
stream=stream,
options=options,
keep_alive=3600 * 24 * 7,
format=format
format=format,
think=think,
)
return response
async def _call_remote_api_async(self, model, tools, stream, options, format, headers):
async def _call_remote_api_async(
self, model, tools, stream, options, format, headers, think=False
):
"""Call the remote Ollama API asynchronously."""
print_yellow(f"🤖 Generating using {model} (remote, async)...")
response = await self.async_client.chat(
@ -202,12 +213,14 @@ class LLM:
stream=stream,
options=options,
keep_alive=3600 * 24 * 7,
think=think, # Use native Ollama thinking support
)
return response
def _call_local_ollama(self, model, stream, temperature):
def _call_local_ollama(self, model, stream, temperature, think=False):
"""Call the local Ollama instance synchronously."""
import ollama
print_yellow(f"🤖 Generating using {model} (local)...")
options = {"temperature": temperature}
if stream:
@ -215,72 +228,130 @@ class LLM:
model=model,
messages=self.messages,
options=options,
stream=True
stream=True,
think=think, # Pass thinking parameter to local ollama
)
def local_stream_adapter():
for chunk in response_stream:
yield type('OllamaResponse', (), {
'message': type('Message', (), {'content': chunk['message']['content']}),
'done': chunk.get('done', False)
})
yield type(
"OllamaResponse",
(),
{
"message": type(
"Message", (), {"content": chunk["message"]["content"]}
),
"done": chunk.get("done", False),
},
)
return self.read_stream(local_stream_adapter())
else:
response = ollama.chat(
model=model,
messages=self.messages,
options=options
options=options,
think=think, # Pass thinking parameter to local ollama
)
result = response["message"]["content"]
# Handle thinking content if present (for backward compatibility)
thinking_content = response["message"].get("thinking", None)
response_obj = type(
"LocalChatResponse",
(),
{
"message": type(
"Message",
(),
{
"content": result,
"thinking": thinking_content,
"get": lambda x: None,
},
)
},
)
result = response['message']['content']
response_obj = type('LocalChatResponse', (), {
'message': type('Message', (), {
'content': result,
'get': lambda x: None
})
})
if '</think>' in result:
result = result.split('</think>')[-1].strip()
response_obj.message.content = result
# No longer need to manually parse </think> tags with native support
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
return response_obj.message
async def _call_local_ollama_async(self, model, stream, temperature):
async def _call_local_ollama_async(self, model, stream, temperature, think=False):
"""Call the local Ollama instance asynchronously (using a thread pool)."""
import ollama
import asyncio
print_yellow(f"🤖 Generating using {model} (local, async)...")
options = {"temperature": temperature}
loop = asyncio.get_event_loop()
if stream:
def run_stream():
return ollama.chat(
model=model,
messages=self.messages,
options=options,
stream=True
stream=True,
think=think, # Pass thinking parameter to local ollama
)
response_stream = await loop.run_in_executor(None, run_stream)
async def local_stream_adapter():
for chunk in response_stream:
yield type('OllamaResponse', (), {
'message': type('Message', (), {'content': chunk['message']['content']}),
'done': chunk.get('done', False)
})
yield type(
"OllamaResponse",
(),
{
"message": type(
"Message", (), {"content": chunk["message"]["content"]}
),
"done": chunk.get("done", False),
},
)
return local_stream_adapter()
else:
def run_chat():
return ollama.chat(
model=model,
messages=self.messages,
options=options
options=options,
think=think, # Pass thinking parameter to local ollama
)
response_dict = await loop.run_in_executor(None, run_chat)
result = response_dict['message']['content']
result = response_dict["message"]["content"]
# Handle thinking content if present (for backward compatibility)
thinking_content = response_dict["message"].get("thinking", None)
# Create response object with thinking support
response_obj = type(
"LocalChatResponse",
(),
{
"message": type(
"Message",
(),
{
"content": result,
"thinking": thinking_content,
"get": lambda x: None,
},
)
},
)
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
return result
return response_obj.message
def generate(
self,
@ -292,44 +363,49 @@ class LLM:
images: list = None,
model: Optional[
Literal["small", "standard", "vision", "reasoning", "tools"]
] = 'standard',
] = "standard",
temperature: float = None,
messages: list[dict] = None,
format = None,
think = False,
force_local: bool = False
format=None,
think=False,
force_local: bool = False,
):
"""
Generate a response based on the provided query and context.
"""
model = self._prepare_messages_and_model(query, user_input, context, messages, images, model)
model = self._prepare_messages_and_model(
query, user_input, context, messages, images, model
)
temperature = temperature if temperature else self.options["temperature"]
if not force_local:
try:
headers = self._build_headers(model, tools, think)
options = self._get_options(temperature)
response = self._call_remote_api(model, tools, stream, options, format, headers)
response = self._call_remote_api(
model, tools, stream, options, format, headers, think=think
)
print_rainbow(response)
if stream:
return self.read_stream(response)
else:
if isinstance(response, ChatResponse):
result = response.message.content.strip('"')
if '</think>' in result:
result = result.split('</think>')[-1]
self.messages.append({"role": "assistant", "content": result.strip('"')})
if tools and not response.message.get("tool_calls"):
pass
message_content = result.strip('"')
self.messages.append(
{"role": "assistant", "content": message_content}
)
if not self.chat:
self.messages = [self.messages[0]]
if not think:
response.message.content = remove_thinking(response.message.content)
return response.message
else:
return "An error occurred."
except Exception as e:
traceback.print_exc()
try:
return self._call_local_ollama(model, stream, temperature)
return self._call_local_ollama(model, stream, temperature, think=think)
except Exception as e:
traceback.print_exc()
return "Both remote API and local Ollama failed. An error occurred."
@ -344,29 +420,84 @@ class LLM:
images: list = None,
model: Optional[
Literal["small", "standard", "vision", "reasoning", "tools"]
] = 'standard',
] = "standard",
temperature: float = None,
messages: list[dict] = None,
format=None,
think=False,
force_local: bool = False,
):
"""
Asynchronously generates a response based on the provided query and other parameters.
Args:
query (str, optional): The query string to generate a response for.
user_input (str, optional): Additional user input to be included in the response.
context (str, optional): Context information to be used in generating the response.
stream (bool, optional): Whether to stream the response. Defaults to False.
tools (list, optional): List of tools to be used in generating the response.
images (list, optional): List of images to be included in the response.
model (Optional[Literal["small", "standard", "vision", "reasoning", "tools"]], optional): The model to be used for generating the response.
temperature (float, optional): The temperature setting for the model.
messages (list[dict], optional): List of messages to use instead of building from query.
format: Format specification for the response.
think (bool, optional): Whether to use thinking mode for reasoning models.
force_local (bool, optional): Force using local Ollama instead of remote API.
Returns:
The generated response message or an error message if an exception occurs.
"""
model = self._prepare_messages_and_model(query, user_input, context, None, images, model)
model = self._prepare_messages_and_model(
query, user_input, context, messages, images, model
)
temperature = temperature if temperature else self.options["temperature"]
# First try with remote API
if not force_local:
try:
headers = self._build_headers(model, tools, False)
headers = self._build_headers(model, tools, think)
options = self._get_options(temperature)
response = await self._call_remote_api_async(model, tools, stream, options, None, headers)
# You can add async-specific response handling here if needed
except Exception as e:
traceback.print_exc()
if force_local or 'response' not in locals():
try:
return await self._call_local_ollama_async(model, stream, temperature)
response = await self._call_remote_api_async(
model, tools, stream, options, format, headers, think=think
)
if stream:
return self.read_stream(response)
else:
if isinstance(response, ChatResponse):
# Handle native thinking mode with separate thinking field
result = response.message.content.strip('"')
thinking_content = getattr(response.message, "thinking", None)
# Store both content and thinking in message history
message_content = result.strip('"')
self.messages.append(
{"role": "assistant", "content": message_content}
)
if not self.chat:
self.messages = [self.messages[0]]
# Return response with both content and thinking accessible
if thinking_content and think:
# Add thinking as an attribute for access if needed
response.message.thinking = thinking_content
return response.message
else:
return "An error occurred."
except Exception as e:
traceback.print_exc()
return "Both remote API and local Ollama failed. An error occurred."
# Fallback to local Ollama or if force_local is True
try:
return await self._call_local_ollama_async(
model, stream, temperature, think=think
)
except Exception as e:
traceback.print_exc()
return "Both remote API and local Ollama failed. An error occurred."
def make_summary(self, text):
# Implement your summary logic using self.client.chat()
@ -396,108 +527,41 @@ class LLM:
def read_stream(self, response):
"""
Yields tuples of (chunk_type, text). The first tuple is ('thinking', ...)
if in_thinking is True and stops at </think>. After that, yields ('normal', ...)
for the rest of the text.
Read streaming response and handle thinking content appropriately.
With native thinking mode, the thinking content is separate from the main content.
"""
thinking_buffer = ""
in_thinking = self.call_model == self.get_model("reasoning")
first_chunk = True
prev_content = None
accumulated_content = ""
accumulated_thinking = ""
for chunk in response:
if not chunk:
continue
# Handle thinking content (if present in streaming)
thinking_content = getattr(chunk.message, "thinking", None)
if thinking_content:
accumulated_thinking += thinking_content
yield ("thinking", thinking_content)
# Handle regular content
content = chunk.message.content
if content:
# Remove leading/trailing quotes that sometimes appear
if content.startswith('"') and len(accumulated_content) == 0:
content = content[1:]
if chunk.done and content.endswith('"'):
content = content[:-1]
# Remove leading quote if it's the first chunk
if first_chunk and content.startswith('"'):
content = content[1:]
first_chunk = False
if in_thinking:
thinking_buffer += content
if "</think>" in thinking_buffer:
end_idx = thinking_buffer.index("</think>") + len("</think>")
yield ("thinking", thinking_buffer[:end_idx])
remaining = thinking_buffer[end_idx:].strip('"')
if chunk.done and remaining:
yield ("normal", remaining)
break
else:
prev_content = remaining
in_thinking = False
else:
if prev_content:
yield ("normal", prev_content)
prev_content = content
accumulated_content += content
yield ("normal", content)
if chunk.done:
if prev_content and prev_content.endswith('"'):
prev_content = prev_content[:-1]
if prev_content:
yield ("normal", prev_content)
break
self.messages.append({"role": "assistant", "content": ""})
async def async_generate(
self,
query: str = None,
user_input: str = None,
context: str = None,
stream: bool = False,
tools: list = None,
images: list = None,
model: Optional[Literal["small", "standard", "vision"]] = None,
temperature: float = None,
force_local: bool = False, # New parameter to force local Ollama
):
"""
Asynchronously generates a response based on the provided query and other parameters.
Args:
query (str, optional): The query string to generate a response for.
user_input (str, optional): Additional user input to be included in the response.
context (str, optional): Context information to be used in generating the response.
stream (bool, optional): Whether to stream the response. Defaults to False.
tools (list, optional): List of tools to be used in generating the response. Will set the model to 'tools'.
images (list, optional): List of images to be included in the response.
model (Optional[Literal["small", "standard", "vision", "tools"]], optional): The model to be used for generating the response.
temperature (float, optional): The temperature setting for the model.
force_local (bool, optional): Force using local Ollama instead of remote API.
Returns:
str: The generated response or an error message if an exception occurs.
"""
print_yellow("ASYNC GENERATE")
# Prepare the model and temperature
model = self._prepare_messages_and_model(query, user_input, context, None, images, model)
temperature = temperature if temperature else self.options["temperature"]
# First try with remote API
if not force_local:
try:
headers = self._build_headers(model, tools, False)
options = self._get_options(temperature)
response = await self._call_remote_api_async(model, tools, stream, options, None, headers)
# Process response from async client
# [Rest of the response processing code as in the original method]
except Exception as e:
print_red(f"Remote API error: {str(e)}")
print_yellow("Falling back to local Ollama...")
# Fallback to local Ollama (for async we'll need to use the sync version)
if force_local or 'response' not in locals():
try:
return await self._call_local_ollama_async(model, stream, temperature)
except Exception as e:
print_red(f"Local Ollama error: {str(e)}")
return "Both remote API and local Ollama failed. An error occurred."
# Store the complete response in message history
self.messages.append({"role": "assistant", "content": accumulated_content})
if not self.chat:
self.messages = [self.messages[0]]
def prepare_images(self, images, message):
"""
@ -532,13 +596,7 @@ class LLM:
message["images"] = base64_images
return message
def remove_thinking(response):
"""Remove the thinking section from the response"""
response_text = response.content if hasattr(response, "content") else str(response)
if "</think>" in response_text:
return response_text.split("</think>")[1].strip()
return response_text
if __name__ == "__main__":
llm = LLM()

Loading…
Cancel
Save