import logging import os from typing import AsyncGenerator, List, Dict, Any, Optional import openai import google.generativeai as genai import anthropic from app.schemas import LLMConfig, Message, Role, Context logger = logging.getLogger("contextflow.llm") # Cache OpenAI clients by API key to avoid re-initializing constantly # In a real app, use dependency injection or singletons _openai_clients: dict[str, openai.AsyncOpenAI] = {} def get_openai_client(api_key: str = None): global _openai_clients key = api_key or os.getenv("OPENAI_API_KEY") if not key: raise ValueError("OpenAI API Key not found") if key not in _openai_clients: _openai_clients[key] = openai.AsyncOpenAI(api_key=key) return _openai_clients[key] # Cache Anthropic clients by API key _anthropic_clients: dict[str, anthropic.AsyncAnthropic] = {} def get_anthropic_client(api_key: str = None): global _anthropic_clients key = api_key or os.getenv("ANTHROPIC_API_KEY") if not key: raise ValueError("Anthropic API Key not found") if key not in _anthropic_clients: _anthropic_clients[key] = anthropic.AsyncAnthropic(api_key=key) return _anthropic_clients[key] # Cache OpenRouter clients (OpenAI-compatible with custom base_url) _openrouter_clients: dict[str, openai.AsyncOpenAI] = {} def get_openrouter_client(api_key: str): global _openrouter_clients if not api_key: raise ValueError("OpenRouter API Key not found") if api_key not in _openrouter_clients: _openrouter_clients[api_key] = openai.AsyncOpenAI( api_key=api_key, base_url="https://openrouter.ai/api/v1", ) return _openrouter_clients[api_key] OPENROUTER_PROVIDER_PREFIX = { "openai": "openai/", "google": "google/", "claude": "anthropic/", } def to_openrouter_model(provider: str, model_name: str) -> str: prefix = OPENROUTER_PROVIDER_PREFIX.get(provider, "") return f"{prefix}{model_name}" def configure_google(api_key: str = None): key = api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Google API Key not found") genai.configure(api_key=key) async def stream_openai( messages: list[Message], config: LLMConfig, attachments: Optional[List[Dict[str, Any]]] = None, tools: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: client = get_openai_client(config.api_key) attachments = attachments or [] tools = tools or [] # Convert internal Message schema to OpenAI format openai_messages = [] if config.system_prompt: openai_messages.append({"role": "system", "content": config.system_prompt}) for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) # Models that ONLY support Responses API (no Chat Completions fallback) responses_only_models = ['gpt-5-pro'] # Models that CAN use Responses API (and thus support web_search tool) model_lower = config.model_name.lower() responses_capable_models = [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', 'o1', 'o1-preview', 'o1-mini', 'gpt-4o', 'gpt-4o-mini', 'gpt-4o-realtime', 'gpt-4o-mini-tts' ] # Use Responses API if: # 1. Model ONLY supports Responses API, OR # 2. User wants web search AND model is capable of Responses API # 3. Attachments are present (Responses supports input_file) use_responses_api = ( config.model_name in responses_only_models or (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (tools) ) if use_responses_api: # Debug: Confirm config reception # yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n" # Use new client.responses.create API with Polling Strategy # Build Responses API input input_messages = [] for msg in openai_messages: if msg['role'] == 'system': continue # goes to instructions # User messages use input_text, assistant messages use output_text content_type = "input_text" if msg['role'] == 'user' else "output_text" input_messages.append({ "role": msg['role'], "content": [ { "type": content_type, "text": msg['content'] } ] }) # Append attachments as separate user message (files only) file_parts = [] for att in attachments: if att.get("provider") == "openai" and att.get("file_id"): file_parts.append({ "type": "input_file", "file_id": att["file_id"] }) if file_parts: input_messages.append({ "role": "user", "content": file_parts }) resp_params = { "model": config.model_name, "input": input_messages, # Full conversation history "stream": False, # Get full output in one call "background": False, "store": True, "tool_choice": "auto", } if tools: resp_params["tools"] = tools resp_params["tool_choice"] = "auto" # Optional: include results for debugging / citations resp_params["include"] = ["file_search_call.results"] # Add reasoning effort (not supported by chat-latest models) models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest'] if config.model_name not in models_without_effort: resp_params["reasoning"] = {"effort": config.reasoning_effort.value} # Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag) # IMPORTANT: Append to existing tools instead of overwriting if config.enable_google_search: if resp_params.get("tools"): resp_params["tools"].append({"type": "web_search"}) else: resp_params["tools"] = [{"type": "web_search"}] resp_params["tool_choice"] = "auto" if config.system_prompt: resp_params["instructions"] = config.system_prompt # Debug: print final tools being sent logger.debug("responses: final tools: %s", resp_params.get('tools')) # 1. Create Response (non-background) initial_resp = await client.responses.create(**resp_params) response_id = initial_resp.id # 2. Poll for Completion import asyncio for _ in range(300): final_resp = await client.responses.retrieve(response_id) if final_resp.status == 'completed': # Debug: log outputs and tool calls try: outs = getattr(final_resp, "output", []) logger.debug("responses: output items: %s", [getattr(o, 'type', None) for o in outs]) for o in outs: if getattr(o, "type", None) == "file_search_call": logger.debug("responses: file_search_call: %s", o) except Exception as e: logger.debug("responses: failed to inspect output: %s", e) found_content = False if hasattr(final_resp, 'output'): for out in final_resp.output: out_type = getattr(out, 'type', None) out_content = getattr(out, 'content', None) logger.debug("responses: output item: type=%s, content=%s", out_type, out_content) if out_type == 'message' and out_content: for c in out_content: c_type = getattr(c, 'type', None) c_text = getattr(c, 'text', None) logger.debug("responses: content item: type=%s, text=%s...", c_type, c_text[:100] if c_text else None) if c_type == 'output_text': text_val = getattr(c, 'text', None) if text_val: logger.debug("responses: yielding text: %s...", text_val[:50]) yield text_val logger.debug("responses: yielded successfully") found_content = True if not found_content: logger.warning("responses: no content found! output=%s", final_resp.output) yield f"\n[Debug: Completed but no content extracted]" return elif final_resp.status in ['failed', 'cancelled', 'expired']: error_msg = getattr(final_resp, 'error', 'Unknown error') yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]" return await asyncio.sleep(2) yield "\n[Error: Polling timed out]" return # Standard Chat Completions API (attachments not supported here) if attachments: yield "[Error] Attachments are only supported for Responses API-capable models." return # Prepare parameters req_params = { "model": config.model_name, "messages": openai_messages, "stream": True } # Identify reasoning models is_reasoning_model = config.model_name in [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', 'o1', 'o1-mini', 'o1-preview' ] if is_reasoning_model: # Reasoning models use max_completion_tokens if config.max_tokens: req_params["max_completion_tokens"] = config.max_tokens # IMPORTANT: Reasoning models often DO NOT support 'temperature'. # We skip adding it. else: req_params["max_tokens"] = config.max_tokens req_params["temperature"] = config.temperature stream = await client.chat.completions.create(**req_params) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: delta = chunk.choices[0].delta if delta.content: yield delta.content elif delta.tool_calls: # If the model tries to call a tool (even if we didn't send any?) # This shouldn't happen unless we sent tools. # But let's notify the user. # Or maybe it's just an empty delta at the start/end. pass elif getattr(delta, 'refusal', None): yield f"[Refusal: {delta.refusal}]" async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]: attachments = attachments or [] # Use new Google GenAI SDK (google-genai) from google import genai from google.genai import types key = config.api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Google API Key not found") client = genai.Client(api_key=key) # Configure Tools (Google Search) tools = None if config.enable_google_search: # Enable Google Search Grounding tools = [types.Tool(google_search=types.GoogleSearch())] # Configure Generation gen_config = types.GenerateContentConfig( temperature=config.temperature, max_output_tokens=config.max_tokens, system_instruction=config.system_prompt, tools=tools ) # If attachments present, send as a single generate_content call (non-streaming) if attachments: parts = [] for att in attachments: uri = att.get("uri") mime = att.get("mime") or "application/octet-stream" if uri: try: parts.append(types.Part.from_uri(uri, mime_type=mime)) except Exception: parts.append(types.Part(text=f"[file attached: {uri}]")) for msg in messages: parts.append(types.Part(text=msg.content)) logger.debug("gemini: sending attachments: %s", [att.get('uri') for att in attachments]) try: response = await client.aio.models.generate_content( model=config.model_name, contents=[types.Content(role="user", parts=parts)], config=gen_config ) if response and getattr(response, "text", None): yield response.text else: yield "[Error] Gemini response returned no text." except Exception as e: yield f"[Error] Gemini call failed: {str(e)}" return # Prepare History # Extract last message as the prompt prompt_msg = "..." history_msgs = messages if messages and messages[-1].role == Role.USER: prompt_msg = messages[-1].content history_msgs = messages[:-1] history_content = [] for msg in history_msgs: role = "user" if msg.role == Role.USER else "model" history_content.append(types.Content( role=role, parts=[types.Part(text=msg.content)] )) # Use Async Client via .aio chat_session = client.aio.chats.create( model=config.model_name, history=history_content, config=gen_config ) response_stream = await chat_session.send_message_stream(prompt_msg) async for chunk in response_stream: # Access text safely if chunk.text: yield chunk.text async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: client = get_anthropic_client(config.api_key) # Separate system messages from conversation messages system_parts = [] if config.system_prompt: system_parts.append(config.system_prompt) claude_messages = [] for msg in messages: if msg.role == Role.SYSTEM: system_parts.append(msg.content) else: role = "user" if msg.role == Role.USER else "assistant" claude_messages.append({"role": role, "content": msg.content}) # Claude requires messages to alternate user/assistant. # Merge consecutive same-role messages. merged = [] for m in claude_messages: if merged and merged[-1]["role"] == m["role"]: merged[-1]["content"] += "\n\n" + m["content"] else: merged.append(m) # Claude requires the first message to be from "user" if merged and merged[0]["role"] == "assistant": merged.insert(0, {"role": "user", "content": "(continued)"}) # If no messages at all, add a placeholder if not merged: merged.append({"role": "user", "content": "Hello"}) system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN async with client.messages.stream( model=config.model_name, max_tokens=config.max_tokens, temperature=config.temperature, system=system_text, messages=merged, ) as stream: async for text in stream.text_stream: yield text async def stream_openrouter( messages: list[Message], config: LLMConfig, openrouter_api_key: str, ) -> AsyncGenerator[str, None]: """Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API.""" client = get_openrouter_client(openrouter_api_key) provider_str = config.provider.value if hasattr(config.provider, 'value') else str(config.provider) openrouter_model = to_openrouter_model(provider_str, config.model_name) openai_messages = [] if config.system_prompt: openai_messages.append({"role": "system", "content": config.system_prompt}) for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) stream = await client.chat.completions.create( model=openrouter_model, messages=openai_messages, stream=True, max_tokens=config.max_tokens, temperature=config.temperature, ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: delta = chunk.choices[0].delta if delta.content: yield delta.content async def llm_streamer( context: Context, user_prompt: str, config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, tools: List[Dict[str, Any]] | None = None, openrouter_api_key: Optional[str] = None, ) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference messages_to_send = context.messages.copy() # If user_prompt is provided (it should be for a Question Block) if user_prompt.strip(): messages_to_send.append(Message( id="temp_user_prompt", # ID doesn't matter for the API call role=Role.USER, content=user_prompt )) # 2. Call Provider try: if config.provider == "openai": async for chunk in stream_openai(messages_to_send, config, attachments, tools): yield chunk elif config.provider == "google": async for chunk in stream_google(messages_to_send, config, attachments): yield chunk elif config.provider == "claude": async for chunk in stream_claude(messages_to_send, config): yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: primary_error = str(e) logger.warning("Primary provider failed: %s. Checking OpenRouter fallback...", primary_error) if not openrouter_api_key: yield f"Error calling LLM: {primary_error}" return try: logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name) async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key): yield chunk except Exception as fallback_error: logger.error("OpenRouter fallback also failed: %s", fallback_error) yield f"Error calling LLM: {primary_error} (OpenRouter fallback also failed: {fallback_error})" async def generate_title(user_prompt: str, response: str, api_key: str = None) -> str: """ Generate a short title (3-4 words) for a Q-A pair using gpt-5-nano. Uses Responses API (required for gpt-5 series), synchronous mode (no background). """ client = get_openai_client(api_key) instructions = """TASK: Extract a short topic title from the given Q&A. Do NOT answer the question - only extract the topic. Rules: - Output 2-3 short words OR 2 longer words - No punctuation, no quotes, no explanation - Capitalize each word - Be specific to the topic discussed - Output ONLY the title, nothing else Examples: Q: "How to sort a list in Python?" -> "Python Sorting" Q: "What is React state?" -> "React State" Q: "Explain AWS Lambda pricing" -> "Lambda Pricing" Q: "Who are you?" -> "AI Identity" Q: "What's the weather in NYC?" -> "NYC Weather\"""" # Truncate to avoid token limits truncated_prompt = user_prompt[:300] if len(user_prompt) > 300 else user_prompt truncated_response = response[:300] if len(response) > 300 else response input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}" try: logger.debug("generate_title: called with prompt: %s...", truncated_prompt[:50]) # Use Responses API for gpt-5-nano (synchronous, no background) # Note: max_output_tokens includes reasoning tokens, so needs to be higher resp = await client.responses.create( model="gpt-5-nano", input=input_text, instructions=instructions, max_output_tokens=500, # Higher to accommodate reasoning tokens reasoning={"effort": "low"}, # Minimize reasoning for simple task stream=False ) logger.debug("generate_title: response status: %s", getattr(resp, 'status', 'unknown')) logger.debug("generate_title: response output: %s", getattr(resp, 'output', 'no output')) # Response should be completed immediately (no polling needed) if hasattr(resp, 'output'): for out in resp.output: if getattr(out, 'type', None) == 'message': content = getattr(out, 'content', []) for c in content: if getattr(c, 'type', None) == 'output_text': title = getattr(c, 'text', '').strip() # Clean up title = title.strip('"\'') logger.debug("generate_title: extracted title: %s", title) if title: return title logger.warning("generate_title: no title found, returning default") return "New Question" except Exception as e: logger.error("Title generation error: %s", e) return "New Question" async def summarize_content(content: str, model: str, openai_api_key: str = None, gemini_api_key: str = None) -> str: """ Summarize the given content using the specified model. Supports both OpenAI and Gemini models. """ instructions = """Summarize the following content concisely. Keep the key points and main ideas. Output only the summary, no preamble.""" # Truncate very long content max_content = 8000 if len(content) > max_content: content = content[:max_content] + "\n\n[Content truncated...]" try: if model.startswith('gemini'): # Use Gemini from google import genai from google.genai import types import os key = gemini_api_key or os.getenv("GOOGLE_API_KEY") if not key: return "Error: Google API Key not found" client = genai.Client(api_key=key) gen_config = types.GenerateContentConfig( temperature=0.3, max_output_tokens=1000, system_instruction=instructions ) response = await client.aio.models.generate_content( model=model, contents=content, config=gen_config ) return response.text or "No summary generated" else: # Use OpenAI client = get_openai_client(openai_api_key) # Check if model needs Responses API responses_api_models = [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3' ] if model in responses_api_models: # Use Responses API resp = await client.responses.create( model=model, input=content, instructions=instructions, max_output_tokens=2000, stream=False ) if hasattr(resp, 'output'): for out in resp.output: if getattr(out, 'type', None) == 'message': for c in getattr(out, 'content', []): if getattr(c, 'type', None) == 'output_text': return getattr(c, 'text', '') or "No summary generated" return "No summary generated" else: # Use Chat Completions API result = await client.chat.completions.create( model=model, messages=[ {"role": "system", "content": instructions}, {"role": "user", "content": content} ], max_tokens=1000, temperature=0.3 ) return result.choices[0].message.content or "No summary generated" except Exception as e: logger.error("Summarization error: %s", e) return f"Error: {str(e)}"