diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-13 05:45:13 +0000 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-13 05:45:13 +0000 |
| commit | 61293147c1d6f1cdde689c36faad923b600a4f6e (patch) | |
| tree | 9c773b13bd4f488ca0cbd1f5d646ba9ff7ab43ef | |
| parent | 257b5bcbd09d4a6b7b1b27d7db4cc2aeed766c39 (diff) | |
Add Anthropic Files API and persistent Google file caching for all providers
- Add anthropic_file_id/google_file_uri fields to FileMeta (backend + frontend)
- Eager upload to Anthropic and Google at file upload time (like OpenAI)
- Cache and reuse file references in prepare_attachments for all 3 providers
- Add document content block injection in stream_claude (file_id, base64, text fallback)
- Conditional beta streaming for Anthropic Files API references
- Persist on-demand upload results (changed flag + save_files_index)
- Clean up file deletion for all providers (Anthropic warn-only, Google deduplicated)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| -rw-r--r-- | backend/app/main.py | 310 | ||||
| -rw-r--r-- | backend/app/services/llm.py | 325 | ||||
| -rw-r--r-- | frontend/src/store/flowStore.ts | 2 |
3 files changed, 470 insertions, 167 deletions
diff --git a/backend/app/main.py b/backend/app/main.py index b0d6138..75491bb 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,4 +1,5 @@ import asyncio +import base64 import logging import tempfile import time @@ -8,7 +9,7 @@ from fastapi.responses import StreamingResponse, FileResponse from fastapi import UploadFile, File, Form from pydantic import BaseModel from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort -from app.services.llm import llm_streamer, generate_title, get_openai_client +from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db from app.auth.utils import get_password_hash from dotenv import load_dotenv @@ -232,6 +233,8 @@ class FileMeta(BaseModel): provider_file_id: Optional[str] = None openai_file_id: Optional[str] = None openai_vector_store_id: Optional[str] = None + anthropic_file_id: Optional[str] = None + google_file_uri: Optional[str] = None # Scopes for filtering: "project_path/node_id" composite keys scopes: List[str] = [] @@ -287,6 +290,53 @@ def smart_merge_messages(messages: list[Message]) -> list[Message]: merged.append(current_msg) return merged +IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} + +ANTHROPIC_DOCUMENT_MIME_TYPES = { + "application/pdf", "text/plain", "text/html", "text/csv", + "text/xml", "text/markdown", "text/css", "text/javascript", + "application/javascript", "application/xml", +} + +def extract_image_attachments(user: str, attached_ids: List[str]) -> tuple[List[dict], List[str]]: + """ + Separate image files from non-image files in attached_ids. + Returns (images, remaining_non_image_ids). + Each image dict: {"mime": str, "data": str (base64), "name": str} + """ + if not attached_ids: + return [], [] + + items = load_files_index(user) + items_map = {item.id: item for item in items} + images: List[dict] = [] + non_image_ids: List[str] = [] + + for fid in attached_ids: + meta = items_map.get(fid) + if not meta: + non_image_ids.append(fid) + continue + + if meta.mime in IMAGE_MIME_TYPES: + path = os.path.join(files_root(user), fid) + if not os.path.exists(path): + logger.warning("Image file missing on disk, skipping: %s", fid) + continue + with open(path, "rb") as f: + raw = f.read() + images.append({ + "mime": meta.mime, + "data": base64.b64encode(raw).decode("utf-8"), + "name": meta.name, + }) + else: + non_image_ids.append(fid) + + logger.debug("extract_image_attachments: %d images, %d non-image files", len(images), len(non_image_ids)) + return images, non_image_ids + + @app.post("/api/run_node_stream") async def run_node_stream( request: NodeRunRequest, @@ -304,7 +354,10 @@ async def run_node_stream( # Get username for file operations username = current_user.username if current_user else DEFAULT_USER - + + # Extract images from attached files (separate from non-image files) + images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) + # 1. Concatenate all incoming contexts first raw_messages = [] for ctx in request.incoming_contexts: @@ -326,7 +379,7 @@ async def run_node_stream( if request.config.provider == ModelProvider.OPENAI: vs_ids, debug_refs, filters = await prepare_openai_vector_search( user=username, - attached_ids=request.attached_file_ids, + attached_ids=non_image_file_ids, scopes=request.scopes, llm_config=request.config, ) @@ -352,14 +405,21 @@ async def run_node_stream( attachments = await prepare_attachments( user=username, target_provider=request.config.provider, - attached_ids=request.attached_file_ids, + attached_ids=non_image_file_ids, + llm_config=request.config, + ) + elif request.config.provider == ModelProvider.CLAUDE: + attachments = await prepare_attachments( + user=username, + target_provider=request.config.provider, + attached_ids=non_image_file_ids, llm_config=request.config, ) openrouter_key = get_user_api_key(current_user, "openrouter") return StreamingResponse( - llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key), + llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key, images=images), media_type="text/event-stream" ) @@ -602,6 +662,95 @@ async def ensure_openai_file_and_index(user: str, meta: FileMeta, path: str, llm await add_file_to_vector_store(vs_id, file_id, client=client) return file_id, vs_id +async def ensure_anthropic_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]: + """ + Upload a file to the Anthropic Files API (beta). + Returns the anthropic file_id on success, or None on skip/failure. + """ + try: + client = get_anthropic_client(api_key) + except Exception: + logger.debug("Skipping Anthropic file upload: no API key available") + return None + + mime = (meta.mime or "").lower() + if mime in IMAGE_MIME_TYPES or mime not in ANTHROPIC_DOCUMENT_MIME_TYPES: + logger.debug("Skipping Anthropic file upload for unsupported mime: %s", mime) + return None + + try: + with open(file_path, "rb") as f: + content = f.read() + resp = await asyncio.to_thread( + client.beta.files.upload, + file=(meta.name or "upload.bin", content, mime), + ) + file_id = getattr(resp, "id", None) + if file_id: + logger.info("Anthropic file uploaded: %s -> %s", meta.name, file_id) + return file_id + except Exception as e: + logger.warning("Anthropic file upload failed for %s: %s", meta.name, e) + return None + + +async def ensure_google_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]: + """ + Upload a file to Google GenAI Files API. + Returns the Google file URI on success, or None on skip/failure. + """ + key = api_key or os.getenv("GOOGLE_API_KEY") + if not key: + logger.debug("Skipping Google file upload: no API key available") + return None + + mime = (meta.mime or "").lower() + if mime in IMAGE_MIME_TYPES: + logger.debug("Skipping Google file upload for image: %s", mime) + return None + + tmp_path = None + try: + client = genai.Client(api_key=key) + with open(file_path, "rb") as f: + content = f.read() + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp.write(content) + tmp_path = tmp.name + + google_resp = await asyncio.to_thread( + client.files.upload, + file=tmp_path, + config={"mimeType": meta.mime or "application/octet-stream"}, + ) + google_name = getattr(google_resp, "name", None) + google_uri = getattr(google_resp, "uri", None) + + # Poll for ACTIVE + if google_name: + for _ in range(10): + try: + info = await asyncio.to_thread(client.files.get, name=google_name) + state = getattr(info, "state", None) + google_uri = getattr(info, "uri", google_uri) + if str(state).upper().endswith("ACTIVE") or state == "ACTIVE": + break + await asyncio.sleep(1) + except Exception: + await asyncio.sleep(1) + + uri = google_uri or google_name + if uri: + logger.info("Google file uploaded: %s -> %s", meta.name, uri) + return uri + except Exception as e: + logger.warning("Google file upload failed for %s: %s", meta.name, e) + return None + finally: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + + async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None): if not vs_id or not file_id: return @@ -677,6 +826,7 @@ async def prepare_attachments( items = load_files_index(user) items_map = {item.id: item for item in items} attachments: list[dict] = [] + changed = False if isinstance(target_provider, ModelProvider): provider_norm = target_provider.value.lower() @@ -732,71 +882,73 @@ async def prepare_attachments( raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}") elif provider_norm == ModelProvider.GOOGLE or provider_norm == "google": - # Reuse uri/name if available and looks like a URI - if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id: + # Reuse cached google_file_uri + cached_uri = meta.google_file_uri or ( + meta.provider_file_id if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id else None + ) + if cached_uri: attachments.append({ "provider": "google", - "uri": meta.provider_file_id, + "uri": cached_uri, "name": meta.name, "mime": meta.mime, }) continue - key = llm_config.api_key or os.getenv("GOOGLE_API_KEY") - if not key: - raise HTTPException(status_code=500, detail="Google API Key not found") - client = genai.Client(api_key=key) - - tmp_path = None - try: - with open(path, "rb") as f: - content = f.read() - with tempfile.NamedTemporaryFile(delete=False) as tmp: - tmp.write(content) - tmp_path = tmp.name - - google_resp = await asyncio.to_thread( - client.files.upload, - file=tmp_path, - config={"mimeType": meta.mime or "application/octet-stream"}, - ) - google_name = getattr(google_resp, "name", None) - google_uri = getattr(google_resp, "uri", None) - - # Poll for ACTIVE and uri if missing - if google_name: - for _ in range(10): - try: - info = await asyncio.to_thread(client.files.get, name=google_name) - state = getattr(info, "state", None) - google_uri = getattr(info, "uri", google_uri) - if str(state).upper().endswith("ACTIVE") or state == "ACTIVE": - break - await asyncio.sleep(1) - except Exception: - await asyncio.sleep(1) - logger.debug("google upload: name=%s uri=%s", google_name, google_uri) + # On-demand upload via shared helper + uri = await ensure_google_file_upload(meta, path, llm_config.api_key) + if not uri: + raise HTTPException(status_code=500, detail=f"Google upload failed for {meta.name}") + meta.google_file_uri = uri + changed = True + attachments.append({ + "provider": "google", + "uri": uri, + "name": meta.name, + "mime": meta.mime, + }) + + elif provider_norm == ModelProvider.CLAUDE or provider_norm == "claude": + # Reuse cached anthropic_file_id if available + if meta.anthropic_file_id: + attachments.append({ + "provider": "claude", + "file_id": meta.anthropic_file_id, + "name": meta.name, + "mime": meta.mime, + }) + continue - uri = google_uri or google_name - if not uri: - raise HTTPException(status_code=500, detail="Google upload returned no uri/name") + # Try on-demand upload + anthropic_id = await ensure_anthropic_file_upload(meta, path, llm_config.api_key) + if anthropic_id: + meta.anthropic_file_id = anthropic_id + changed = True attachments.append({ - "provider": "google", - "uri": uri, + "provider": "claude", + "file_id": anthropic_id, "name": meta.name, "mime": meta.mime, }) - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}") - finally: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) + continue + + # Fallback: inline base64 data + with open(path, "rb") as f: + raw = f.read() + attachments.append({ + "provider": "claude", + "data_base64": base64.b64encode(raw).decode("utf-8"), + "name": meta.name, + "mime": meta.mime, + }) else: raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}") + # Persist index if any on-demand uploads updated meta (applies to Claude and Google) + if changed: + save_files_index(user, list(items_map.values())) + # Debug log logger.debug("attachments: provider=%s count=%d detail=%s", provider_norm, len(attachments), [{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments]) return attachments @@ -824,9 +976,11 @@ async def prepare_openai_vector_search( # Determine which files to include - combine scopes AND attached_ids relevant_files_map: dict[str, FileMeta] = {} - # First: add files matching scopes + # First: add files matching scopes (skip image files — they are handled inline) if scopes: for item in items: + if item.mime in IMAGE_MIME_TYPES: + continue if item.scopes and any(s in scopes for s in item.scopes): relevant_files_map[item.id] = item logger.debug("file_search: scopes=%s matched_files=%s", scopes, [f.name for f in relevant_files_map.values()]) @@ -1026,7 +1180,11 @@ async def upload_file( ) # Always try to index into OpenAI vector store (if <=50MB) - if size <= OPENAI_MAX_FILE_SIZE: + # Skip image files — they are sent inline to vision-capable models, not via file_search + mime_for_check = (file.content_type or "application/octet-stream").lower() + if mime_for_check in IMAGE_MIME_TYPES: + logger.debug("Skipping OpenAI vector store indexing for image file: %s", file_name) + elif size <= OPENAI_MAX_FILE_SIZE: try: openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, dest_path, None) meta.openai_file_id = openai_file_id @@ -1038,6 +1196,19 @@ async def upload_file( else: logger.warning("Skipping OpenAI indexing for %s: exceeds 50MB", file_name) + # Anthropic Files API upload (eager, like OpenAI) + anthropic_id = await ensure_anthropic_file_upload(meta, dest_path) + if anthropic_id: + meta.anthropic_file_id = anthropic_id + + # Google Files API upload (eager, like OpenAI) + google_uri = await ensure_google_file_upload(meta, dest_path) + if google_uri: + meta.google_file_uri = google_uri + # Also backfill provider_file_id for legacy compat if provider was google + if provider_normalized == "google" and not meta.provider_file_id: + meta.provider_file_id = google_uri + items.append(meta) save_files_index(user, items) return {"file": meta} @@ -1073,17 +1244,34 @@ async def delete_file(user: str = DEFAULT_USER, file_id: str = ""): await client.files.delete(meta.provider_file_id) except Exception as e: raise HTTPException(status_code=500, detail=f"OpenAI delete failed: {str(e)}") + # Delete from Google Files API — collect all distinct Google refs + google_refs = set() if meta.provider == "google" and meta.provider_file_id: + google_refs.add(meta.provider_file_id) + if meta.google_file_uri: + google_refs.add(meta.google_file_uri) + for g_ref in google_refs: try: key = os.getenv("GOOGLE_API_KEY") if not key: - raise HTTPException(status_code=500, detail="Google API Key not found") + logger.warning("Skipping Google file deletion: no API key") + break client = genai.Client(api_key=key) - await asyncio.to_thread(client.files.delete, meta.provider_file_id) - except HTTPException: - raise + await asyncio.to_thread(client.files.delete, g_ref) + logger.info("Google file deleted: %s", g_ref) + except Exception as e: + logger.warning("Google file deletion failed for %s: %s", g_ref, e) + + # Delete from Anthropic Files API if present + if meta.anthropic_file_id: + try: + client = get_anthropic_client() + await asyncio.to_thread( + client.beta.files.delete, meta.anthropic_file_id + ) + logger.info("Anthropic file deleted: %s", meta.anthropic_file_id) except Exception as e: - raise HTTPException(status_code=500, detail=f"Google delete failed: {str(e)}") + logger.warning("Anthropic file deletion failed for %s: %s", meta.anthropic_file_id, e) path = os.path.join(files_root(user), file_id) if os.path.exists(path): diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py index 2eb69ed..7efdce0 100644 --- a/backend/app/services/llm.py +++ b/backend/app/services/llm.py @@ -68,6 +68,7 @@ async def stream_openai( config: LLMConfig, attachments: Optional[List[Dict[str, Any]]] = None, tools: Optional[List[Dict[str, Any]]] = None, + images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: client = get_openai_client(config.api_key) attachments = attachments or [] @@ -98,9 +99,10 @@ async def stream_openai( # 2. User wants web search AND model is capable of Responses API # 3. Attachments are present (Responses supports input_file) use_responses_api = ( - config.model_name in responses_only_models or + config.model_name in responses_only_models or (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or + (images and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or (tools) ) @@ -126,6 +128,18 @@ async def stream_openai( ] }) + # Inject images into last user message + if images and input_messages: + # Find the last user message to inject images into + for i in range(len(input_messages) - 1, -1, -1): + if input_messages[i]["role"] == "user": + for img in images: + input_messages[i]["content"].append({ + "type": "input_image", + "image_url": f"data:{img['mime']};base64,{img['data']}" + }) + break + # Append attachments as separate user message (files only) file_parts = [] for att in attachments: @@ -143,90 +157,52 @@ async def stream_openai( resp_params = { "model": config.model_name, "input": input_messages, # Full conversation history - "stream": False, # Get full output in one call - "background": False, + "stream": True, "store": True, "tool_choice": "auto", } if tools: resp_params["tools"] = tools resp_params["tool_choice"] = "auto" - # Optional: include results for debugging / citations - resp_params["include"] = ["file_search_call.results"] - + # Add reasoning effort (not supported by chat-latest models) models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest'] if config.model_name not in models_without_effort: resp_params["reasoning"] = {"effort": config.reasoning_effort.value} - - # Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag) - # IMPORTANT: Append to existing tools instead of overwriting + + # Enable Web Search if requested if config.enable_google_search: if resp_params.get("tools"): resp_params["tools"].append({"type": "web_search"}) else: resp_params["tools"] = [{"type": "web_search"}] resp_params["tool_choice"] = "auto" - + if config.system_prompt: resp_params["instructions"] = config.system_prompt - # Debug: print final tools being sent - logger.debug("responses: final tools: %s", resp_params.get('tools')) - - # 1. Create Response (non-background) - initial_resp = await client.responses.create(**resp_params) - response_id = initial_resp.id - - # 2. Poll for Completion - import asyncio - for _ in range(300): - final_resp = await client.responses.retrieve(response_id) - - if final_resp.status == 'completed': - # Debug: log outputs and tool calls - try: - outs = getattr(final_resp, "output", []) - logger.debug("responses: output items: %s", [getattr(o, 'type', None) for o in outs]) - for o in outs: - if getattr(o, "type", None) == "file_search_call": - logger.debug("responses: file_search_call: %s", o) - except Exception as e: - logger.debug("responses: failed to inspect output: %s", e) - - found_content = False - if hasattr(final_resp, 'output'): - for out in final_resp.output: - out_type = getattr(out, 'type', None) - out_content = getattr(out, 'content', None) - logger.debug("responses: output item: type=%s, content=%s", out_type, out_content) - - if out_type == 'message' and out_content: - for c in out_content: - c_type = getattr(c, 'type', None) - c_text = getattr(c, 'text', None) - logger.debug("responses: content item: type=%s, text=%s...", c_type, c_text[:100] if c_text else None) - if c_type == 'output_text': - text_val = getattr(c, 'text', None) - if text_val: - logger.debug("responses: yielding text: %s...", text_val[:50]) - yield text_val - logger.debug("responses: yielded successfully") - found_content = True - - if not found_content: - logger.warning("responses: no content found! output=%s", final_resp.output) - yield f"\n[Debug: Completed but no content extracted]" - return - - elif final_resp.status in ['failed', 'cancelled', 'expired']: - error_msg = getattr(final_resp, 'error', 'Unknown error') - yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]" - return - - await asyncio.sleep(2) - - yield "\n[Error: Polling timed out]" + logger.debug("responses: streaming, tools: %s", resp_params.get('tools')) + + # Stream the response — yields text deltas as they arrive + stream = await client.responses.create(**resp_params) + async for event in stream: + evt_type = getattr(event, 'type', None) + if evt_type == 'response.output_text.delta': + delta = getattr(event, 'delta', '') + if delta: + yield delta + elif evt_type == 'response.completed': + resp_obj = getattr(event, 'response', None) + if resp_obj: + for out in getattr(resp_obj, 'output', []): + if getattr(out, 'type', None) == 'file_search_call': + logger.debug("responses: file_search_call: %s", out) + break + elif evt_type == 'response.failed': + resp_obj = getattr(event, 'response', None) + error_msg = getattr(resp_obj, 'error', None) if resp_obj else None + yield f"\n[Error: {error_msg or 'Response generation failed'}]" + break return # Standard Chat Completions API (attachments not supported here) @@ -234,6 +210,19 @@ async def stream_openai( yield "[Error] Attachments are only supported for Responses API-capable models." return + # Inject images into last user message for Chat Completions format + if images and openai_messages: + for i in range(len(openai_messages) - 1, -1, -1): + if openai_messages[i]["role"] == "user": + text_content = openai_messages[i]["content"] + openai_messages[i]["content"] = [ + {"type": "text", "text": text_content}, + ] + [ + {"type": "image_url", "image_url": {"url": f"data:{img['mime']};base64,{img['data']}"}} + for img in images + ] + break + # Prepare parameters req_params = { "model": config.model_name, @@ -255,7 +244,8 @@ async def stream_openai( # IMPORTANT: Reasoning models often DO NOT support 'temperature'. # We skip adding it. else: - req_params["max_tokens"] = config.max_tokens + if config.max_tokens: + req_params["max_tokens"] = config.max_tokens req_params["temperature"] = config.temperature stream = await client.chat.completions.create(**req_params) @@ -274,7 +264,7 @@ async def stream_openai( elif getattr(delta, 'refusal', None): yield f"[Refusal: {delta.refusal}]" -async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]: +async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, images: Optional[List[Dict[str, Any]]] = None) -> AsyncGenerator[str, None]: attachments = attachments or [] # Use new Google GenAI SDK (google-genai) from google import genai @@ -293,31 +283,56 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments: tools = [types.Tool(google_search=types.GoogleSearch())] # Configure Generation - gen_config = types.GenerateContentConfig( - temperature=config.temperature, - max_output_tokens=config.max_tokens, - system_instruction=config.system_prompt, - tools=tools - ) + gen_config_kwargs = { + "temperature": config.temperature, + "system_instruction": config.system_prompt, + "tools": tools, + } + if config.max_tokens: + gen_config_kwargs["max_output_tokens"] = config.max_tokens + gen_config = types.GenerateContentConfig(**gen_config_kwargs) - # If attachments present, send as a single generate_content call (non-streaming) - if attachments: - parts = [] - for att in attachments: - uri = att.get("uri") - mime = att.get("mime") or "application/octet-stream" - if uri: - try: - parts.append(types.Part.from_uri(uri, mime_type=mime)) - except Exception: - parts.append(types.Part(text=f"[file attached: {uri}]")) + # If attachments or images present, use non-streaming generate_content + # but preserve multi-turn conversation structure + if attachments or images: + import base64 as _b64 + + # Build proper multi-turn contents with images in the last user message + contents = [] for msg in messages: - parts.append(types.Part(text=msg.content)) - logger.debug("gemini: sending attachments: %s", [att.get('uri') for att in attachments]) + role = "user" if msg.role == Role.USER else "model" + contents.append(types.Content( + role=role, + parts=[types.Part(text=msg.content)] + )) + + # Find last user message and inject images + attachments into its parts + for i in range(len(contents) - 1, -1, -1): + if contents[i].role == "user": + extra_parts = [] + for att in attachments: + uri = att.get("uri") + mime = att.get("mime") or "application/octet-stream" + if uri: + try: + extra_parts.append(types.Part.from_uri(uri, mime_type=mime)) + except Exception: + extra_parts.append(types.Part(text=f"[file attached: {uri}]")) + if images: + for img in images: + raw_bytes = _b64.b64decode(img["data"]) + extra_parts.append(types.Part(inline_data=types.Blob(mime_type=img["mime"], data=raw_bytes))) + contents[i] = types.Content( + role="user", + parts=list(contents[i].parts) + extra_parts + ) + break + + logger.debug("gemini: sending attachments=%d images=%d contents=%d", len(attachments), len(images or []), len(contents)) try: response = await client.aio.models.generate_content( model=config.model_name, - contents=[types.Content(role="user", parts=parts)], + contents=contents, config=gen_config ) if response and getattr(response, "text", None): @@ -358,8 +373,9 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments: if chunk.text: yield chunk.text -async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]: +async def stream_claude(messages: list[Message], config: LLMConfig, attachments: Optional[List[Dict[str, Any]]] = None, images: Optional[List[Dict[str, Any]]] = None) -> AsyncGenerator[str, None]: client = get_anthropic_client(config.api_key) + attachments = attachments or [] # Separate system messages from conversation messages system_parts = [] @@ -391,23 +407,101 @@ async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGene if not merged: merged.append({"role": "user", "content": "Hello"}) + # Inject images into last user message (Claude vision format) + if images and merged: + for i in range(len(merged) - 1, -1, -1): + if merged[i]["role"] == "user": + text_content = merged[i]["content"] + # Convert from string to content blocks array + content_blocks = [{"type": "text", "text": text_content}] + for img in images: + content_blocks.append({ + "type": "image", + "source": { + "type": "base64", + "media_type": img["mime"], + "data": img["data"], + } + }) + merged[i]["content"] = content_blocks + break + + # Inject document attachments into last user message + has_file_references = False + if attachments and merged: + import base64 as _b64 + for i in range(len(merged) - 1, -1, -1): + if merged[i]["role"] == "user": + # Ensure content is a list of blocks (images may have already converted it) + if isinstance(merged[i]["content"], str): + merged[i]["content"] = [{"type": "text", "text": merged[i]["content"]}] + + for att in attachments: + file_id = att.get("file_id") + data_b64 = att.get("data_base64") + mime = (att.get("mime") or "").lower() + name = att.get("name", "file") + + if file_id: + # Use Anthropic Files API reference (requires beta) + merged[i]["content"].append({ + "type": "document", + "source": {"type": "file", "file_id": file_id}, + "title": name, + }) + has_file_references = True + elif data_b64 and mime == "application/pdf": + # Inline base64 PDF + merged[i]["content"].append({ + "type": "document", + "source": { + "type": "base64", + "media_type": "application/pdf", + "data": data_b64, + }, + "title": name, + }) + elif data_b64: + # Text-like file: decode and inject as text block + try: + text = _b64.b64decode(data_b64).decode("utf-8", errors="replace") + merged[i]["content"].append({ + "type": "text", + "text": f"--- {name} ---\n{text}", + }) + except Exception: + logger.warning("Failed to decode attachment %s as text", name) + break + system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN - async with client.messages.stream( + stream_params = dict( model=config.model_name, - max_tokens=config.max_tokens, + max_tokens=config.max_tokens or 16384, temperature=config.temperature, system=system_text, messages=merged, - ) as stream: - async for text in stream.text_stream: - yield text + ) + + if has_file_references: + # Use beta endpoint for Files API references + async with client.beta.messages.stream( + **stream_params, + betas=["files-api-2025-04-14"], + ) as stream: + async for text in stream.text_stream: + yield text + else: + async with client.messages.stream(**stream_params) as stream: + async for text in stream.text_stream: + yield text async def stream_openrouter( messages: list[Message], config: LLMConfig, openrouter_api_key: str, + images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: """Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API.""" client = get_openrouter_client(openrouter_api_key) @@ -421,13 +515,28 @@ async def stream_openrouter( for msg in messages: openai_messages.append({"role": msg.role.value, "content": msg.content}) - stream = await client.chat.completions.create( - model=openrouter_model, - messages=openai_messages, - stream=True, - max_tokens=config.max_tokens, - temperature=config.temperature, - ) + # Inject images into last user message (OpenAI Chat Completions format) + if images and openai_messages: + for i in range(len(openai_messages) - 1, -1, -1): + if openai_messages[i]["role"] == "user": + text_content = openai_messages[i]["content"] + openai_messages[i]["content"] = [ + {"type": "text", "text": text_content}, + ] + [ + {"type": "image_url", "image_url": {"url": f"data:{img['mime']};base64,{img['data']}"}} + for img in images + ] + break + + or_params = { + "model": openrouter_model, + "messages": openai_messages, + "stream": True, + "temperature": config.temperature, + } + if config.max_tokens: + or_params["max_tokens"] = config.max_tokens + stream = await client.chat.completions.create(**or_params) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: @@ -443,6 +552,7 @@ async def llm_streamer( attachments: List[Dict[str, Any]] | None = None, tools: List[Dict[str, Any]] | None = None, openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: # 1. Merge Context + New User Prompt # We create a temporary list of messages for this inference @@ -457,21 +567,24 @@ async def llm_streamer( )) # 2. Call Provider + logger.debug("llm_streamer: provider=%s model=%s messages=%d images=%d", + config.provider, config.model_name, len(messages_to_send), len(images or [])) try: if config.provider == "openai": - async for chunk in stream_openai(messages_to_send, config, attachments, tools): + async for chunk in stream_openai(messages_to_send, config, attachments, tools, images=images): yield chunk elif config.provider == "google": - async for chunk in stream_google(messages_to_send, config, attachments): + async for chunk in stream_google(messages_to_send, config, attachments, images=images): yield chunk elif config.provider == "claude": - async for chunk in stream_claude(messages_to_send, config): + async for chunk in stream_claude(messages_to_send, config, attachments=attachments, images=images): yield chunk else: yield f"Error: Unsupported provider {config.provider}" except Exception as e: primary_error = str(e) - logger.warning("Primary provider failed: %s. Checking OpenRouter fallback...", primary_error) + logger.warning("Primary provider %s/%s failed: %s. Checking OpenRouter fallback...", + config.provider, config.model_name, primary_error) if not openrouter_api_key: yield f"Error calling LLM: {primary_error}" @@ -479,7 +592,7 @@ async def llm_streamer( try: logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name) - async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key): + async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key, images=images): yield chunk except Exception as fallback_error: logger.error("OpenRouter fallback also failed: %s", fallback_error) diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts index 665030b..6140339 100644 --- a/frontend/src/store/flowStore.ts +++ b/frontend/src/store/flowStore.ts @@ -129,6 +129,8 @@ export interface FileMeta { created_at: number; provider?: string; provider_file_id?: string; + anthropic_file_id?: string; + google_file_uri?: string; scopes?: string[]; // "project_path/node_id" composite keys } |
