import logging import os from typing import AsyncGenerator, List, Dict, Any, Optional import openai import google.generativeai as genai import anthropic from app.schemas import LLMConfig, Message, Role, Context, ModelProvider logger = logging.getLogger("contextflow.llm") # Cache OpenAI clients by API key to avoid re-initializing constantly # In a real app, use dependency injection or singletons _openai_clients: dict[str, openai.AsyncOpenAI] = {} def get_openai_client(api_key: str = None): global _openai_clients key = api_key or os.getenv("OPENAI_API_KEY") if not key: raise ValueError("OpenAI API Key not found") if key not in _openai_clients: _openai_clients[key] = openai.AsyncOpenAI(api_key=key) return _openai_clients[key] # Cache Anthropic clients by API key _anthropic_clients: dict[str, anthropic.AsyncAnthropic] = {} def get_anthropic_client(api_key: str = None): global _anthropic_clients key = api_key or os.getenv("ANTHROPIC_API_KEY") if not key: raise ValueError("Anthropic API Key not found") if key not in _anthropic_clients: _anthropic_clients[key] = anthropic.AsyncAnthropic(api_key=key) return _anthropic_clients[key] # Cache OpenRouter clients (OpenAI-compatible with custom base_url) _openrouter_clients: dict[str, openai.AsyncOpenAI] = {} def get_openrouter_client(api_key: str): global _openrouter_clients if not api_key: raise ValueError("OpenRouter API Key not found") if api_key not in _openrouter_clients: _openrouter_clients[api_key] = openai.AsyncOpenAI( api_key=api_key, base_url="https://openrouter.ai/api/v1", ) return _openrouter_clients[api_key] OPENROUTER_PROVIDER_PREFIX = { "openai": "openai/", "google": "google/", "claude": "anthropic/", } def to_openrouter_model(provider: str, model_name: str) -> str: prefix = OPENROUTER_PROVIDER_PREFIX.get(provider, "") return f"{prefix}{model_name}" def configure_google(api_key: str = None): key = api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Google API Key not found") genai.configure(api_key=key) async def stream_openai( messages: list[Message], config: LLMConfig, attachments: Optional[List[Dict[str, Any]]] = None, tools: Optional[List[Dict[str, Any]]] = None, images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: client = get_openai_client(config.api_key) attachments = attachments or [] tools = tools or [] # Convert internal Message schema to OpenAI format openai_messages = [] if config.system_prompt: openai_messages.append({"role": "system", "content": config.system_prompt}) for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) # Models that ONLY support Responses API (no Chat Completions fallback) responses_only_models = ['gpt-5-pro'] # Models that CAN use Responses API (and thus support web_search tool) model_lower = config.model_name.lower() responses_capable_models = [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', 'o1', 'o1-preview', 'o1-mini', 'gpt-4o', 'gpt-4o-mini', 'gpt-4o-realtime', 'gpt-4o-mini-tts' ] # Use Responses API if: # 1. Model ONLY supports Responses API, OR # 2. User wants web search AND model is capable of Responses API # 3. Attachments are present (Responses supports input_file) use_responses_api = ( config.model_name in responses_only_models or (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (images and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (tools) ) if use_responses_api: # Debug: Confirm config reception # yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n" # Use new client.responses.create API with Polling Strategy # Build Responses API input input_messages = [] for msg in openai_messages: if msg['role'] == 'system': continue # goes to instructions # User messages use input_text, assistant messages use output_text content_type = "input_text" if msg['role'] == 'user' else "output_text" input_messages.append({ "role": msg['role'], "content": [ { "type": content_type, "text": msg['content'] } ] }) # Inject images into last user message if images and input_messages: # Find the last user message to inject images into for i in range(len(input_messages) - 1, -1, -1): if input_messages[i]["role"] == "user": for img in images: input_messages[i]["content"].append({ "type": "input_image", "image_url": f"data:{img['mime']};base64,{img['data']}" }) break # Append attachments as separate user message (files only) file_parts = [] for att in attachments: if att.get("provider") == "openai" and att.get("file_id"): file_parts.append({ "type": "input_file", "file_id": att["file_id"] }) if file_parts: input_messages.append({ "role": "user", "content": file_parts }) resp_params = { "model": config.model_name, "input": input_messages, # Full conversation history "stream": True, "store": True, "tool_choice": "auto", } if tools: resp_params["tools"] = tools resp_params["tool_choice"] = "auto" # Add reasoning effort (not supported by chat-latest models) models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest'] if config.model_name not in models_without_effort: resp_params["reasoning"] = {"effort": config.reasoning_effort.value} # Enable Web Search if requested if config.enable_google_search: if resp_params.get("tools"): resp_params["tools"].append({"type": "web_search"}) else: resp_params["tools"] = [{"type": "web_search"}] resp_params["tool_choice"] = "auto" if config.system_prompt: resp_params["instructions"] = config.system_prompt logger.debug("responses: streaming, tools: %s", resp_params.get('tools')) # Stream the response — yields text deltas as they arrive stream = await client.responses.create(**resp_params) async for event in stream: evt_type = getattr(event, 'type', None) if evt_type == 'response.output_text.delta': delta = getattr(event, 'delta', '') if delta: yield delta elif evt_type == 'response.completed': resp_obj = getattr(event, 'response', None) if resp_obj: for out in getattr(resp_obj, 'output', []): if getattr(out, 'type', None) == 'file_search_call': logger.debug("responses: file_search_call: %s", out) break elif evt_type == 'response.failed': resp_obj = getattr(event, 'response', None) error_msg = getattr(resp_obj, 'error', None) if resp_obj else None yield f"\n[Error: {error_msg or 'Response generation failed'}]" break return # Standard Chat Completions API (attachments not supported here) if attachments: yield "[Error] Attachments are only supported for Responses API-capable models." return # Inject images into last user message for Chat Completions format if images and openai_messages: for i in range(len(openai_messages) - 1, -1, -1): if openai_messages[i]["role"] == "user": text_content = openai_messages[i]["content"] openai_messages[i]["content"] = [ {"type": "text", "text": text_content}, ] + [ {"type": "image_url", "image_url": {"url": f"data:{img['mime']};base64,{img['data']}"}} for img in images ] break # Prepare parameters req_params = { "model": config.model_name, "messages": openai_messages, "stream": True } # Identify reasoning models is_reasoning_model = config.model_name in [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3', 'o1', 'o1-mini', 'o1-preview' ] if is_reasoning_model: # Reasoning models use max_completion_tokens if config.max_tokens: req_params["max_completion_tokens"] = config.max_tokens # IMPORTANT: Reasoning models often DO NOT support 'temperature'. # We skip adding it. else: if config.max_tokens: req_params["max_tokens"] = config.max_tokens req_params["temperature"] = config.temperature stream = await client.chat.completions.create(**req_params) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: delta = chunk.choices[0].delta if delta.content: yield delta.content elif delta.tool_calls: # If the model tries to call a tool (even if we didn't send any?) # This shouldn't happen unless we sent tools. # But let's notify the user. # Or maybe it's just an empty delta at the start/end. pass elif getattr(delta, 'refusal', None): yield f"[Refusal: {delta.refusal}]" async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, images: Optional[List[Dict[str, Any]]] = None) -> AsyncGenerator[str, None]: attachments = attachments or [] # Use new Google GenAI SDK (google-genai) from google import genai from google.genai import types key = config.api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Google API Key not found") client = genai.Client(api_key=key) # Configure Tools (Google Search) tools = None if config.enable_google_search: # Enable Google Search Grounding tools = [types.Tool(google_search=types.GoogleSearch())] # Configure Generation gen_config_kwargs = { "temperature": config.temperature, "system_instruction": config.system_prompt, "tools": tools, } if config.max_tokens: gen_config_kwargs["max_output_tokens"] = config.max_tokens gen_config = types.GenerateContentConfig(**gen_config_kwargs) # If attachments or images present, use non-streaming generate_content # but preserve multi-turn conversation structure if attachments or images: import base64 as _b64 # Build proper multi-turn contents with images in the last user message contents = [] for msg in messages: role = "user" if msg.role == Role.USER else "model" contents.append(types.Content( role=role, parts=[types.Part(text=msg.content)] )) # Find last user message and inject images + attachments into its parts for i in range(len(contents) - 1, -1, -1): if contents[i].role == "user": extra_parts = [] for att in attachments: uri = att.get("uri") mime = att.get("mime") or "application/octet-stream" if uri: try: extra_parts.append(types.Part.from_uri(uri, mime_type=mime)) except Exception: extra_parts.append(types.Part(text=f"[file attached: {uri}]")) if images: for img in images: raw_bytes = _b64.b64decode(img["data"]) extra_parts.append(types.Part(inline_data=types.Blob(mime_type=img["mime"], data=raw_bytes))) contents[i] = types.Content( role="user", parts=list(contents[i].parts) + extra_parts ) break logger.debug("gemini: sending attachments=%d images=%d contents=%d", len(attachments), len(images or []), len(contents)) try: response = await client.aio.models.generate_content( model=config.model_name, contents=contents, config=gen_config ) if response and getattr(response, "text", None): yield response.text else: yield "[Error] Gemini response returned no text." except Exception as e: yield f"[Error] Gemini call failed: {str(e)}" return # Prepare History # Extract last message as the prompt prompt_msg = "..." history_msgs = messages if messages and messages[-1].role == Role.USER: prompt_msg = messages[-1].content history_msgs = messages[:-1] history_content = [] for msg in history_msgs: role = "user" if msg.role == Role.USER else "model" history_content.append(types.Content( role=role, parts=[types.Part(text=msg.content)] )) # Use Async Client via .aio chat_session = client.aio.chats.create( model=config.model_name, history=history_content, config=gen_config ) response_stream = await chat_session.send_message_stream(prompt_msg) async for chunk in response_stream: # Access text safely if chunk.text: yield chunk.text async def stream_claude(messages: list[Message], config: LLMConfig, attachments: Optional[List[Dict[str, Any]]] = None, images: Optional[List[Dict[str, Any]]] = None) -> AsyncGenerator[str, None]: client = get_anthropic_client(config.api_key) attachments = attachments or [] # Separate system messages from conversation messages system_parts = [] if config.system_prompt: system_parts.append(config.system_prompt) claude_messages = [] for msg in messages: if msg.role == Role.SYSTEM: system_parts.append(msg.content) else: role = "user" if msg.role == Role.USER else "assistant" claude_messages.append({"role": role, "content": msg.content}) # Claude requires messages to alternate user/assistant. # Merge consecutive same-role messages. merged = [] for m in claude_messages: if merged and merged[-1]["role"] == m["role"]: merged[-1]["content"] += "\n\n" + m["content"] else: merged.append(m) # Claude requires the first message to be from "user" if merged and merged[0]["role"] == "assistant": merged.insert(0, {"role": "user", "content": "(continued)"}) # If no messages at all, add a placeholder if not merged: merged.append({"role": "user", "content": "Hello"}) # Inject images into last user message (Claude vision format) if images and merged: for i in range(len(merged) - 1, -1, -1): if merged[i]["role"] == "user": text_content = merged[i]["content"] # Convert from string to content blocks array content_blocks = [{"type": "text", "text": text_content}] for img in images: content_blocks.append({ "type": "image", "source": { "type": "base64", "media_type": img["mime"], "data": img["data"], } }) merged[i]["content"] = content_blocks break # Inject document attachments into last user message has_file_references = False if attachments and merged: import base64 as _b64 for i in range(len(merged) - 1, -1, -1): if merged[i]["role"] == "user": # Ensure content is a list of blocks (images may have already converted it) if isinstance(merged[i]["content"], str): merged[i]["content"] = [{"type": "text", "text": merged[i]["content"]}] for att in attachments: file_id = att.get("file_id") data_b64 = att.get("data_base64") mime = (att.get("mime") or "").lower() name = att.get("name", "file") if file_id: # Use Anthropic Files API reference (requires beta) merged[i]["content"].append({ "type": "document", "source": {"type": "file", "file_id": file_id}, "title": name, }) has_file_references = True elif data_b64 and mime == "application/pdf": # Inline base64 PDF merged[i]["content"].append({ "type": "document", "source": { "type": "base64", "media_type": "application/pdf", "data": data_b64, }, "title": name, }) elif data_b64: # Text-like file: decode and inject as text block try: text = _b64.b64decode(data_b64).decode("utf-8", errors="replace") merged[i]["content"].append({ "type": "text", "text": f"--- {name} ---\n{text}", }) except Exception: logger.warning("Failed to decode attachment %s as text", name) break system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN stream_params = dict( model=config.model_name, max_tokens=config.max_tokens or 16384, temperature=config.temperature, system=system_text, messages=merged, ) if has_file_references: # Use beta endpoint for Files API references async with client.beta.messages.stream( **stream_params, betas=["files-api-2025-04-14"], ) as stream: async for text in stream.text_stream: yield text else: async with client.messages.stream(**stream_params) as stream: async for text in stream.text_stream: yield text async def stream_openrouter( messages: list[Message], config: LLMConfig, openrouter_api_key: str, images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: """Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API.""" client = get_openrouter_client(openrouter_api_key) provider_str = config.provider.value if hasattr(config.provider, 'value') else str(config.provider) openrouter_model = to_openrouter_model(provider_str, config.model_name) openai_messages = [] if config.system_prompt: openai_messages.append({"role": "system", "content": config.system_prompt}) for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) # Inject images into last user message (OpenAI Chat Completions format) if images and openai_messages: for i in range(len(openai_messages) - 1, -1, -1): if openai_messages[i]["role"] == "user": text_content = openai_messages[i]["content"] openai_messages[i]["content"] = [ {"type": "text", "text": text_content}, ] + [ {"type": "image_url", "image_url": {"url": f"data:{img['mime']};base64,{img['data']}"}} for img in images ] break or_params = { "model": openrouter_model, "messages": openai_messages, "stream": True, "temperature": config.temperature, } if config.max_tokens: or_params["max_tokens"] = config.max_tokens stream = await client.chat.completions.create(**or_params) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: delta = chunk.choices[0].delta if delta.content: yield delta.content async def llm_streamer( context: Context, user_prompt: str, config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, tools: List[Dict[str, Any]] | None = None, openrouter_api_key: Optional[str] = None, images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference messages_to_send = context.messages.copy() # If user_prompt is provided (it should be for a Question Block) if user_prompt.strip(): messages_to_send.append(Message( id="temp_user_prompt", # ID doesn't matter for the API call role=Role.USER, content=user_prompt )) # 2. Call Provider logger.debug("llm_streamer: provider=%s model=%s messages=%d images=%d", config.provider, config.model_name, len(messages_to_send), len(images or [])) try: if config.provider == "openai": async for chunk in stream_openai(messages_to_send, config, attachments, tools, images=images): yield chunk elif config.provider == "google": async for chunk in stream_google(messages_to_send, config, attachments, images=images): yield chunk elif config.provider == "claude": async for chunk in stream_claude(messages_to_send, config, attachments=attachments, images=images): yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: primary_error = str(e) logger.warning("Primary provider %s/%s failed: %s. Checking OpenRouter fallback...", config.provider, config.model_name, primary_error) if not openrouter_api_key: yield f"Error calling LLM: {primary_error}" return try: logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name) async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key, images=images): yield chunk except Exception as fallback_error: logger.error("OpenRouter fallback also failed: %s", fallback_error) yield f"Error calling LLM: {primary_error} (OpenRouter fallback also failed: {fallback_error})" def resolve_provider(model_name: str) -> ModelProvider: """Determine the provider from a model name string.""" name = model_name.lower() if any(name.startswith(p) for p in ('claude',)): return ModelProvider.CLAUDE if any(name.startswith(p) for p in ('gemini',)): return ModelProvider.GOOGLE # Default to OpenAI for gpt-*, o1, o3, etc. return ModelProvider.OPENAI async def query_model_full( context: Context, user_prompt: str, config: LLMConfig, attachments=None, tools=None, openrouter_api_key=None, images=None, ) -> str: """Collect full response from llm_streamer (non-streaming wrapper).""" chunks = [] async for chunk in llm_streamer(context, user_prompt, config, attachments, tools, openrouter_api_key, images): chunks.append(chunk) return "".join(chunks) async def generate_title(user_prompt: str, response: str, api_key: str = None) -> str: """ Generate a short title (3-4 words) for a Q-A pair using gpt-5-nano. Uses Responses API (required for gpt-5 series), synchronous mode (no background). """ client = get_openai_client(api_key) instructions = """TASK: Extract a short topic title from the given Q&A. Do NOT answer the question - only extract the topic. Rules: - Output 2-3 short words OR 2 longer words - No punctuation, no quotes, no explanation - Capitalize each word - Be specific to the topic discussed - Output ONLY the title, nothing else Examples: Q: "How to sort a list in Python?" -> "Python Sorting" Q: "What is React state?" -> "React State" Q: "Explain AWS Lambda pricing" -> "Lambda Pricing" Q: "Who are you?" -> "AI Identity" Q: "What's the weather in NYC?" -> "NYC Weather\"""" # Truncate to avoid token limits truncated_prompt = user_prompt[:300] if len(user_prompt) > 300 else user_prompt truncated_response = response[:300] if len(response) > 300 else response input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}" try: logger.debug("generate_title: called with prompt: %s...", truncated_prompt[:50]) # Use Responses API for gpt-5-nano (synchronous, no background) # Note: max_output_tokens includes reasoning tokens, so needs to be higher resp = await client.responses.create( model="gpt-5-nano", input=input_text, instructions=instructions, max_output_tokens=500, # Higher to accommodate reasoning tokens reasoning={"effort": "low"}, # Minimize reasoning for simple task stream=False ) logger.debug("generate_title: response status: %s", getattr(resp, 'status', 'unknown')) logger.debug("generate_title: response output: %s", getattr(resp, 'output', 'no output')) # Response should be completed immediately (no polling needed) if hasattr(resp, 'output'): for out in resp.output: if getattr(out, 'type', None) == 'message': content = getattr(out, 'content', []) for c in content: if getattr(c, 'type', None) == 'output_text': title = getattr(c, 'text', '').strip() # Clean up title = title.strip('"\'') logger.debug("generate_title: extracted title: %s", title) if title: return title logger.warning("generate_title: no title found, returning default") return "New Question" except Exception as e: logger.error("Title generation error: %s", e) return "New Question" async def summarize_content(content: str, model: str, openai_api_key: str = None, gemini_api_key: str = None) -> str: """ Summarize the given content using the specified model. Supports both OpenAI and Gemini models. """ instructions = """Summarize the following content concisely. Keep the key points and main ideas. Output only the summary, no preamble.""" # Truncate very long content max_content = 8000 if len(content) > max_content: content = content[:max_content] + "\n\n[Content truncated...]" try: if model.startswith('gemini'): # Use Gemini from google import genai from google.genai import types import os key = gemini_api_key or os.getenv("GOOGLE_API_KEY") if not key: return "Error: Google API Key not found" client = genai.Client(api_key=key) gen_config = types.GenerateContentConfig( temperature=0.3, max_output_tokens=1000, system_instruction=instructions ) response = await client.aio.models.generate_content( model=model, contents=content, config=gen_config ) return response.text or "No summary generated" else: # Use OpenAI client = get_openai_client(openai_api_key) # Check if model needs Responses API responses_api_models = [ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano', 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3' ] if model in responses_api_models: # Use Responses API resp = await client.responses.create( model=model, input=content, instructions=instructions, max_output_tokens=2000, stream=False ) if hasattr(resp, 'output'): for out in resp.output: if getattr(out, 'type', None) == 'message': for c in getattr(out, 'content', []): if getattr(c, 'type', None) == 'output_text': return getattr(c, 'text', '') or "No summary generated" return "No summary generated" else: # Use Chat Completions API result = await client.chat.completions.create( model=model, messages=[ {"role": "system", "content": instructions}, {"role": "user", "content": content} ], max_tokens=1000, temperature=0.3 ) return result.choices[0].message.content or "No summary generated" except Exception as e: logger.error("Summarization error: %s", e) return f"Error: {str(e)}"