diff options
Diffstat (limited to 'backend/app/services')
| -rw-r--r-- | backend/app/services/llm.py | 493 |
1 files changed, 442 insertions, 51 deletions
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py index 958ab4c..660a69d 100644 --- a/backend/app/services/llm.py +++ b/backend/app/services/llm.py @@ -1,21 +1,21 @@ import os -from typing import AsyncGenerator +from typing import AsyncGenerator, List, Dict, Any, Optional import openai import google.generativeai as genai from app.schemas import LLMConfig, Message, Role, Context -# Simple in-memory cache for clients to avoid re-initializing constantly +# Cache OpenAI clients by API key to avoid re-initializing constantly # In a real app, use dependency injection or singletons -_openai_client = None +_openai_clients: dict[str, openai.AsyncOpenAI] = {} def get_openai_client(api_key: str = None): - global _openai_client + global _openai_clients key = api_key or os.getenv("OPENAI_API_KEY") if not key: raise ValueError("OpenAI API Key not found") - if not _openai_client: - _openai_client = openai.AsyncOpenAI(api_key=key) - return _openai_client + if key not in _openai_clients: + _openai_clients[key] = openai.AsyncOpenAI(api_key=key) + return _openai_clients[key] def configure_google(api_key: str = None): key = api_key or os.getenv("GOOGLE_API_KEY") @@ -23,8 +23,15 @@ def configure_google(api_key: str = None): raise ValueError("Google API Key not found") genai.configure(api_key=key) -async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: +async def stream_openai( + messages: list[Message], + config: LLMConfig, + attachments: Optional[List[Dict[str, Any]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[str, None]: client = get_openai_client(config.api_key) + attachments = attachments or [] + tools = tools or [] # Convert internal Message schema to OpenAI format openai_messages = [] @@ -34,61 +41,290 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) - stream = await client.chat.completions.create( - model=config.model_name, - messages=openai_messages, - temperature=config.temperature, - max_tokens=config.max_tokens, - stream=True + # Models that ONLY support Responses API (no Chat Completions fallback) + responses_only_models = ['gpt-5-pro'] + + # Models that CAN use Responses API (and thus support web_search tool) + model_lower = config.model_name.lower() + responses_capable_models = [ + 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', + 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', + 'o1', 'o1-preview', 'o1-mini', + 'gpt-4o', 'gpt-4o-mini', 'gpt-4o-realtime', 'gpt-4o-mini-tts' + ] + + # Use Responses API if: + # 1. Model ONLY supports Responses API, OR + # 2. User wants web search AND model is capable of Responses API + # 3. Attachments are present (Responses supports input_file) + use_responses_api = ( + config.model_name in responses_only_models or + (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or + (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or + (tools) ) + if use_responses_api: + # Debug: Confirm config reception + # yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n" + + # Use new client.responses.create API with Polling Strategy + # Build Responses API input + input_messages = [] + for msg in openai_messages: + if msg['role'] == 'system': + continue # goes to instructions + # User messages use input_text, assistant messages use output_text + content_type = "input_text" if msg['role'] == 'user' else "output_text" + input_messages.append({ + "role": msg['role'], + "content": [ + { + "type": content_type, + "text": msg['content'] + } + ] + }) + + # Append attachments as separate user message (files only) + file_parts = [] + for att in attachments: + if att.get("provider") == "openai" and att.get("file_id"): + file_parts.append({ + "type": "input_file", + "file_id": att["file_id"] + }) + if file_parts: + input_messages.append({ + "role": "user", + "content": file_parts + }) + + resp_params = { + "model": config.model_name, + "input": input_messages, # Full conversation history + "stream": False, # Get full output in one call + "background": False, + "store": True, + "tool_choice": "auto", + } + if tools: + resp_params["tools"] = tools + resp_params["tool_choice"] = "auto" + # Optional: include results for debugging / citations + resp_params["include"] = ["file_search_call.results"] + + # Add reasoning effort (not supported by chat-latest models) + models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest'] + if config.model_name not in models_without_effort: + resp_params["reasoning"] = {"effort": config.reasoning_effort.value} + + # Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag) + # IMPORTANT: Append to existing tools instead of overwriting + if config.enable_google_search: + if resp_params.get("tools"): + resp_params["tools"].append({"type": "web_search"}) + else: + resp_params["tools"] = [{"type": "web_search"}] + resp_params["tool_choice"] = "auto" + + if config.system_prompt: + resp_params["instructions"] = config.system_prompt + + # Debug: print final tools being sent + print(f"[responses debug] final tools: {resp_params.get('tools')}") + + # 1. Create Response (non-background) + initial_resp = await client.responses.create(**resp_params) + response_id = initial_resp.id + + # 2. Poll for Completion + import asyncio + for _ in range(300): + final_resp = await client.responses.retrieve(response_id) + + if final_resp.status == 'completed': + # Debug: log outputs and tool calls + try: + outs = getattr(final_resp, "output", []) + print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}") + for o in outs: + if getattr(o, "type", None) == "file_search_call": + print(f"[responses debug] file_search_call: {o}") + except Exception as e: + print(f"[responses debug] failed to inspect output: {e}") + + found_content = False + if hasattr(final_resp, 'output'): + for out in final_resp.output: + out_type = getattr(out, 'type', None) + out_content = getattr(out, 'content', None) + print(f"[responses debug] output item: type={out_type}, content={out_content}") + + if out_type == 'message' and out_content: + for c in out_content: + c_type = getattr(c, 'type', None) + c_text = getattr(c, 'text', None) + print(f"[responses debug] content item: type={c_type}, text={c_text[:100] if c_text else None}...") + if c_type == 'output_text': + text_val = getattr(c, 'text', None) + if text_val: + print(f"[responses debug] YIELDING text: {text_val[:50]}...") + yield text_val + print(f"[responses debug] YIELDED successfully") + found_content = True + + if not found_content: + print(f"[responses debug] No content found! final_resp.output={final_resp.output}") + yield f"\n[Debug: Completed but no content extracted]" + return + + elif final_resp.status in ['failed', 'cancelled', 'expired']: + error_msg = getattr(final_resp, 'error', 'Unknown error') + yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]" + return + + await asyncio.sleep(2) + + yield "\n[Error: Polling timed out]" + return + + # Standard Chat Completions API (attachments not supported here) + if attachments: + yield "[Error] Attachments are only supported for Responses API-capable models." + return + + # Prepare parameters + req_params = { + "model": config.model_name, + "messages": openai_messages, + "stream": True + } + + # Identify reasoning models + is_reasoning_model = config.model_name in [ + 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', + 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', + 'o1', 'o1-mini', 'o1-preview' + ] + + if is_reasoning_model: + # Reasoning models use max_completion_tokens + if config.max_tokens: + req_params["max_completion_tokens"] = config.max_tokens + # IMPORTANT: Reasoning models often DO NOT support 'temperature'. + # We skip adding it. + else: + req_params["max_tokens"] = config.max_tokens + req_params["temperature"] = config.temperature + + stream = await client.chat.completions.create(**req_params) + async for chunk in stream: - if chunk.choices[0].delta.content: - yield chunk.choices[0].delta.content + if chunk.choices and chunk.choices[0].delta: + delta = chunk.choices[0].delta + if delta.content: + yield delta.content + elif delta.tool_calls: + # If the model tries to call a tool (even if we didn't send any?) + # This shouldn't happen unless we sent tools. + # But let's notify the user. + # Or maybe it's just an empty delta at the start/end. + pass + elif getattr(delta, 'refusal', None): + yield f"[Refusal: {delta.refusal}]" -async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: - configure_google(config.api_key) - model = genai.GenerativeModel(config.model_name) +async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]: + attachments = attachments or [] + # Use new Google GenAI SDK (google-genai) + from google import genai + from google.genai import types - # Google Generative AI history format: - # [{"role": "user", "parts": ["..."]}, {"role": "model", "parts": ["..."]}] - # System prompt is usually set on model init or prepended. + key = config.api_key or os.getenv("GOOGLE_API_KEY") + if not key: + raise ValueError("Google API Key not found") + + client = genai.Client(api_key=key) - history = [] - # If system prompt exists, we might prepend it to the first user message or use specific system instruction if supported - # Gemini 1.5 Pro supports system instructions. For simplicity, let's prepend to history if possible or context. + # Configure Tools (Google Search) + tools = None + if config.enable_google_search: + # Enable Google Search Grounding + tools = [types.Tool(google_search=types.GoogleSearch())] + + # Configure Generation + gen_config = types.GenerateContentConfig( + temperature=config.temperature, + max_output_tokens=config.max_tokens, + system_instruction=config.system_prompt, + tools=tools + ) - system_instruction = config.system_prompt - if system_instruction: - model = genai.GenerativeModel(config.model_name, system_instruction=system_instruction) + # If attachments present, send as a single generate_content call (non-streaming) + if attachments: + parts = [] + for att in attachments: + uri = att.get("uri") + mime = att.get("mime") or "application/octet-stream" + if uri: + try: + parts.append(types.Part.from_uri(uri, mime_type=mime)) + except Exception: + parts.append(types.Part(text=f"[file attached: {uri}]")) + for msg in messages: + parts.append(types.Part(text=msg.content)) + print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}") + try: + response = await client.aio.models.generate_content( + model=config.model_name, + contents=[types.Content(role="user", parts=parts)], + config=gen_config + ) + if response and getattr(response, "text", None): + yield response.text + else: + yield "[Error] Gemini response returned no text." + except Exception as e: + yield f"[Error] Gemini call failed: {str(e)}" + return - # Convert messages - # Note: Gemini strictly requires user/model alternation in history usually. - # We will need to handle this. For MVP, we assume the input is clean or we blindly map. - for msg in messages: + # Prepare History + # Extract last message as the prompt + prompt_msg = "..." + history_msgs = messages + if messages and messages[-1].role == Role.USER: + prompt_msg = messages[-1].content + history_msgs = messages[:-1] + + history_content = [] + for msg in history_msgs: role = "user" if msg.role == Role.USER else "model" - history.append({"role": role, "parts": [msg.content]}) - - # The last message should be the prompt, strictly speaking, `chat.send_message` takes the new message - # But if we are treating everything as history... - # Let's separate the last user message as the prompt if possible. + history_content.append(types.Content( + role=role, + parts=[types.Part(text=msg.content)] + )) + + # Use Async Client via .aio + chat_session = client.aio.chats.create( + model=config.model_name, + history=history_content, + config=gen_config + ) + + response_stream = await chat_session.send_message_stream(prompt_msg) - if history and history[-1]["role"] == "user": - last_msg = history.pop() - chat = model.start_chat(history=history) - response_stream = await chat.send_message_async(last_msg["parts"][0], stream=True) - else: - # If the last message is not user, we might be in a weird state. - # Just send an empty prompt or handle error? - # For now, assume the user always provides a prompt in the node. - chat = model.start_chat(history=history) - response_stream = await chat.send_message_async("...", stream=True) # Fallback - async for chunk in response_stream: + # Access text safely if chunk.text: yield chunk.text -async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> AsyncGenerator[str, None]: +async def llm_streamer( + context: Context, + user_prompt: str, + config: LLMConfig, + attachments: List[Dict[str, Any]] | None = None, + tools: List[Dict[str, Any]] | None = None, +) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference messages_to_send = context.messages.copy() @@ -104,13 +340,168 @@ async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> # 2. Call Provider try: if config.provider == "openai": - async for chunk in stream_openai(messages_to_send, config): + async for chunk in stream_openai(messages_to_send, config, attachments, tools): yield chunk elif config.provider == "google": - async for chunk in stream_google(messages_to_send, config): + async for chunk in stream_google(messages_to_send, config, attachments): yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: yield f"Error calling LLM: {str(e)}" + +async def generate_title(user_prompt: str, response: str, api_key: str = None) -> str: + """ + Generate a short title (3-4 words) for a Q-A pair using gpt-5-nano. + Uses Responses API (required for gpt-5 series), synchronous mode (no background). + """ + client = get_openai_client(api_key) + + instructions = """TASK: Extract a short topic title from the given Q&A. Do NOT answer the question - only extract the topic. + +Rules: +- Output 2-3 short words OR 2 longer words +- No punctuation, no quotes, no explanation +- Capitalize each word +- Be specific to the topic discussed +- Output ONLY the title, nothing else + +Examples: +Q: "How to sort a list in Python?" -> "Python Sorting" +Q: "What is React state?" -> "React State" +Q: "Explain AWS Lambda pricing" -> "Lambda Pricing" +Q: "Who are you?" -> "AI Identity" +Q: "What's the weather in NYC?" -> "NYC Weather\"""" + + # Truncate to avoid token limits + truncated_prompt = user_prompt[:300] if len(user_prompt) > 300 else user_prompt + truncated_response = response[:300] if len(response) > 300 else response + + input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}" + + try: + print(f"[generate_title] Called with prompt: {truncated_prompt[:50]}...") + + # Use Responses API for gpt-5-nano (synchronous, no background) + # Note: max_output_tokens includes reasoning tokens, so needs to be higher + resp = await client.responses.create( + model="gpt-5-nano", + input=input_text, + instructions=instructions, + max_output_tokens=500, # Higher to accommodate reasoning tokens + reasoning={"effort": "low"}, # Minimize reasoning for simple task + stream=False + ) + + print(f"[generate_title] Response status: {getattr(resp, 'status', 'unknown')}") + print(f"[generate_title] Response output: {getattr(resp, 'output', 'no output')}") + + # Response should be completed immediately (no polling needed) + if hasattr(resp, 'output'): + for out in resp.output: + if getattr(out, 'type', None) == 'message': + content = getattr(out, 'content', []) + for c in content: + if getattr(c, 'type', None) == 'output_text': + title = getattr(c, 'text', '').strip() + # Clean up + title = title.strip('"\'') + print(f"[generate_title] Extracted title: {title}") + if title: + return title + + print("[generate_title] No title found, returning default") + return "New Question" + + except Exception as e: + print(f"Title generation error: {e}") + return "New Question" + + +async def summarize_content(content: str, model: str, openai_api_key: str = None, gemini_api_key: str = None) -> str: + """ + Summarize the given content using the specified model. + Supports both OpenAI and Gemini models. + """ + instructions = """Summarize the following content concisely. +Keep the key points and main ideas. +Output only the summary, no preamble.""" + + # Truncate very long content + max_content = 8000 + if len(content) > max_content: + content = content[:max_content] + "\n\n[Content truncated...]" + + try: + if model.startswith('gemini'): + # Use Gemini + from google import genai + from google.genai import types + import os + + key = gemini_api_key or os.getenv("GOOGLE_API_KEY") + if not key: + return "Error: Google API Key not found" + + client = genai.Client(api_key=key) + + gen_config = types.GenerateContentConfig( + temperature=0.3, + max_output_tokens=1000, + system_instruction=instructions + ) + + response = await client.aio.models.generate_content( + model=model, + contents=content, + config=gen_config + ) + + return response.text or "No summary generated" + + else: + # Use OpenAI + client = get_openai_client(openai_api_key) + + # Check if model needs Responses API + responses_api_models = [ + 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', + 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3' + ] + + if model in responses_api_models: + # Use Responses API + resp = await client.responses.create( + model=model, + input=content, + instructions=instructions, + max_output_tokens=2000, + stream=False + ) + + if hasattr(resp, 'output'): + for out in resp.output: + if getattr(out, 'type', None) == 'message': + for c in getattr(out, 'content', []): + if getattr(c, 'type', None) == 'output_text': + return getattr(c, 'text', '') or "No summary generated" + + return "No summary generated" + else: + # Use Chat Completions API + result = await client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": instructions}, + {"role": "user", "content": content} + ], + max_tokens=1000, + temperature=0.3 + ) + + return result.choices[0].message.content or "No summary generated" + + except Exception as e: + print(f"Summarization error: {e}") + return f"Error: {str(e)}" |
