first commit

legacy
lasseedfast 7 months ago
commit 6c807c8744
  1. 36
      .gitignore
  2. 0
      LICENSE
  3. 109
      README.md
  4. 4
      _llm/__init__.py
  5. 0
      _llm/__main__.py
  6. 554
      _llm/llm.py
  7. 26
      setup.py

36
.gitignore vendored

@ -0,0 +1,36 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Environment variables
.env
.venv
.envrc
# IDE specific files
.idea/
.vscode/
*.swp
*.swo
# OS specific files
.DS_Store
Thumbs.db

@ -0,0 +1,109 @@
# _llm
A Python package for interacting with LLM models through Ollama, supporting both remote API and local Ollama instances.
## Installation
Install directly from GitHub:
```bash
pip install git+https://github.com/lasseedfast/_llm.git
```
Or clone and install for development:
```bash
git clone https://github.com/lasseedfast/_llm.git
cd _llm
pip install -e .
```
## Dependencies
This package requires:
- env_manager: `pip install git+https://github.com/lasseedfast/env_manager.git`
- colorprinter: `pip install git+https://github.com/lasseedfast/colorprinter.git`
- ollama: For local model inference
- tiktoken: For token counting
- requests: For API communication
## Environment Variables
The package requires several environment variables to be set:
- `LLM_API_URL`: URL of the Ollama API
- `LLM_API_USER`: Username for API authentication
- `LLM_API_PWD_LASSE`: Password for API authentication
- `LLM_MODEL`: Standard model name
- `LLM_MODEL_SMALL`: Small model name
- `LLM_MODEL_VISION`: Vision model name
- `LLM_MODEL_LARGE`: Large context model name
- `LLM_MODEL_REASONING`: Reasoning model name
- `LLM_MODEL_TOOLS`: Tools model name
These can be set in a `.env` file in your project directory or in the ArangoDB environment document in the div database.
## Basic Usage
```python
from _llm import LLM
# Initialize the LLM
llm = LLM()
# Generate a response
result = llm.generate(
query="I want to add 2 and 2",
)
print(result.content)
```
## Advanced Usage
### Working with Images
```python
from _llm import LLM
llm = LLM()
response = llm.generate(
query="What's in this image?",
images=["path/to/image.jpg"],
model="vision"
)
```
### Streaming Responses
```python
from _llm import LLM
llm = LLM()
for chunk_type, chunk in llm.generate(
query="Write a paragraph about AI",
stream=True
):
print(f"{chunk_type}: {chunk}")
```
### Using Async API
```python
import asyncio
from _llm import LLM
async def main():
llm = LLM()
response = await llm.async_generate(
query="What is machine learning?",
model="standard"
)
print(response)
asyncio.run(main())
```
## License
MIT

@ -0,0 +1,4 @@
from .llm import LLM, remove_thinking
__version__ = "0.1.0"
__all__ = ["LLM", "remove_thinking"]

@ -0,0 +1,554 @@
import os
import base64
import re
import traceback
from typing import Literal, Optional
import requests
import tiktoken
from ollama import (
Client,
AsyncClient,
ResponseError,
ChatResponse,
Tool,
Options,
)
import env_manager
from colorprinter.print_color import *
env_manager.set_env()
print(os.environ)
tokenizer = tiktoken.get_encoding("cl100k_base")
class LLM:
"""
LLM class for interacting with an instance of Ollama.
Attributes:
model (str): The model to be used for response generation.
system_message (str): The system message to be used in the chat.
options (dict): Options for the model, such as temperature.
messages (list): List of messages in the chat.
max_length_answer (int): Maximum length of the generated answer.
chat (bool): Whether the chat mode is enabled.
chosen_backend (str): The chosen backend server for the API.
client (Client): The client for synchronous API calls.
async_client (AsyncClient): The client for asynchronous API calls.
tools (list): List of tools to be used in generating the response.
Note:
Most logic for message preparation, header building, and API calls is handled by private helper methods.
The main public methods are:
- generate: Synchronous generation (see method docstring)
- async_generate: Asynchronous generation (see method docstring)
- make_summary: Generate a summary of a text
- read_stream: Handle streaming responses
- prepare_images: Prepare images for vision models
"""
def __init__(
self,
system_message: str = "You are an assistant.",
temperature: float = 0.01,
model: Optional[
Literal["small", "standard", "vision", "reasoning", "tools"]
] = "standard",
max_length_answer: int = 4096,
messages: list[dict] = None,
chat: bool = True,
chosen_backend: str = None,
tools: list = None,
) -> None:
"""
Initialize the assistant with the given parameters.
Args:
system_message (str): The initial system message for the assistant. Defaults to "You are an assistant.".
temperature (float): The temperature setting for the model, affecting randomness. Defaults to 0.01.
model (Optional[Literal["small", "standard", "vision", "reasoning"]]): The model type to use. Defaults to "standard".
max_length_answer (int): The maximum length of the generated answer. Defaults to 4096.
messages (list[dict], optional): A list of initial messages. Defaults to None.
chat (bool): Whether the assistant is in chat mode. Defaults to True.
chosen_backend (str, optional): The backend server to use. If not provided, the least connected server is chosen.
Returns:
None
"""
self.model = self.get_model(model)
self.call_model = (
self.model
) # This is set per call to decide what model that was actually used
self.system_message = system_message
self.options = {"temperature": temperature}
self.messages = messages or [{"role": "system", "content": self.system_message}]
self.max_length_answer = max_length_answer
self.chat = chat
self.chosen_backend = chosen_backend
headers = {
"Authorization": f"Basic {self.get_credentials()}",
}
if self.chosen_backend:
headers["X-Chosen-Backend"] = self.chosen_backend
self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/")
self.client: Client = Client(host=self.host_url, headers=headers, timeout=120)
self.async_client: AsyncClient = AsyncClient()
def get_credentials(self):
# Initialize the client with the host and default headers
credentials = f"{os.getenv('LLM_API_USER')}:{os.getenv('LLM_API_PWD_LASSE')}"
return base64.b64encode(credentials.encode()).decode()
def get_model(self, model_alias):
models = {
"standard": "LLM_MODEL",
"small": "LLM_MODEL_SMALL",
"vision": "LLM_MODEL_VISION",
"standard_64k": "LLM_MODEL_LARGE",
"reasoning": "LLM_MODEL_REASONING",
"tools": "LLM_MODEL_TOOLS",
}
model = os.getenv(models.get(model_alias, "LLM_MODEL"))
print_purple(f"Using model: {model}")
return model
def count_tokens(self):
num_tokens = 0
for i in self.messages:
for k, v in i.items():
if k == "content":
if not isinstance(v, str):
v = str(v)
tokens = tokenizer.encode(v)
num_tokens += len(tokens)
return int(num_tokens)
def _prepare_messages_and_model(self, query, user_input, context, messages, images, model):
"""Prepare messages and select the appropriate model, handling images if present."""
print_red('model', model)
if messages:
messages = [
{"role": i["role"], "content": re.sub(r"\s*\n\s*", "\n", i["content"])}
for i in messages
]
message = messages.pop(-1)
query = message["content"]
self.messages = messages
else:
query = re.sub(r"\s*\n\s*", "\n", query)
message = {"role": "user", "content": query}
if images:
message = self.prepare_images(images, message)
model = self.get_model("vision")
else:
model = self.get_model(model)
self.messages.append(message)
print_red('return model',model)
return model
def _build_headers(self, model, tools, think):
"""Build HTTP headers for API requests, including auth and backend/model info."""
headers = {"Authorization": f"Basic {self.get_credentials()}"}
if self.chosen_backend and model not in [self.get_model("vision"), self.get_model("tools"), self.get_model("reasoning")]:
headers["X-Chosen-Backend"] = self.chosen_backend
if model == self.get_model("small"):
headers["X-Model-Type"] = "small"
if model == self.get_model("tools"):
headers["X-Model-Type"] = "tools"
if think and model and any([m in model for m in ['qwen3', 'deepseek']]):
self.messages[-1]['content'] = f"/think\n{self.messages[-1]['content']}"
elif model and any([m in model for m in ['qwen3', 'deepseek']]):
self.messages[-1]['content'] = f"/no_think\n{self.messages[-1]['content']}"
return headers
def _get_options(self, temperature):
"""Build model options, setting temperature and other parameters."""
options = Options(**self.options)
options.temperature = temperature if temperature is not None else self.options["temperature"]
return options
def _call_remote_api(self, model, tools, stream, options, format, headers):
"""Call the remote Ollama API synchronously."""
self.call_model = model
self.client: Client = Client(host=self.host_url, headers=headers, timeout=300)
print_yellow(f"🤖 Generating using {model} (remote)...")
response = self.client.chat(
model=model,
messages=self.messages,
tools=tools,
stream=stream,
options=options,
keep_alive=3600 * 24 * 7,
format=format
)
return response
async def _call_remote_api_async(self, model, tools, stream, options, format, headers):
"""Call the remote Ollama API asynchronously."""
print_yellow(f"🤖 Generating using {model} (remote, async)...")
response = await self.async_client.chat(
model=model,
messages=self.messages,
headers=headers,
tools=tools,
stream=stream,
options=options,
keep_alive=3600 * 24 * 7,
)
return response
def _call_local_ollama(self, model, stream, temperature):
"""Call the local Ollama instance synchronously."""
import ollama
print_yellow(f"🤖 Generating using {model} (local)...")
options = {"temperature": temperature}
if stream:
response_stream = ollama.chat(
model=model,
messages=self.messages,
options=options,
stream=True
)
def local_stream_adapter():
for chunk in response_stream:
yield type('OllamaResponse', (), {
'message': type('Message', (), {'content': chunk['message']['content']}),
'done': chunk.get('done', False)
})
return self.read_stream(local_stream_adapter())
else:
response = ollama.chat(
model=model,
messages=self.messages,
options=options
)
result = response['message']['content']
response_obj = type('LocalChatResponse', (), {
'message': type('Message', (), {
'content': result,
'get': lambda x: None
})
})
if '</think>' in result:
result = result.split('</think>')[-1].strip()
response_obj.message.content = result
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
return response_obj.message
async def _call_local_ollama_async(self, model, stream, temperature):
"""Call the local Ollama instance asynchronously (using a thread pool)."""
import ollama
import asyncio
print_yellow(f"🤖 Generating using {model} (local, async)...")
options = {"temperature": temperature}
loop = asyncio.get_event_loop()
if stream:
def run_stream():
return ollama.chat(
model=model,
messages=self.messages,
options=options,
stream=True
)
response_stream = await loop.run_in_executor(None, run_stream)
async def local_stream_adapter():
for chunk in response_stream:
yield type('OllamaResponse', (), {
'message': type('Message', (), {'content': chunk['message']['content']}),
'done': chunk.get('done', False)
})
return local_stream_adapter()
else:
def run_chat():
return ollama.chat(
model=model,
messages=self.messages,
options=options
)
response_dict = await loop.run_in_executor(None, run_chat)
result = response_dict['message']['content']
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
return result
def generate(
self,
query: str = None,
user_input: str = None,
context: str = None,
stream: bool = False,
tools: list = None,
images: list = None,
model: Optional[
Literal["small", "standard", "vision", "reasoning", "tools"]
] = 'standard',
temperature: float = None,
messages: list[dict] = None,
format = None,
think = False,
force_local: bool = False
):
"""
Generate a response based on the provided query and context.
"""
model = self._prepare_messages_and_model(query, user_input, context, messages, images, model)
print(f"[generate] model after _prepare_messages_and_model: {model}")
temperature = temperature if temperature else self.options["temperature"]
if not force_local:
try:
headers = self._build_headers(model, tools, think)
options = self._get_options(temperature)
response = self._call_remote_api(model, tools, stream, options, format, headers)
if stream:
return self.read_stream(response)
else:
if isinstance(response, ChatResponse):
result = response.message.content.strip('"')
if '</think>' in result:
result = result.split('</think>')[-1]
self.messages.append({"role": "assistant", "content": result.strip('"')})
if tools and not response.message.get("tool_calls"):
pass
if not self.chat:
self.messages = [self.messages[0]]
if not think:
response.message.content = remove_thinking(response.message.content)
return response.message
else:
return "An error occurred."
except Exception as e:
traceback.print_exc()
try:
return self._call_local_ollama(model, stream, temperature)
except Exception as e:
traceback.print_exc()
return "Both remote API and local Ollama failed. An error occurred."
async def async_generate(
self,
query: str = None,
user_input: str = None,
context: str = None,
stream: bool = False,
tools: list = None,
images: list = None,
model: Optional[
Literal["small", "standard", "vision", "reasoning", "tools"]
] = 'standard',
temperature: float = None,
force_local: bool = False,
):
"""
Asynchronously generates a response based on the provided query and other parameters.
"""
model = self._prepare_messages_and_model(query, user_input, context, None, images, model)
temperature = temperature if temperature else self.options["temperature"]
if not force_local:
try:
headers = self._build_headers(model, tools, False)
options = self._get_options(temperature)
response = await self._call_remote_api_async(model, tools, stream, options, None, headers)
# You can add async-specific response handling here if needed
except Exception as e:
traceback.print_exc()
if force_local or 'response' not in locals():
try:
return await self._call_local_ollama_async(model, stream, temperature)
except Exception as e:
traceback.print_exc()
return "Both remote API and local Ollama failed. An error occurred."
def make_summary(self, text):
# Implement your summary logic using self.client.chat()
summary_message = {
"role": "user",
"content": f'Summarize the text below:\n"""{text}"""\nRemember to be concise and detailed. Answer in English.',
}
messages = [
{
"role": "system",
"content": "You are summarizing a text. Make it detailed and concise. Answer ONLY with the summary. Don't add any new information.",
},
summary_message,
]
try:
response = self.client.chat(
model=self.get_model("small"),
messages=messages,
options=Options(temperature=0.01),
keep_alive=3600 * 24 * 7,
)
summary = response.message.content.strip()
print_blue("Summary:", summary)
return summary
except ResponseError as e:
print_red("Error generating summary:", e)
return "Summary generation failed."
def read_stream(self, response):
"""
Yields tuples of (chunk_type, text). The first tuple is ('thinking', ...)
if in_thinking is True and stops at </think>. After that, yields ('normal', ...)
for the rest of the text.
"""
thinking_buffer = ""
in_thinking = self.call_model == self.get_model("reasoning")
first_chunk = True
prev_content = None
for chunk in response:
if not chunk:
continue
content = chunk.message.content
# Remove leading quote if it's the first chunk
if first_chunk and content.startswith('"'):
content = content[1:]
first_chunk = False
if in_thinking:
thinking_buffer += content
if "</think>" in thinking_buffer:
end_idx = thinking_buffer.index("</think>") + len("</think>")
yield ("thinking", thinking_buffer[:end_idx])
remaining = thinking_buffer[end_idx:].strip('"')
if chunk.done and remaining:
yield ("normal", remaining)
break
else:
prev_content = remaining
in_thinking = False
else:
if prev_content:
yield ("normal", prev_content)
prev_content = content
if chunk.done:
if prev_content and prev_content.endswith('"'):
prev_content = prev_content[:-1]
if prev_content:
yield ("normal", prev_content)
break
self.messages.append({"role": "assistant", "content": ""})
async def async_generate(
self,
query: str = None,
user_input: str = None,
context: str = None,
stream: bool = False,
tools: list = None,
images: list = None,
model: Optional[Literal["small", "standard", "vision"]] = None,
temperature: float = None,
force_local: bool = False, # New parameter to force local Ollama
):
"""
Asynchronously generates a response based on the provided query and other parameters.
Args:
query (str, optional): The query string to generate a response for.
user_input (str, optional): Additional user input to be included in the response.
context (str, optional): Context information to be used in generating the response.
stream (bool, optional): Whether to stream the response. Defaults to False.
tools (list, optional): List of tools to be used in generating the response. Will set the model to 'tools'.
images (list, optional): List of images to be included in the response.
model (Optional[Literal["small", "standard", "vision", "tools"]], optional): The model to be used for generating the response.
temperature (float, optional): The temperature setting for the model.
force_local (bool, optional): Force using local Ollama instead of remote API.
Returns:
str: The generated response or an error message if an exception occurs.
"""
print_yellow("ASYNC GENERATE")
# Prepare the model and temperature
model = self._prepare_messages_and_model(query, user_input, context, None, images, model)
temperature = temperature if temperature else self.options["temperature"]
# First try with remote API
if not force_local:
try:
headers = self._build_headers(model, tools, False)
options = self._get_options(temperature)
response = await self._call_remote_api_async(model, tools, stream, options, None, headers)
# Process response from async client
# [Rest of the response processing code as in the original method]
except Exception as e:
print_red(f"Remote API error: {str(e)}")
print_yellow("Falling back to local Ollama...")
# Fallback to local Ollama (for async we'll need to use the sync version)
if force_local or 'response' not in locals():
try:
return await self._call_local_ollama_async(model, stream, temperature)
except Exception as e:
print_red(f"Local Ollama error: {str(e)}")
return "Both remote API and local Ollama failed. An error occurred."
def prepare_images(self, images, message):
"""
Prepares a list of images by converting them to base64 encoded strings and adds them to the provided message dictionary.
Args:
images (list): A list of images, where each image can be a file path (str), a base64 encoded string (str), or bytes.
message (dict): A dictionary to which the base64 encoded images will be added under the key "images".
Returns:
dict: The updated message dictionary with the base64 encoded images added under the key "images".
Raises:
ValueError: If an image is not a string or bytes.
"""
import base64
base64_images = []
base64_pattern = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")
for image in images:
if isinstance(image, str):
if base64_pattern.match(image):
base64_images.append(image)
else:
with open(image, "rb") as image_file:
base64_images.append(
base64.b64encode(image_file.read()).decode("utf-8")
)
elif isinstance(image, bytes):
base64_images.append(base64.b64encode(image).decode("utf-8"))
else:
print_red("Invalid image type")
message["images"] = base64_images
return message
def remove_thinking(response):
"""Remove the thinking section from the response"""
response_text = response.content if hasattr(response, "content") else str(response)
if "</think>" in response_text:
return response_text.split("</think>")[1].strip()
return response_text
if __name__ == "__main__":
llm = LLM()
result = llm.generate(
query="I want to add 2 and 2",
)
print(result.content)

@ -0,0 +1,26 @@
from setuptools import setup, find_packages
setup(
name="_llm",
version="0.1.0",
packages=find_packages(),
install_requires=[
"requests",
"tiktoken",
"ollama",
"env_manager @ git+https://github.com/lasseedfast/env_manager.git",
"colorprinter @ git+https://github.com/lasseedfast/colorprinter.git",
],
author="Lasse Edfast",
author_email="lasse@edfast.se",
description="A tool for interacting with LLM models",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
url="https://github.com/lasseedfast/_llm",
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.6",
)
Loading…
Cancel
Save