diff options
Diffstat (limited to 'backend/app/services')
| -rw-r--r-- | backend/app/services/llm.py | 171 |
1 files changed, 153 insertions, 18 deletions
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py index 660a69d..2eb69ed 100644 --- a/backend/app/services/llm.py +++ b/backend/app/services/llm.py @@ -1,9 +1,13 @@ +import logging import os from typing import AsyncGenerator, List, Dict, Any, Optional import openai import google.generativeai as genai +import anthropic from app.schemas import LLMConfig, Message, Role, Context +logger = logging.getLogger("contextflow.llm") + # Cache OpenAI clients by API key to avoid re-initializing constantly # In a real app, use dependency injection or singletons _openai_clients: dict[str, openai.AsyncOpenAI] = {} @@ -17,6 +21,42 @@ def get_openai_client(api_key: str = None): _openai_clients[key] = openai.AsyncOpenAI(api_key=key) return _openai_clients[key] +# Cache Anthropic clients by API key +_anthropic_clients: dict[str, anthropic.AsyncAnthropic] = {} + +def get_anthropic_client(api_key: str = None): + global _anthropic_clients + key = api_key or os.getenv("ANTHROPIC_API_KEY") + if not key: + raise ValueError("Anthropic API Key not found") + if key not in _anthropic_clients: + _anthropic_clients[key] = anthropic.AsyncAnthropic(api_key=key) + return _anthropic_clients[key] + +# Cache OpenRouter clients (OpenAI-compatible with custom base_url) +_openrouter_clients: dict[str, openai.AsyncOpenAI] = {} + +def get_openrouter_client(api_key: str): + global _openrouter_clients + if not api_key: + raise ValueError("OpenRouter API Key not found") + if api_key not in _openrouter_clients: + _openrouter_clients[api_key] = openai.AsyncOpenAI( + api_key=api_key, + base_url="https://openrouter.ai/api/v1", + ) + return _openrouter_clients[api_key] + +OPENROUTER_PROVIDER_PREFIX = { + "openai": "openai/", + "google": "google/", + "claude": "anthropic/", +} + +def to_openrouter_model(provider: str, model_name: str) -> str: + prefix = OPENROUTER_PROVIDER_PREFIX.get(provider, "") + return f"{prefix}{model_name}" + def configure_google(api_key: str = None): key = api_key or os.getenv("GOOGLE_API_KEY") if not key: @@ -132,7 +172,7 @@ async def stream_openai( resp_params["instructions"] = config.system_prompt # Debug: print final tools being sent - print(f"[responses debug] final tools: {resp_params.get('tools')}") + logger.debug("responses: final tools: %s", resp_params.get('tools')) # 1. Create Response (non-background) initial_resp = await client.responses.create(**resp_params) @@ -147,35 +187,35 @@ async def stream_openai( # Debug: log outputs and tool calls try: outs = getattr(final_resp, "output", []) - print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}") + logger.debug("responses: output items: %s", [getattr(o, 'type', None) for o in outs]) for o in outs: if getattr(o, "type", None) == "file_search_call": - print(f"[responses debug] file_search_call: {o}") + logger.debug("responses: file_search_call: %s", o) except Exception as e: - print(f"[responses debug] failed to inspect output: {e}") + logger.debug("responses: failed to inspect output: %s", e) found_content = False if hasattr(final_resp, 'output'): for out in final_resp.output: out_type = getattr(out, 'type', None) out_content = getattr(out, 'content', None) - print(f"[responses debug] output item: type={out_type}, content={out_content}") + logger.debug("responses: output item: type=%s, content=%s", out_type, out_content) if out_type == 'message' and out_content: for c in out_content: c_type = getattr(c, 'type', None) c_text = getattr(c, 'text', None) - print(f"[responses debug] content item: type={c_type}, text={c_text[:100] if c_text else None}...") + logger.debug("responses: content item: type=%s, text=%s...", c_type, c_text[:100] if c_text else None) if c_type == 'output_text': text_val = getattr(c, 'text', None) if text_val: - print(f"[responses debug] YIELDING text: {text_val[:50]}...") + logger.debug("responses: yielding text: %s...", text_val[:50]) yield text_val - print(f"[responses debug] YIELDED successfully") + logger.debug("responses: yielded successfully") found_content = True if not found_content: - print(f"[responses debug] No content found! final_resp.output={final_resp.output}") + logger.warning("responses: no content found! output=%s", final_resp.output) yield f"\n[Debug: Completed but no content extracted]" return @@ -273,7 +313,7 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments: parts.append(types.Part(text=f"[file attached: {uri}]")) for msg in messages: parts.append(types.Part(text=msg.content)) - print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}") + logger.debug("gemini: sending attachments: %s", [att.get('uri') for att in attachments]) try: response = await client.aio.models.generate_content( model=config.model_name, @@ -318,12 +358,91 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments: if chunk.text: yield chunk.text +async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: + client = get_anthropic_client(config.api_key) + + # Separate system messages from conversation messages + system_parts = [] + if config.system_prompt: + system_parts.append(config.system_prompt) + + claude_messages = [] + for msg in messages: + if msg.role == Role.SYSTEM: + system_parts.append(msg.content) + else: + role = "user" if msg.role == Role.USER else "assistant" + claude_messages.append({"role": role, "content": msg.content}) + + # Claude requires messages to alternate user/assistant. + # Merge consecutive same-role messages. + merged = [] + for m in claude_messages: + if merged and merged[-1]["role"] == m["role"]: + merged[-1]["content"] += "\n\n" + m["content"] + else: + merged.append(m) + + # Claude requires the first message to be from "user" + if merged and merged[0]["role"] == "assistant": + merged.insert(0, {"role": "user", "content": "(continued)"}) + + # If no messages at all, add a placeholder + if not merged: + merged.append({"role": "user", "content": "Hello"}) + + system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN + + async with client.messages.stream( + model=config.model_name, + max_tokens=config.max_tokens, + temperature=config.temperature, + system=system_text, + messages=merged, + ) as stream: + async for text in stream.text_stream: + yield text + + +async def stream_openrouter( + messages: list[Message], + config: LLMConfig, + openrouter_api_key: str, +) -> AsyncGenerator[str, None]: + """Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API.""" + client = get_openrouter_client(openrouter_api_key) + + provider_str = config.provider.value if hasattr(config.provider, 'value') else str(config.provider) + openrouter_model = to_openrouter_model(provider_str, config.model_name) + + openai_messages = [] + if config.system_prompt: + openai_messages.append({"role": "system", "content": config.system_prompt}) + for msg in messages: + openai_messages.append({"role": msg.role.value, "content": msg.content}) + + stream = await client.chat.completions.create( + model=openrouter_model, + messages=openai_messages, + stream=True, + max_tokens=config.max_tokens, + temperature=config.temperature, + ) + + async for chunk in stream: + if chunk.choices and chunk.choices[0].delta: + delta = chunk.choices[0].delta + if delta.content: + yield delta.content + + async def llm_streamer( context: Context, user_prompt: str, config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, tools: List[Dict[str, Any]] | None = None, + openrouter_api_key: Optional[str] = None, ) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference @@ -345,10 +464,26 @@ async def llm_streamer( elif config.provider == "google": async for chunk in stream_google(messages_to_send, config, attachments): yield chunk + elif config.provider == "claude": + async for chunk in stream_claude(messages_to_send, config): + yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: - yield f"Error calling LLM: {str(e)}" + primary_error = str(e) + logger.warning("Primary provider failed: %s. Checking OpenRouter fallback...", primary_error) + + if not openrouter_api_key: + yield f"Error calling LLM: {primary_error}" + return + + try: + logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name) + async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key): + yield chunk + except Exception as fallback_error: + logger.error("OpenRouter fallback also failed: %s", fallback_error) + yield f"Error calling LLM: {primary_error} (OpenRouter fallback also failed: {fallback_error})" async def generate_title(user_prompt: str, response: str, api_key: str = None) -> str: @@ -381,7 +516,7 @@ Q: "What's the weather in NYC?" -> "NYC Weather\"""" input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}" try: - print(f"[generate_title] Called with prompt: {truncated_prompt[:50]}...") + logger.debug("generate_title: called with prompt: %s...", truncated_prompt[:50]) # Use Responses API for gpt-5-nano (synchronous, no background) # Note: max_output_tokens includes reasoning tokens, so needs to be higher @@ -394,8 +529,8 @@ Q: "What's the weather in NYC?" -> "NYC Weather\"""" stream=False ) - print(f"[generate_title] Response status: {getattr(resp, 'status', 'unknown')}") - print(f"[generate_title] Response output: {getattr(resp, 'output', 'no output')}") + logger.debug("generate_title: response status: %s", getattr(resp, 'status', 'unknown')) + logger.debug("generate_title: response output: %s", getattr(resp, 'output', 'no output')) # Response should be completed immediately (no polling needed) if hasattr(resp, 'output'): @@ -407,15 +542,15 @@ Q: "What's the weather in NYC?" -> "NYC Weather\"""" title = getattr(c, 'text', '').strip() # Clean up title = title.strip('"\'') - print(f"[generate_title] Extracted title: {title}") + logger.debug("generate_title: extracted title: %s", title) if title: return title - print("[generate_title] No title found, returning default") + logger.warning("generate_title: no title found, returning default") return "New Question" except Exception as e: - print(f"Title generation error: {e}") + logger.error("Title generation error: %s", e) return "New Question" @@ -503,5 +638,5 @@ Output only the summary, no preamble.""" return result.choices[0].message.content or "No summary generated" except Exception as e: - print(f"Summarization error: {e}") + logger.error("Summarization error: %s", e) return f"Error: {str(e)}" |
