summaryrefslogtreecommitdiff
path: root/backend/app/services
diff options
context:
space:
mode:
Diffstat (limited to 'backend/app/services')
-rw-r--r--backend/app/services/llm.py171
1 files changed, 153 insertions, 18 deletions
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py
index 660a69d..2eb69ed 100644
--- a/backend/app/services/llm.py
+++ b/backend/app/services/llm.py
@@ -1,9 +1,13 @@
+import logging
import os
from typing import AsyncGenerator, List, Dict, Any, Optional
import openai
import google.generativeai as genai
+import anthropic
from app.schemas import LLMConfig, Message, Role, Context
+logger = logging.getLogger("contextflow.llm")
+
# Cache OpenAI clients by API key to avoid re-initializing constantly
# In a real app, use dependency injection or singletons
_openai_clients: dict[str, openai.AsyncOpenAI] = {}
@@ -17,6 +21,42 @@ def get_openai_client(api_key: str = None):
_openai_clients[key] = openai.AsyncOpenAI(api_key=key)
return _openai_clients[key]
+# Cache Anthropic clients by API key
+_anthropic_clients: dict[str, anthropic.AsyncAnthropic] = {}
+
+def get_anthropic_client(api_key: str = None):
+ global _anthropic_clients
+ key = api_key or os.getenv("ANTHROPIC_API_KEY")
+ if not key:
+ raise ValueError("Anthropic API Key not found")
+ if key not in _anthropic_clients:
+ _anthropic_clients[key] = anthropic.AsyncAnthropic(api_key=key)
+ return _anthropic_clients[key]
+
+# Cache OpenRouter clients (OpenAI-compatible with custom base_url)
+_openrouter_clients: dict[str, openai.AsyncOpenAI] = {}
+
+def get_openrouter_client(api_key: str):
+ global _openrouter_clients
+ if not api_key:
+ raise ValueError("OpenRouter API Key not found")
+ if api_key not in _openrouter_clients:
+ _openrouter_clients[api_key] = openai.AsyncOpenAI(
+ api_key=api_key,
+ base_url="https://openrouter.ai/api/v1",
+ )
+ return _openrouter_clients[api_key]
+
+OPENROUTER_PROVIDER_PREFIX = {
+ "openai": "openai/",
+ "google": "google/",
+ "claude": "anthropic/",
+}
+
+def to_openrouter_model(provider: str, model_name: str) -> str:
+ prefix = OPENROUTER_PROVIDER_PREFIX.get(provider, "")
+ return f"{prefix}{model_name}"
+
def configure_google(api_key: str = None):
key = api_key or os.getenv("GOOGLE_API_KEY")
if not key:
@@ -132,7 +172,7 @@ async def stream_openai(
resp_params["instructions"] = config.system_prompt
# Debug: print final tools being sent
- print(f"[responses debug] final tools: {resp_params.get('tools')}")
+ logger.debug("responses: final tools: %s", resp_params.get('tools'))
# 1. Create Response (non-background)
initial_resp = await client.responses.create(**resp_params)
@@ -147,35 +187,35 @@ async def stream_openai(
# Debug: log outputs and tool calls
try:
outs = getattr(final_resp, "output", [])
- print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}")
+ logger.debug("responses: output items: %s", [getattr(o, 'type', None) for o in outs])
for o in outs:
if getattr(o, "type", None) == "file_search_call":
- print(f"[responses debug] file_search_call: {o}")
+ logger.debug("responses: file_search_call: %s", o)
except Exception as e:
- print(f"[responses debug] failed to inspect output: {e}")
+ logger.debug("responses: failed to inspect output: %s", e)
found_content = False
if hasattr(final_resp, 'output'):
for out in final_resp.output:
out_type = getattr(out, 'type', None)
out_content = getattr(out, 'content', None)
- print(f"[responses debug] output item: type={out_type}, content={out_content}")
+ logger.debug("responses: output item: type=%s, content=%s", out_type, out_content)
if out_type == 'message' and out_content:
for c in out_content:
c_type = getattr(c, 'type', None)
c_text = getattr(c, 'text', None)
- print(f"[responses debug] content item: type={c_type}, text={c_text[:100] if c_text else None}...")
+ logger.debug("responses: content item: type=%s, text=%s...", c_type, c_text[:100] if c_text else None)
if c_type == 'output_text':
text_val = getattr(c, 'text', None)
if text_val:
- print(f"[responses debug] YIELDING text: {text_val[:50]}...")
+ logger.debug("responses: yielding text: %s...", text_val[:50])
yield text_val
- print(f"[responses debug] YIELDED successfully")
+ logger.debug("responses: yielded successfully")
found_content = True
if not found_content:
- print(f"[responses debug] No content found! final_resp.output={final_resp.output}")
+ logger.warning("responses: no content found! output=%s", final_resp.output)
yield f"\n[Debug: Completed but no content extracted]"
return
@@ -273,7 +313,7 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments:
parts.append(types.Part(text=f"[file attached: {uri}]"))
for msg in messages:
parts.append(types.Part(text=msg.content))
- print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}")
+ logger.debug("gemini: sending attachments: %s", [att.get('uri') for att in attachments])
try:
response = await client.aio.models.generate_content(
model=config.model_name,
@@ -318,12 +358,91 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments:
if chunk.text:
yield chunk.text
+async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]:
+ client = get_anthropic_client(config.api_key)
+
+ # Separate system messages from conversation messages
+ system_parts = []
+ if config.system_prompt:
+ system_parts.append(config.system_prompt)
+
+ claude_messages = []
+ for msg in messages:
+ if msg.role == Role.SYSTEM:
+ system_parts.append(msg.content)
+ else:
+ role = "user" if msg.role == Role.USER else "assistant"
+ claude_messages.append({"role": role, "content": msg.content})
+
+ # Claude requires messages to alternate user/assistant.
+ # Merge consecutive same-role messages.
+ merged = []
+ for m in claude_messages:
+ if merged and merged[-1]["role"] == m["role"]:
+ merged[-1]["content"] += "\n\n" + m["content"]
+ else:
+ merged.append(m)
+
+ # Claude requires the first message to be from "user"
+ if merged and merged[0]["role"] == "assistant":
+ merged.insert(0, {"role": "user", "content": "(continued)"})
+
+ # If no messages at all, add a placeholder
+ if not merged:
+ merged.append({"role": "user", "content": "Hello"})
+
+ system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN
+
+ async with client.messages.stream(
+ model=config.model_name,
+ max_tokens=config.max_tokens,
+ temperature=config.temperature,
+ system=system_text,
+ messages=merged,
+ ) as stream:
+ async for text in stream.text_stream:
+ yield text
+
+
+async def stream_openrouter(
+ messages: list[Message],
+ config: LLMConfig,
+ openrouter_api_key: str,
+) -> AsyncGenerator[str, None]:
+ """Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API."""
+ client = get_openrouter_client(openrouter_api_key)
+
+ provider_str = config.provider.value if hasattr(config.provider, 'value') else str(config.provider)
+ openrouter_model = to_openrouter_model(provider_str, config.model_name)
+
+ openai_messages = []
+ if config.system_prompt:
+ openai_messages.append({"role": "system", "content": config.system_prompt})
+ for msg in messages:
+ openai_messages.append({"role": msg.role.value, "content": msg.content})
+
+ stream = await client.chat.completions.create(
+ model=openrouter_model,
+ messages=openai_messages,
+ stream=True,
+ max_tokens=config.max_tokens,
+ temperature=config.temperature,
+ )
+
+ async for chunk in stream:
+ if chunk.choices and chunk.choices[0].delta:
+ delta = chunk.choices[0].delta
+ if delta.content:
+ yield delta.content
+
+
async def llm_streamer(
context: Context,
user_prompt: str,
config: LLMConfig,
attachments: List[Dict[str, Any]] | None = None,
tools: List[Dict[str, Any]] | None = None,
+ openrouter_api_key: Optional[str] = None,
) -> AsyncGenerator[str, None]:
# 1. Merge Context + New User Prompt
# We create a temporary list of messages for this inference
@@ -345,10 +464,26 @@ async def llm_streamer(
elif config.provider == "google":
async for chunk in stream_google(messages_to_send, config, attachments):
yield chunk
+ elif config.provider == "claude":
+ async for chunk in stream_claude(messages_to_send, config):
+ yield chunk
else:
yield f"Error: Unsupported provider {config.provider}"
except Exception as e:
- yield f"Error calling LLM: {str(e)}"
+ primary_error = str(e)
+ logger.warning("Primary provider failed: %s. Checking OpenRouter fallback...", primary_error)
+
+ if not openrouter_api_key:
+ yield f"Error calling LLM: {primary_error}"
+ return
+
+ try:
+ logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name)
+ async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key):
+ yield chunk
+ except Exception as fallback_error:
+ logger.error("OpenRouter fallback also failed: %s", fallback_error)
+ yield f"Error calling LLM: {primary_error} (OpenRouter fallback also failed: {fallback_error})"
async def generate_title(user_prompt: str, response: str, api_key: str = None) -> str:
@@ -381,7 +516,7 @@ Q: "What's the weather in NYC?" -> "NYC Weather\""""
input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}"
try:
- print(f"[generate_title] Called with prompt: {truncated_prompt[:50]}...")
+ logger.debug("generate_title: called with prompt: %s...", truncated_prompt[:50])
# Use Responses API for gpt-5-nano (synchronous, no background)
# Note: max_output_tokens includes reasoning tokens, so needs to be higher
@@ -394,8 +529,8 @@ Q: "What's the weather in NYC?" -> "NYC Weather\""""
stream=False
)
- print(f"[generate_title] Response status: {getattr(resp, 'status', 'unknown')}")
- print(f"[generate_title] Response output: {getattr(resp, 'output', 'no output')}")
+ logger.debug("generate_title: response status: %s", getattr(resp, 'status', 'unknown'))
+ logger.debug("generate_title: response output: %s", getattr(resp, 'output', 'no output'))
# Response should be completed immediately (no polling needed)
if hasattr(resp, 'output'):
@@ -407,15 +542,15 @@ Q: "What's the weather in NYC?" -> "NYC Weather\""""
title = getattr(c, 'text', '').strip()
# Clean up
title = title.strip('"\'')
- print(f"[generate_title] Extracted title: {title}")
+ logger.debug("generate_title: extracted title: %s", title)
if title:
return title
- print("[generate_title] No title found, returning default")
+ logger.warning("generate_title: no title found, returning default")
return "New Question"
except Exception as e:
- print(f"Title generation error: {e}")
+ logger.error("Title generation error: %s", e)
return "New Question"
@@ -503,5 +638,5 @@ Output only the summary, no preamble."""
return result.choices[0].message.content or "No summary generated"
except Exception as e:
- print(f"Summarization error: {e}")
+ logger.error("Summarization error: %s", e)
return f"Error: {str(e)}"