diff options
Diffstat (limited to 'backend/app/main.py')
| -rw-r--r-- | backend/app/main.py | 553 |
1 files changed, 545 insertions, 8 deletions
diff --git a/backend/app/main.py b/backend/app/main.py index 886bd9e..be30333 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,16 +1,20 @@ +import asyncio +import tempfile +import time from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse -from fastapi import UploadFile, File +from fastapi import UploadFile, File, Form from pydantic import BaseModel from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort -from app.services.llm import llm_streamer, generate_title +from app.services.llm import llm_streamer, generate_title, get_openai_client from dotenv import load_dotenv import os import json import shutil from typing import List, Literal, Optional from uuid import uuid4 +from google import genai load_dotenv() @@ -28,6 +32,9 @@ app.add_middleware( DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data"))) DEFAULT_USER = "test" ARCHIVE_FILENAME = "archived_nodes.json" +VALID_FILE_PROVIDERS = {"local", "openai", "google"} +OPENAI_MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB limit per OpenAI docs +OPENAI_DEFAULT_FILE_PURPOSE = os.getenv("OPENAI_FILE_PURPOSE", "user_data") def ensure_user_root(user: str) -> str: """ @@ -152,6 +159,10 @@ class FileMeta(BaseModel): created_at: float provider: Optional[str] = None provider_file_id: Optional[str] = None + openai_file_id: Optional[str] = None + openai_vector_store_id: Optional[str] = None + # Scopes for filtering: "project_path/node_id" composite keys + scopes: List[str] = [] class FolderRequest(BaseModel): user: str = DEFAULT_USER @@ -225,8 +236,44 @@ async def run_node_stream(request: NodeRunRequest): execution_context = Context(messages=final_messages) + tools: List[dict] = [] + attachments: List[dict] = [] + + if request.config.provider == ModelProvider.OPENAI: + vs_ids, debug_refs, filters = await prepare_openai_vector_search( + user=DEFAULT_USER, + attached_ids=request.attached_file_ids, + scopes=request.scopes, + llm_config=request.config, + ) + # Always enable file_search if vector store exists (even without explicit attachments) + # This allows nodes to access files attached in previous nodes of the trace + if not vs_ids: + # Try to get user's vector store anyway + try: + client = get_openai_client(request.config.api_key) + vs_id = await ensure_user_vector_store(DEFAULT_USER, client) + if vs_id: + vs_ids = [vs_id] + except Exception as e: + print(f"[warn] Could not get vector store: {e}") + + if vs_ids: + tool_def = {"type": "file_search", "vector_store_ids": vs_ids} + if filters: + tool_def["filters"] = filters + tools.append(tool_def) + print(f"[openai file_search] vs_ids={vs_ids} refs={debug_refs} filters={filters}") + elif request.config.provider == ModelProvider.GOOGLE: + attachments = await prepare_attachments( + user=DEFAULT_USER, + target_provider=request.config.provider, + attached_ids=request.attached_file_ids, + llm_config=request.config, + ) + return StreamingResponse( - llm_streamer(execution_context, request.user_prompt, request.config), + llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools), media_type="text/event-stream" ) @@ -396,6 +443,107 @@ def archived_path(user: str) -> str: def files_index_path(user: str) -> str: return os.path.join(files_root(user), "index.json") +def user_vector_store_path(user: str) -> str: + return os.path.join(files_root(user), "vector_store.json") + +async def ensure_user_vector_store(user: str, client=None) -> str: + """ + Ensure there is a vector store for the user (OpenAI). + Persist the id under data/<user>/files/vector_store.json. + """ + path = user_vector_store_path(user) + if client is None: + client = get_openai_client() + + # Try existing cached ID + if os.path.exists(path): + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + vs_id_cached = data.get("id") + if vs_id_cached: + try: + await client.vector_stores.retrieve(vector_store_id=vs_id_cached) + return vs_id_cached + except Exception: + # Possibly deleted; recreate below + pass + except Exception: + pass + + # create new + vs = await client.vector_stores.create(name=f"{user}-vs") + vs_id = getattr(vs, "id", None) + if not vs_id: + raise HTTPException(status_code=500, detail="Failed to create vector store") + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump({"id": vs_id}, f) + return vs_id + +async def ensure_openai_file_and_index(user: str, meta: FileMeta, path: str, llm_config: Optional[LLMConfig] = None) -> tuple[str, str]: + """ + Ensure the file is uploaded to OpenAI Files and added to the user's vector store. + Returns (openai_file_id, vector_store_id). + """ + client = get_openai_client(llm_config.api_key if llm_config else None) + vs_id = await ensure_user_vector_store(user, client) + + file_id = meta.openai_file_id or (meta.provider_file_id if meta.provider == "openai" else None) + if not file_id: + with open(path, "rb") as f: + content = f.read() + resp = await client.files.create( + file=(meta.name or "upload.bin", content), + purpose="assistants", + ) + file_id = getattr(resp, "id", None) + if not file_id: + raise HTTPException(status_code=500, detail="OpenAI file upload returned no file_id") + + await add_file_to_vector_store(vs_id, file_id, client=client) + return file_id, vs_id + +async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None): + if not vs_id or not file_id: + return + if client is None: + client = get_openai_client() + try: + await client.vector_stores.files.delete(vector_store_id=vs_id, file_id=file_id) + except Exception as e: + print(f"[warn] remove_file_from_vector_store failed: {e}") + +async def add_file_to_vector_store(vs_id: str, file_id: str, client=None): + """ + Add a file to vector store with file_id as attribute for filtering. + We use file_id as the attribute so we can filter by specific files at query time. + """ + if client is None: + client = get_openai_client() + + # Use file_id as attribute for filtering + create_params = { + "vector_store_id": vs_id, + "file_id": file_id, + "attributes": {"file_id": file_id} # Enable filtering by file_id + } + + await client.vector_stores.files.create(**create_params) + # Poll until completed (limit capped at 100 per API spec) + for _ in range(20): + listing = await client.vector_stores.files.list(vector_store_id=vs_id, limit=100) + found = None + for item in getattr(listing, "data", []): + if getattr(item, "id", None) == file_id or getattr(item, "file_id", None) == file_id: + found = item + break + status = getattr(found, "status", None) if found else None + if status == "completed": + return + await asyncio.sleep(0.5) + # If not confirmed, still continue + return def load_files_index(user: str) -> List[FileMeta]: path = files_index_path(user) @@ -412,6 +560,226 @@ def save_files_index(user: str, items: List[FileMeta]): with open(path, "w", encoding="utf-8") as f: json.dump([item.model_dump() for item in items], f, ensure_ascii=False, indent=2) + +async def prepare_attachments( + user: str, + target_provider: str, + attached_ids: List[str], + llm_config: LLMConfig, +) -> list[dict]: + """ + For each attached file ID: + - If already uploaded to the target provider, reuse provider_file_id/uri. + - Otherwise, upload with the original filename (required by OpenAI). + Returns a list of dicts describing attachment references for the provider. + """ + if not attached_ids: + return [] + + items = load_files_index(user) + items_map = {item.id: item for item in items} + attachments: list[dict] = [] + + if isinstance(target_provider, ModelProvider): + provider_norm = target_provider.value.lower() + else: + provider_norm = str(target_provider).lower() + + for fid in attached_ids: + meta = items_map.get(fid) + if not meta: + print(f"[warn] Attached file id not found, skipping: {fid}") + continue + + path = os.path.join(files_root(user), fid) + if not os.path.exists(path): + raise HTTPException(status_code=404, detail=f"Attached file missing on disk: {meta.name}") + + if provider_norm == ModelProvider.OPENAI or provider_norm == "openai": + # Reuse provider file id if available + if meta.provider == "openai" and meta.provider_file_id: + attachments.append({ + "provider": "openai", + "file_id": meta.provider_file_id, + "name": meta.name, + "mime": meta.mime, + }) + continue + + # Upload to OpenAI with original filename + with open(path, "rb") as f: + content = f.read() + size = len(content) + if size > OPENAI_MAX_FILE_SIZE: + raise HTTPException(status_code=400, detail=f"File {meta.name} exceeds OpenAI 50MB limit") + + try: + client = get_openai_client(llm_config.api_key) + resp = await client.files.create( + file=(meta.name or "upload.bin", content), + purpose=OPENAI_DEFAULT_FILE_PURPOSE, + ) + openai_file_id = getattr(resp, "id", None) + if not openai_file_id: + raise HTTPException(status_code=500, detail="OpenAI file upload returned no file_id") + attachments.append({ + "provider": "openai", + "file_id": openai_file_id, + "name": meta.name, + "mime": meta.mime, + }) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}") + + elif provider_norm == ModelProvider.GOOGLE or provider_norm == "google": + # Reuse uri/name if available and looks like a URI + if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id: + attachments.append({ + "provider": "google", + "uri": meta.provider_file_id, + "name": meta.name, + "mime": meta.mime, + }) + continue + + key = llm_config.api_key or os.getenv("GOOGLE_API_KEY") + if not key: + raise HTTPException(status_code=500, detail="Google API Key not found") + client = genai.Client(api_key=key) + + tmp_path = None + try: + with open(path, "rb") as f: + content = f.read() + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp.write(content) + tmp_path = tmp.name + + google_resp = await asyncio.to_thread( + client.files.upload, + file=tmp_path, + config={"mimeType": meta.mime or "application/octet-stream"}, + ) + google_name = getattr(google_resp, "name", None) + google_uri = getattr(google_resp, "uri", None) + + # Poll for ACTIVE and uri if missing + if google_name: + for _ in range(10): + try: + info = await asyncio.to_thread(client.files.get, name=google_name) + state = getattr(info, "state", None) + google_uri = getattr(info, "uri", google_uri) + if str(state).upper().endswith("ACTIVE") or state == "ACTIVE": + break + await asyncio.sleep(1) + except Exception: + await asyncio.sleep(1) + print(f"[google upload] name={google_name} uri={google_uri}") + + uri = google_uri or google_name + if not uri: + raise HTTPException(status_code=500, detail="Google upload returned no uri/name") + attachments.append({ + "provider": "google", + "uri": uri, + "name": meta.name, + "mime": meta.mime, + }) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}") + finally: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + + else: + raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}") + + # Debug log + print(f"[attachments] provider={provider_norm} count={len(attachments)} detail={[{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments]}") + return attachments + + +async def prepare_openai_vector_search( + user: str, + attached_ids: List[str], + scopes: List[str], + llm_config: LLMConfig, +) -> tuple[List[str], List[dict], Optional[dict]]: + """ + Ensure all attached files are uploaded to OpenAI Files (purpose=assistants) and added to the user's vector store. + Returns (vector_store_ids, openai_file_refs_for_debug, filters). + + Filtering logic: + - If scopes provided: find files whose scopes intersect with requested scopes + - If only attached_ids: use those specific files + - Filters are constructed using file_id attribute in vector store + """ + items = load_files_index(user) + items_map = {item.id: item for item in items} + + # Determine which files to include based on scopes or attached_ids + relevant_files: List[FileMeta] = [] + + if scopes: + # Find files whose scopes intersect with requested scopes + for item in items: + if item.scopes and any(s in scopes for s in item.scopes): + relevant_files.append(item) + print(f"[file_search] scopes={scopes} matched_files={[f.name for f in relevant_files]}") + elif attached_ids: + # Fallback: use explicitly attached files + for fid in attached_ids: + meta = items_map.get(fid) + if meta: + relevant_files.append(meta) + + if not relevant_files: + return [], [], None + + changed = False + vs_ids: List[str] = [] + debug_refs: List[dict] = [] + file_ids_for_filter: List[str] = [] + + for meta in relevant_files: + path = os.path.join(files_root(user), meta.id) + if not os.path.exists(path): + print(f"[warn] Attached file missing on disk, skipping: {meta.id}") + continue + # Enforce 50MB OpenAI limit + file_size = os.path.getsize(path) + if file_size > OPENAI_MAX_FILE_SIZE: + print(f"[warn] File {meta.name} exceeds OpenAI 50MB limit, skipping") + continue + + openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, path, llm_config) + if meta.openai_file_id != openai_file_id or meta.openai_vector_store_id != vs_id: + meta.openai_file_id = openai_file_id + meta.openai_vector_store_id = vs_id + changed = True + vs_ids.append(vs_id) + debug_refs.append({"name": meta.name, "file_id": openai_file_id, "vs_id": vs_id}) + if openai_file_id: + file_ids_for_filter.append(openai_file_id) + + if changed: + save_files_index(user, list(items_map.values())) + + # deduplicate + vs_ids_unique = list({vid for vid in vs_ids if vid}) + + # Build filters to only search relevant files + filters = None + if file_ids_for_filter: + filters = {"type": "in", "key": "file_id", "value": file_ids_for_filter} + + return vs_ids_unique, debug_refs, filters + # ------------------------------------------------- @app.get("/api/projects/archived") @@ -449,27 +817,124 @@ def list_files(user: str = DEFAULT_USER): @app.post("/api/files/upload") -async def upload_file(user: str = DEFAULT_USER, file: UploadFile = File(...)): +async def upload_file( + user: str = DEFAULT_USER, + file: UploadFile = File(...), + provider: str = Form("local"), + purpose: Optional[str] = Form(None), +): migrate_legacy_layout(user) items = load_files_index(user) file_id = str(uuid4()) dest_root = files_root(user) dest_path = os.path.join(dest_root, file_id) + file_name = file.filename or "upload.bin" + provider_normalized = (provider or "local").lower() + if provider_normalized not in VALID_FILE_PROVIDERS: + raise HTTPException(status_code=400, detail="Unsupported provider") + try: content = await file.read() - size = len(content) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + size = len(content) + if provider_normalized == "openai" and size > OPENAI_MAX_FILE_SIZE: + raise HTTPException(status_code=400, detail="OpenAI provider limit: max 50MB per file") + + provider_file_id: Optional[str] = None + provider_created_at: Optional[float] = None + + if provider_normalized == "openai": + try: + client = get_openai_client() + upload_purpose = purpose or OPENAI_DEFAULT_FILE_PURPOSE + resp = await client.files.create( + file=(file_name, content), + purpose=upload_purpose, + ) + provider_file_id = getattr(resp, "id", None) + provider_created_at = getattr(resp, "created_at", None) + except Exception as e: + raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}") + elif provider_normalized == "google": + try: + key = os.getenv("GOOGLE_API_KEY") + if not key: + raise HTTPException(status_code=500, detail="Google API Key not found") + client = genai.Client(api_key=key) + # The Google GenAI SDK upload is synchronous; run in thread to avoid blocking the event loop. + tmp_path = None + try: + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp.write(content) + tmp_path = tmp.name + google_resp = await asyncio.to_thread( + client.files.upload, + file=tmp_path, + config={"mimeType": file.content_type or "application/octet-stream"}, + ) + google_name = getattr(google_resp, "name", None) + google_uri = getattr(google_resp, "uri", None) + + # Poll for ACTIVE and uri if missing + if google_name: + for _ in range(10): + try: + info = await asyncio.to_thread(client.files.get, name=google_name) + state = getattr(info, "state", None) + google_uri = getattr(info, "uri", google_uri) + if str(state).upper().endswith("ACTIVE") or state == "ACTIVE": + break + await asyncio.sleep(1) + except Exception: + await asyncio.sleep(1) + + provider_file_id = google_uri or google_name + finally: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + + provider_created_at = time.time() + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}") + + try: + os.makedirs(dest_root, exist_ok=True) with open(dest_path, "wb") as f: f.write(content) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + created_at = provider_created_at or os.path.getmtime(dest_path) + meta = FileMeta( id=file_id, - name=file.filename, + name=file_name, size=size, mime=file.content_type or "application/octet-stream", - created_at=os.path.getmtime(dest_path), + created_at=created_at, + provider=provider_normalized if provider_normalized != "local" else None, + provider_file_id=provider_file_id, + openai_file_id=None, + openai_vector_store_id=None, ) + + # Always try to index into OpenAI vector store (if <=50MB) + if size <= OPENAI_MAX_FILE_SIZE: + try: + openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, dest_path, None) + meta.openai_file_id = openai_file_id + meta.openai_vector_store_id = vs_id + if provider_normalized == "openai" and not meta.provider_file_id: + meta.provider_file_id = openai_file_id + except Exception as e: + print(f"[warn] OpenAI indexing failed for {file_name}: {e}") + else: + print(f"[warn] Skipping OpenAI indexing for {file_name}: exceeds 50MB") + items.append(meta) save_files_index(user, items) return {"file": meta} @@ -489,15 +954,87 @@ def download_file(user: str = DEFAULT_USER, file_id: str = ""): @app.post("/api/files/delete") -def delete_file(user: str = DEFAULT_USER, file_id: str = ""): +async def delete_file(user: str = DEFAULT_USER, file_id: str = ""): migrate_legacy_layout(user) items = load_files_index(user) meta = next((i for i in items if i.id == file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") + + # Remove from vector store and OpenAI Files if present + if meta.openai_vector_store_id and meta.openai_file_id: + await remove_file_from_vector_store(meta.openai_vector_store_id, meta.openai_file_id) + if meta.provider == "openai" and meta.provider_file_id: + try: + client = get_openai_client() + await client.files.delete(meta.provider_file_id) + except Exception as e: + raise HTTPException(status_code=500, detail=f"OpenAI delete failed: {str(e)}") + if meta.provider == "google" and meta.provider_file_id: + try: + key = os.getenv("GOOGLE_API_KEY") + if not key: + raise HTTPException(status_code=500, detail="Google API Key not found") + client = genai.Client(api_key=key) + await asyncio.to_thread(client.files.delete, meta.provider_file_id) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Google delete failed: {str(e)}") + path = os.path.join(files_root(user), file_id) if os.path.exists(path): os.remove(path) items = [i for i in items if i.id != file_id] save_files_index(user, items) return {"ok": True} + + +class AddScopeRequest(BaseModel): + user: str = DEFAULT_USER + file_id: str + scope: str # "project_path/node_id" composite key + + +@app.post("/api/files/add_scope") +def add_file_scope(request: AddScopeRequest): + """ + Add a scope to a file's scopes list. + Called when user attaches a file to a node. + """ + migrate_legacy_layout(request.user) + items = load_files_index(request.user) + meta = next((i for i in items if i.id == request.file_id), None) + if not meta: + raise HTTPException(status_code=404, detail="file not found") + + if request.scope not in meta.scopes: + meta.scopes.append(request.scope) + save_files_index(request.user, items) + + return {"file": meta.model_dump()} + + +class RemoveScopeRequest(BaseModel): + user: str = DEFAULT_USER + file_id: str + scope: str + + +@app.post("/api/files/remove_scope") +def remove_file_scope(request: RemoveScopeRequest): + """ + Remove a scope from a file's scopes list. + Called when user detaches a file from a node. + """ + migrate_legacy_layout(request.user) + items = load_files_index(request.user) + meta = next((i for i in items if i.id == request.file_id), None) + if not meta: + raise HTTPException(status_code=404, detail="file not found") + + if request.scope in meta.scopes: + meta.scopes.remove(request.scope) + save_files_index(request.user, items) + + return {"file": meta.model_dump()} |
