You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
581 lines
24 KiB
581 lines
24 KiB
from _llm import LLM |
|
|
|
|
|
if __name__ == "__main__": |
|
llm = LLM() |
|
|
|
result = llm.generate( |
|
query="I want to add 2 and 2", |
|
think=True, |
|
) |
|
print(result) |
|
# import os |
|
# import base64 |
|
# import re |
|
# from typing import Literal, Optional |
|
# from pydantic import BaseModel |
|
# import requests |
|
# import tiktoken |
|
# from ollama import ( |
|
# Client, |
|
# AsyncClient, |
|
# ResponseError, |
|
# ChatResponse, |
|
# Tool, |
|
# Options, |
|
# ) |
|
|
|
# import env_manager |
|
# from colorprinter.print_color import * |
|
|
|
# env_manager.set_env() |
|
|
|
# tokenizer = tiktoken.get_encoding("cl100k_base") |
|
|
|
|
|
# class LLM: |
|
# """ |
|
# LLM class for interacting with an instance of Ollama. |
|
|
|
# Attributes: |
|
# model (str): The model to be used for response generation. |
|
# system_message (str): The system message to be used in the chat. |
|
# options (dict): Options for the model, such as temperature. |
|
# messages (list): List of messages in the chat. |
|
# max_length_answer (int): Maximum length of the generated answer. |
|
# chat (bool): Whether the chat mode is enabled. |
|
# chosen_backend (str): The chosen backend server for the API. |
|
# client (Client): The client for synchronous API calls. |
|
# async_client (AsyncClient): The client for asynchronous API calls. |
|
# tools (list): List of tools to be used in generating the response. |
|
|
|
# Methods: |
|
# __init__(self, system_message, temperature, model, max_length_answer, messages, chat, chosen_backend): |
|
# Initializes the LLM class with the provided parameters. |
|
|
|
# get_model(self, model_alias): |
|
# Retrieves the model name based on the provided alias. |
|
|
|
# count_tokens(self): |
|
# Counts the number of tokens in the messages. |
|
|
|
# get_least_conn_server(self): |
|
# Retrieves the least connected server from the backend. |
|
|
|
# generate(self, query, user_input, context, stream, tools, images, model, temperature): |
|
# Generates a response based on the provided query and options. |
|
|
|
# make_summary(self, text): |
|
# Generates a summary of the provided text. |
|
|
|
# read_stream(self, response): |
|
# Handles streaming responses. |
|
|
|
# async_generate(self, query, user_input, context, stream, tools, images, model, temperature): |
|
# Asynchronously generates a response based on the provided query and options. |
|
|
|
# prepare_images(self, images, message): |
|
# """ |
|
|
|
# def __init__( |
|
# self, |
|
# system_message: str = "You are an assistant.", |
|
# temperature: float = 0.01, |
|
# model: Optional[ |
|
# Literal["small", "standard", "vision", "reasoning", "tools"] |
|
# ] = "standard", |
|
# max_length_answer: int = 4096, |
|
# messages: list[dict] = None, |
|
# chat: bool = True, |
|
# chosen_backend: str = None, |
|
# tools: list = None, |
|
# ) -> None: |
|
# """ |
|
# Initialize the assistant with the given parameters. |
|
|
|
# Args: |
|
# system_message (str): The initial system message for the assistant. Defaults to "You are an assistant.". |
|
# temperature (float): The temperature setting for the model, affecting randomness. Defaults to 0.01. |
|
# model (Optional[Literal["small", "standard", "vision", "reasoning"]]): The model type to use. Defaults to "standard". |
|
# max_length_answer (int): The maximum length of the generated answer. Defaults to 4096. |
|
# messages (list[dict], optional): A list of initial messages. Defaults to None. |
|
# chat (bool): Whether the assistant is in chat mode. Defaults to True. |
|
# chosen_backend (str, optional): The backend server to use. If not provided, the least connected server is chosen. |
|
|
|
# Returns: |
|
# None |
|
# """ |
|
|
|
# self.model = self.get_model(model) |
|
# self.call_model = ( |
|
# self.model |
|
# ) # This is set per call to decide what model that was actually used |
|
# self.system_message = system_message |
|
# self.options = {"temperature": temperature} |
|
# self.messages = messages or [{"role": "system", "content": self.system_message}] |
|
# self.max_length_answer = max_length_answer |
|
# self.chat = chat |
|
|
|
# if not chosen_backend: |
|
# chosen_backend = self.get_least_conn_server() |
|
# self.chosen_backend = chosen_backend |
|
|
|
|
|
# headers = { |
|
# "Authorization": f"Basic {self.get_credentials()}", |
|
# "X-Chosen-Backend": self.chosen_backend, |
|
# } |
|
# self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/") |
|
# self.host_url = 'http://192.168.1.12:3300' #! Change back when possible |
|
# self.client: Client = Client(host=self.host_url, headers=headers, timeout=240) |
|
# self.async_client: AsyncClient = AsyncClient() |
|
|
|
# def get_credentials(self): |
|
# # Initialize the client with the host and default headers |
|
# credentials = f"{os.getenv('LLM_API_USER')}:{os.getenv('LLM_API_PWD_LASSE')}" |
|
# return base64.b64encode(credentials.encode()).decode() |
|
|
|
# def get_model(self, model_alias): |
|
|
|
# models = { |
|
# "standard": "LLM_MODEL", |
|
# "small": "LLM_MODEL_SMALL", |
|
# "vision": "LLM_MODEL_VISION", |
|
# "standard_64k": "LLM_MODEL_LARGE", |
|
# "reasoning": "LLM_MODEL_REASONING", |
|
# "tools": "LLM_MODEL_TOOLS", |
|
# } |
|
# model = os.getenv(models.get(model_alias, "LLM_MODEL")) |
|
# self.model = model |
|
# return model |
|
|
|
# def count_tokens(self): |
|
# num_tokens = 0 |
|
# for i in self.messages: |
|
# for k, v in i.items(): |
|
# if k == "content": |
|
# if not isinstance(v, str): |
|
# v = str(v) |
|
# tokens = tokenizer.encode(v) |
|
# num_tokens += len(tokens) |
|
# return int(num_tokens) |
|
|
|
# def get_least_conn_server(self): |
|
# try: |
|
# response = requests.get("http://192.168.1.12:5000/least_conn") |
|
# response.raise_for_status() |
|
# # Extract the least connected server from the response |
|
# least_conn_server = response.headers.get("X-Upstream-Address") |
|
# return least_conn_server |
|
# except requests.RequestException as e: |
|
# print_red("Error getting least connected server:", e) |
|
# return None |
|
|
|
# def generate( |
|
# self, |
|
# query: str = None, |
|
# user_input: str = None, |
|
# context: str = None, |
|
# stream: bool = False, |
|
# tools: list = None, |
|
# images: list = None, |
|
# model: Optional[ |
|
# Literal["small", "standard", "vision", "reasoning", "tools"] |
|
# ] = None, |
|
# temperature: float = None, |
|
# messages: list[dict] = None, |
|
# format: BaseModel = None, |
|
# think: bool = False |
|
# ): |
|
# """ |
|
# Generate a response based on the provided query and context. |
|
# Parameters: |
|
# query (str): The query string from the user. |
|
# user_input (str): Additional user input to be appended to the last message. |
|
# context (str): Contextual information to be used in generating the response. |
|
# stream (bool): Whether to stream the response. |
|
# tools (list): List of tools to be used in generating the response. |
|
# images (list): List of images to be included in the response. |
|
# model (Optional[Literal["small", "standard", "vision", "tools"]]): The model type to be used. |
|
# temperature (float): The temperature setting for the model. |
|
# messages (list[dict]): List of previous messages in the conversation. |
|
# format (Optional[BaseModel]): The format of the response. |
|
# think (bool): Whether to use the reasoning model. |
|
|
|
# Returns: |
|
# str: The generated response or an error message if an exception occurs. |
|
# """ |
|
|
|
# # Prepare the model and temperature |
|
|
|
# model = self.get_model(model) if model else self.model |
|
# # if model == self.get_model('tools'): |
|
# # stream = False |
|
# temperature = temperature if temperature else self.options["temperature"] |
|
|
|
# if messages: |
|
# messages = [ |
|
# {"role": i["role"], "content": re.sub(r"\s*\n\s*", "\n", i["content"])} |
|
# for i in messages |
|
# ] |
|
# message = messages.pop(-1) |
|
# query = message["content"] |
|
# self.messages = messages |
|
# else: |
|
# # Normalize whitespace and add the query to the messages |
|
# query = re.sub(r"\s*\n\s*", "\n", query) |
|
# message = {"role": "user", "content": query} |
|
|
|
# # Handle images if any |
|
# if images: |
|
# message = self.prepare_images(images, message) |
|
# model = self.get_model("vision") |
|
|
|
# self.messages.append(message) |
|
|
|
# # Prepare headers |
|
# headers = {"Authorization": f"Basic {self.get_credentials()}"} |
|
# if self.chosen_backend and model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]: #TODO Maybe reasoning shouldn't be here. |
|
# headers["X-Chosen-Backend"] = self.chosen_backend |
|
|
|
# if model == self.get_model("small"): |
|
# headers["X-Model-Type"] = "small" |
|
# if model == self.get_model("tools"): |
|
# headers["X-Model-Type"] = "tools" |
|
|
|
# reasoning_models = ['qwen3', 'deepseek'] #TODO Add more reasoning models here when added to ollama |
|
# if any([model_name in model for model_name in reasoning_models]): |
|
# if think: |
|
# self.messages[-1]['content'] = f"/think\n{self.messages[-1]['content']}" |
|
# else: |
|
# self.messages[-1]['content'] = f"/no_think\n{self.messages[-1]['content']}" |
|
|
|
# # Prepare options |
|
# options = Options(**self.options) |
|
# options.temperature = temperature |
|
|
|
# # Call the client.chat method |
|
# try: |
|
# self.call_model = model |
|
# self.client: Client = Client(host=self.host_url, headers=headers, timeout=300) #! |
|
# #print_rainbow(self.client._client.__dict__) |
|
# print_yellow(f"🤖 Generating using {model}...") |
|
# # if headers: |
|
# # self.client.headers.update(headers) |
|
# response = self.client.chat( |
|
# model=model, |
|
# messages=self.messages, |
|
# tools=tools, |
|
# stream=stream, |
|
# options=options, |
|
# keep_alive=3600 * 24 * 7, |
|
# format=format |
|
# ) |
|
|
|
# except ResponseError as e: |
|
# print_red("Error!") |
|
# print(e) |
|
# return "An error occurred." |
|
# # print_rainbow(response.__dict__) |
|
# # If user_input is provided, update the last message |
|
|
|
# if user_input: |
|
# if context: |
|
# if len(context) > 2000: |
|
# context = self.make_summary(context) |
|
# user_input = ( |
|
# f"{user_input}\n\nUse the information below to answer the question.\n" |
|
# f'"""{context}"""\n[This is a summary of the context provided in the original message.]' |
|
# ) |
|
# system_message_info = "\nSometimes some of the messages in the chat history are summarised, then that is clearly indicated in the message." |
|
# if system_message_info not in self.messages[0]["content"]: |
|
# self.messages[0]["content"] += system_message_info |
|
# self.messages[-1] = {"role": "user", "content": user_input} |
|
|
|
# # self.chosen_backend = self.client.last_response.headers.get("X-Chosen-Backend") |
|
|
|
# # Handle streaming response |
|
# if stream: |
|
# print_purple("STREAMING") |
|
# return self.read_stream(response) |
|
# else: |
|
# # Process the response |
|
# if isinstance(response, ChatResponse): |
|
# result = response.message.content.strip('"') |
|
# if '</think>' in result: |
|
# result = result.split('</think>')[-1] |
|
# self.messages.append( |
|
# {"role": "assistant", "content": result.strip('"')} |
|
# ) |
|
# if tools and not response.message.get("tool_calls"): |
|
# print_yellow("No tool calls in response".upper()) |
|
# if not self.chat: |
|
# self.messages = [self.messages[0]] |
|
|
|
# if not think: |
|
# response.message.content = remove_thinking(response.message.content) |
|
# return response.message |
|
# else: |
|
# print_red("Unexpected response type") |
|
# return "An error occurred." |
|
|
|
# def make_summary(self, text): |
|
# # Implement your summary logic using self.client.chat() |
|
# summary_message = { |
|
# "role": "user", |
|
# "content": f'Summarize the text below:\n"""{text}"""\nRemember to be concise and detailed. Answer in English.', |
|
# } |
|
# messages = [ |
|
# { |
|
# "role": "system", |
|
# "content": "You are summarizing a text. Make it detailed and concise. Answer ONLY with the summary. Don't add any new information.", |
|
# }, |
|
# summary_message, |
|
# ] |
|
# try: |
|
# response = self.client.chat( |
|
# model=self.get_model("small"), |
|
# messages=messages, |
|
# options=Options(temperature=0.01), |
|
# keep_alive=3600 * 24 * 7, |
|
# ) |
|
# summary = response.message.content.strip() |
|
# print_blue("Summary:", summary) |
|
# return summary |
|
# except ResponseError as e: |
|
# print_red("Error generating summary:", e) |
|
# return "Summary generation failed." |
|
|
|
# def read_stream(self, response): |
|
# """ |
|
# Yields tuples of (chunk_type, text). The first tuple is ('thinking', ...) |
|
# if in_thinking is True and stops at </think>. After that, yields ('normal', ...) |
|
# for the rest of the text. |
|
# """ |
|
# thinking_buffer = "" |
|
# in_thinking = self.call_model == self.get_model("reasoning") |
|
# first_chunk = True |
|
# prev_content = None |
|
|
|
# for chunk in response: |
|
# if not chunk: |
|
# continue |
|
# content = chunk.message.content |
|
|
|
# # Remove leading quote if it's the first chunk |
|
# if first_chunk and content.startswith('"'): |
|
# content = content[1:] |
|
# first_chunk = False |
|
|
|
# if in_thinking: |
|
# thinking_buffer += content |
|
# if "</think>" in thinking_buffer: |
|
# end_idx = thinking_buffer.index("</think>") + len("</think>") |
|
# yield ("thinking", thinking_buffer[:end_idx]) |
|
# remaining = thinking_buffer[end_idx:].strip('"') |
|
# if chunk.done and remaining: |
|
# yield ("normal", remaining) |
|
# break |
|
# else: |
|
# prev_content = remaining |
|
# in_thinking = False |
|
# else: |
|
# if prev_content: |
|
# yield ("normal", prev_content) |
|
# prev_content = content |
|
|
|
# if chunk.done: |
|
# if prev_content and prev_content.endswith('"'): |
|
# prev_content = prev_content[:-1] |
|
# if prev_content: |
|
# yield ("normal", prev_content) |
|
# break |
|
|
|
# self.messages.append({"role": "assistant", "content": ""}) |
|
|
|
# async def async_generate( |
|
# self, |
|
# query: str = None, |
|
# user_input: str = None, |
|
# context: str = None, |
|
# stream: bool = False, |
|
# tools: list = None, |
|
# images: list = None, |
|
# model: Optional[Literal["small", "standard", "vision"]] = None, |
|
# temperature: float = None, |
|
# ): |
|
# """ |
|
# Asynchronously generates a response based on the provided query and other parameters. |
|
|
|
# Args: |
|
# query (str, optional): The query string to generate a response for. |
|
# user_input (str, optional): Additional user input to be included in the response. |
|
# context (str, optional): Context information to be used in generating the response. |
|
# stream (bool, optional): Whether to stream the response. Defaults to False. |
|
# tools (list, optional): List of tools to be used in generating the response. Will set the model to 'tools'. |
|
# images (list, optional): List of images to be included in the response. |
|
# model (Optional[Literal["small", "standard", "vision", "tools"]], optional): The model to be used for generating the response. |
|
# temperature (float, optional): The temperature setting for the model. |
|
|
|
# Returns: |
|
# str: The generated response or an error message if an exception occurs. |
|
|
|
# Raises: |
|
# ResponseError: If an error occurs during the response generation. |
|
|
|
# Notes: |
|
# - The function prepares the model and temperature settings. |
|
# - It normalizes whitespace in the query and handles images if provided. |
|
# - It prepares headers and options for the request. |
|
# - It adjusts options for long messages and calls the async client's chat method. |
|
# - If user_input is provided, it updates the last message. |
|
# - It updates the chosen backend based on the response headers. |
|
# - It handles streaming responses and processes the response accordingly. |
|
# - It's not neccecary to set model to 'tools' if you provide tools as an argument. |
|
# """ |
|
# print_yellow("ASYNC GENERATE") |
|
# # Normaliz e whitespace and add the query to the messages |
|
# query = re.sub(r"\s*\n\s*", "\n", query) |
|
# message = {"role": "user", "content": query} |
|
# self.messages.append(message) |
|
|
|
# # Prepare the model and temperature |
|
# model = self.get_model(model) if model else self.model |
|
# temperature = temperature if temperature else self.options["temperature"] |
|
|
|
# # Prepare options |
|
# options = Options(**self.options) |
|
# options.temperature = temperature |
|
|
|
# # Prepare headers |
|
# headers = {} |
|
|
|
# # Set model depending on the input |
|
# if images: |
|
# message = self.prepare_images(images, message) |
|
# model = self.get_model("vision") |
|
# elif tools: |
|
# model = self.get_model("tools") |
|
# headers["X-Model-Type"] = "tools" |
|
# tools = [Tool(**tool) if isinstance(tool, dict) else tool for tool in tools] |
|
# elif self.chosen_backend and model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]: |
|
# headers["X-Chosen-Backend"] = self.chosen_backend |
|
# elif model == self.get_model("small"): |
|
# headers["X-Model-Type"] = "small" |
|
|
|
# # Adjust options for long messages |
|
# if self.chat or len(self.messages) > 15000: |
|
# num_tokens = self.count_tokens() + self.max_length_answer // 2 |
|
# if num_tokens > 8000 and model not in [ |
|
# self.get_model("vision"), |
|
# self.get_model("tools"), |
|
# ]: |
|
# model = self.get_model("standard_64k") |
|
# headers["X-Model-Type"] = "large" |
|
|
|
# # Call the async client's chat method |
|
# try: |
|
# response = await self.async_client.chat( |
|
# model=model, |
|
# messages=self.messages, |
|
# headers=headers, |
|
# tools=tools, |
|
# stream=stream, |
|
# options=options, |
|
# keep_alive=3600 * 24 * 7, |
|
# ) |
|
# except ResponseError as e: |
|
# print_red("Error!") |
|
# print(e) |
|
# return "An error occurred." |
|
|
|
# # If user_input is provided, update the last message |
|
# if user_input: |
|
# if context: |
|
# if len(context) > 2000: |
|
# context = self.make_summary(context) |
|
# user_input = ( |
|
# f"{user_input}\n\nUse the information below to answer the question.\n" |
|
# f'"""{context}"""\n[This is a summary of the context provided in the original message.]' |
|
# ) |
|
# system_message_info = "\nSometimes some of the messages in the chat history are summarised, then that is clearly indicated in the message." |
|
# if system_message_info not in self.messages[0]["content"]: |
|
# self.messages[0]["content"] += system_message_info |
|
# self.messages[-1] = {"role": "user", "content": user_input} |
|
|
|
# print_red(self.async_client.last_response.headers.get("X-Chosen-Backend", "No backend")) |
|
# # Update chosen_backend |
|
# if model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]: |
|
# self.chosen_backend = self.async_client.last_response.headers.get( |
|
# "X-Chosen-Backend" |
|
# ) |
|
|
|
# # Handle streaming response |
|
# if stream: |
|
# return self.read_stream(response) |
|
# else: |
|
# # Process the response |
|
# if isinstance(response, ChatResponse): |
|
# result = response.message.content.strip('"') |
|
# self.messages.append( |
|
# {"role": "assistant", "content": result.strip('"')} |
|
# ) |
|
# if tools and not response.message.get("tool_calls"): |
|
# print_yellow("No tool calls in response".upper()) |
|
# if not self.chat: |
|
# self.messages = [self.messages[0]] |
|
# return result |
|
# else: |
|
# print_red("Unexpected response type") |
|
# return "An error occurred." |
|
|
|
# def prepare_images(self, images, message): |
|
# """ |
|
# Prepares a list of images by converting them to base64 encoded strings and adds them to the provided message dictionary. |
|
# Args: |
|
# images (list): A list of images, where each image can be a file path (str), a base64 encoded string (str), or bytes. |
|
# message (dict): A dictionary to which the base64 encoded images will be added under the key "images". |
|
# Returns: |
|
# dict: The updated message dictionary with the base64 encoded images added under the key "images". |
|
# Raises: |
|
# ValueError: If an image is not a string or bytes. |
|
# """ |
|
# import base64 |
|
|
|
# base64_images = [] |
|
# base64_pattern = re.compile(r"^[A-Za-z0-9+/]+={0,2}$") |
|
|
|
# for image in images: |
|
# if isinstance(image, str): |
|
# if base64_pattern.match(image): |
|
# base64_images.append(image) |
|
# else: |
|
# with open(image, "rb") as image_file: |
|
# base64_images.append( |
|
# base64.b64encode(image_file.read()).decode("utf-8") |
|
# ) |
|
# elif isinstance(image, bytes): |
|
# base64_images.append(base64.b64encode(image).decode("utf-8")) |
|
# else: |
|
# print_red("Invalid image type") |
|
|
|
# message["images"] = base64_images |
|
# # Use the vision model |
|
|
|
# return message |
|
|
|
# def remove_thinking(response): |
|
# """Remove the thinking section from the response""" |
|
# response_text = response.content if hasattr(response, "content") else str(response) |
|
# if "</think>" in response_text: |
|
# return response_text.split("</think>")[1].strip() |
|
# return response_text |
|
|
|
# if __name__ == "__main__": |
|
|
|
# llm = LLM() |
|
|
|
# result = llm.generate( |
|
# query="I want to add 2 and 2", |
|
# ) |
|
# print(result.content)
|
|
|