diff options
Diffstat (limited to 'backend/app/services/llm.py')
| -rw-r--r-- | backend/app/services/llm.py | 159 |
1 files changed, 120 insertions, 39 deletions
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py index b372f9e..96b0514 100644 --- a/backend/app/services/llm.py +++ b/backend/app/services/llm.py @@ -1,5 +1,5 @@ import os -from typing import AsyncGenerator +from typing import AsyncGenerator, List, Dict, Any, Optional import openai import google.generativeai as genai from app.schemas import LLMConfig, Message, Role, Context @@ -23,8 +23,15 @@ def configure_google(api_key: str = None): raise ValueError("Google API Key not found") genai.configure(api_key=key) -async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: +async def stream_openai( + messages: list[Message], + config: LLMConfig, + attachments: Optional[List[Dict[str, Any]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[str, None]: client = get_openai_client(config.api_key) + attachments = attachments or [] + tools = tools or [] # Convert internal Message schema to OpenAI format openai_messages = [] @@ -38,17 +45,23 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene responses_only_models = ['gpt-5-pro'] # Models that CAN use Responses API (and thus support web_search tool) + model_lower = config.model_name.lower() responses_capable_models = [ - 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', - 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3' + 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', + 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', + 'o1', 'o1-preview', 'o1-mini', + 'gpt-4o', 'gpt-4o-mini', 'gpt-4o-realtime', 'gpt-4o-mini-tts' ] # Use Responses API if: # 1. Model ONLY supports Responses API, OR # 2. User wants web search AND model is capable of Responses API + # 3. Attachments are present (Responses supports input_file) use_responses_api = ( config.model_name in responses_only_models or - (config.enable_google_search and config.model_name in responses_capable_models) + (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or + (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or + (tools) ) if use_responses_api: @@ -56,25 +69,50 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene # yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n" # Use new client.responses.create API with Polling Strategy - # Convert messages to Responses API format (same as Chat Completions) - # Responses API accepts input as array of message objects - - # Filter out system messages (use instructions instead) and format for Responses API + # Build Responses API input input_messages = [] for msg in openai_messages: - if msg['role'] != 'system': # System prompt goes to instructions - input_messages.append({ - "role": msg['role'], - "content": msg['content'] + if msg['role'] == 'system': + continue # goes to instructions + # User messages use input_text, assistant messages use output_text + content_type = "input_text" if msg['role'] == 'user' else "output_text" + input_messages.append({ + "role": msg['role'], + "content": [ + { + "type": content_type, + "text": msg['content'] + } + ] + }) + + # Append attachments as separate user message (files only) + file_parts = [] + for att in attachments: + if att.get("provider") == "openai" and att.get("file_id"): + file_parts.append({ + "type": "input_file", + "file_id": att["file_id"] }) + if file_parts: + input_messages.append({ + "role": "user", + "content": file_parts + }) resp_params = { "model": config.model_name, "input": input_messages, # Full conversation history - "stream": False, # Disable stream to get immediate ID - "background": True, # Enable background mode for async execution - "store": True + "stream": False, # Get full output in one call + "background": False, + "store": True, + "tool_choice": "auto", } + if tools: + resp_params["tools"] = tools + resp_params["tool_choice"] = "auto" + # Optional: include results for debugging / citations + resp_params["include"] = ["file_search_call.results"] # Add reasoning effort (not supported by chat-latest models) models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest'] @@ -82,28 +120,40 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene resp_params["reasoning"] = {"effort": config.reasoning_effort.value} # Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag) + # IMPORTANT: Append to existing tools instead of overwriting if config.enable_google_search: - resp_params["tools"] = [{"type": "web_search"}] + if resp_params.get("tools"): + resp_params["tools"].append({"type": "web_search"}) + else: + resp_params["tools"] = [{"type": "web_search"}] resp_params["tool_choice"] = "auto" - # Debugging tool injection - # yield "[Debug: Web Search Tool Injected]" # Uncomment to debug if config.system_prompt: resp_params["instructions"] = config.system_prompt + + # Debug: print final tools being sent + print(f"[responses debug] final tools: {resp_params.get('tools')}") - # 1. Create Response (Async/Background) - # This returns a Response object immediately with status 'queued' or 'in_progress' + # 1. Create Response (non-background) initial_resp = await client.responses.create(**resp_params) response_id = initial_resp.id - + # 2. Poll for Completion import asyncio - # Poll for up to 10 minutes - for _ in range(300): + for _ in range(300): final_resp = await client.responses.retrieve(response_id) - + if final_resp.status == 'completed': - # Parse final response object + # Debug: log outputs and tool calls + try: + outs = getattr(final_resp, "output", []) + print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}") + for o in outs: + if getattr(o, "type", None) == "file_search_call": + print(f"[responses debug] file_search_call: {o}") + except Exception as e: + print(f"[responses debug] failed to inspect output: {e}") + found_content = False if hasattr(final_resp, 'output'): for out in final_resp.output: @@ -128,13 +178,16 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]" return - # Still in_progress await asyncio.sleep(2) yield "\n[Error: Polling timed out]" return - # Standard Chat Completions API + # Standard Chat Completions API (attachments not supported here) + if attachments: + yield "[Error] Attachments are only supported for Responses API-capable models." + return + # Prepare parameters req_params = { "model": config.model_name, @@ -175,7 +228,8 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene elif getattr(delta, 'refusal', None): yield f"[Refusal: {delta.refusal}]" -async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: +async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]: + attachments = attachments or [] # Use new Google GenAI SDK (google-genai) from google import genai from google.genai import types @@ -200,6 +254,34 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene tools=tools ) + # If attachments present, send as a single generate_content call (non-streaming) + if attachments: + parts = [] + for att in attachments: + uri = att.get("uri") + mime = att.get("mime") or "application/octet-stream" + if uri: + try: + parts.append(types.Part.from_uri(uri, mime_type=mime)) + except Exception: + parts.append(types.Part(text=f"[file attached: {uri}]")) + for msg in messages: + parts.append(types.Part(text=msg.content)) + print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}") + try: + response = await client.aio.models.generate_content( + model=config.model_name, + contents=[types.Content(role="user", parts=parts)], + config=gen_config + ) + if response and getattr(response, "text", None): + yield response.text + else: + yield "[Error] Gemini response returned no text." + except Exception as e: + yield f"[Error] Gemini call failed: {str(e)}" + return + # Prepare History # Extract last message as the prompt prompt_msg = "..." @@ -223,13 +305,6 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene config=gen_config ) - # Streaming call - # In google-genai SDK, streaming is usually via send_message_stream - - # Check if send_message_stream exists, otherwise use send_message with stream=True (but error says no) - # Let's assume send_message_stream is the way. - - # Note: chat_session.send_message_stream returns an AsyncIterator (or a coroutine returning one) response_stream = await chat_session.send_message_stream(prompt_msg) async for chunk in response_stream: @@ -237,7 +312,13 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene if chunk.text: yield chunk.text -async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> AsyncGenerator[str, None]: +async def llm_streamer( + context: Context, + user_prompt: str, + config: LLMConfig, + attachments: List[Dict[str, Any]] | None = None, + tools: List[Dict[str, Any]] | None = None, +) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference messages_to_send = context.messages.copy() @@ -253,10 +334,10 @@ async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> # 2. Call Provider try: if config.provider == "openai": - async for chunk in stream_openai(messages_to_send, config): + async for chunk in stream_openai(messages_to_send, config, attachments, tools): yield chunk elif config.provider == "google": - async for chunk in stream_google(messages_to_send, config): + async for chunk in stream_google(messages_to_send, config, attachments): yield chunk else: yield f"Error: Unsupported provider {config.provider}" |
