summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-02-13 05:45:13 +0000
committerYurenHao0426 <blackhao0426@gmail.com>2026-02-13 05:45:13 +0000
commit61293147c1d6f1cdde689c36faad923b600a4f6e (patch)
tree9c773b13bd4f488ca0cbd1f5d646ba9ff7ab43ef
parent257b5bcbd09d4a6b7b1b27d7db4cc2aeed766c39 (diff)
Add Anthropic Files API and persistent Google file caching for all providers
- Add anthropic_file_id/google_file_uri fields to FileMeta (backend + frontend) - Eager upload to Anthropic and Google at file upload time (like OpenAI) - Cache and reuse file references in prepare_attachments for all 3 providers - Add document content block injection in stream_claude (file_id, base64, text fallback) - Conditional beta streaming for Anthropic Files API references - Persist on-demand upload results (changed flag + save_files_index) - Clean up file deletion for all providers (Anthropic warn-only, Google deduplicated) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
-rw-r--r--backend/app/main.py310
-rw-r--r--backend/app/services/llm.py325
-rw-r--r--frontend/src/store/flowStore.ts2
3 files changed, 470 insertions, 167 deletions
diff --git a/backend/app/main.py b/backend/app/main.py
index b0d6138..75491bb 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -1,4 +1,5 @@
import asyncio
+import base64
import logging
import tempfile
import time
@@ -8,7 +9,7 @@ from fastapi.responses import StreamingResponse, FileResponse
from fastapi import UploadFile, File, Form
from pydantic import BaseModel
from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort
-from app.services.llm import llm_streamer, generate_title, get_openai_client
+from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client
from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db
from app.auth.utils import get_password_hash
from dotenv import load_dotenv
@@ -232,6 +233,8 @@ class FileMeta(BaseModel):
provider_file_id: Optional[str] = None
openai_file_id: Optional[str] = None
openai_vector_store_id: Optional[str] = None
+ anthropic_file_id: Optional[str] = None
+ google_file_uri: Optional[str] = None
# Scopes for filtering: "project_path/node_id" composite keys
scopes: List[str] = []
@@ -287,6 +290,53 @@ def smart_merge_messages(messages: list[Message]) -> list[Message]:
merged.append(current_msg)
return merged
+IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
+
+ANTHROPIC_DOCUMENT_MIME_TYPES = {
+ "application/pdf", "text/plain", "text/html", "text/csv",
+ "text/xml", "text/markdown", "text/css", "text/javascript",
+ "application/javascript", "application/xml",
+}
+
+def extract_image_attachments(user: str, attached_ids: List[str]) -> tuple[List[dict], List[str]]:
+ """
+ Separate image files from non-image files in attached_ids.
+ Returns (images, remaining_non_image_ids).
+ Each image dict: {"mime": str, "data": str (base64), "name": str}
+ """
+ if not attached_ids:
+ return [], []
+
+ items = load_files_index(user)
+ items_map = {item.id: item for item in items}
+ images: List[dict] = []
+ non_image_ids: List[str] = []
+
+ for fid in attached_ids:
+ meta = items_map.get(fid)
+ if not meta:
+ non_image_ids.append(fid)
+ continue
+
+ if meta.mime in IMAGE_MIME_TYPES:
+ path = os.path.join(files_root(user), fid)
+ if not os.path.exists(path):
+ logger.warning("Image file missing on disk, skipping: %s", fid)
+ continue
+ with open(path, "rb") as f:
+ raw = f.read()
+ images.append({
+ "mime": meta.mime,
+ "data": base64.b64encode(raw).decode("utf-8"),
+ "name": meta.name,
+ })
+ else:
+ non_image_ids.append(fid)
+
+ logger.debug("extract_image_attachments: %d images, %d non-image files", len(images), len(non_image_ids))
+ return images, non_image_ids
+
+
@app.post("/api/run_node_stream")
async def run_node_stream(
request: NodeRunRequest,
@@ -304,7 +354,10 @@ async def run_node_stream(
# Get username for file operations
username = current_user.username if current_user else DEFAULT_USER
-
+
+ # Extract images from attached files (separate from non-image files)
+ images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids)
+
# 1. Concatenate all incoming contexts first
raw_messages = []
for ctx in request.incoming_contexts:
@@ -326,7 +379,7 @@ async def run_node_stream(
if request.config.provider == ModelProvider.OPENAI:
vs_ids, debug_refs, filters = await prepare_openai_vector_search(
user=username,
- attached_ids=request.attached_file_ids,
+ attached_ids=non_image_file_ids,
scopes=request.scopes,
llm_config=request.config,
)
@@ -352,14 +405,21 @@ async def run_node_stream(
attachments = await prepare_attachments(
user=username,
target_provider=request.config.provider,
- attached_ids=request.attached_file_ids,
+ attached_ids=non_image_file_ids,
+ llm_config=request.config,
+ )
+ elif request.config.provider == ModelProvider.CLAUDE:
+ attachments = await prepare_attachments(
+ user=username,
+ target_provider=request.config.provider,
+ attached_ids=non_image_file_ids,
llm_config=request.config,
)
openrouter_key = get_user_api_key(current_user, "openrouter")
return StreamingResponse(
- llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key),
+ llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key, images=images),
media_type="text/event-stream"
)
@@ -602,6 +662,95 @@ async def ensure_openai_file_and_index(user: str, meta: FileMeta, path: str, llm
await add_file_to_vector_store(vs_id, file_id, client=client)
return file_id, vs_id
+async def ensure_anthropic_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]:
+ """
+ Upload a file to the Anthropic Files API (beta).
+ Returns the anthropic file_id on success, or None on skip/failure.
+ """
+ try:
+ client = get_anthropic_client(api_key)
+ except Exception:
+ logger.debug("Skipping Anthropic file upload: no API key available")
+ return None
+
+ mime = (meta.mime or "").lower()
+ if mime in IMAGE_MIME_TYPES or mime not in ANTHROPIC_DOCUMENT_MIME_TYPES:
+ logger.debug("Skipping Anthropic file upload for unsupported mime: %s", mime)
+ return None
+
+ try:
+ with open(file_path, "rb") as f:
+ content = f.read()
+ resp = await asyncio.to_thread(
+ client.beta.files.upload,
+ file=(meta.name or "upload.bin", content, mime),
+ )
+ file_id = getattr(resp, "id", None)
+ if file_id:
+ logger.info("Anthropic file uploaded: %s -> %s", meta.name, file_id)
+ return file_id
+ except Exception as e:
+ logger.warning("Anthropic file upload failed for %s: %s", meta.name, e)
+ return None
+
+
+async def ensure_google_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]:
+ """
+ Upload a file to Google GenAI Files API.
+ Returns the Google file URI on success, or None on skip/failure.
+ """
+ key = api_key or os.getenv("GOOGLE_API_KEY")
+ if not key:
+ logger.debug("Skipping Google file upload: no API key available")
+ return None
+
+ mime = (meta.mime or "").lower()
+ if mime in IMAGE_MIME_TYPES:
+ logger.debug("Skipping Google file upload for image: %s", mime)
+ return None
+
+ tmp_path = None
+ try:
+ client = genai.Client(api_key=key)
+ with open(file_path, "rb") as f:
+ content = f.read()
+ with tempfile.NamedTemporaryFile(delete=False) as tmp:
+ tmp.write(content)
+ tmp_path = tmp.name
+
+ google_resp = await asyncio.to_thread(
+ client.files.upload,
+ file=tmp_path,
+ config={"mimeType": meta.mime or "application/octet-stream"},
+ )
+ google_name = getattr(google_resp, "name", None)
+ google_uri = getattr(google_resp, "uri", None)
+
+ # Poll for ACTIVE
+ if google_name:
+ for _ in range(10):
+ try:
+ info = await asyncio.to_thread(client.files.get, name=google_name)
+ state = getattr(info, "state", None)
+ google_uri = getattr(info, "uri", google_uri)
+ if str(state).upper().endswith("ACTIVE") or state == "ACTIVE":
+ break
+ await asyncio.sleep(1)
+ except Exception:
+ await asyncio.sleep(1)
+
+ uri = google_uri or google_name
+ if uri:
+ logger.info("Google file uploaded: %s -> %s", meta.name, uri)
+ return uri
+ except Exception as e:
+ logger.warning("Google file upload failed for %s: %s", meta.name, e)
+ return None
+ finally:
+ if tmp_path and os.path.exists(tmp_path):
+ os.remove(tmp_path)
+
+
async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None):
if not vs_id or not file_id:
return
@@ -677,6 +826,7 @@ async def prepare_attachments(
items = load_files_index(user)
items_map = {item.id: item for item in items}
attachments: list[dict] = []
+ changed = False
if isinstance(target_provider, ModelProvider):
provider_norm = target_provider.value.lower()
@@ -732,71 +882,73 @@ async def prepare_attachments(
raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}")
elif provider_norm == ModelProvider.GOOGLE or provider_norm == "google":
- # Reuse uri/name if available and looks like a URI
- if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id:
+ # Reuse cached google_file_uri
+ cached_uri = meta.google_file_uri or (
+ meta.provider_file_id if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id else None
+ )
+ if cached_uri:
attachments.append({
"provider": "google",
- "uri": meta.provider_file_id,
+ "uri": cached_uri,
"name": meta.name,
"mime": meta.mime,
})
continue
- key = llm_config.api_key or os.getenv("GOOGLE_API_KEY")
- if not key:
- raise HTTPException(status_code=500, detail="Google API Key not found")
- client = genai.Client(api_key=key)
-
- tmp_path = None
- try:
- with open(path, "rb") as f:
- content = f.read()
- with tempfile.NamedTemporaryFile(delete=False) as tmp:
- tmp.write(content)
- tmp_path = tmp.name
-
- google_resp = await asyncio.to_thread(
- client.files.upload,
- file=tmp_path,
- config={"mimeType": meta.mime or "application/octet-stream"},
- )
- google_name = getattr(google_resp, "name", None)
- google_uri = getattr(google_resp, "uri", None)
-
- # Poll for ACTIVE and uri if missing
- if google_name:
- for _ in range(10):
- try:
- info = await asyncio.to_thread(client.files.get, name=google_name)
- state = getattr(info, "state", None)
- google_uri = getattr(info, "uri", google_uri)
- if str(state).upper().endswith("ACTIVE") or state == "ACTIVE":
- break
- await asyncio.sleep(1)
- except Exception:
- await asyncio.sleep(1)
- logger.debug("google upload: name=%s uri=%s", google_name, google_uri)
+ # On-demand upload via shared helper
+ uri = await ensure_google_file_upload(meta, path, llm_config.api_key)
+ if not uri:
+ raise HTTPException(status_code=500, detail=f"Google upload failed for {meta.name}")
+ meta.google_file_uri = uri
+ changed = True
+ attachments.append({
+ "provider": "google",
+ "uri": uri,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+
+ elif provider_norm == ModelProvider.CLAUDE or provider_norm == "claude":
+ # Reuse cached anthropic_file_id if available
+ if meta.anthropic_file_id:
+ attachments.append({
+ "provider": "claude",
+ "file_id": meta.anthropic_file_id,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+ continue
- uri = google_uri or google_name
- if not uri:
- raise HTTPException(status_code=500, detail="Google upload returned no uri/name")
+ # Try on-demand upload
+ anthropic_id = await ensure_anthropic_file_upload(meta, path, llm_config.api_key)
+ if anthropic_id:
+ meta.anthropic_file_id = anthropic_id
+ changed = True
attachments.append({
- "provider": "google",
- "uri": uri,
+ "provider": "claude",
+ "file_id": anthropic_id,
"name": meta.name,
"mime": meta.mime,
})
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}")
- finally:
- if tmp_path and os.path.exists(tmp_path):
- os.remove(tmp_path)
+ continue
+
+ # Fallback: inline base64 data
+ with open(path, "rb") as f:
+ raw = f.read()
+ attachments.append({
+ "provider": "claude",
+ "data_base64": base64.b64encode(raw).decode("utf-8"),
+ "name": meta.name,
+ "mime": meta.mime,
+ })
else:
raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}")
+ # Persist index if any on-demand uploads updated meta (applies to Claude and Google)
+ if changed:
+ save_files_index(user, list(items_map.values()))
+
# Debug log
logger.debug("attachments: provider=%s count=%d detail=%s", provider_norm, len(attachments), [{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments])
return attachments
@@ -824,9 +976,11 @@ async def prepare_openai_vector_search(
# Determine which files to include - combine scopes AND attached_ids
relevant_files_map: dict[str, FileMeta] = {}
- # First: add files matching scopes
+ # First: add files matching scopes (skip image files — they are handled inline)
if scopes:
for item in items:
+ if item.mime in IMAGE_MIME_TYPES:
+ continue
if item.scopes and any(s in scopes for s in item.scopes):
relevant_files_map[item.id] = item
logger.debug("file_search: scopes=%s matched_files=%s", scopes, [f.name for f in relevant_files_map.values()])
@@ -1026,7 +1180,11 @@ async def upload_file(
)
# Always try to index into OpenAI vector store (if <=50MB)
- if size <= OPENAI_MAX_FILE_SIZE:
+ # Skip image files — they are sent inline to vision-capable models, not via file_search
+ mime_for_check = (file.content_type or "application/octet-stream").lower()
+ if mime_for_check in IMAGE_MIME_TYPES:
+ logger.debug("Skipping OpenAI vector store indexing for image file: %s", file_name)
+ elif size <= OPENAI_MAX_FILE_SIZE:
try:
openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, dest_path, None)
meta.openai_file_id = openai_file_id
@@ -1038,6 +1196,19 @@ async def upload_file(
else:
logger.warning("Skipping OpenAI indexing for %s: exceeds 50MB", file_name)
+ # Anthropic Files API upload (eager, like OpenAI)
+ anthropic_id = await ensure_anthropic_file_upload(meta, dest_path)
+ if anthropic_id:
+ meta.anthropic_file_id = anthropic_id
+
+ # Google Files API upload (eager, like OpenAI)
+ google_uri = await ensure_google_file_upload(meta, dest_path)
+ if google_uri:
+ meta.google_file_uri = google_uri
+ # Also backfill provider_file_id for legacy compat if provider was google
+ if provider_normalized == "google" and not meta.provider_file_id:
+ meta.provider_file_id = google_uri
+
items.append(meta)
save_files_index(user, items)
return {"file": meta}
@@ -1073,17 +1244,34 @@ async def delete_file(user: str = DEFAULT_USER, file_id: str = ""):
await client.files.delete(meta.provider_file_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"OpenAI delete failed: {str(e)}")
+ # Delete from Google Files API — collect all distinct Google refs
+ google_refs = set()
if meta.provider == "google" and meta.provider_file_id:
+ google_refs.add(meta.provider_file_id)
+ if meta.google_file_uri:
+ google_refs.add(meta.google_file_uri)
+ for g_ref in google_refs:
try:
key = os.getenv("GOOGLE_API_KEY")
if not key:
- raise HTTPException(status_code=500, detail="Google API Key not found")
+ logger.warning("Skipping Google file deletion: no API key")
+ break
client = genai.Client(api_key=key)
- await asyncio.to_thread(client.files.delete, meta.provider_file_id)
- except HTTPException:
- raise
+ await asyncio.to_thread(client.files.delete, g_ref)
+ logger.info("Google file deleted: %s", g_ref)
+ except Exception as e:
+ logger.warning("Google file deletion failed for %s: %s", g_ref, e)
+
+ # Delete from Anthropic Files API if present
+ if meta.anthropic_file_id:
+ try:
+ client = get_anthropic_client()
+ await asyncio.to_thread(
+ client.beta.files.delete, meta.anthropic_file_id
+ )
+ logger.info("Anthropic file deleted: %s", meta.anthropic_file_id)
except Exception as e:
- raise HTTPException(status_code=500, detail=f"Google delete failed: {str(e)}")
+ logger.warning("Anthropic file deletion failed for %s: %s", meta.anthropic_file_id, e)
path = os.path.join(files_root(user), file_id)
if os.path.exists(path):
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py
index 2eb69ed..7efdce0 100644
--- a/backend/app/services/llm.py
+++ b/backend/app/services/llm.py
@@ -68,6 +68,7 @@ async def stream_openai(
config: LLMConfig,
attachments: Optional[List[Dict[str, Any]]] = None,
tools: Optional[List[Dict[str, Any]]] = None,
+ images: Optional[List[Dict[str, Any]]] = None,
) -> AsyncGenerator[str, None]:
client = get_openai_client(config.api_key)
attachments = attachments or []
@@ -98,9 +99,10 @@ async def stream_openai(
# 2. User wants web search AND model is capable of Responses API
# 3. Attachments are present (Responses supports input_file)
use_responses_api = (
- config.model_name in responses_only_models or
+ config.model_name in responses_only_models or
(config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
(attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
+ (images and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
(tools)
)
@@ -126,6 +128,18 @@ async def stream_openai(
]
})
+ # Inject images into last user message
+ if images and input_messages:
+ # Find the last user message to inject images into
+ for i in range(len(input_messages) - 1, -1, -1):
+ if input_messages[i]["role"] == "user":
+ for img in images:
+ input_messages[i]["content"].append({
+ "type": "input_image",
+ "image_url": f"data:{img['mime']};base64,{img['data']}"
+ })
+ break
+
# Append attachments as separate user message (files only)
file_parts = []
for att in attachments:
@@ -143,90 +157,52 @@ async def stream_openai(
resp_params = {
"model": config.model_name,
"input": input_messages, # Full conversation history
- "stream": False, # Get full output in one call
- "background": False,
+ "stream": True,
"store": True,
"tool_choice": "auto",
}
if tools:
resp_params["tools"] = tools
resp_params["tool_choice"] = "auto"
- # Optional: include results for debugging / citations
- resp_params["include"] = ["file_search_call.results"]
-
+
# Add reasoning effort (not supported by chat-latest models)
models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest']
if config.model_name not in models_without_effort:
resp_params["reasoning"] = {"effort": config.reasoning_effort.value}
-
- # Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag)
- # IMPORTANT: Append to existing tools instead of overwriting
+
+ # Enable Web Search if requested
if config.enable_google_search:
if resp_params.get("tools"):
resp_params["tools"].append({"type": "web_search"})
else:
resp_params["tools"] = [{"type": "web_search"}]
resp_params["tool_choice"] = "auto"
-
+
if config.system_prompt:
resp_params["instructions"] = config.system_prompt
- # Debug: print final tools being sent
- logger.debug("responses: final tools: %s", resp_params.get('tools'))
-
- # 1. Create Response (non-background)
- initial_resp = await client.responses.create(**resp_params)
- response_id = initial_resp.id
-
- # 2. Poll for Completion
- import asyncio
- for _ in range(300):
- final_resp = await client.responses.retrieve(response_id)
-
- if final_resp.status == 'completed':
- # Debug: log outputs and tool calls
- try:
- outs = getattr(final_resp, "output", [])
- logger.debug("responses: output items: %s", [getattr(o, 'type', None) for o in outs])
- for o in outs:
- if getattr(o, "type", None) == "file_search_call":
- logger.debug("responses: file_search_call: %s", o)
- except Exception as e:
- logger.debug("responses: failed to inspect output: %s", e)
-
- found_content = False
- if hasattr(final_resp, 'output'):
- for out in final_resp.output:
- out_type = getattr(out, 'type', None)
- out_content = getattr(out, 'content', None)
- logger.debug("responses: output item: type=%s, content=%s", out_type, out_content)
-
- if out_type == 'message' and out_content:
- for c in out_content:
- c_type = getattr(c, 'type', None)
- c_text = getattr(c, 'text', None)
- logger.debug("responses: content item: type=%s, text=%s...", c_type, c_text[:100] if c_text else None)
- if c_type == 'output_text':
- text_val = getattr(c, 'text', None)
- if text_val:
- logger.debug("responses: yielding text: %s...", text_val[:50])
- yield text_val
- logger.debug("responses: yielded successfully")
- found_content = True
-
- if not found_content:
- logger.warning("responses: no content found! output=%s", final_resp.output)
- yield f"\n[Debug: Completed but no content extracted]"
- return
-
- elif final_resp.status in ['failed', 'cancelled', 'expired']:
- error_msg = getattr(final_resp, 'error', 'Unknown error')
- yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]"
- return
-
- await asyncio.sleep(2)
-
- yield "\n[Error: Polling timed out]"
+ logger.debug("responses: streaming, tools: %s", resp_params.get('tools'))
+
+ # Stream the response — yields text deltas as they arrive
+ stream = await client.responses.create(**resp_params)
+ async for event in stream:
+ evt_type = getattr(event, 'type', None)
+ if evt_type == 'response.output_text.delta':
+ delta = getattr(event, 'delta', '')
+ if delta:
+ yield delta
+ elif evt_type == 'response.completed':
+ resp_obj = getattr(event, 'response', None)
+ if resp_obj:
+ for out in getattr(resp_obj, 'output', []):
+ if getattr(out, 'type', None) == 'file_search_call':
+ logger.debug("responses: file_search_call: %s", out)
+ break
+ elif evt_type == 'response.failed':
+ resp_obj = getattr(event, 'response', None)
+ error_msg = getattr(resp_obj, 'error', None) if resp_obj else None
+ yield f"\n[Error: {error_msg or 'Response generation failed'}]"
+ break
return
# Standard Chat Completions API (attachments not supported here)
@@ -234,6 +210,19 @@ async def stream_openai(
yield "[Error] Attachments are only supported for Responses API-capable models."
return
+ # Inject images into last user message for Chat Completions format
+ if images and openai_messages:
+ for i in range(len(openai_messages) - 1, -1, -1):
+ if openai_messages[i]["role"] == "user":
+ text_content = openai_messages[i]["content"]
+ openai_messages[i]["content"] = [
+ {"type": "text", "text": text_content},
+ ] + [
+ {"type": "image_url", "image_url": {"url": f"data:{img['mime']};base64,{img['data']}"}}
+ for img in images
+ ]
+ break
+
# Prepare parameters
req_params = {
"model": config.model_name,
@@ -255,7 +244,8 @@ async def stream_openai(
# IMPORTANT: Reasoning models often DO NOT support 'temperature'.
# We skip adding it.
else:
- req_params["max_tokens"] = config.max_tokens
+ if config.max_tokens:
+ req_params["max_tokens"] = config.max_tokens
req_params["temperature"] = config.temperature
stream = await client.chat.completions.create(**req_params)
@@ -274,7 +264,7 @@ async def stream_openai(
elif getattr(delta, 'refusal', None):
yield f"[Refusal: {delta.refusal}]"
-async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]:
+async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None, images: Optional[List[Dict[str, Any]]] = None) -> AsyncGenerator[str, None]:
attachments = attachments or []
# Use new Google GenAI SDK (google-genai)
from google import genai
@@ -293,31 +283,56 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments:
tools = [types.Tool(google_search=types.GoogleSearch())]
# Configure Generation
- gen_config = types.GenerateContentConfig(
- temperature=config.temperature,
- max_output_tokens=config.max_tokens,
- system_instruction=config.system_prompt,
- tools=tools
- )
+ gen_config_kwargs = {
+ "temperature": config.temperature,
+ "system_instruction": config.system_prompt,
+ "tools": tools,
+ }
+ if config.max_tokens:
+ gen_config_kwargs["max_output_tokens"] = config.max_tokens
+ gen_config = types.GenerateContentConfig(**gen_config_kwargs)
- # If attachments present, send as a single generate_content call (non-streaming)
- if attachments:
- parts = []
- for att in attachments:
- uri = att.get("uri")
- mime = att.get("mime") or "application/octet-stream"
- if uri:
- try:
- parts.append(types.Part.from_uri(uri, mime_type=mime))
- except Exception:
- parts.append(types.Part(text=f"[file attached: {uri}]"))
+ # If attachments or images present, use non-streaming generate_content
+ # but preserve multi-turn conversation structure
+ if attachments or images:
+ import base64 as _b64
+
+ # Build proper multi-turn contents with images in the last user message
+ contents = []
for msg in messages:
- parts.append(types.Part(text=msg.content))
- logger.debug("gemini: sending attachments: %s", [att.get('uri') for att in attachments])
+ role = "user" if msg.role == Role.USER else "model"
+ contents.append(types.Content(
+ role=role,
+ parts=[types.Part(text=msg.content)]
+ ))
+
+ # Find last user message and inject images + attachments into its parts
+ for i in range(len(contents) - 1, -1, -1):
+ if contents[i].role == "user":
+ extra_parts = []
+ for att in attachments:
+ uri = att.get("uri")
+ mime = att.get("mime") or "application/octet-stream"
+ if uri:
+ try:
+ extra_parts.append(types.Part.from_uri(uri, mime_type=mime))
+ except Exception:
+ extra_parts.append(types.Part(text=f"[file attached: {uri}]"))
+ if images:
+ for img in images:
+ raw_bytes = _b64.b64decode(img["data"])
+ extra_parts.append(types.Part(inline_data=types.Blob(mime_type=img["mime"], data=raw_bytes)))
+ contents[i] = types.Content(
+ role="user",
+ parts=list(contents[i].parts) + extra_parts
+ )
+ break
+
+ logger.debug("gemini: sending attachments=%d images=%d contents=%d", len(attachments), len(images or []), len(contents))
try:
response = await client.aio.models.generate_content(
model=config.model_name,
- contents=[types.Content(role="user", parts=parts)],
+ contents=contents,
config=gen_config
)
if response and getattr(response, "text", None):
@@ -358,8 +373,9 @@ async def stream_google(messages: list[Message], config: LLMConfig, attachments:
if chunk.text:
yield chunk.text
-async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]:
+async def stream_claude(messages: list[Message], config: LLMConfig, attachments: Optional[List[Dict[str, Any]]] = None, images: Optional[List[Dict[str, Any]]] = None) -> AsyncGenerator[str, None]:
client = get_anthropic_client(config.api_key)
+ attachments = attachments or []
# Separate system messages from conversation messages
system_parts = []
@@ -391,23 +407,101 @@ async def stream_claude(messages: list[Message], config: LLMConfig) -> AsyncGene
if not merged:
merged.append({"role": "user", "content": "Hello"})
+ # Inject images into last user message (Claude vision format)
+ if images and merged:
+ for i in range(len(merged) - 1, -1, -1):
+ if merged[i]["role"] == "user":
+ text_content = merged[i]["content"]
+ # Convert from string to content blocks array
+ content_blocks = [{"type": "text", "text": text_content}]
+ for img in images:
+ content_blocks.append({
+ "type": "image",
+ "source": {
+ "type": "base64",
+ "media_type": img["mime"],
+ "data": img["data"],
+ }
+ })
+ merged[i]["content"] = content_blocks
+ break
+
+ # Inject document attachments into last user message
+ has_file_references = False
+ if attachments and merged:
+ import base64 as _b64
+ for i in range(len(merged) - 1, -1, -1):
+ if merged[i]["role"] == "user":
+ # Ensure content is a list of blocks (images may have already converted it)
+ if isinstance(merged[i]["content"], str):
+ merged[i]["content"] = [{"type": "text", "text": merged[i]["content"]}]
+
+ for att in attachments:
+ file_id = att.get("file_id")
+ data_b64 = att.get("data_base64")
+ mime = (att.get("mime") or "").lower()
+ name = att.get("name", "file")
+
+ if file_id:
+ # Use Anthropic Files API reference (requires beta)
+ merged[i]["content"].append({
+ "type": "document",
+ "source": {"type": "file", "file_id": file_id},
+ "title": name,
+ })
+ has_file_references = True
+ elif data_b64 and mime == "application/pdf":
+ # Inline base64 PDF
+ merged[i]["content"].append({
+ "type": "document",
+ "source": {
+ "type": "base64",
+ "media_type": "application/pdf",
+ "data": data_b64,
+ },
+ "title": name,
+ })
+ elif data_b64:
+ # Text-like file: decode and inject as text block
+ try:
+ text = _b64.b64decode(data_b64).decode("utf-8", errors="replace")
+ merged[i]["content"].append({
+ "type": "text",
+ "text": f"--- {name} ---\n{text}",
+ })
+ except Exception:
+ logger.warning("Failed to decode attachment %s as text", name)
+ break
+
system_text = "\n\n".join(system_parts) if system_parts else anthropic.NOT_GIVEN
- async with client.messages.stream(
+ stream_params = dict(
model=config.model_name,
- max_tokens=config.max_tokens,
+ max_tokens=config.max_tokens or 16384,
temperature=config.temperature,
system=system_text,
messages=merged,
- ) as stream:
- async for text in stream.text_stream:
- yield text
+ )
+
+ if has_file_references:
+ # Use beta endpoint for Files API references
+ async with client.beta.messages.stream(
+ **stream_params,
+ betas=["files-api-2025-04-14"],
+ ) as stream:
+ async for text in stream.text_stream:
+ yield text
+ else:
+ async with client.messages.stream(**stream_params) as stream:
+ async for text in stream.text_stream:
+ yield text
async def stream_openrouter(
messages: list[Message],
config: LLMConfig,
openrouter_api_key: str,
+ images: Optional[List[Dict[str, Any]]] = None,
) -> AsyncGenerator[str, None]:
"""Stream via OpenRouter fallback using OpenAI-compatible Chat Completions API."""
client = get_openrouter_client(openrouter_api_key)
@@ -421,13 +515,28 @@ async def stream_openrouter(
for msg in messages:
openai_messages.append({"role": msg.role.value, "content": msg.content})
- stream = await client.chat.completions.create(
- model=openrouter_model,
- messages=openai_messages,
- stream=True,
- max_tokens=config.max_tokens,
- temperature=config.temperature,
- )
+ # Inject images into last user message (OpenAI Chat Completions format)
+ if images and openai_messages:
+ for i in range(len(openai_messages) - 1, -1, -1):
+ if openai_messages[i]["role"] == "user":
+ text_content = openai_messages[i]["content"]
+ openai_messages[i]["content"] = [
+ {"type": "text", "text": text_content},
+ ] + [
+ {"type": "image_url", "image_url": {"url": f"data:{img['mime']};base64,{img['data']}"}}
+ for img in images
+ ]
+ break
+
+ or_params = {
+ "model": openrouter_model,
+ "messages": openai_messages,
+ "stream": True,
+ "temperature": config.temperature,
+ }
+ if config.max_tokens:
+ or_params["max_tokens"] = config.max_tokens
+ stream = await client.chat.completions.create(**or_params)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta:
@@ -443,6 +552,7 @@ async def llm_streamer(
attachments: List[Dict[str, Any]] | None = None,
tools: List[Dict[str, Any]] | None = None,
openrouter_api_key: Optional[str] = None,
+ images: Optional[List[Dict[str, Any]]] = None,
) -> AsyncGenerator[str, None]:
# 1. Merge Context + New User Prompt
# We create a temporary list of messages for this inference
@@ -457,21 +567,24 @@ async def llm_streamer(
))
# 2. Call Provider
+ logger.debug("llm_streamer: provider=%s model=%s messages=%d images=%d",
+ config.provider, config.model_name, len(messages_to_send), len(images or []))
try:
if config.provider == "openai":
- async for chunk in stream_openai(messages_to_send, config, attachments, tools):
+ async for chunk in stream_openai(messages_to_send, config, attachments, tools, images=images):
yield chunk
elif config.provider == "google":
- async for chunk in stream_google(messages_to_send, config, attachments):
+ async for chunk in stream_google(messages_to_send, config, attachments, images=images):
yield chunk
elif config.provider == "claude":
- async for chunk in stream_claude(messages_to_send, config):
+ async for chunk in stream_claude(messages_to_send, config, attachments=attachments, images=images):
yield chunk
else:
yield f"Error: Unsupported provider {config.provider}"
except Exception as e:
primary_error = str(e)
- logger.warning("Primary provider failed: %s. Checking OpenRouter fallback...", primary_error)
+ logger.warning("Primary provider %s/%s failed: %s. Checking OpenRouter fallback...",
+ config.provider, config.model_name, primary_error)
if not openrouter_api_key:
yield f"Error calling LLM: {primary_error}"
@@ -479,7 +592,7 @@ async def llm_streamer(
try:
logger.info("Falling back to OpenRouter for %s/%s", config.provider, config.model_name)
- async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key):
+ async for chunk in stream_openrouter(messages_to_send, config, openrouter_api_key, images=images):
yield chunk
except Exception as fallback_error:
logger.error("OpenRouter fallback also failed: %s", fallback_error)
diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts
index 665030b..6140339 100644
--- a/frontend/src/store/flowStore.ts
+++ b/frontend/src/store/flowStore.ts
@@ -129,6 +129,8 @@ export interface FileMeta {
created_at: number;
provider?: string;
provider_file_id?: string;
+ anthropic_file_id?: string;
+ google_file_uri?: string;
scopes?: string[]; // "project_path/node_id" composite keys
}