import os from typing import AsyncGenerator import openai import google.generativeai as genai from app.schemas import LLMConfig, Message, Role, Context # Simple in-memory cache for clients to avoid re-initializing constantly # In a real app, use dependency injection or singletons _openai_client = None def get_openai_client(api_key: str = None): global _openai_client key = api_key or os.getenv("OPENAI_API_KEY") if not key: raise ValueError("OpenAI API Key not found") if not _openai_client: _openai_client = openai.AsyncOpenAI(api_key=key) return _openai_client def configure_google(api_key: str = None): key = api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Google API Key not found") genai.configure(api_key=key) async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: client = get_openai_client(config.api_key) # Convert internal Message schema to OpenAI format openai_messages = [] if config.system_prompt: openai_messages.append({"role": "system", "content": config.system_prompt}) for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) # Models that ONLY support Responses API (no Chat Completions fallback) responses_only_models = ['gpt-5-pro'] # Models that CAN use Responses API (and thus support web_search tool) responses_capable_models = [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3' ] # Use Responses API if: # 1. Model ONLY supports Responses API, OR # 2. User wants web search AND model is capable of Responses API use_responses_api = ( config.model_name in responses_only_models or (config.enable_google_search and config.model_name in responses_capable_models) ) if use_responses_api: # Debug: Confirm config reception # yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n" # Use new client.responses.create API with Polling Strategy # Convert messages to Responses API format (same as Chat Completions) # Responses API accepts input as array of message objects # Filter out system messages (use instructions instead) and format for Responses API input_messages = [] for msg in openai_messages: if msg['role'] != 'system': # System prompt goes to instructions input_messages.append({ "role": msg['role'], "content": msg['content'] }) resp_params = { "model": config.model_name, "input": input_messages, # Full conversation history "stream": False, # Disable stream to get immediate ID "background": True, # Enable background mode for async execution "store": True } # Add reasoning effort (not supported by chat-latest models) models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest'] if config.model_name not in models_without_effort: resp_params["reasoning"] = {"effort": config.reasoning_effort.value} # Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag) if config.enable_google_search: resp_params["tools"] = [{"type": "web_search"}] resp_params["tool_choice"] = "auto" # Debugging tool injection # yield "[Debug: Web Search Tool Injected]" # Uncomment to debug if config.system_prompt: resp_params["instructions"] = config.system_prompt # 1. Create Response (Async/Background) # This returns a Response object immediately with status 'queued' or 'in_progress' initial_resp = await client.responses.create(**resp_params) response_id = initial_resp.id # 2. Poll for Completion import asyncio # Poll for up to 10 minutes for _ in range(300): final_resp = await client.responses.retrieve(response_id) if final_resp.status == 'completed': # Parse final response object found_content = False if hasattr(final_resp, 'output'): for out in final_resp.output: out_type = getattr(out, 'type', None) out_content = getattr(out, 'content', None) if out_type == 'message' and out_content: for c in out_content: c_type = getattr(c, 'type', None) if c_type == 'output_text': text_val = getattr(c, 'text', None) if text_val: yield text_val found_content = True if not found_content: yield f"\n[Debug: Completed but no content. Resp: {final_resp}]" return elif final_resp.status in ['failed', 'cancelled', 'expired']: error_msg = getattr(final_resp, 'error', 'Unknown error') yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]" return # Still in_progress await asyncio.sleep(2) yield "\n[Error: Polling timed out]" return # Standard Chat Completions API # Prepare parameters req_params = { "model": config.model_name, "messages": openai_messages, "stream": True } # Identify reasoning models is_reasoning_model = config.model_name in [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', 'o1', 'o1-mini', 'o1-preview' ] if is_reasoning_model: # Reasoning models use max_completion_tokens if config.max_tokens: req_params["max_completion_tokens"] = config.max_tokens # IMPORTANT: Reasoning models often DO NOT support 'temperature'. # We skip adding it. else: req_params["max_tokens"] = config.max_tokens req_params["temperature"] = config.temperature stream = await client.chat.completions.create(**req_params) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: delta = chunk.choices[0].delta if delta.content: yield delta.content elif delta.tool_calls: # If the model tries to call a tool (even if we didn't send any?) # This shouldn't happen unless we sent tools. # But let's notify the user. # Or maybe it's just an empty delta at the start/end. pass elif getattr(delta, 'refusal', None): yield f"[Refusal: {delta.refusal}]" async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: # Use new Google GenAI SDK (google-genai) from google import genai from google.genai import types key = config.api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Google API Key not found") client = genai.Client(api_key=key) # Configure Tools (Google Search) tools = None if config.enable_google_search: # Enable Google Search Grounding tools = [types.Tool(google_search=types.GoogleSearch())] # Configure Generation gen_config = types.GenerateContentConfig( temperature=config.temperature, max_output_tokens=config.max_tokens, system_instruction=config.system_prompt, tools=tools ) # Prepare History # Extract last message as the prompt prompt_msg = "..." history_msgs = messages if messages and messages[-1].role == Role.USER: prompt_msg = messages[-1].content history_msgs = messages[:-1] history_content = [] for msg in history_msgs: role = "user" if msg.role == Role.USER else "model" history_content.append(types.Content( role=role, parts=[types.Part(text=msg.content)] )) # Use Async Client via .aio chat_session = client.aio.chats.create( model=config.model_name, history=history_content, config=gen_config ) # Streaming call # In google-genai SDK, streaming is usually via send_message_stream # Check if send_message_stream exists, otherwise use send_message with stream=True (but error says no) # Let's assume send_message_stream is the way. # Note: chat_session.send_message_stream returns an AsyncIterator (or a coroutine returning one) response_stream = await chat_session.send_message_stream(prompt_msg) async for chunk in response_stream: # Access text safely if chunk.text: yield chunk.text async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference messages_to_send = context.messages.copy() # If user_prompt is provided (it should be for a Question Block) if user_prompt.strip(): messages_to_send.append(Message( id="temp_user_prompt", # ID doesn't matter for the API call role=Role.USER, content=user_prompt )) # 2. Call Provider try: if config.provider == "openai": async for chunk in stream_openai(messages_to_send, config): yield chunk elif config.provider == "google": async for chunk in stream_google(messages_to_send, config): yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: yield f"Error calling LLM: {str(e)}" async def generate_title(user_prompt: str, response: str) -> str: """ Generate a short title (3-4 words) for a Q-A pair using gpt-5-nano. Uses Responses API (required for gpt-5 series), synchronous mode (no background). """ client = get_openai_client() instructions = """TASK: Extract a short topic title from the given Q&A. Do NOT answer the question - only extract the topic. Rules: - Output 2-3 short words OR 2 longer words - No punctuation, no quotes, no explanation - Capitalize each word - Be specific to the topic discussed - Output ONLY the title, nothing else Examples: Q: "How to sort a list in Python?" -> "Python Sorting" Q: "What is React state?" -> "React State" Q: "Explain AWS Lambda pricing" -> "Lambda Pricing" Q: "Who are you?" -> "AI Identity" Q: "What's the weather in NYC?" -> "NYC Weather\"""" # Truncate to avoid token limits truncated_prompt = user_prompt[:300] if len(user_prompt) > 300 else user_prompt truncated_response = response[:300] if len(response) > 300 else response input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}" try: print(f"[generate_title] Called with prompt: {truncated_prompt[:50]}...") # Use Responses API for gpt-5-nano (synchronous, no background) # Note: max_output_tokens includes reasoning tokens, so needs to be higher resp = await client.responses.create( model="gpt-5-nano", input=input_text, instructions=instructions, max_output_tokens=500, # Higher to accommodate reasoning tokens reasoning={"effort": "low"}, # Minimize reasoning for simple task stream=False ) print(f"[generate_title] Response status: {getattr(resp, 'status', 'unknown')}") print(f"[generate_title] Response output: {getattr(resp, 'output', 'no output')}") # Response should be completed immediately (no polling needed) if hasattr(resp, 'output'): for out in resp.output: if getattr(out, 'type', None) == 'message': content = getattr(out, 'content', []) for c in content: if getattr(c, 'type', None) == 'output_text': title = getattr(c, 'text', '').strip() # Clean up title = title.strip('"\'') print(f"[generate_title] Extracted title: {title}") if title: return title print("[generate_title] No title found, returning default") return "New Question" except Exception as e: print(f"Title generation error: {e}") return "New Question" async def summarize_content(content: str, model: str) -> str: """ Summarize the given content using the specified model. Supports both OpenAI and Gemini models. """ instructions = """Summarize the following content concisely. Keep the key points and main ideas. Output only the summary, no preamble.""" # Truncate very long content max_content = 8000 if len(content) > max_content: content = content[:max_content] + "\n\n[Content truncated...]" try: if model.startswith('gemini'): # Use Gemini from google import genai from google.genai import types import os key = os.getenv("GOOGLE_API_KEY") if not key: return "Error: Google API Key not found" client = genai.Client(api_key=key) gen_config = types.GenerateContentConfig( temperature=0.3, max_output_tokens=1000, system_instruction=instructions ) response = await client.aio.models.generate_content( model=model, contents=content, config=gen_config ) return response.text or "No summary generated" else: # Use OpenAI client = get_openai_client() # Check if model needs Responses API responses_api_models = [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3' ] if model in responses_api_models: # Use Responses API resp = await client.responses.create( model=model, input=content, instructions=instructions, max_output_tokens=2000, stream=False ) if hasattr(resp, 'output'): for out in resp.output: if getattr(out, 'type', None) == 'message': for c in getattr(out, 'content', []): if getattr(c, 'type', None) == 'output_text': return getattr(c, 'text', '') or "No summary generated" return "No summary generated" else: # Use Chat Completions API result = await client.chat.completions.create( model=model, messages=[ {"role": "system", "content": instructions}, {"role": "user", "content": content} ], max_tokens=1000, temperature=0.3 ) return result.choices[0].message.content or "No summary generated" except Exception as e: print(f"Summarization error: {e}") return f"Error: {str(e)}"