summaryrefslogtreecommitdiff
path: root/backend/app/services/llm.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/app/services/llm.py')
-rw-r--r--backend/app/services/llm.py159
1 files changed, 120 insertions, 39 deletions
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py
index b372f9e..96b0514 100644
--- a/backend/app/services/llm.py
+++ b/backend/app/services/llm.py
@@ -1,5 +1,5 @@
import os
-from typing import AsyncGenerator
+from typing import AsyncGenerator, List, Dict, Any, Optional
import openai
import google.generativeai as genai
from app.schemas import LLMConfig, Message, Role, Context
@@ -23,8 +23,15 @@ def configure_google(api_key: str = None):
raise ValueError("Google API Key not found")
genai.configure(api_key=key)
-async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]:
+async def stream_openai(
+ messages: list[Message],
+ config: LLMConfig,
+ attachments: Optional[List[Dict[str, Any]]] = None,
+ tools: Optional[List[Dict[str, Any]]] = None,
+) -> AsyncGenerator[str, None]:
client = get_openai_client(config.api_key)
+ attachments = attachments or []
+ tools = tools or []
# Convert internal Message schema to OpenAI format
openai_messages = []
@@ -38,17 +45,23 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
responses_only_models = ['gpt-5-pro']
# Models that CAN use Responses API (and thus support web_search tool)
+ model_lower = config.model_name.lower()
responses_capable_models = [
- 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano',
- 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3'
+ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano',
+ 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3',
+ 'o1', 'o1-preview', 'o1-mini',
+ 'gpt-4o', 'gpt-4o-mini', 'gpt-4o-realtime', 'gpt-4o-mini-tts'
]
# Use Responses API if:
# 1. Model ONLY supports Responses API, OR
# 2. User wants web search AND model is capable of Responses API
+ # 3. Attachments are present (Responses supports input_file)
use_responses_api = (
config.model_name in responses_only_models or
- (config.enable_google_search and config.model_name in responses_capable_models)
+ (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
+ (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
+ (tools)
)
if use_responses_api:
@@ -56,25 +69,50 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
# yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n"
# Use new client.responses.create API with Polling Strategy
- # Convert messages to Responses API format (same as Chat Completions)
- # Responses API accepts input as array of message objects
-
- # Filter out system messages (use instructions instead) and format for Responses API
+ # Build Responses API input
input_messages = []
for msg in openai_messages:
- if msg['role'] != 'system': # System prompt goes to instructions
- input_messages.append({
- "role": msg['role'],
- "content": msg['content']
+ if msg['role'] == 'system':
+ continue # goes to instructions
+ # User messages use input_text, assistant messages use output_text
+ content_type = "input_text" if msg['role'] == 'user' else "output_text"
+ input_messages.append({
+ "role": msg['role'],
+ "content": [
+ {
+ "type": content_type,
+ "text": msg['content']
+ }
+ ]
+ })
+
+ # Append attachments as separate user message (files only)
+ file_parts = []
+ for att in attachments:
+ if att.get("provider") == "openai" and att.get("file_id"):
+ file_parts.append({
+ "type": "input_file",
+ "file_id": att["file_id"]
})
+ if file_parts:
+ input_messages.append({
+ "role": "user",
+ "content": file_parts
+ })
resp_params = {
"model": config.model_name,
"input": input_messages, # Full conversation history
- "stream": False, # Disable stream to get immediate ID
- "background": True, # Enable background mode for async execution
- "store": True
+ "stream": False, # Get full output in one call
+ "background": False,
+ "store": True,
+ "tool_choice": "auto",
}
+ if tools:
+ resp_params["tools"] = tools
+ resp_params["tool_choice"] = "auto"
+ # Optional: include results for debugging / citations
+ resp_params["include"] = ["file_search_call.results"]
# Add reasoning effort (not supported by chat-latest models)
models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest']
@@ -82,28 +120,40 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
resp_params["reasoning"] = {"effort": config.reasoning_effort.value}
# Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag)
+ # IMPORTANT: Append to existing tools instead of overwriting
if config.enable_google_search:
- resp_params["tools"] = [{"type": "web_search"}]
+ if resp_params.get("tools"):
+ resp_params["tools"].append({"type": "web_search"})
+ else:
+ resp_params["tools"] = [{"type": "web_search"}]
resp_params["tool_choice"] = "auto"
- # Debugging tool injection
- # yield "[Debug: Web Search Tool Injected]" # Uncomment to debug
if config.system_prompt:
resp_params["instructions"] = config.system_prompt
+
+ # Debug: print final tools being sent
+ print(f"[responses debug] final tools: {resp_params.get('tools')}")
- # 1. Create Response (Async/Background)
- # This returns a Response object immediately with status 'queued' or 'in_progress'
+ # 1. Create Response (non-background)
initial_resp = await client.responses.create(**resp_params)
response_id = initial_resp.id
-
+
# 2. Poll for Completion
import asyncio
- # Poll for up to 10 minutes
- for _ in range(300):
+ for _ in range(300):
final_resp = await client.responses.retrieve(response_id)
-
+
if final_resp.status == 'completed':
- # Parse final response object
+ # Debug: log outputs and tool calls
+ try:
+ outs = getattr(final_resp, "output", [])
+ print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}")
+ for o in outs:
+ if getattr(o, "type", None) == "file_search_call":
+ print(f"[responses debug] file_search_call: {o}")
+ except Exception as e:
+ print(f"[responses debug] failed to inspect output: {e}")
+
found_content = False
if hasattr(final_resp, 'output'):
for out in final_resp.output:
@@ -128,13 +178,16 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]"
return
- # Still in_progress
await asyncio.sleep(2)
yield "\n[Error: Polling timed out]"
return
- # Standard Chat Completions API
+ # Standard Chat Completions API (attachments not supported here)
+ if attachments:
+ yield "[Error] Attachments are only supported for Responses API-capable models."
+ return
+
# Prepare parameters
req_params = {
"model": config.model_name,
@@ -175,7 +228,8 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
elif getattr(delta, 'refusal', None):
yield f"[Refusal: {delta.refusal}]"
-async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]:
+async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]:
+ attachments = attachments or []
# Use new Google GenAI SDK (google-genai)
from google import genai
from google.genai import types
@@ -200,6 +254,34 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene
tools=tools
)
+ # If attachments present, send as a single generate_content call (non-streaming)
+ if attachments:
+ parts = []
+ for att in attachments:
+ uri = att.get("uri")
+ mime = att.get("mime") or "application/octet-stream"
+ if uri:
+ try:
+ parts.append(types.Part.from_uri(uri, mime_type=mime))
+ except Exception:
+ parts.append(types.Part(text=f"[file attached: {uri}]"))
+ for msg in messages:
+ parts.append(types.Part(text=msg.content))
+ print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}")
+ try:
+ response = await client.aio.models.generate_content(
+ model=config.model_name,
+ contents=[types.Content(role="user", parts=parts)],
+ config=gen_config
+ )
+ if response and getattr(response, "text", None):
+ yield response.text
+ else:
+ yield "[Error] Gemini response returned no text."
+ except Exception as e:
+ yield f"[Error] Gemini call failed: {str(e)}"
+ return
+
# Prepare History
# Extract last message as the prompt
prompt_msg = "..."
@@ -223,13 +305,6 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene
config=gen_config
)
- # Streaming call
- # In google-genai SDK, streaming is usually via send_message_stream
-
- # Check if send_message_stream exists, otherwise use send_message with stream=True (but error says no)
- # Let's assume send_message_stream is the way.
-
- # Note: chat_session.send_message_stream returns an AsyncIterator (or a coroutine returning one)
response_stream = await chat_session.send_message_stream(prompt_msg)
async for chunk in response_stream:
@@ -237,7 +312,13 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene
if chunk.text:
yield chunk.text
-async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> AsyncGenerator[str, None]:
+async def llm_streamer(
+ context: Context,
+ user_prompt: str,
+ config: LLMConfig,
+ attachments: List[Dict[str, Any]] | None = None,
+ tools: List[Dict[str, Any]] | None = None,
+) -> AsyncGenerator[str, None]:
# 1. Merge Context + New User Prompt
# We create a temporary list of messages for this inference
messages_to_send = context.messages.copy()
@@ -253,10 +334,10 @@ async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) ->
# 2. Call Provider
try:
if config.provider == "openai":
- async for chunk in stream_openai(messages_to_send, config):
+ async for chunk in stream_openai(messages_to_send, config, attachments, tools):
yield chunk
elif config.provider == "google":
- async for chunk in stream_google(messages_to_send, config):
+ async for chunk in stream_google(messages_to_send, config, attachments):
yield chunk
else:
yield f"Error: Unsupported provider {config.provider}"