diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-13 03:02:36 +0000 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-13 03:02:36 +0000 |
| commit | 7d897ad9bb5ee46839ec91992cbbf4593168f119 (patch) | |
| tree | b4549f64176e93474b3b6c4b36294d30a46230b7 /backend | |
| parent | 2f19d8cb84598e0822b525f5fb5c456c07448fb7 (diff) | |
Add Claude provider, OpenRouter fallback, and GFM markdown support
- Add Claude (Anthropic) as third LLM provider with streaming support
- Add OpenRouter as transparent fallback when official API keys are missing or fail
- Add remark-gfm to ReactMarkdown for table/strikethrough rendering
- Claude models: sonnet-4.5, opus-4, opus-4.5, opus-4.6
- Backend: new stream_claude(), stream_openrouter(), provider routing, API key CRUD
- Frontend: model selectors, API key inputs for Claude and OpenRouter
- Auto-migration for new DB columns (claude_api_key, openrouter_api_key)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/app/auth/models.py | 12 | ||||
| -rw-r--r-- | backend/app/auth/routes.py | 14 | ||||
| -rw-r--r-- | backend/app/main.py | 60 | ||||
| -rw-r--r-- | backend/app/schemas.py | 1 | ||||
| -rw-r--r-- | backend/app/services/llm.py | 171 | ||||
| -rw-r--r-- | backend/requirements.txt | 1 |
6 files changed, 223 insertions, 36 deletions
diff --git a/backend/app/auth/models.py b/backend/app/auth/models.py index 8477ba2..14f5bb3 100644 --- a/backend/app/auth/models.py +++ b/backend/app/auth/models.py @@ -1,5 +1,5 @@ import os -from sqlalchemy import Column, Integer, String, DateTime, Text, create_engine +from sqlalchemy import Column, Integer, String, DateTime, Text, create_engine, text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime @@ -26,12 +26,22 @@ class User(Base): # API Keys (stored encrypted in production, plain for simplicity here) openai_api_key = Column(Text, nullable=True) gemini_api_key = Column(Text, nullable=True) + claude_api_key = Column(Text, nullable=True) + openrouter_api_key = Column(Text, nullable=True) def init_db(): """Initialize database tables""" os.makedirs(DATA_ROOT, exist_ok=True) Base.metadata.create_all(bind=engine) + # Migrate: add columns that may be missing in existing SQLite databases + with engine.connect() as conn: + for col in ("claude_api_key", "openrouter_api_key"): + try: + conn.execute(text(f"ALTER TABLE users ADD COLUMN {col} TEXT")) + conn.commit() + except Exception: + pass # Column already exists def get_db(): diff --git a/backend/app/auth/routes.py b/backend/app/auth/routes.py index 3c906b5..eaf897e 100644 --- a/backend/app/auth/routes.py +++ b/backend/app/auth/routes.py @@ -228,14 +228,20 @@ async def get_api_keys(current_user: User = Depends(get_current_user)): return { "openai_api_key": mask_key(current_user.openai_api_key), "gemini_api_key": mask_key(current_user.gemini_api_key), + "claude_api_key": mask_key(current_user.claude_api_key), + "openrouter_api_key": mask_key(current_user.openrouter_api_key), "has_openai_key": bool(current_user.openai_api_key), "has_gemini_key": bool(current_user.gemini_api_key), + "has_claude_key": bool(current_user.claude_api_key), + "has_openrouter_key": bool(current_user.openrouter_api_key), } class ApiKeysUpdate(BaseModel): openai_api_key: Optional[str] = None gemini_api_key: Optional[str] = None + claude_api_key: Optional[str] = None + openrouter_api_key: Optional[str] = None @router.post("/api-keys") @@ -253,7 +259,13 @@ async def update_api_keys( if keys.gemini_api_key is not None: current_user.gemini_api_key = keys.gemini_api_key if keys.gemini_api_key else None - + + if keys.claude_api_key is not None: + current_user.claude_api_key = keys.claude_api_key if keys.claude_api_key else None + + if keys.openrouter_api_key is not None: + current_user.openrouter_api_key = keys.openrouter_api_key if keys.openrouter_api_key else None + db.commit() return {"message": "API keys updated successfully"} diff --git a/backend/app/main.py b/backend/app/main.py index c254652..b0d6138 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,4 +1,5 @@ import asyncio +import logging import tempfile import time from fastapi import FastAPI, HTTPException, Depends @@ -21,6 +22,20 @@ from sqlalchemy.orm import Session load_dotenv() +# --------------- Logging Setup --------------- +_LOG_DIR = os.path.join(os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data"))), "logs") +os.makedirs(_LOG_DIR, exist_ok=True) + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(os.path.join(_LOG_DIR, "contextflow.log"), encoding="utf-8"), + ], +) +logger = logging.getLogger("contextflow") + app = FastAPI(title="ContextFlow Backend") # Include authentication router @@ -53,9 +68,9 @@ async def startup_event(): ) db.add(test_user) db.commit() - print("[startup] Created default test user (test/114514)") + logger.info("Created default test user (test/114514)") else: - print("[startup] Test user already exists") + logger.info("Test user already exists") finally: db.close() @@ -77,11 +92,19 @@ def get_user_api_key(user: User | None, provider: str) -> str | None: return user.openai_api_key if provider in ("google", "gemini") and user.gemini_api_key: return user.gemini_api_key + if provider == "claude" and user.claude_api_key: + return user.claude_api_key + if provider == "openrouter" and user.openrouter_api_key: + return user.openrouter_api_key # Fallback to environment variables if provider == "openai": return os.getenv("OPENAI_API_KEY") if provider in ("google", "gemini"): return os.getenv("GOOGLE_API_KEY") + if provider == "claude": + return os.getenv("ANTHROPIC_API_KEY") + if provider == "openrouter": + return os.getenv("OPENROUTER_API_KEY") return None def ensure_user_root(user: str) -> str: @@ -317,14 +340,14 @@ async def run_node_stream( if vs_id: vs_ids = [vs_id] except Exception as e: - print(f"[warn] Could not get vector store: {e}") + logger.warning("Could not get vector store: %s", e) if vs_ids: tool_def = {"type": "file_search", "vector_store_ids": vs_ids} if filters: tool_def["filters"] = filters tools.append(tool_def) - print(f"[openai file_search] vs_ids={vs_ids} refs={debug_refs} filters={filters}") + logger.debug("openai file_search: vs_ids=%s refs=%s filters=%s", vs_ids, debug_refs, filters) elif request.config.provider == ModelProvider.GOOGLE: attachments = await prepare_attachments( user=username, @@ -333,8 +356,10 @@ async def run_node_stream( llm_config=request.config, ) + openrouter_key = get_user_api_key(current_user, "openrouter") + return StreamingResponse( - llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools), + llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key), media_type="text/event-stream" ) @@ -413,12 +438,15 @@ def save_blueprint(req: SaveBlueprintRequest): try: full_path = safe_path(req.user, req.path) os.makedirs(os.path.dirname(full_path), exist_ok=True) + raw = json.dumps(req.content, ensure_ascii=False, indent=2) + logger.info("save_blueprint: user=%s path=%s size=%d bytes", req.user, req.path, len(raw)) with open(full_path, "w", encoding="utf-8") as f: - json.dump(req.content, f, ensure_ascii=False, indent=2) + f.write(raw) return {"ok": True} except HTTPException: raise except Exception as e: + logger.error("save_blueprint failed: user=%s path=%s error=%s", req.user, req.path, e) raise HTTPException(status_code=500, detail=str(e)) @@ -582,7 +610,7 @@ async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None): try: await client.vector_stores.files.delete(vector_store_id=vs_id, file_id=file_id) except Exception as e: - print(f"[warn] remove_file_from_vector_store failed: {e}") + logger.warning("remove_file_from_vector_store failed: %s", e) async def add_file_to_vector_store(vs_id: str, file_id: str, client=None): """ @@ -658,7 +686,7 @@ async def prepare_attachments( for fid in attached_ids: meta = items_map.get(fid) if not meta: - print(f"[warn] Attached file id not found, skipping: {fid}") + logger.warning("Attached file id not found, skipping: %s", fid) continue path = os.path.join(files_root(user), fid) @@ -747,7 +775,7 @@ async def prepare_attachments( await asyncio.sleep(1) except Exception: await asyncio.sleep(1) - print(f"[google upload] name={google_name} uri={google_uri}") + logger.debug("google upload: name=%s uri=%s", google_name, google_uri) uri = google_uri or google_name if not uri: @@ -770,7 +798,7 @@ async def prepare_attachments( raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}") # Debug log - print(f"[attachments] provider={provider_norm} count={len(attachments)} detail={[{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments]}") + logger.debug("attachments: provider=%s count=%d detail=%s", provider_norm, len(attachments), [{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments]) return attachments @@ -801,7 +829,7 @@ async def prepare_openai_vector_search( for item in items: if item.scopes and any(s in scopes for s in item.scopes): relevant_files_map[item.id] = item - print(f"[file_search] scopes={scopes} matched_files={[f.name for f in relevant_files_map.values()]}") + logger.debug("file_search: scopes=%s matched_files=%s", scopes, [f.name for f in relevant_files_map.values()]) # Second: also add explicitly attached files (they should always be searchable) if attached_ids: @@ -809,7 +837,7 @@ async def prepare_openai_vector_search( meta = items_map.get(fid) if meta and fid not in relevant_files_map: relevant_files_map[fid] = meta - print(f"[file_search] adding explicitly attached file: {meta.name}") + logger.debug("file_search: adding explicitly attached file: %s", meta.name) relevant_files = list(relevant_files_map.values()) @@ -824,12 +852,12 @@ async def prepare_openai_vector_search( for meta in relevant_files: path = os.path.join(files_root(user), meta.id) if not os.path.exists(path): - print(f"[warn] Attached file missing on disk, skipping: {meta.id}") + logger.warning("Attached file missing on disk, skipping: %s", meta.id) continue # Enforce 50MB OpenAI limit file_size = os.path.getsize(path) if file_size > OPENAI_MAX_FILE_SIZE: - print(f"[warn] File {meta.name} exceeds OpenAI 50MB limit, skipping") + logger.warning("File %s exceeds OpenAI 50MB limit, skipping", meta.name) continue openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, path, llm_config) @@ -1006,9 +1034,9 @@ async def upload_file( if provider_normalized == "openai" and not meta.provider_file_id: meta.provider_file_id = openai_file_id except Exception as e: - print(f"[warn] OpenAI indexing failed for {file_name}: {e}") + logger.warning("OpenAI indexing failed for %s: %s", file_name, e) else: - print(f"[warn] Skipping OpenAI indexing for {file_name}: exceeds 50MB") + logger.warning("Skipping OpenAI indexing for %s: exceeds 50MB", file_name) items.append(meta) save_files_index(user, items) diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 54c0560..8e5f12c 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -23,6 +23,7 @@ class Context(BaseModel): class ModelProvider(str, Enum): OPENAI = "openai" GOOGLE = "google" + CLAUDE = "claude" class ReasoningEffort(str, Enum): LOW = "low" diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py index 660a69d..2eb69ed 100644 --- a/backend/app/services/llm.py +++ b/backend/app/services/llm.py @@ -1,9 +1,13 @@ +import logging import os from typing import AsyncGenerator, List, Dict, Any, Optional import openai import google.generativeai as genai +import anthropic from app.schemas import LLMConfig, Message, Role, Context +logger = logging.getLogger("contextflow.llm") + # Cache OpenAI clients by API key to avoid re-initializing constantly # In a real app, use dependency injection or singletons _openai_clients: dict[str, openai.AsyncOpenAI] = {} @@ -17,6 +21,42 @@ def get_openai_client(api_key: str = None): _openai_clients[key] = openai.AsyncOpenAI(api_key=key) return _openai_clients[key] +# Cache Anthropic clients by API key +_anthropic_clients: dict[str, anthropic.AsyncAnthropic] = {} + +def get_anthropic_client(api_key: str = None): + global _anthropic_clients + key = api_key or os.getenv("ANTHROPIC_API_KEY") + if not key: + raise ValueError("Anthropic API Key not found") + if key not in _anthropic_clients: + _anthropic_clients[key] = anthropic.AsyncAnthropic(api_key=key) + return _anthropic_clients[key] + +# Cache OpenRouter clients (OpenAI-compatible with custom base_url) +_openrouter_clients: dict[str, openai.AsyncOpenAI] = {} + +def get_openrouter_client(api_key: str): + global _openrouter_clients + if not api_key: + raise ValueError("OpenRouter API Key not found") + if api_key not in _openrouter_clients: + _openrouter_clients[api_key] = openai.AsyncOpenAI( + api_key=api_key, + base_url="https://openrouter.ai/api/v1", + ) + return _openrouter_clients[api_key] + +OPENROUTER_PROVIDER_PREFIX = { + "openai": "openai/", + "google": "google/", + "claude": "anthropic/", +} + +def to_openrouter_model(provider: str, model_name: str) -> str: + prefix = OPENROUTER_PROVIDER_PREFIX.get(provider, "") + return f"{prefix}{model_name}" + def configure_google(api_key: str = None): key = api_key or os.getenv("GOOGLE_API_KEY") if not key: @@ -132,7 +172,7 @@ async def stream_openai( resp_params["instructions"] = config.system_prompt # Debug: print final tools being sent - print(f"[responses debug] final tools: {resp_params.get('tools')}") + logger.debug("responses: final tools: %s", resp_params.get('tools')) # 1. Create Response (non-background) initial_resp = await client.responses.create(**resp_params) @@ -147,35 +187,35 @@ async def stream_openai( # Debug: log outputs and tool calls try: outs = getattr(final_resp, "output", []) - print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}") + logger.debug("responses: output items: %s", [getattr(o, 'type', None) for o in outs]) for o in outs: if getattr(o, "type", None) == "file_search_call": - print(f"[responses debug] file_search_call: {o}") + logger.debug("responses: file_search_call: %s", o) except Exception as e: - print(f"[responses debug] failed to inspect output: {e}") + logger.debug("responses: failed to inspect output: %s", e) found_content = False if hasattr(final_resp, 'output'): for out in final_resp.output: out_type = getattr(out, 'type', None) out_content = getattr(out, 'content', None) - print(f"[responses debug] output item: type={out_type}, content={out_content}") + logger.debug("responses: output item: type=%s, content=%s", out_type, out_content) if out_type == 'message' and out_content: for c in out_content: c_type = getattr(c, 'type', None) c_text = getattr(c, 'text', None) - print(f"[responses debug] content item: type={c_type}, text={c_text[:100] if c_text else None}...") + logger.debug("responses: content item: type=%s, text=%s...", c_type, c_text[:100] if c_text else None) if c_type == 'output_text': text_val = getattr(c, 'text', None) if text_val: - print(f"[responses debug] YIELDING text: {text_val[:50]}...") + logger.debug("responses: yielding text: %s...", text_val[:50]) yield text_val - print(f"[responses debug] YIELDED successfully") + logger.debug("responses: yielded successfully") found_content = True if not found_content: - print(f"[responses debug] No content found! final_resp.output={final_resp.output}") + logger.warning("responses: no content found! output=%s", final_resp.output) yield f"\n[Debug: Completed but no content extracted]" return @@ -273,7 +313,7 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments: parts.append(types.Part(text=f"[file attached: {uri}]")) for msg in messages: parts.append(types.Part(text=msg.content)) - print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}") + logger.debug("gemini: sending attachments: %s", [att.get('uri') for att in attachments]) try: response = await client.aio.models.generate_content( model=config.model_name, @@ -318,12 +358,91 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments: if chunk.text: yield chunk.text +async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: + client = get_anthropic_client(config.api_key) + + # Separate system messages from conversation messages + system_parts = [] + if config.system_prompt: + system_parts.append(config.system_prompt) + + claude_messages = [] + for msg in messages: + if msg.role == Role.SYSTEM: + system_parts.append(msg.content) + else: + role = "user" if msg.role == Role.USER else "assistant" + claude_messages.append({"role": role, "content": msg.content}) + + # Claude requires messages to alternate user/assistant. + # Merge consecutive same-role messages. + merged = [] + for m in claude_messages: + if merged and merged[-1]["role"] == m["role"]: + merged[-1]["content"] += "\n\n" + m["content"] + else: + merged.append(m) + + # Claude requires the first message to be from "user" + if merged and merged[0]["role"] == "assistant": + merged.insert(0, {"role": "user", "content": "(continued)"}) + + # If no messages at all, add a placeholder + if not merged: + merged.append({"role": "user", "content": "Hello"}) + + system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN + + async with client.messages.stream( + model=config.model_name, + max_tokens=config.max_tokens, + temperature=config.temperature, + system=system_text, + messages=merged, + ) as stream: + async for text in stream.text_stream: + yield text + + +async def stream_openrouter( + messages: list[Message], + config: LLMConfig, + openrouter_api_key: str, +) -> AsyncGenerator[str, None]: + """Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API.""" + client = get_openrouter_client(openrouter_api_key) + + provider_str = config.provider.value if hasattr(config.provider, 'value') else str(config.provider) + openrouter_model = to_openrouter_model(provider_str, config.model_name) + + openai_messages = [] + if config.system_prompt: + openai_messages.append({"role": "system", "content": config.system_prompt}) + for msg in messages: + openai_messages.append({"role": msg.role.value, "content": msg.content}) + + stream = await client.chat.completions.create( + model=openrouter_model, + messages=openai_messages, + stream=True, + max_tokens=config.max_tokens, + temperature=config.temperature, + ) + + async for chunk in stream: + if chunk.choices and chunk.choices[0].delta: + delta = chunk.choices[0].delta + if delta.content: + yield delta.content + + async def llm_streamer( context: Context, user_prompt: str, config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, tools: List[Dict[str, Any]] | None = None, + openrouter_api_key: Optional[str] = None, ) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference @@ -345,10 +464,26 @@ async def llm_streamer( elif config.provider == "google": async for chunk in stream_google(messages_to_send, config, attachments): yield chunk + elif config.provider == "claude": + async for chunk in stream_claude(messages_to_send, config): + yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: - yield f"Error calling LLM: {str(e)}" + primary_error = str(e) + logger.warning("Primary provider failed: %s. Checking OpenRouter fallback...", primary_error) + + if not openrouter_api_key: + yield f"Error calling LLM: {primary_error}" + return + + try: + logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name) + async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key): + yield chunk + except Exception as fallback_error: + logger.error("OpenRouter fallback also failed: %s", fallback_error) + yield f"Error calling LLM: {primary_error} (OpenRouter fallback also failed: {fallback_error})" async def generate_title(user_prompt: str, response: str, api_key: str = None) -> str: @@ -381,7 +516,7 @@ Q: "What's the weather in NYC?" -> "NYC Weather\"""" input_text = f"Question: {truncated_prompt}\n\nAnswer: {truncated_response}" try: - print(f"[generate_title] Called with prompt: {truncated_prompt[:50]}...") + logger.debug("generate_title: called with prompt: %s...", truncated_prompt[:50]) # Use Responses API for gpt-5-nano (synchronous, no background) # Note: max_output_tokens includes reasoning tokens, so needs to be higher @@ -394,8 +529,8 @@ Q: "What's the weather in NYC?" -> "NYC Weather\"""" stream=False ) - print(f"[generate_title] Response status: {getattr(resp, 'status', 'unknown')}") - print(f"[generate_title] Response output: {getattr(resp, 'output', 'no output')}") + logger.debug("generate_title: response status: %s", getattr(resp, 'status', 'unknown')) + logger.debug("generate_title: response output: %s", getattr(resp, 'output', 'no output')) # Response should be completed immediately (no polling needed) if hasattr(resp, 'output'): @@ -407,15 +542,15 @@ Q: "What's the weather in NYC?" -> "NYC Weather\"""" title = getattr(c, 'text', '').strip() # Clean up title = title.strip('"\'') - print(f"[generate_title] Extracted title: {title}") + logger.debug("generate_title: extracted title: %s", title) if title: return title - print("[generate_title] No title found, returning default") + logger.warning("generate_title: no title found, returning default") return "New Question" except Exception as e: - print(f"Title generation error: {e}") + logger.error("Title generation error: %s", e) return "New Question" @@ -503,5 +638,5 @@ Output only the summary, no preamble.""" return result.choices[0].message.content or "No summary generated" except Exception as e: - print(f"Summarization error: {e}") + logger.error("Summarization error: %s", e) return f"Error: {str(e)}" diff --git a/backend/requirements.txt b/backend/requirements.txt index a9607fd..7260f95 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,6 +3,7 @@ uvicorn pydantic[email] openai google-generativeai +anthropic python-dotenv httpx python-multipart |
