You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

581 lines
24 KiB

from _llm import LLM
if __name__ == "__main__":
llm = LLM()
result = llm.generate(
query="I want to add 2 and 2",
think=True,
)
print(result)
# import os
# import base64
# import re
# from typing import Literal, Optional
# from pydantic import BaseModel
# import requests
# import tiktoken
# from ollama import (
# Client,
# AsyncClient,
# ResponseError,
# ChatResponse,
# Tool,
# Options,
# )
# import env_manager
# from colorprinter.print_color import *
# env_manager.set_env()
# tokenizer = tiktoken.get_encoding("cl100k_base")
# class LLM:
# """
# LLM class for interacting with an instance of Ollama.
# Attributes:
# model (str): The model to be used for response generation.
# system_message (str): The system message to be used in the chat.
# options (dict): Options for the model, such as temperature.
# messages (list): List of messages in the chat.
# max_length_answer (int): Maximum length of the generated answer.
# chat (bool): Whether the chat mode is enabled.
# chosen_backend (str): The chosen backend server for the API.
# client (Client): The client for synchronous API calls.
# async_client (AsyncClient): The client for asynchronous API calls.
# tools (list): List of tools to be used in generating the response.
# Methods:
# __init__(self, system_message, temperature, model, max_length_answer, messages, chat, chosen_backend):
# Initializes the LLM class with the provided parameters.
# get_model(self, model_alias):
# Retrieves the model name based on the provided alias.
# count_tokens(self):
# Counts the number of tokens in the messages.
# get_least_conn_server(self):
# Retrieves the least connected server from the backend.
# generate(self, query, user_input, context, stream, tools, images, model, temperature):
# Generates a response based on the provided query and options.
# make_summary(self, text):
# Generates a summary of the provided text.
# read_stream(self, response):
# Handles streaming responses.
# async_generate(self, query, user_input, context, stream, tools, images, model, temperature):
# Asynchronously generates a response based on the provided query and options.
# prepare_images(self, images, message):
# """
# def __init__(
# self,
# system_message: str = "You are an assistant.",
# temperature: float = 0.01,
# model: Optional[
# Literal["small", "standard", "vision", "reasoning", "tools"]
# ] = "standard",
# max_length_answer: int = 4096,
# messages: list[dict] = None,
# chat: bool = True,
# chosen_backend: str = None,
# tools: list = None,
# ) -> None:
# """
# Initialize the assistant with the given parameters.
# Args:
# system_message (str): The initial system message for the assistant. Defaults to "You are an assistant.".
# temperature (float): The temperature setting for the model, affecting randomness. Defaults to 0.01.
# model (Optional[Literal["small", "standard", "vision", "reasoning"]]): The model type to use. Defaults to "standard".
# max_length_answer (int): The maximum length of the generated answer. Defaults to 4096.
# messages (list[dict], optional): A list of initial messages. Defaults to None.
# chat (bool): Whether the assistant is in chat mode. Defaults to True.
# chosen_backend (str, optional): The backend server to use. If not provided, the least connected server is chosen.
# Returns:
# None
# """
# self.model = self.get_model(model)
# self.call_model = (
# self.model
# ) # This is set per call to decide what model that was actually used
# self.system_message = system_message
# self.options = {"temperature": temperature}
# self.messages = messages or [{"role": "system", "content": self.system_message}]
# self.max_length_answer = max_length_answer
# self.chat = chat
# if not chosen_backend:
# chosen_backend = self.get_least_conn_server()
# self.chosen_backend = chosen_backend
# headers = {
# "Authorization": f"Basic {self.get_credentials()}",
# "X-Chosen-Backend": self.chosen_backend,
# }
# self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/")
# self.host_url = 'http://192.168.1.12:3300' #! Change back when possible
# self.client: Client = Client(host=self.host_url, headers=headers, timeout=240)
# self.async_client: AsyncClient = AsyncClient()
# def get_credentials(self):
# # Initialize the client with the host and default headers
# credentials = f"{os.getenv('LLM_API_USER')}:{os.getenv('LLM_API_PWD_LASSE')}"
# return base64.b64encode(credentials.encode()).decode()
# def get_model(self, model_alias):
# models = {
# "standard": "LLM_MODEL",
# "small": "LLM_MODEL_SMALL",
# "vision": "LLM_MODEL_VISION",
# "standard_64k": "LLM_MODEL_LARGE",
# "reasoning": "LLM_MODEL_REASONING",
# "tools": "LLM_MODEL_TOOLS",
# }
# model = os.getenv(models.get(model_alias, "LLM_MODEL"))
# self.model = model
# return model
# def count_tokens(self):
# num_tokens = 0
# for i in self.messages:
# for k, v in i.items():
# if k == "content":
# if not isinstance(v, str):
# v = str(v)
# tokens = tokenizer.encode(v)
# num_tokens += len(tokens)
# return int(num_tokens)
# def get_least_conn_server(self):
# try:
# response = requests.get("http://192.168.1.12:5000/least_conn")
# response.raise_for_status()
# # Extract the least connected server from the response
# least_conn_server = response.headers.get("X-Upstream-Address")
# return least_conn_server
# except requests.RequestException as e:
# print_red("Error getting least connected server:", e)
# return None
# def generate(
# self,
# query: str = None,
# user_input: str = None,
# context: str = None,
# stream: bool = False,
# tools: list = None,
# images: list = None,
# model: Optional[
# Literal["small", "standard", "vision", "reasoning", "tools"]
# ] = None,
# temperature: float = None,
# messages: list[dict] = None,
# format: BaseModel = None,
# think: bool = False
# ):
# """
# Generate a response based on the provided query and context.
# Parameters:
# query (str): The query string from the user.
# user_input (str): Additional user input to be appended to the last message.
# context (str): Contextual information to be used in generating the response.
# stream (bool): Whether to stream the response.
# tools (list): List of tools to be used in generating the response.
# images (list): List of images to be included in the response.
# model (Optional[Literal["small", "standard", "vision", "tools"]]): The model type to be used.
# temperature (float): The temperature setting for the model.
# messages (list[dict]): List of previous messages in the conversation.
# format (Optional[BaseModel]): The format of the response.
# think (bool): Whether to use the reasoning model.
# Returns:
# str: The generated response or an error message if an exception occurs.
# """
# # Prepare the model and temperature
# model = self.get_model(model) if model else self.model
# # if model == self.get_model('tools'):
# # stream = False
# temperature = temperature if temperature else self.options["temperature"]
# if messages:
# messages = [
# {"role": i["role"], "content": re.sub(r"\s*\n\s*", "\n", i["content"])}
# for i in messages
# ]
# message = messages.pop(-1)
# query = message["content"]
# self.messages = messages
# else:
# # Normalize whitespace and add the query to the messages
# query = re.sub(r"\s*\n\s*", "\n", query)
# message = {"role": "user", "content": query}
# # Handle images if any
# if images:
# message = self.prepare_images(images, message)
# model = self.get_model("vision")
# self.messages.append(message)
# # Prepare headers
# headers = {"Authorization": f"Basic {self.get_credentials()}"}
# if self.chosen_backend and model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]: #TODO Maybe reasoning shouldn't be here.
# headers["X-Chosen-Backend"] = self.chosen_backend
# if model == self.get_model("small"):
# headers["X-Model-Type"] = "small"
# if model == self.get_model("tools"):
# headers["X-Model-Type"] = "tools"
# reasoning_models = ['qwen3', 'deepseek'] #TODO Add more reasoning models here when added to ollama
# if any([model_name in model for model_name in reasoning_models]):
# if think:
# self.messages[-1]['content'] = f"/think\n{self.messages[-1]['content']}"
# else:
# self.messages[-1]['content'] = f"/no_think\n{self.messages[-1]['content']}"
# # Prepare options
# options = Options(**self.options)
# options.temperature = temperature
# # Call the client.chat method
# try:
# self.call_model = model
# self.client: Client = Client(host=self.host_url, headers=headers, timeout=300) #!
# #print_rainbow(self.client._client.__dict__)
# print_yellow(f"🤖 Generating using {model}...")
# # if headers:
# # self.client.headers.update(headers)
# response = self.client.chat(
# model=model,
# messages=self.messages,
# tools=tools,
# stream=stream,
# options=options,
# keep_alive=3600 * 24 * 7,
# format=format
# )
# except ResponseError as e:
# print_red("Error!")
# print(e)
# return "An error occurred."
# # print_rainbow(response.__dict__)
# # If user_input is provided, update the last message
# if user_input:
# if context:
# if len(context) > 2000:
# context = self.make_summary(context)
# user_input = (
# f"{user_input}\n\nUse the information below to answer the question.\n"
# f'"""{context}"""\n[This is a summary of the context provided in the original message.]'
# )
# system_message_info = "\nSometimes some of the messages in the chat history are summarised, then that is clearly indicated in the message."
# if system_message_info not in self.messages[0]["content"]:
# self.messages[0]["content"] += system_message_info
# self.messages[-1] = {"role": "user", "content": user_input}
# # self.chosen_backend = self.client.last_response.headers.get("X-Chosen-Backend")
# # Handle streaming response
# if stream:
# print_purple("STREAMING")
# return self.read_stream(response)
# else:
# # Process the response
# if isinstance(response, ChatResponse):
# result = response.message.content.strip('"')
# if '</think>' in result:
# result = result.split('</think>')[-1]
# self.messages.append(
# {"role": "assistant", "content": result.strip('"')}
# )
# if tools and not response.message.get("tool_calls"):
# print_yellow("No tool calls in response".upper())
# if not self.chat:
# self.messages = [self.messages[0]]
# if not think:
# response.message.content = remove_thinking(response.message.content)
# return response.message
# else:
# print_red("Unexpected response type")
# return "An error occurred."
# def make_summary(self, text):
# # Implement your summary logic using self.client.chat()
# summary_message = {
# "role": "user",
# "content": f'Summarize the text below:\n"""{text}"""\nRemember to be concise and detailed. Answer in English.',
# }
# messages = [
# {
# "role": "system",
# "content": "You are summarizing a text. Make it detailed and concise. Answer ONLY with the summary. Don't add any new information.",
# },
# summary_message,
# ]
# try:
# response = self.client.chat(
# model=self.get_model("small"),
# messages=messages,
# options=Options(temperature=0.01),
# keep_alive=3600 * 24 * 7,
# )
# summary = response.message.content.strip()
# print_blue("Summary:", summary)
# return summary
# except ResponseError as e:
# print_red("Error generating summary:", e)
# return "Summary generation failed."
# def read_stream(self, response):
# """
# Yields tuples of (chunk_type, text). The first tuple is ('thinking', ...)
# if in_thinking is True and stops at </think>. After that, yields ('normal', ...)
# for the rest of the text.
# """
# thinking_buffer = ""
# in_thinking = self.call_model == self.get_model("reasoning")
# first_chunk = True
# prev_content = None
# for chunk in response:
# if not chunk:
# continue
# content = chunk.message.content
# # Remove leading quote if it's the first chunk
# if first_chunk and content.startswith('"'):
# content = content[1:]
# first_chunk = False
# if in_thinking:
# thinking_buffer += content
# if "</think>" in thinking_buffer:
# end_idx = thinking_buffer.index("</think>") + len("</think>")
# yield ("thinking", thinking_buffer[:end_idx])
# remaining = thinking_buffer[end_idx:].strip('"')
# if chunk.done and remaining:
# yield ("normal", remaining)
# break
# else:
# prev_content = remaining
# in_thinking = False
# else:
# if prev_content:
# yield ("normal", prev_content)
# prev_content = content
# if chunk.done:
# if prev_content and prev_content.endswith('"'):
# prev_content = prev_content[:-1]
# if prev_content:
# yield ("normal", prev_content)
# break
# self.messages.append({"role": "assistant", "content": ""})
# async def async_generate(
# self,
# query: str = None,
# user_input: str = None,
# context: str = None,
# stream: bool = False,
# tools: list = None,
# images: list = None,
# model: Optional[Literal["small", "standard", "vision"]] = None,
# temperature: float = None,
# ):
# """
# Asynchronously generates a response based on the provided query and other parameters.
# Args:
# query (str, optional): The query string to generate a response for.
# user_input (str, optional): Additional user input to be included in the response.
# context (str, optional): Context information to be used in generating the response.
# stream (bool, optional): Whether to stream the response. Defaults to False.
# tools (list, optional): List of tools to be used in generating the response. Will set the model to 'tools'.
# images (list, optional): List of images to be included in the response.
# model (Optional[Literal["small", "standard", "vision", "tools"]], optional): The model to be used for generating the response.
# temperature (float, optional): The temperature setting for the model.
# Returns:
# str: The generated response or an error message if an exception occurs.
# Raises:
# ResponseError: If an error occurs during the response generation.
# Notes:
# - The function prepares the model and temperature settings.
# - It normalizes whitespace in the query and handles images if provided.
# - It prepares headers and options for the request.
# - It adjusts options for long messages and calls the async client's chat method.
# - If user_input is provided, it updates the last message.
# - It updates the chosen backend based on the response headers.
# - It handles streaming responses and processes the response accordingly.
# - It's not neccecary to set model to 'tools' if you provide tools as an argument.
# """
# print_yellow("ASYNC GENERATE")
# # Normaliz e whitespace and add the query to the messages
# query = re.sub(r"\s*\n\s*", "\n", query)
# message = {"role": "user", "content": query}
# self.messages.append(message)
# # Prepare the model and temperature
# model = self.get_model(model) if model else self.model
# temperature = temperature if temperature else self.options["temperature"]
# # Prepare options
# options = Options(**self.options)
# options.temperature = temperature
# # Prepare headers
# headers = {}
# # Set model depending on the input
# if images:
# message = self.prepare_images(images, message)
# model = self.get_model("vision")
# elif tools:
# model = self.get_model("tools")
# headers["X-Model-Type"] = "tools"
# tools = [Tool(**tool) if isinstance(tool, dict) else tool for tool in tools]
# elif self.chosen_backend and model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]:
# headers["X-Chosen-Backend"] = self.chosen_backend
# elif model == self.get_model("small"):
# headers["X-Model-Type"] = "small"
# # Adjust options for long messages
# if self.chat or len(self.messages) > 15000:
# num_tokens = self.count_tokens() + self.max_length_answer // 2
# if num_tokens > 8000 and model not in [
# self.get_model("vision"),
# self.get_model("tools"),
# ]:
# model = self.get_model("standard_64k")
# headers["X-Model-Type"] = "large"
# # Call the async client's chat method
# try:
# response = await self.async_client.chat(
# model=model,
# messages=self.messages,
# headers=headers,
# tools=tools,
# stream=stream,
# options=options,
# keep_alive=3600 * 24 * 7,
# )
# except ResponseError as e:
# print_red("Error!")
# print(e)
# return "An error occurred."
# # If user_input is provided, update the last message
# if user_input:
# if context:
# if len(context) > 2000:
# context = self.make_summary(context)
# user_input = (
# f"{user_input}\n\nUse the information below to answer the question.\n"
# f'"""{context}"""\n[This is a summary of the context provided in the original message.]'
# )
# system_message_info = "\nSometimes some of the messages in the chat history are summarised, then that is clearly indicated in the message."
# if system_message_info not in self.messages[0]["content"]:
# self.messages[0]["content"] += system_message_info
# self.messages[-1] = {"role": "user", "content": user_input}
# print_red(self.async_client.last_response.headers.get("X-Chosen-Backend", "No backend"))
# # Update chosen_backend
# if model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]:
# self.chosen_backend = self.async_client.last_response.headers.get(
# "X-Chosen-Backend"
# )
# # Handle streaming response
# if stream:
# return self.read_stream(response)
# else:
# # Process the response
# if isinstance(response, ChatResponse):
# result = response.message.content.strip('"')
# self.messages.append(
# {"role": "assistant", "content": result.strip('"')}
# )
# if tools and not response.message.get("tool_calls"):
# print_yellow("No tool calls in response".upper())
# if not self.chat:
# self.messages = [self.messages[0]]
# return result
# else:
# print_red("Unexpected response type")
# return "An error occurred."
# def prepare_images(self, images, message):
# """
# Prepares a list of images by converting them to base64 encoded strings and adds them to the provided message dictionary.
# Args:
# images (list): A list of images, where each image can be a file path (str), a base64 encoded string (str), or bytes.
# message (dict): A dictionary to which the base64 encoded images will be added under the key "images".
# Returns:
# dict: The updated message dictionary with the base64 encoded images added under the key "images".
# Raises:
# ValueError: If an image is not a string or bytes.
# """
# import base64
# base64_images = []
# base64_pattern = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")
# for image in images:
# if isinstance(image, str):
# if base64_pattern.match(image):
# base64_images.append(image)
# else:
# with open(image, "rb") as image_file:
# base64_images.append(
# base64.b64encode(image_file.read()).decode("utf-8")
# )
# elif isinstance(image, bytes):
# base64_images.append(base64.b64encode(image).decode("utf-8"))
# else:
# print_red("Invalid image type")
# message["images"] = base64_images
# # Use the vision model
# return message
# def remove_thinking(response):
# """Remove the thinking section from the response"""
# response_text = response.content if hasattr(response, "content") else str(response)
# if "</think>" in response_text:
# return response_text.split("</think>")[1].strip()
# return response_text
# if __name__ == "__main__":
# llm = LLM()
# result = llm.generate(
# query="I want to add 2 and 2",
# )
# print(result.content)