Enhance ChatResponse handling: add headers support and refactor request method in LLM class

legacy
lasseedfast 5 months ago
parent ba75c7dc97
commit d3f347661b
  1. 107
      _llm/llm.py

@ -4,15 +4,38 @@ import re
import traceback
from typing import Literal, Optional
import tiktoken
import ollama
from ollama import (
Client,
AsyncClient,
ResponseError,
ChatResponse,
Options,
)
from ollama._types import ChatResponse
from pydantic import Field
class ChatResponseWithHeaders(ChatResponse):
headers: dict = Field(default_factory=dict)
def patched_request(self, cls, *args, stream=False, **kwargs):
if stream:
return self._original_request(cls, *args, stream=stream, **kwargs)
raw_response = self._request_raw(*args, **kwargs)
# Use the subclass if cls is ChatResponse
if cls.__name__ == "ChatResponse":
obj = ChatResponseWithHeaders(**raw_response.json())
else:
obj = cls(**raw_response.json())
obj.headers = dict(raw_response.headers)
return obj
ollama.Client._original_request = ollama.Client._request
ollama.Client._request = patched_request
import backoff
import env_manager
try:
from colorprinter.colorprinter.print_color import *
except ImportError:
@ -65,6 +88,7 @@ class LLM:
timeout: int = 240,
local_available: bool = False,
on_vpn: bool = False,
siltent: bool = False,
) -> None:
"""
Initialize the assistant with the given parameters.
@ -79,7 +103,7 @@ class LLM:
chosen_backend (str, optional): The backend server to use. If not provided, the least connected server is chosen.
think (bool): Whether to use thinking mode for reasoning models. Defaults to False.
on_vpn (bool): Whether the connection is over VPN and a local path to server can be used. Defaults to False.
silent (bool): If True, suppresses non-error prints. Defaults to False.
Returns:
None
"""
@ -97,6 +121,7 @@ class LLM:
self.tools = tools or []
self.local_available = local_available
self.chosen_backend = chosen_backend
self.silent = siltent
headers = {
"Authorization": f"Basic {self.get_credentials()}",
@ -111,7 +136,9 @@ class LLM:
self.host_url = f"{os.getenv('LLM_URL')}:{os.getenv('LLM_PORT')}"
else:
self.host_url = os.getenv("LLM_API_URL").rstrip("/api/chat/")
self.client: Client = Client(host=self.host_url, headers=headers, timeout=timeout)
self.client: Client = Client(
host=self.host_url, headers=headers, timeout=timeout
)
self.async_client: AsyncClient = AsyncClient()
def get_credentials(self):
@ -180,7 +207,6 @@ class LLM:
]:
model = self.get_model(model)
self.messages.append(message)
return model
@ -227,10 +253,11 @@ class LLM:
"""Call the remote Ollama API synchronously."""
self.call_model = model
self.client: Client = Client(host=self.host_url, headers=headers, timeout=300)
if self.on_vpn:
print_yellow(f"🤖 Generating using {model} (remote, on VPN)...")
else:
print_yellow(f"🤖 Generating using {model} (remote)...")
if not self.silent:
if self.on_vpn:
print_yellow(f"🤖 Generating using {model} (remote, on VPN)...")
else:
print_yellow(f"🤖 Generating using {model} (remote)...")
# If this is an embeddings model, call the embed endpoint instead of chat.
if model == self.get_model("embeddings"):
@ -244,7 +271,9 @@ class LLM:
input_text = self.messages[-1].get("content", "")
# Use the embed API (synchronous)
response = self.client.embed(model=model, input=input_text, keep_alive=3600 * 24 * 7)
response = self.client.embed(
model=model, input=input_text, keep_alive=3600 * 24 * 7
)
return response
response = self.client.chat(
@ -257,6 +286,10 @@ class LLM:
format=format,
think=think,
)
if hasattr(response, "headers") and "x-chosen-backend" in response.headers:
self.chosen_backend = response.headers["x-chosen-backend"]
print_blue(f"Backend used: {self.chosen_backend}")
self.chosen_backend = response.headers.get("x-chosen-backend", None)
return response
@backoff.on_exception(
@ -267,13 +300,14 @@ class LLM:
base=10,
on_backoff=lambda details: print_yellow(
f"Retrying due to error: {details['exception']}"
)
),
)
async def _call_remote_api_async(
self, model, tools, stream, options, format, headers, think=False
):
"""Call the remote Ollama API asynchronously."""
print_yellow(f"🤖 Generating using {model} (remote, async)...")
if not self.silent:
print_yellow(f"🤖 Generating using {model} (remote, async)...")
# If embedding model, use async embed endpoint
if model == self.get_model("embeddings"):
@ -303,8 +337,8 @@ class LLM:
def _call_local_ollama(self, model, stream, temperature, think=False):
"""Call the local Ollama instance synchronously."""
import ollama
print_yellow(f"🤖 Generating using {model} (local)...")
if not self.silent:
print_yellow(f"🤖 Generating using {model} (local)...")
options = {"temperature": temperature}
if stream:
response_stream = ollama.chat(
@ -331,8 +365,8 @@ class LLM:
(),
{
"content": content,
"thinking": thinking # Include thinking in stream chunks
}
"thinking": thinking, # Include thinking in stream chunks
},
),
"done": chunk.get("done", False),
},
@ -388,8 +422,8 @@ class LLM:
"""Call the local Ollama instance asynchronously (using a thread pool)."""
import ollama
import asyncio
print_yellow(f"🤖 Generating using {model} (local, async)...")
if not self.silent:
print_yellow(f"🤖 Generating using {model} (local, async)...")
options = {"temperature": temperature}
loop = asyncio.get_event_loop()
if stream:
@ -421,8 +455,8 @@ class LLM:
(),
{
"content": content,
"thinking": thinking # Include thinking in async stream chunks
}
"thinking": thinking, # Include thinking in async stream chunks
},
),
"done": chunk.get("done", False),
},
@ -533,11 +567,12 @@ class LLM:
headers = self._build_headers(model)
options = self._get_options(temperature)
# Call Ollama server
# Call Ollama server)
response: ChatResponse = self._call_remote_api(
model, tools, stream, options, format, headers, think=think
)
# If using embeddings model, the response is an embed result (not a ChatResponse).
if model == self.get_model("embeddings"):
return response
@ -549,14 +584,12 @@ class LLM:
# With native thinking support, content is already clean
result = response.message.content.strip('"')
self.messages.append(
{"role": "assistant", "content": result}
)
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
Warning = ("Please use reposen.message.content when ising _llm")
Warning = "Please use reposen.message.content when ising _llm"
return response
else:
return "An error occurred."
@ -632,9 +665,7 @@ class LLM:
# With native thinking support, content is already clean
result = response.message.content.strip('"')
self.messages.append(
{"role": "assistant", "content": result}
)
self.messages.append({"role": "assistant", "content": result})
if not self.chat:
self.messages = [self.messages[0]]
@ -761,23 +792,29 @@ class LLM:
if __name__ == "__main__":
# Example usage of the LLM class with thinking mode
llm = LLM()
system_message = "You are an extraction assistant. You will get one page at a time from a PDF document, and your task is to extract subcontractors and close clues from the text.\nAssume the state oil company in Uganda is UNOC even if not named. Extract subcontractors (entities or individuals contracted to perform work for UNOC) and close clues.\n\nMANDATES:\n- Do NOT hallucinate. Extract only names or clear clues present in the text.\n- Include \"clues\" such as 'awarded to', 'won the tender', 'appointed', 'supplier', 'contractor', 'consultant', 'tender', 'procured by'.\n- Return EXACT supporting text snippets (<= 300 chars) that justify each extraction.\n- Provide a brief explanation where you explain why the entity is a subcontractor.\n- Returned named entities should be real entities (companies or persons) that could plausibly be subcontractors."
llm = LLM(system_message=system_message)
prompt = "Page 1 of document:\n-----START OF PAGE-----\n**GUIDELINES FOR THE 2019 REGISTRATION ON THE NATIONAL OIL AND GAS**\n**TALENT REGISTER**\n**Welcome to the 2019 National Oil and Gas Talent Register (NOGTR).**\nThe Petroleum Authority of Uganda has developed a National Oil and Gas Talent Register\nto capture all talent that can potentially work in the oil and gas sector as required by law.\nThe NOGTR is a register classified into the demand and supply side users. The demand\nside users consist of companies/government agencies which meet the eligibility criteria\nseeking to recruit human resource across projects in the oil and gas sector and\nredeployment of the same to other sectors.\nOn the other hand, the supply side users include individuals who meet the eligibility criteria\nfor the workforce demands of the oil and gas and are likely to be recruited by the demand\nside users. The NOGTR registration process shall be maintained annually for both the\ndemand and supply side users and the 2019 process will be in line with these guidelines.\nApplicants are encouraged to visit our website and read the submission guidelines carefully before\nregistering.\n**1.** **Annual Registration Calendar**\nThe Authority shall publish the list of entities and persons willing to provide and supply\nlabour force in the oil and gas sector every 31 [st] December of the applicable year. To\nachieve this process the following timelines shall apply.\n**a) Demand side use**\n1. The window for registration shall open on the 1 [st] February 2019 and remain open\nthroughout the year.\n2. The demand side users shall post all available job openings in the Oil and Gas\nSector on the window provided for this purpose by PAU in addition to other\nchannels of advertising that they may opt to use to give wider publicity as required\nby different guidelines.\n3. The demand side users shall have access to the window for purposes of identifying\nand offering employment to any person from the supply side users who meets their\njob description. A considered supply side user shall be contacted and notified of\nthe offer.\n4. Priority shall be given to potential applicants registered on the NOGTR.\n5. The demand side users are encouraged to verify documents uploaded on the\nwindow with the awarding institutions before relying upon them to award jobs.\n6. The Authority shall publish the list of the demand side users that have qualified to\nthe system every 31 [st] December of the applicable year.\n7. The list shall be updated three times; on 31 [st] March 2019, 30 [th] June 2019 and 31 [st]\nAugust 2019 in line with the National Suppliers Database.\n8. The demand side user shall remain on the NOGTR automatically when they\nreapply on the NSD unless if they no longer meet the criteria.\n-----END OF PAGE-----\nPlease extract subcontractors as per the mandates given."
response = llm.generate(query=prompt)
print(response.message.content)
# Basic usage
result = llm.generate(
query="I want to add 2 and 2",
)
print("Basic result:", result.content)
print("Basic result:", result.message.content)
# Example with thinking mode (for reasoning models)
print("\n--- Thinking Mode Example ---")
thinking_result = llm.generate(
query="Solve this step by step: If I have 15 apples and give away 7, then buy 3 more, how many do I have?",
model="reasoning",
think=True
think=True,
)
print("Answer:", thinking_result.content)
if hasattr(thinking_result, 'thinking') and thinking_result.thinking:
print("Answer:", thinking_result.message.content)
if hasattr(thinking_result, "thinking") and thinking_result.thinking:
print("Model's reasoning:", thinking_result.thinking)
# Example with streaming and thinking
@ -786,11 +823,13 @@ if __name__ == "__main__":
query="Write a short explanation of photosynthesis",
model="reasoning",
stream=True,
think=True
think=True,
):
if chunk_type == "thinking":
# Use print with blue color escape codes since print_blue doesn't support 'end' parameter
print(f"\033[94m {chunk_content}\033[0m", end="") # Show reasoning process in blue
print(
f"\033[94m {chunk_content}\033[0m", end=""
) # Show reasoning process in blue
elif chunk_type == "content":
print(chunk_content, end="") # Show final answer
elif chunk_type == "thinking_complete":

Loading…
Cancel
Save