summaryrefslogtreecommitdiff
path: root/backend/app
diff options
context:
space:
mode:
authorblackhao <13851610112@163.com>2025-12-10 19:30:26 -0600
committerblackhao <13851610112@163.com>2025-12-10 19:30:26 -0600
commitd9b17431a799a0354103ef390f6db15f34fb92be (patch)
tree7e8f2d70bd39f6a32aa4eae8e9655afc339c6bc9 /backend/app
parent0dcaf9d7da9fa5041fbd5489a60886ceb416b1d4 (diff)
init file sys
Diffstat (limited to 'backend/app')
-rw-r--r--backend/app/main.py553
-rw-r--r--backend/app/schemas.py4
-rw-r--r--backend/app/services/llm.py159
3 files changed, 669 insertions, 47 deletions
diff --git a/backend/app/main.py b/backend/app/main.py
index 886bd9e..be30333 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -1,16 +1,20 @@
+import asyncio
+import tempfile
+import time
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
-from fastapi import UploadFile, File
+from fastapi import UploadFile, File, Form
from pydantic import BaseModel
from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort
-from app.services.llm import llm_streamer, generate_title
+from app.services.llm import llm_streamer, generate_title, get_openai_client
from dotenv import load_dotenv
import os
import json
import shutil
from typing import List, Literal, Optional
from uuid import uuid4
+from google import genai
load_dotenv()
@@ -28,6 +32,9 @@ app.add_middleware(
DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data")))
DEFAULT_USER = "test"
ARCHIVE_FILENAME = "archived_nodes.json"
+VALID_FILE_PROVIDERS = {"local", "openai", "google"}
+OPENAI_MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB limit per OpenAI docs
+OPENAI_DEFAULT_FILE_PURPOSE = os.getenv("OPENAI_FILE_PURPOSE", "user_data")
def ensure_user_root(user: str) -> str:
"""
@@ -152,6 +159,10 @@ class FileMeta(BaseModel):
created_at: float
provider: Optional[str] = None
provider_file_id: Optional[str] = None
+ openai_file_id: Optional[str] = None
+ openai_vector_store_id: Optional[str] = None
+ # Scopes for filtering: "project_path/node_id" composite keys
+ scopes: List[str] = []
class FolderRequest(BaseModel):
user: str = DEFAULT_USER
@@ -225,8 +236,44 @@ async def run_node_stream(request: NodeRunRequest):
execution_context = Context(messages=final_messages)
+ tools: List[dict] = []
+ attachments: List[dict] = []
+
+ if request.config.provider == ModelProvider.OPENAI:
+ vs_ids, debug_refs, filters = await prepare_openai_vector_search(
+ user=DEFAULT_USER,
+ attached_ids=request.attached_file_ids,
+ scopes=request.scopes,
+ llm_config=request.config,
+ )
+ # Always enable file_search if vector store exists (even without explicit attachments)
+ # This allows nodes to access files attached in previous nodes of the trace
+ if not vs_ids:
+ # Try to get user's vector store anyway
+ try:
+ client = get_openai_client(request.config.api_key)
+ vs_id = await ensure_user_vector_store(DEFAULT_USER, client)
+ if vs_id:
+ vs_ids = [vs_id]
+ except Exception as e:
+ print(f"[warn] Could not get vector store: {e}")
+
+ if vs_ids:
+ tool_def = {"type": "file_search", "vector_store_ids": vs_ids}
+ if filters:
+ tool_def["filters"] = filters
+ tools.append(tool_def)
+ print(f"[openai file_search] vs_ids={vs_ids} refs={debug_refs} filters={filters}")
+ elif request.config.provider == ModelProvider.GOOGLE:
+ attachments = await prepare_attachments(
+ user=DEFAULT_USER,
+ target_provider=request.config.provider,
+ attached_ids=request.attached_file_ids,
+ llm_config=request.config,
+ )
+
return StreamingResponse(
- llm_streamer(execution_context, request.user_prompt, request.config),
+ llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools),
media_type="text/event-stream"
)
@@ -396,6 +443,107 @@ def archived_path(user: str) -> str:
def files_index_path(user: str) -> str:
return os.path.join(files_root(user), "index.json")
+def user_vector_store_path(user: str) -> str:
+ return os.path.join(files_root(user), "vector_store.json")
+
+async def ensure_user_vector_store(user: str, client=None) -> str:
+ """
+ Ensure there is a vector store for the user (OpenAI).
+ Persist the id under data/<user>/files/vector_store.json.
+ """
+ path = user_vector_store_path(user)
+ if client is None:
+ client = get_openai_client()
+
+ # Try existing cached ID
+ if os.path.exists(path):
+ try:
+ with open(path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ vs_id_cached = data.get("id")
+ if vs_id_cached:
+ try:
+ await client.vector_stores.retrieve(vector_store_id=vs_id_cached)
+ return vs_id_cached
+ except Exception:
+ # Possibly deleted; recreate below
+ pass
+ except Exception:
+ pass
+
+ # create new
+ vs = await client.vector_stores.create(name=f"{user}-vs")
+ vs_id = getattr(vs, "id", None)
+ if not vs_id:
+ raise HTTPException(status_code=500, detail="Failed to create vector store")
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump({"id": vs_id}, f)
+ return vs_id
+
+async def ensure_openai_file_and_index(user: str, meta: FileMeta, path: str, llm_config: Optional[LLMConfig] = None) -> tuple[str, str]:
+ """
+ Ensure the file is uploaded to OpenAI Files and added to the user's vector store.
+ Returns (openai_file_id, vector_store_id).
+ """
+ client = get_openai_client(llm_config.api_key if llm_config else None)
+ vs_id = await ensure_user_vector_store(user, client)
+
+ file_id = meta.openai_file_id or (meta.provider_file_id if meta.provider == "openai" else None)
+ if not file_id:
+ with open(path, "rb") as f:
+ content = f.read()
+ resp = await client.files.create(
+ file=(meta.name or "upload.bin", content),
+ purpose="assistants",
+ )
+ file_id = getattr(resp, "id", None)
+ if not file_id:
+ raise HTTPException(status_code=500, detail="OpenAI file upload returned no file_id")
+
+ await add_file_to_vector_store(vs_id, file_id, client=client)
+ return file_id, vs_id
+
+async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None):
+ if not vs_id or not file_id:
+ return
+ if client is None:
+ client = get_openai_client()
+ try:
+ await client.vector_stores.files.delete(vector_store_id=vs_id, file_id=file_id)
+ except Exception as e:
+ print(f"[warn] remove_file_from_vector_store failed: {e}")
+
+async def add_file_to_vector_store(vs_id: str, file_id: str, client=None):
+ """
+ Add a file to vector store with file_id as attribute for filtering.
+ We use file_id as the attribute so we can filter by specific files at query time.
+ """
+ if client is None:
+ client = get_openai_client()
+
+ # Use file_id as attribute for filtering
+ create_params = {
+ "vector_store_id": vs_id,
+ "file_id": file_id,
+ "attributes": {"file_id": file_id} # Enable filtering by file_id
+ }
+
+ await client.vector_stores.files.create(**create_params)
+ # Poll until completed (limit capped at 100 per API spec)
+ for _ in range(20):
+ listing = await client.vector_stores.files.list(vector_store_id=vs_id, limit=100)
+ found = None
+ for item in getattr(listing, "data", []):
+ if getattr(item, "id", None) == file_id or getattr(item, "file_id", None) == file_id:
+ found = item
+ break
+ status = getattr(found, "status", None) if found else None
+ if status == "completed":
+ return
+ await asyncio.sleep(0.5)
+ # If not confirmed, still continue
+ return
def load_files_index(user: str) -> List[FileMeta]:
path = files_index_path(user)
@@ -412,6 +560,226 @@ def save_files_index(user: str, items: List[FileMeta]):
with open(path, "w", encoding="utf-8") as f:
json.dump([item.model_dump() for item in items], f, ensure_ascii=False, indent=2)
+
+async def prepare_attachments(
+ user: str,
+ target_provider: str,
+ attached_ids: List[str],
+ llm_config: LLMConfig,
+) -> list[dict]:
+ """
+ For each attached file ID:
+ - If already uploaded to the target provider, reuse provider_file_id/uri.
+ - Otherwise, upload with the original filename (required by OpenAI).
+ Returns a list of dicts describing attachment references for the provider.
+ """
+ if not attached_ids:
+ return []
+
+ items = load_files_index(user)
+ items_map = {item.id: item for item in items}
+ attachments: list[dict] = []
+
+ if isinstance(target_provider, ModelProvider):
+ provider_norm = target_provider.value.lower()
+ else:
+ provider_norm = str(target_provider).lower()
+
+ for fid in attached_ids:
+ meta = items_map.get(fid)
+ if not meta:
+ print(f"[warn] Attached file id not found, skipping: {fid}")
+ continue
+
+ path = os.path.join(files_root(user), fid)
+ if not os.path.exists(path):
+ raise HTTPException(status_code=404, detail=f"Attached file missing on disk: {meta.name}")
+
+ if provider_norm == ModelProvider.OPENAI or provider_norm == "openai":
+ # Reuse provider file id if available
+ if meta.provider == "openai" and meta.provider_file_id:
+ attachments.append({
+ "provider": "openai",
+ "file_id": meta.provider_file_id,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+ continue
+
+ # Upload to OpenAI with original filename
+ with open(path, "rb") as f:
+ content = f.read()
+ size = len(content)
+ if size > OPENAI_MAX_FILE_SIZE:
+ raise HTTPException(status_code=400, detail=f"File {meta.name} exceeds OpenAI 50MB limit")
+
+ try:
+ client = get_openai_client(llm_config.api_key)
+ resp = await client.files.create(
+ file=(meta.name or "upload.bin", content),
+ purpose=OPENAI_DEFAULT_FILE_PURPOSE,
+ )
+ openai_file_id = getattr(resp, "id", None)
+ if not openai_file_id:
+ raise HTTPException(status_code=500, detail="OpenAI file upload returned no file_id")
+ attachments.append({
+ "provider": "openai",
+ "file_id": openai_file_id,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}")
+
+ elif provider_norm == ModelProvider.GOOGLE or provider_norm == "google":
+ # Reuse uri/name if available and looks like a URI
+ if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id:
+ attachments.append({
+ "provider": "google",
+ "uri": meta.provider_file_id,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+ continue
+
+ key = llm_config.api_key or os.getenv("GOOGLE_API_KEY")
+ if not key:
+ raise HTTPException(status_code=500, detail="Google API Key not found")
+ client = genai.Client(api_key=key)
+
+ tmp_path = None
+ try:
+ with open(path, "rb") as f:
+ content = f.read()
+ with tempfile.NamedTemporaryFile(delete=False) as tmp:
+ tmp.write(content)
+ tmp_path = tmp.name
+
+ google_resp = await asyncio.to_thread(
+ client.files.upload,
+ file=tmp_path,
+ config={"mimeType": meta.mime or "application/octet-stream"},
+ )
+ google_name = getattr(google_resp, "name", None)
+ google_uri = getattr(google_resp, "uri", None)
+
+ # Poll for ACTIVE and uri if missing
+ if google_name:
+ for _ in range(10):
+ try:
+ info = await asyncio.to_thread(client.files.get, name=google_name)
+ state = getattr(info, "state", None)
+ google_uri = getattr(info, "uri", google_uri)
+ if str(state).upper().endswith("ACTIVE") or state == "ACTIVE":
+ break
+ await asyncio.sleep(1)
+ except Exception:
+ await asyncio.sleep(1)
+ print(f"[google upload] name={google_name} uri={google_uri}")
+
+ uri = google_uri or google_name
+ if not uri:
+ raise HTTPException(status_code=500, detail="Google upload returned no uri/name")
+ attachments.append({
+ "provider": "google",
+ "uri": uri,
+ "name": meta.name,
+ "mime": meta.mime,
+ })
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}")
+ finally:
+ if tmp_path and os.path.exists(tmp_path):
+ os.remove(tmp_path)
+
+ else:
+ raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}")
+
+ # Debug log
+ print(f"[attachments] provider={provider_norm} count={len(attachments)} detail={[{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments]}")
+ return attachments
+
+
+async def prepare_openai_vector_search(
+ user: str,
+ attached_ids: List[str],
+ scopes: List[str],
+ llm_config: LLMConfig,
+) -> tuple[List[str], List[dict], Optional[dict]]:
+ """
+ Ensure all attached files are uploaded to OpenAI Files (purpose=assistants) and added to the user's vector store.
+ Returns (vector_store_ids, openai_file_refs_for_debug, filters).
+
+ Filtering logic:
+ - If scopes provided: find files whose scopes intersect with requested scopes
+ - If only attached_ids: use those specific files
+ - Filters are constructed using file_id attribute in vector store
+ """
+ items = load_files_index(user)
+ items_map = {item.id: item for item in items}
+
+ # Determine which files to include based on scopes or attached_ids
+ relevant_files: List[FileMeta] = []
+
+ if scopes:
+ # Find files whose scopes intersect with requested scopes
+ for item in items:
+ if item.scopes and any(s in scopes for s in item.scopes):
+ relevant_files.append(item)
+ print(f"[file_search] scopes={scopes} matched_files={[f.name for f in relevant_files]}")
+ elif attached_ids:
+ # Fallback: use explicitly attached files
+ for fid in attached_ids:
+ meta = items_map.get(fid)
+ if meta:
+ relevant_files.append(meta)
+
+ if not relevant_files:
+ return [], [], None
+
+ changed = False
+ vs_ids: List[str] = []
+ debug_refs: List[dict] = []
+ file_ids_for_filter: List[str] = []
+
+ for meta in relevant_files:
+ path = os.path.join(files_root(user), meta.id)
+ if not os.path.exists(path):
+ print(f"[warn] Attached file missing on disk, skipping: {meta.id}")
+ continue
+ # Enforce 50MB OpenAI limit
+ file_size = os.path.getsize(path)
+ if file_size > OPENAI_MAX_FILE_SIZE:
+ print(f"[warn] File {meta.name} exceeds OpenAI 50MB limit, skipping")
+ continue
+
+ openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, path, llm_config)
+ if meta.openai_file_id != openai_file_id or meta.openai_vector_store_id != vs_id:
+ meta.openai_file_id = openai_file_id
+ meta.openai_vector_store_id = vs_id
+ changed = True
+ vs_ids.append(vs_id)
+ debug_refs.append({"name": meta.name, "file_id": openai_file_id, "vs_id": vs_id})
+ if openai_file_id:
+ file_ids_for_filter.append(openai_file_id)
+
+ if changed:
+ save_files_index(user, list(items_map.values()))
+
+ # deduplicate
+ vs_ids_unique = list({vid for vid in vs_ids if vid})
+
+ # Build filters to only search relevant files
+ filters = None
+ if file_ids_for_filter:
+ filters = {"type": "in", "key": "file_id", "value": file_ids_for_filter}
+
+ return vs_ids_unique, debug_refs, filters
+
# -------------------------------------------------
@app.get("/api/projects/archived")
@@ -449,27 +817,124 @@ def list_files(user: str = DEFAULT_USER):
@app.post("/api/files/upload")
-async def upload_file(user: str = DEFAULT_USER, file: UploadFile = File(...)):
+async def upload_file(
+ user: str = DEFAULT_USER,
+ file: UploadFile = File(...),
+ provider: str = Form("local"),
+ purpose: Optional[str] = Form(None),
+):
migrate_legacy_layout(user)
items = load_files_index(user)
file_id = str(uuid4())
dest_root = files_root(user)
dest_path = os.path.join(dest_root, file_id)
+ file_name = file.filename or "upload.bin"
+ provider_normalized = (provider or "local").lower()
+ if provider_normalized not in VALID_FILE_PROVIDERS:
+ raise HTTPException(status_code=400, detail="Unsupported provider")
+
try:
content = await file.read()
- size = len(content)
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=str(e))
+
+ size = len(content)
+ if provider_normalized == "openai" and size > OPENAI_MAX_FILE_SIZE:
+ raise HTTPException(status_code=400, detail="OpenAI provider limit: max 50MB per file")
+
+ provider_file_id: Optional[str] = None
+ provider_created_at: Optional[float] = None
+
+ if provider_normalized == "openai":
+ try:
+ client = get_openai_client()
+ upload_purpose = purpose or OPENAI_DEFAULT_FILE_PURPOSE
+ resp = await client.files.create(
+ file=(file_name, content),
+ purpose=upload_purpose,
+ )
+ provider_file_id = getattr(resp, "id", None)
+ provider_created_at = getattr(resp, "created_at", None)
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}")
+ elif provider_normalized == "google":
+ try:
+ key = os.getenv("GOOGLE_API_KEY")
+ if not key:
+ raise HTTPException(status_code=500, detail="Google API Key not found")
+ client = genai.Client(api_key=key)
+ # The Google GenAI SDK upload is synchronous; run in thread to avoid blocking the event loop.
+ tmp_path = None
+ try:
+ with tempfile.NamedTemporaryFile(delete=False) as tmp:
+ tmp.write(content)
+ tmp_path = tmp.name
+ google_resp = await asyncio.to_thread(
+ client.files.upload,
+ file=tmp_path,
+ config={"mimeType": file.content_type or "application/octet-stream"},
+ )
+ google_name = getattr(google_resp, "name", None)
+ google_uri = getattr(google_resp, "uri", None)
+
+ # Poll for ACTIVE and uri if missing
+ if google_name:
+ for _ in range(10):
+ try:
+ info = await asyncio.to_thread(client.files.get, name=google_name)
+ state = getattr(info, "state", None)
+ google_uri = getattr(info, "uri", google_uri)
+ if str(state).upper().endswith("ACTIVE") or state == "ACTIVE":
+ break
+ await asyncio.sleep(1)
+ except Exception:
+ await asyncio.sleep(1)
+
+ provider_file_id = google_uri or google_name
+ finally:
+ if tmp_path and os.path.exists(tmp_path):
+ os.remove(tmp_path)
+
+ provider_created_at = time.time()
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}")
+
+ try:
+ os.makedirs(dest_root, exist_ok=True)
with open(dest_path, "wb") as f:
f.write(content)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
+ created_at = provider_created_at or os.path.getmtime(dest_path)
+
meta = FileMeta(
id=file_id,
- name=file.filename,
+ name=file_name,
size=size,
mime=file.content_type or "application/octet-stream",
- created_at=os.path.getmtime(dest_path),
+ created_at=created_at,
+ provider=provider_normalized if provider_normalized != "local" else None,
+ provider_file_id=provider_file_id,
+ openai_file_id=None,
+ openai_vector_store_id=None,
)
+
+ # Always try to index into OpenAI vector store (if <=50MB)
+ if size <= OPENAI_MAX_FILE_SIZE:
+ try:
+ openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, dest_path, None)
+ meta.openai_file_id = openai_file_id
+ meta.openai_vector_store_id = vs_id
+ if provider_normalized == "openai" and not meta.provider_file_id:
+ meta.provider_file_id = openai_file_id
+ except Exception as e:
+ print(f"[warn] OpenAI indexing failed for {file_name}: {e}")
+ else:
+ print(f"[warn] Skipping OpenAI indexing for {file_name}: exceeds 50MB")
+
items.append(meta)
save_files_index(user, items)
return {"file": meta}
@@ -489,15 +954,87 @@ def download_file(user: str = DEFAULT_USER, file_id: str = ""):
@app.post("/api/files/delete")
-def delete_file(user: str = DEFAULT_USER, file_id: str = ""):
+async def delete_file(user: str = DEFAULT_USER, file_id: str = ""):
migrate_legacy_layout(user)
items = load_files_index(user)
meta = next((i for i in items if i.id == file_id), None)
if not meta:
raise HTTPException(status_code=404, detail="file not found")
+
+ # Remove from vector store and OpenAI Files if present
+ if meta.openai_vector_store_id and meta.openai_file_id:
+ await remove_file_from_vector_store(meta.openai_vector_store_id, meta.openai_file_id)
+ if meta.provider == "openai" and meta.provider_file_id:
+ try:
+ client = get_openai_client()
+ await client.files.delete(meta.provider_file_id)
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"OpenAI delete failed: {str(e)}")
+ if meta.provider == "google" and meta.provider_file_id:
+ try:
+ key = os.getenv("GOOGLE_API_KEY")
+ if not key:
+ raise HTTPException(status_code=500, detail="Google API Key not found")
+ client = genai.Client(api_key=key)
+ await asyncio.to_thread(client.files.delete, meta.provider_file_id)
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Google delete failed: {str(e)}")
+
path = os.path.join(files_root(user), file_id)
if os.path.exists(path):
os.remove(path)
items = [i for i in items if i.id != file_id]
save_files_index(user, items)
return {"ok": True}
+
+
+class AddScopeRequest(BaseModel):
+ user: str = DEFAULT_USER
+ file_id: str
+ scope: str # "project_path/node_id" composite key
+
+
+@app.post("/api/files/add_scope")
+def add_file_scope(request: AddScopeRequest):
+ """
+ Add a scope to a file's scopes list.
+ Called when user attaches a file to a node.
+ """
+ migrate_legacy_layout(request.user)
+ items = load_files_index(request.user)
+ meta = next((i for i in items if i.id == request.file_id), None)
+ if not meta:
+ raise HTTPException(status_code=404, detail="file not found")
+
+ if request.scope not in meta.scopes:
+ meta.scopes.append(request.scope)
+ save_files_index(request.user, items)
+
+ return {"file": meta.model_dump()}
+
+
+class RemoveScopeRequest(BaseModel):
+ user: str = DEFAULT_USER
+ file_id: str
+ scope: str
+
+
+@app.post("/api/files/remove_scope")
+def remove_file_scope(request: RemoveScopeRequest):
+ """
+ Remove a scope from a file's scopes list.
+ Called when user detaches a file from a node.
+ """
+ migrate_legacy_layout(request.user)
+ items = load_files_index(request.user)
+ meta = next((i for i in items if i.id == request.file_id), None)
+ if not meta:
+ raise HTTPException(status_code=404, detail="file not found")
+
+ if request.scope in meta.scopes:
+ meta.scopes.remove(request.scope)
+ save_files_index(request.user, items)
+
+ return {"file": meta.model_dump()}
diff --git a/backend/app/schemas.py b/backend/app/schemas.py
index bd8ebe7..54c0560 100644
--- a/backend/app/schemas.py
+++ b/backend/app/schemas.py
@@ -49,6 +49,10 @@ class NodeRunRequest(BaseModel):
user_prompt: str
config: LLMConfig
merge_strategy: MergeStrategy = MergeStrategy.SMART
+ attached_file_ids: List[str] = Field(default_factory=list)
+ # Scopes for file_search filtering: ["project_path/node_id", ...]
+ # Contains all project/node combinations in the current trace
+ scopes: List[str] = Field(default_factory=list)
class NodeRunResponse(BaseModel):
node_id: str
diff --git a/backend/app/services/llm.py b/backend/app/services/llm.py
index b372f9e..96b0514 100644
--- a/backend/app/services/llm.py
+++ b/backend/app/services/llm.py
@@ -1,5 +1,5 @@
import os
-from typing import AsyncGenerator
+from typing import AsyncGenerator, List, Dict, Any, Optional
import openai
import google.generativeai as genai
from app.schemas import LLMConfig, Message, Role, Context
@@ -23,8 +23,15 @@ def configure_google(api_key: str = None):
raise ValueError("Google API Key not found")
genai.configure(api_key=key)
-async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]:
+async def stream_openai(
+ messages: list[Message],
+ config: LLMConfig,
+ attachments: Optional[List[Dict[str, Any]]] = None,
+ tools: Optional[List[Dict[str, Any]]] = None,
+) -> AsyncGenerator[str, None]:
client = get_openai_client(config.api_key)
+ attachments = attachments or []
+ tools = tools or []
# Convert internal Message schema to OpenAI format
openai_messages = []
@@ -38,17 +45,23 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
responses_only_models = ['gpt-5-pro']
# Models that CAN use Responses API (and thus support web_search tool)
+ model_lower = config.model_name.lower()
responses_capable_models = [
- 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano',
- 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3'
+ 'gpt-5', 'gpt-5-chat-latest', 'gpt-5-mini', 'gpt-5-nano',
+ 'gpt-5-pro', 'gpt-5.1', 'gpt-5.1-chat-latest', 'o3',
+ 'o1', 'o1-preview', 'o1-mini',
+ 'gpt-4o', 'gpt-4o-mini', 'gpt-4o-realtime', 'gpt-4o-mini-tts'
]
# Use Responses API if:
# 1. Model ONLY supports Responses API, OR
# 2. User wants web search AND model is capable of Responses API
+ # 3. Attachments are present (Responses supports input_file)
use_responses_api = (
config.model_name in responses_only_models or
- (config.enable_google_search and config.model_name in responses_capable_models)
+ (config.enable_google_search and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
+ (attachments and (config.model_name in responses_capable_models or model_lower.startswith("gpt-4o"))) or
+ (tools)
)
if use_responses_api:
@@ -56,25 +69,50 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
# yield f"[Debug: Config Search={config.enable_google_search}, Model={config.model_name}]\n"
# Use new client.responses.create API with Polling Strategy
- # Convert messages to Responses API format (same as Chat Completions)
- # Responses API accepts input as array of message objects
-
- # Filter out system messages (use instructions instead) and format for Responses API
+ # Build Responses API input
input_messages = []
for msg in openai_messages:
- if msg['role'] != 'system': # System prompt goes to instructions
- input_messages.append({
- "role": msg['role'],
- "content": msg['content']
+ if msg['role'] == 'system':
+ continue # goes to instructions
+ # User messages use input_text, assistant messages use output_text
+ content_type = "input_text" if msg['role'] == 'user' else "output_text"
+ input_messages.append({
+ "role": msg['role'],
+ "content": [
+ {
+ "type": content_type,
+ "text": msg['content']
+ }
+ ]
+ })
+
+ # Append attachments as separate user message (files only)
+ file_parts = []
+ for att in attachments:
+ if att.get("provider") == "openai" and att.get("file_id"):
+ file_parts.append({
+ "type": "input_file",
+ "file_id": att["file_id"]
})
+ if file_parts:
+ input_messages.append({
+ "role": "user",
+ "content": file_parts
+ })
resp_params = {
"model": config.model_name,
"input": input_messages, # Full conversation history
- "stream": False, # Disable stream to get immediate ID
- "background": True, # Enable background mode for async execution
- "store": True
+ "stream": False, # Get full output in one call
+ "background": False,
+ "store": True,
+ "tool_choice": "auto",
}
+ if tools:
+ resp_params["tools"] = tools
+ resp_params["tool_choice"] = "auto"
+ # Optional: include results for debugging / citations
+ resp_params["include"] = ["file_search_call.results"]
# Add reasoning effort (not supported by chat-latest models)
models_without_effort = ['gpt-5-chat-latest', 'gpt-5.1-chat-latest']
@@ -82,28 +120,40 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
resp_params["reasoning"] = {"effort": config.reasoning_effort.value}
# Enable Web Search if requested (Reusing enable_google_search flag as generic web_search flag)
+ # IMPORTANT: Append to existing tools instead of overwriting
if config.enable_google_search:
- resp_params["tools"] = [{"type": "web_search"}]
+ if resp_params.get("tools"):
+ resp_params["tools"].append({"type": "web_search"})
+ else:
+ resp_params["tools"] = [{"type": "web_search"}]
resp_params["tool_choice"] = "auto"
- # Debugging tool injection
- # yield "[Debug: Web Search Tool Injected]" # Uncomment to debug
if config.system_prompt:
resp_params["instructions"] = config.system_prompt
+
+ # Debug: print final tools being sent
+ print(f"[responses debug] final tools: {resp_params.get('tools')}")
- # 1. Create Response (Async/Background)
- # This returns a Response object immediately with status 'queued' or 'in_progress'
+ # 1. Create Response (non-background)
initial_resp = await client.responses.create(**resp_params)
response_id = initial_resp.id
-
+
# 2. Poll for Completion
import asyncio
- # Poll for up to 10 minutes
- for _ in range(300):
+ for _ in range(300):
final_resp = await client.responses.retrieve(response_id)
-
+
if final_resp.status == 'completed':
- # Parse final response object
+ # Debug: log outputs and tool calls
+ try:
+ outs = getattr(final_resp, "output", [])
+ print(f"[responses debug] output items: {[getattr(o, 'type', None) for o in outs]}")
+ for o in outs:
+ if getattr(o, "type", None) == "file_search_call":
+ print(f"[responses debug] file_search_call: {o}")
+ except Exception as e:
+ print(f"[responses debug] failed to inspect output: {e}")
+
found_content = False
if hasattr(final_resp, 'output'):
for out in final_resp.output:
@@ -128,13 +178,16 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
yield f"\n[Error: Response generation {final_resp.status}: {error_msg}]"
return
- # Still in_progress
await asyncio.sleep(2)
yield "\n[Error: Polling timed out]"
return
- # Standard Chat Completions API
+ # Standard Chat Completions API (attachments not supported here)
+ if attachments:
+ yield "[Error] Attachments are only supported for Responses API-capable models."
+ return
+
# Prepare parameters
req_params = {
"model": config.model_name,
@@ -175,7 +228,8 @@ async def stream_openai(messages: list[Message], config: LLMConfig) -> AsyncGene
elif getattr(delta, 'refusal', None):
yield f"[Refusal: {delta.refusal}]"
-async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGenerator[str, None]:
+async def stream_google(messages: list[Message], config: LLMConfig, attachments: List[Dict[str, Any]] | None = None) -> AsyncGenerator[str, None]:
+ attachments = attachments or []
# Use new Google GenAI SDK (google-genai)
from google import genai
from google.genai import types
@@ -200,6 +254,34 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene
tools=tools
)
+ # If attachments present, send as a single generate_content call (non-streaming)
+ if attachments:
+ parts = []
+ for att in attachments:
+ uri = att.get("uri")
+ mime = att.get("mime") or "application/octet-stream"
+ if uri:
+ try:
+ parts.append(types.Part.from_uri(uri, mime_type=mime))
+ except Exception:
+ parts.append(types.Part(text=f"[file attached: {uri}]"))
+ for msg in messages:
+ parts.append(types.Part(text=msg.content))
+ print(f"[gemini] sending attachments: {[att.get('uri') for att in attachments]}")
+ try:
+ response = await client.aio.models.generate_content(
+ model=config.model_name,
+ contents=[types.Content(role="user", parts=parts)],
+ config=gen_config
+ )
+ if response and getattr(response, "text", None):
+ yield response.text
+ else:
+ yield "[Error] Gemini response returned no text."
+ except Exception as e:
+ yield f"[Error] Gemini call failed: {str(e)}"
+ return
+
# Prepare History
# Extract last message as the prompt
prompt_msg = "..."
@@ -223,13 +305,6 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene
config=gen_config
)
- # Streaming call
- # In google-genai SDK, streaming is usually via send_message_stream
-
- # Check if send_message_stream exists, otherwise use send_message with stream=True (but error says no)
- # Let's assume send_message_stream is the way.
-
- # Note: chat_session.send_message_stream returns an AsyncIterator (or a coroutine returning one)
response_stream = await chat_session.send_message_stream(prompt_msg)
async for chunk in response_stream:
@@ -237,7 +312,13 @@ async def stream_google(messages: list[Message], config: LLMConfig) -> AsyncGene
if chunk.text:
yield chunk.text
-async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) -> AsyncGenerator[str, None]:
+async def llm_streamer(
+ context: Context,
+ user_prompt: str,
+ config: LLMConfig,
+ attachments: List[Dict[str, Any]] | None = None,
+ tools: List[Dict[str, Any]] | None = None,
+) -> AsyncGenerator[str, None]:
# 1. Merge Context + New User Prompt
# We create a temporary list of messages for this inference
messages_to_send = context.messages.copy()
@@ -253,10 +334,10 @@ async def llm_streamer(context: Context, user_prompt: str, config: LLMConfig) ->
# 2. Call Provider
try:
if config.provider == "openai":
- async for chunk in stream_openai(messages_to_send, config):
+ async for chunk in stream_openai(messages_to_send, config, attachments, tools):
yield chunk
elif config.provider == "google":
- async for chunk in stream_google(messages_to_send, config):
+ async for chunk in stream_google(messages_to_send, config, attachments):
yield chunk
else:
yield f"Error: Unsupported provider {config.provider}"