summaryrefslogtreecommitdiff
path: root/backend/app/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/app/main.py')
-rw-r--r--backend/app/main.py310
1 files changed, 249 insertions, 61 deletions
diff --git a/backend/app/main.py b/backend/app/main.py
index b0d6138..75491bb 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -1,4 +1,5 @@
import asyncio
+import base64
import logging
import tempfile
import time
@@ -8,7 +9,7 @@ from fastapi.responses import StreamingResponse, FileResponse
from fastapi import UploadFile, File, Form
from pydantic import BaseModel
from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort
-from app.services.llm import llm_streamer, generate_title, get_openai_client
+from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client
from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db
from app.auth.utils import get_password_hash
from dotenv import load_dotenv
@@ -232,6 +233,8 @@ class FileMeta(BaseModel):
provider_file_id: Optional[str] = None
openai_file_id: Optional[str] = None
openai_vector_store_id: Optional[str] = None
+ anthropic_file_id: Optional[str] = None
+ google_file_uri: Optional[str] = None
# Scopes for filtering: "project_path/node_id" composite keys
scopes: List[str] = []
@@ -287,6 +290,53 @@ def smart_merge_messages(messages: list[Message]) -> list[Message]:
merged.append(current_msg)
return merged
+IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
+
+ANTHROPIC_DOCUMENT_MIME_TYPES = {
+ "application/pdf", "text/plain", "text/html", "text/csv",
+ "text/xml", "text/markdown", "text/css", "text/javascript",
+ "application/javascript", "application/xml",
+}
+
+def extract_image_attachments(user: str, attached_ids: List[str]) -> tuple[List[dict], List[str]]:
+ """
+ Separate image files from non-image files in attached_ids.
+ Returns (images, remaining_non_image_ids).
+ Each image dict: {"mime": str, "data": str (base64), "name": str}
+ """
+ if not attached_ids:
+ return [], []
+
+ items = load_files_index(user)
+ items_map = {item.id: item for item in items}
+ images: List[dict] = []
+ non_image_ids: List[str] = []
+
+ for fid in attached_ids:
+ meta = items_map.get(fid)
+ if not meta:
+ non_image_ids.append(fid)
+ continue
+
+ if meta.mime in IMAGE_MIME_TYPES:
+ path = os.path.join(files_root(user), fid)
+ if not os.path.exists(path):
+ logger.warning("Image file missing on disk, skipping: %s", fid)
+ continue
+ with open(path, "rb") as f:
+ raw = f.read()
+ images.append({
+ "mime": meta.mime,
+ "data": base64.b64encode(raw).decode("utf-8"),
+ "name": meta.name,
+ })
+ else:
+ non_image_ids.append(fid)
+
+ logger.debug("extract_image_attachments: %d images, %d non-image files", len(images), len(non_image_ids))
+ return images, non_image_ids
+
+
@app.post("/api/run_node_stream")
async def run_node_stream(
request: NodeRunRequest,
@@ -304,7 +354,10 @@ async def run_node_stream(
# Get username for file operations
username = current_user.username if current_user else DEFAULT_USER
-
+
+ # Extract images from attached files (separate from non-image files)
+ images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids)
+
# 1. Concatenate all incoming contexts first
raw_messages = []
for ctx in request.incoming_contexts:
@@ -326,7 +379,7 @@ async def run_node_stream(
if request.config.provider == ModelProvider.OPENAI:
vs_ids, debug_refs, filters = await prepare_openai_vector_search(
user=username,
- attached_ids=request.attached_file_ids,
+ attached_ids=non_image_file_ids,
scopes=request.scopes,
llm_config=request.config,
)
@@ -352,14 +405,21 @@ async def run_node_stream(
attachments = await prepare_attachments(
user=username,
target_provider=request.config.provider,
- attached_ids=request.attached_file_ids,
+ attached_ids=non_image_file_ids,
+ llm_config=request.config,
+ )
+ elif request.config.provider == ModelProvider.CLAUDE:
+ attachments = await prepare_attachments(
+ user=username,
+ target_provider=request.config.provider,
+ attached_ids=non_image_file_ids,
llm_config=request.config,
)
openrouter_key = get_user_api_key(current_user, "openrouter")
return StreamingResponse(
- llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key),
+ llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key, images=images),
media_type="text/event-stream"
)
@@ -602,6 +662,95 @@ async def ensure_openai_file_and_index(user: str, meta: FileMeta, path: str, llm
await add_file_to_vector_store(vs_id, file_id, client=client)
return file_id, vs_id
+async def ensure_anthropic_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]:
+ """
+ Upload a file to the Anthropic Files API (beta).
+ Returns the anthropic file_id on success, or None on skip/failure.
+ """
+ try:
+ client = get_anthropic_client(api_key)
+ except Exception:
+ logger.debug("Skipping Anthropic file upload: no API key available")
+ return None
+
+ mime = (meta.mime or "").lower()
+ if mime in IMAGE_MIME_TYPES or mime not in ANTHROPIC_DOCUMENT_MIME_TYPES:
+ logger.debug("Skipping Anthropic file upload for unsupported mime: %s", mime)
+ return None
+
+ try:
+ with open(file_path, "rb") as f:
+ content = f.read()
+ resp = await asyncio.to_thread(
+ client.beta.files.upload,
+ file=(meta.name or "upload.bin", content, mime),
+ )
+ file_id = getattr(resp, "id", None)
+ if file_id:
+ logger.info("Anthropic file uploaded: %s -> %s", meta.name, file_id)
+ return file_id
+ except Exception as e:
+ logger.warning("Anthropic file upload failed for %s: %s", meta.name, e)
+ return None
+
+
+async def ensure_google_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]:
+ """
+ Upload a file to Google GenAI Files API.
+ Returns the Google file URI on success, or None on skip/failure.
+ """
+ key = api_key or os.getenv("GOOGLE_API_KEY")
+ if not key:
+ logger.debug("Skipping Google file upload: no API key available")
+ return None
+
+ mime = (meta.mime or "").lower()
+ if mime in IMAGE_MIME_TYPES:
+ logger.debug("Skipping Google file upload for image: %s", mime)
+ return None
+
+ tmp_path = None
+ try:
+ client = genai.Client(api_key=key)
+ with open(file_path, "rb") as f:
+ content = f.read()
+ with tempfile.NamedTemporaryFile(delete=False) as tmp:
+ tmp.write(content)
+ tmp_path = tmp.name
+
+ google_resp = await asyncio.to_thread(
+ client.files.upload,
+ file=tmp_path,
+ config={"mimeType": meta.mime or "application/octet-stream"},
+ )
+ google_name = getattr(google_resp, "name", None)
+ google_uri = getattr(google_resp, "uri", None)
+
+ # Poll for ACTIVE
+ if google_name:
+ for _ in range(10):
+ try:
+ info = await asyncio.to_thread(client.files.get, name=google_name)
+ state = getattr(info, "state", None)
+ google_uri = getattr(info, "uri", google_uri)
+ if str(state).upper().endswith("ACTIVE") or state == "ACTIVE":
+ break
+ await asyncio.sleep(1)
+ except Exception:
+ await asyncio.sleep(1)
+
+ uri = google_uri or google_name
+ if uri:
+ logger.info("Google file uploaded: %s -> %s", meta.name, uri)
+ return uri
+ except Exception as e:
+ logger.warning("Google file upload failed for %s: %s", meta.name, e)
+ return None
+ finally:
+ if tmp_path and os.path.exists(tmp_path):
+ os.remove(tmp_path)
+
+
async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None):
if not vs_id or not file_id:
return
@@ -677,6 +826,7 @@ async def prepare_attachments(
items = load_files_index(user)
items_map = {item.id: item for item in items}
attachments: list[dict] = []
+ changed = False
if isinstance(target_provider, ModelProvider):
provider_norm = target_provider.value.lower()
@@ -732,71 +882,73 @@ async def prepare_attachments(
raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}")
elif provider_norm == ModelProvider.GOOGLE or provider_norm == "google":
- # Reuse uri/name if available and looks like a URI
- if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id:
+ # Reuse cached google_file_uri
+ cached_uri = meta.google_file_uri or (
+ meta.provider_file_id if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id else None
+ )
+ if cached_uri:
attachments.append({
"provider": "google",
- "uri": meta.provider_file_id,
+ "uri": cached_uri,
"name": meta.name,
"mime": meta.mime,
})
continue
- key = llm_config.api_key or os.getenv("GOOGLE_API_KEY")
- if not key:
- raise HTTPException(status_code=500, detail="Google API Key not found")
- client = genai.Client(api_key=key)
-
- tmp_path = None
- try:
- with open(path, "rb") as f:
- content = f.read()
- with tempfile.NamedTemporaryFile(delete=False) as tmp:
- tmp.write(content)
- tmp_path = tmp.name
-
- google_resp = await asyncio.to_thread(
- client.files.upload,
- file=tmp_path,
- config={"mimeType": meta.mime or "application/octet-stream"},
- )
- google_name = getattr(google_resp, "name", None)
- google_uri = getattr(google_resp, "uri", None)
-
- # Poll for ACTIVE and uri if missing
- if google_name:
- for _ in range(10):
- try:
- info = await asyncio.to_thread(client.files.get, name=google_name)
- state = getattr(info, "state", None)
- google_uri = getattr(info, "uri", google_uri)
- if str(state).upper().endswith("ACTIVE") or state == "ACTIVE":
- break
- await asyncio.sleep(1)
- except Exception:
- await asyncio.sleep(1)
- logger.debug("google upload: name=%s uri=%s", google_name, google_uri)
+ # On-demand upload via shared helper
+ uri = await ensure_google_file_upload(meta, path, llm_config.api_key)
+ if not uri:
+ raise HTTPException(status_code=500, detail=f"Google upload failed for {meta.name}")
+ meta.google_file_uri = uri
+ changed = True
+ attachments.append({
+ "provider": "google",
+ "uri": uri,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+
+ elif provider_norm == ModelProvider.CLAUDE or provider_norm == "claude":
+ # Reuse cached anthropic_file_id if available
+ if meta.anthropic_file_id:
+ attachments.append({
+ "provider": "claude",
+ "file_id": meta.anthropic_file_id,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+ continue
- uri = google_uri or google_name
- if not uri:
- raise HTTPException(status_code=500, detail="Google upload returned no uri/name")
+ # Try on-demand upload
+ anthropic_id = await ensure_anthropic_file_upload(meta, path, llm_config.api_key)
+ if anthropic_id:
+ meta.anthropic_file_id = anthropic_id
+ changed = True
attachments.append({
- "provider": "google",
- "uri": uri,
+ "provider": "claude",
+ "file_id": anthropic_id,
"name": meta.name,
"mime": meta.mime,
})
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}")
- finally:
- if tmp_path and os.path.exists(tmp_path):
- os.remove(tmp_path)
+ continue
+
+ # Fallback: inline base64 data
+ with open(path, "rb") as f:
+ raw = f.read()
+ attachments.append({
+ "provider": "claude",
+ "data_base64": base64.b64encode(raw).decode("utf-8"),
+ "name": meta.name,
+ "mime": meta.mime,
+ })
else:
raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}")
+ # Persist index if any on-demand uploads updated meta (applies to Claude and Google)
+ if changed:
+ save_files_index(user, list(items_map.values()))
+
# Debug log
logger.debug("attachments: provider=%s count=%d detail=%s", provider_norm, len(attachments), [{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments])
return attachments
@@ -824,9 +976,11 @@ async def prepare_openai_vector_search(
# Determine which files to include - combine scopes AND attached_ids
relevant_files_map: dict[str, FileMeta] = {}
- # First: add files matching scopes
+ # First: add files matching scopes (skip image files — they are handled inline)
if scopes:
for item in items:
+ if item.mime in IMAGE_MIME_TYPES:
+ continue
if item.scopes and any(s in scopes for s in item.scopes):
relevant_files_map[item.id] = item
logger.debug("file_search: scopes=%s matched_files=%s", scopes, [f.name for f in relevant_files_map.values()])
@@ -1026,7 +1180,11 @@ async def upload_file(
)
# Always try to index into OpenAI vector store (if <=50MB)
- if size <= OPENAI_MAX_FILE_SIZE:
+ # Skip image files — they are sent inline to vision-capable models, not via file_search
+ mime_for_check = (file.content_type or "application/octet-stream").lower()
+ if mime_for_check in IMAGE_MIME_TYPES:
+ logger.debug("Skipping OpenAI vector store indexing for image file: %s", file_name)
+ elif size <= OPENAI_MAX_FILE_SIZE:
try:
openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, dest_path, None)
meta.openai_file_id = openai_file_id
@@ -1038,6 +1196,19 @@ async def upload_file(
else:
logger.warning("Skipping OpenAI indexing for %s: exceeds 50MB", file_name)
+ # Anthropic Files API upload (eager, like OpenAI)
+ anthropic_id = await ensure_anthropic_file_upload(meta, dest_path)
+ if anthropic_id:
+ meta.anthropic_file_id = anthropic_id
+
+ # Google Files API upload (eager, like OpenAI)
+ google_uri = await ensure_google_file_upload(meta, dest_path)
+ if google_uri:
+ meta.google_file_uri = google_uri
+ # Also backfill provider_file_id for legacy compat if provider was google
+ if provider_normalized == "google" and not meta.provider_file_id:
+ meta.provider_file_id = google_uri
+
items.append(meta)
save_files_index(user, items)
return {"file": meta}
@@ -1073,17 +1244,34 @@ async def delete_file(user: str = DEFAULT_USER, file_id: str = ""):
await client.files.delete(meta.provider_file_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"OpenAI delete failed: {str(e)}")
+ # Delete from Google Files API — collect all distinct Google refs
+ google_refs = set()
if meta.provider == "google" and meta.provider_file_id:
+ google_refs.add(meta.provider_file_id)
+ if meta.google_file_uri:
+ google_refs.add(meta.google_file_uri)
+ for g_ref in google_refs:
try:
key = os.getenv("GOOGLE_API_KEY")
if not key:
- raise HTTPException(status_code=500, detail="Google API Key not found")
+ logger.warning("Skipping Google file deletion: no API key")
+ break
client = genai.Client(api_key=key)
- await asyncio.to_thread(client.files.delete, meta.provider_file_id)
- except HTTPException:
- raise
+ await asyncio.to_thread(client.files.delete, g_ref)
+ logger.info("Google file deleted: %s", g_ref)
+ except Exception as e:
+ logger.warning("Google file deletion failed for %s: %s", g_ref, e)
+
+ # Delete from Anthropic Files API if present
+ if meta.anthropic_file_id:
+ try:
+ client = get_anthropic_client()
+ await asyncio.to_thread(
+ client.beta.files.delete, meta.anthropic_file_id
+ )
+ logger.info("Anthropic file deleted: %s", meta.anthropic_file_id)
except Exception as e:
- raise HTTPException(status_code=500, detail=f"Google delete failed: {str(e)}")
+ logger.warning("Anthropic file deletion failed for %s: %s", meta.anthropic_file_id, e)
path = os.path.join(files_root(user), file_id)
if os.path.exists(path):