import asyncio import base64 import logging import tempfile import time from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi import UploadFile, File, Form from pydantic import BaseModel from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort, CouncilRunRequest, DebateRunRequest, DebateJudgeMode from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client, resolve_provider from app.services.council import council_event_stream from app.services.debate import debate_event_stream from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db from app.auth.utils import get_password_hash from dotenv import load_dotenv import os import json import shutil from typing import List, Literal, Optional from uuid import uuid4 from google import genai from sqlalchemy.orm import Session load_dotenv() # --------------- Logging Setup --------------- _LOG_DIR = os.path.join(os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data"))), "logs") os.makedirs(_LOG_DIR, exist_ok=True) logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(_LOG_DIR, "contextflow.log"), encoding="utf-8"), ], ) logger = logging.getLogger("contextflow") app = FastAPI(title="ContextFlow Backend") # Include authentication router app.include_router(auth_router) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize database on startup @app.on_event("startup") async def startup_event(): """Initialize database and create default test user if not exists""" init_db() # Create test user if not exists from app.auth.models import SessionLocal db = SessionLocal() try: existing = db.query(User).filter(User.username == "test").first() if not existing: test_user = User( username="test", email="test@contextflow.local", hashed_password=get_password_hash("114514") ) db.add(test_user) db.commit() logger.info("Created default test user (test/114514)") else: logger.info("Test user already exists") finally: db.close() # --------- Project / Blueprint storage --------- DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data"))) DEFAULT_USER = "test" ARCHIVE_FILENAME = "archived_nodes.json" VALID_FILE_PROVIDERS = {"local", "openai", "google"} OPENAI_MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB limit per OpenAI docs OPENAI_DEFAULT_FILE_PURPOSE = os.getenv("OPENAI_FILE_PURPOSE", "user_data") def get_user_api_key(user: User | None, provider: str) -> str | None: """ Get API key for a provider from user's saved settings. Falls back to environment variable if user has no key set. """ if user: if provider == "openai" and user.openai_api_key: return user.openai_api_key if provider in ("google", "gemini") and user.gemini_api_key: return user.gemini_api_key if provider == "claude" and user.claude_api_key: return user.claude_api_key if provider == "openrouter" and user.openrouter_api_key: return user.openrouter_api_key # Fallback to environment variables if provider == "openai": return os.getenv("OPENAI_API_KEY") if provider in ("google", "gemini"): return os.getenv("GOOGLE_API_KEY") if provider == "claude": return os.getenv("ANTHROPIC_API_KEY") if provider == "openrouter": return os.getenv("OPENROUTER_API_KEY") return None def resolve_user(current_user: User | None, username: str | None) -> User | None: """ Resolve a User object: prefer authenticated current_user, fall back to DB lookup by username. This handles cases where the JWT token is expired but the username is still known from query params. """ if current_user: return current_user if username and username != DEFAULT_USER: try: db = next(get_db()) return db.query(User).filter(User.username == username).first() except Exception: return None return None def ensure_user_root(user: str) -> str: """ Ensures the new data root structure: data//projects data//archive """ user_root = os.path.join(DATA_ROOT, user) projects_root = os.path.join(user_root, "projects") archive_root = os.path.join(user_root, "archive") os.makedirs(projects_root, exist_ok=True) os.makedirs(archive_root, exist_ok=True) return user_root def projects_root(user: str) -> str: return os.path.join(ensure_user_root(user), "projects") def archive_root(user: str) -> str: return os.path.join(ensure_user_root(user), "archive") def files_root(user: str) -> str: root = os.path.join(ensure_user_root(user), "files") os.makedirs(root, exist_ok=True) return root def migrate_legacy_layout(user: str): """ Migrate from legacy ./projects/ and legacy archive folders to the new data// structure. """ legacy_root = os.path.abspath(os.path.join(os.getcwd(), "projects", user)) new_projects = projects_root(user) if os.path.exists(legacy_root) and not os.listdir(new_projects): try: for name in os.listdir(legacy_root): src = os.path.join(legacy_root, name) dst = os.path.join(new_projects, name) if not os.path.exists(dst): shutil.move(src, dst) except Exception: pass # migrate legacy archive (archived/ or .cf_archived/) legacy_archives = [ os.path.join(legacy_root, "archived", ARCHIVE_FILENAME), os.path.join(legacy_root, ".cf_archived", ARCHIVE_FILENAME), ] new_archive_file = archived_path(user) if not os.path.exists(new_archive_file): for legacy in legacy_archives: if os.path.exists(legacy): os.makedirs(os.path.dirname(new_archive_file), exist_ok=True) try: shutil.move(legacy, new_archive_file) except Exception: pass def safe_path(user: str, relative_path: str) -> str: root = projects_root(user) norm = os.path.normpath(relative_path).lstrip(os.sep) full = os.path.abspath(os.path.join(root, norm)) if not full.startswith(root): raise HTTPException(status_code=400, detail="Invalid path") return full class FSItem(BaseModel): name: str path: str # path relative to user root type: Literal["file", "folder"] size: Optional[int] = None mtime: Optional[float] = None children: Optional[List["FSItem"]] = None FSItem.model_rebuild() def list_tree(user: str, relative_path: str = ".") -> List[FSItem]: migrate_legacy_layout(user) root = safe_path(user, relative_path) items: List[FSItem] = [] for name in sorted(os.listdir(root)): full = os.path.join(root, name) rel = os.path.relpath(full, projects_root(user)) stat = os.stat(full) if os.path.isdir(full): items.append(FSItem( name=name, path=rel, type="folder", size=None, mtime=stat.st_mtime, children=list_tree(user, rel) )) else: items.append(FSItem( name=name, path=rel, type="file", size=stat.st_size, mtime=stat.st_mtime, children=None )) return items class SaveBlueprintRequest(BaseModel): user: str = DEFAULT_USER path: str # relative path including filename.json content: dict class RenameRequest(BaseModel): user: str = DEFAULT_USER path: str new_name: Optional[str] = None new_path: Optional[str] = None class FileMeta(BaseModel): id: str name: str size: int mime: str created_at: float provider: Optional[str] = None provider_file_id: Optional[str] = None openai_file_id: Optional[str] = None openai_vector_store_id: Optional[str] = None anthropic_file_id: Optional[str] = None google_file_uri: Optional[str] = None # Scopes for filtering: "project_path/node_id" composite keys scopes: List[str] = [] class FolderRequest(BaseModel): user: str = DEFAULT_USER path: str # relative folder path class DeleteRequest(BaseModel): user: str = DEFAULT_USER path: str is_folder: bool = False # ----------------------------------------------- @app.get("/") def read_root(): return {"message": "ContextFlow Backend is running"} def smart_merge_messages(messages: list[Message]) -> list[Message]: """ Merges messages using two steps: 1. Deduplication by ID (to handle diamond dependencies). 2. Merging consecutive messages from the same role. """ if not messages: return [] # 1. Deduplicate by ID, keeping order seen_ids = set() deduplicated = [] for msg in messages: if msg.id not in seen_ids: deduplicated.append(msg) seen_ids.add(msg.id) # 2. Merge consecutive roles if not deduplicated: return [] merged = [] current_msg = deduplicated[0].model_copy() for next_msg in deduplicated[1:]: if next_msg.role == current_msg.role: # Merge content current_msg.content += f"\n\n{next_msg.content}" # Keep the latest timestamp current_msg.timestamp = next_msg.timestamp else: merged.append(current_msg) current_msg = next_msg.model_copy() merged.append(current_msg) return merged IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} ANTHROPIC_DOCUMENT_MIME_TYPES = { "application/pdf", "text/plain", "text/html", "text/csv", "text/xml", "text/markdown", "text/css", "text/javascript", "application/javascript", "application/xml", } def extract_image_attachments(user: str, attached_ids: List[str]) -> tuple[List[dict], List[str]]: """ Separate image files from non-image files in attached_ids. Returns (images, remaining_non_image_ids). Each image dict: {"mime": str, "data": str (base64), "name": str} """ if not attached_ids: return [], [] items = load_files_index(user) items_map = {item.id: item for item in items} images: List[dict] = [] non_image_ids: List[str] = [] for fid in attached_ids: meta = items_map.get(fid) if not meta: non_image_ids.append(fid) continue if meta.mime in IMAGE_MIME_TYPES: path = os.path.join(files_root(user), fid) if not os.path.exists(path): logger.warning("Image file missing on disk, skipping: %s", fid) continue with open(path, "rb") as f: raw = f.read() images.append({ "mime": meta.mime, "data": base64.b64encode(raw).decode("utf-8"), "name": meta.name, }) else: non_image_ids.append(fid) logger.debug("extract_image_attachments: %d images, %d non-image files", len(images), len(non_image_ids)) return images, non_image_ids @app.post("/api/run_node_stream") async def run_node_stream( request: NodeRunRequest, user: str = DEFAULT_USER, current_user: User | None = Depends(get_current_user_optional) ): """ Stream the response from the LLM. """ resolved = resolve_user(current_user, user) # Get API key from user settings if not provided in request provider_name = request.config.provider.value if hasattr(request.config.provider, 'value') else str(request.config.provider) if not request.config.api_key: user_key = get_user_api_key(resolved, provider_name.lower()) if user_key: request.config.api_key = user_key # Get username for file operations username = resolved.username if resolved else DEFAULT_USER # Extract images from attached files (separate from non-image files) images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) # 1. Concatenate all incoming contexts first raw_messages = [] for ctx in request.incoming_contexts: raw_messages.extend(ctx.messages) # 2. Apply Merge Strategy final_messages = [] if request.merge_strategy == MergeStrategy.SMART: final_messages = smart_merge_messages(raw_messages) else: # RAW strategy: just keep them as is final_messages = raw_messages execution_context = Context(messages=final_messages) tools: List[dict] = [] attachments: List[dict] = [] if request.config.provider == ModelProvider.OPENAI: vs_ids, debug_refs, filters = await prepare_openai_vector_search( user=username, attached_ids=non_image_file_ids, scopes=request.scopes, llm_config=request.config, ) # Always enable file_search if vector store exists (even without explicit attachments) # This allows nodes to access files attached in previous nodes of the trace if not vs_ids: # Try to get user's vector store anyway try: client = get_openai_client(request.config.api_key) vs_id = await ensure_user_vector_store(username, client) if vs_id: vs_ids = [vs_id] except Exception as e: logger.warning("Could not get vector store: %s", e) if vs_ids: tool_def = {"type": "file_search", "vector_store_ids": vs_ids} if filters: tool_def["filters"] = filters tools.append(tool_def) logger.debug("openai file_search: vs_ids=%s refs=%s filters=%s", vs_ids, debug_refs, filters) elif request.config.provider == ModelProvider.GOOGLE: scoped_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) attachments = await prepare_attachments( user=username, target_provider=request.config.provider, attached_ids=scoped_ids, llm_config=request.config, ) elif request.config.provider == ModelProvider.CLAUDE: scoped_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) attachments = await prepare_attachments( user=username, target_provider=request.config.provider, attached_ids=scoped_ids, llm_config=request.config, ) openrouter_key = get_user_api_key(resolved, "openrouter") return StreamingResponse( llm_streamer(execution_context, request.user_prompt, request.config, attachments, tools, openrouter_api_key=openrouter_key, images=images), media_type="text/event-stream" ) @app.post("/api/run_council_stream") async def run_council_stream( request: CouncilRunRequest, user: str = DEFAULT_USER, current_user: User | None = Depends(get_current_user_optional), ): """ Run the 3-stage LLM Council and stream SSE events. """ resolved = resolve_user(current_user, user) username = resolved.username if resolved else DEFAULT_USER # Merge incoming contexts (same logic as run_node_stream) raw_messages = [] for ctx in request.incoming_contexts: raw_messages.extend(ctx.messages) if request.merge_strategy == MergeStrategy.SMART: final_messages = smart_merge_messages(raw_messages) else: final_messages = raw_messages execution_context = Context(messages=final_messages) # Extract images from attached files images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) openrouter_key = get_user_api_key(resolved, "openrouter") # Build LLMConfig + attachments + per-member contexts for each council member member_configs: list[LLMConfig] = [] attachments_per_model: list[list[dict] | None] = [] tools_per_model: list[list[dict] | None] = [] contexts_per_model: list[Context | None] = [] all_model_names = [m.model_name for m in request.council_models] + [request.chairman_model.model_name] for member in request.council_models: provider = resolve_provider(member.model_name) provider_str = provider.value api_key = get_user_api_key(resolved, provider_str) config = LLMConfig( provider=provider, model_name=member.model_name, temperature=member.temperature if member.temperature is not None else request.temperature, system_prompt=request.system_prompt, api_key=api_key, reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort, enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search, ) member_configs.append(config) # Prepare provider-specific file attachments tools: list[dict] = [] attachments: list[dict] = [] # For Google/Claude: resolve scope-based files so upstream attachments are visible scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) if provider == ModelProvider.OPENAI: vs_ids, debug_refs, filters = await prepare_openai_vector_search( user=username, attached_ids=non_image_file_ids, scopes=request.scopes, llm_config=config, ) if not vs_ids: try: client = get_openai_client(config.api_key) vs_id = await ensure_user_vector_store(username, client) if vs_id: vs_ids = [vs_id] except Exception: pass if vs_ids: tool_def = {"type": "file_search", "vector_store_ids": vs_ids} if filters: tool_def["filters"] = filters tools.append(tool_def) elif provider == ModelProvider.GOOGLE: attachments = await prepare_attachments( user=username, target_provider=provider, attached_ids=scoped_file_ids, llm_config=config, ) elif provider == ModelProvider.CLAUDE: attachments = await prepare_attachments( user=username, target_provider=provider, attached_ids=scoped_file_ids, llm_config=config, ) attachments_per_model.append(attachments or None) tools_per_model.append(tools or None) # Per-member context override if member.incoming_contexts: raw = [] for ctx in member.incoming_contexts: raw.extend(ctx.messages) if request.merge_strategy == MergeStrategy.SMART: merged = smart_merge_messages(raw) else: merged = raw contexts_per_model.append(Context(messages=merged)) else: contexts_per_model.append(None) # Use shared context # Build chairman config chairman = request.chairman_model chairman_provider = resolve_provider(chairman.model_name) chairman_api_key = get_user_api_key(resolved, chairman_provider.value) chairman_config = LLMConfig( provider=chairman_provider, model_name=chairman.model_name, temperature=chairman.temperature if chairman.temperature is not None else request.temperature, system_prompt=request.system_prompt, api_key=chairman_api_key, reasoning_effort=chairman.reasoning_effort if chairman.reasoning_effort is not None else request.reasoning_effort, enable_google_search=chairman.enable_google_search if chairman.enable_google_search is not None else request.enable_google_search, ) return StreamingResponse( council_event_stream( user_prompt=request.user_prompt, context=execution_context, member_configs=member_configs, chairman_config=chairman_config, attachments_per_model=attachments_per_model, tools_per_model=tools_per_model, openrouter_api_key=openrouter_key, images=images, contexts_per_model=contexts_per_model, ), media_type="text/event-stream", ) @app.post("/api/run_debate_stream") async def run_debate_stream( request: DebateRunRequest, user: str = DEFAULT_USER, current_user: User | None = Depends(get_current_user_optional), ): """ Run a multi-round LLM Debate and stream SSE events. """ resolved = resolve_user(current_user, user) username = resolved.username if resolved else DEFAULT_USER # Merge incoming contexts raw_messages = [] for ctx in request.incoming_contexts: raw_messages.extend(ctx.messages) if request.merge_strategy == MergeStrategy.SMART: final_messages = smart_merge_messages(raw_messages) else: final_messages = raw_messages execution_context = Context(messages=final_messages) # Extract images from attached files images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) openrouter_key = get_user_api_key(resolved, "openrouter") # Build LLMConfig + attachments + tools for each debate member member_configs: list[LLMConfig] = [] attachments_per_model: list[list[dict] | None] = [] tools_per_model: list[list[dict] | None] = [] for member in request.debate_models: provider = resolve_provider(member.model_name) provider_str = provider.value api_key = get_user_api_key(resolved, provider_str) config = LLMConfig( provider=provider, model_name=member.model_name, temperature=member.temperature if member.temperature is not None else request.temperature, system_prompt=request.system_prompt, api_key=api_key, reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort, enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search, ) member_configs.append(config) # Prepare provider-specific file attachments tools: list[dict] = [] attachments: list[dict] = [] scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) if provider == ModelProvider.OPENAI: vs_ids, debug_refs, filters = await prepare_openai_vector_search( user=username, attached_ids=non_image_file_ids, scopes=request.scopes, llm_config=config, ) if not vs_ids: try: client = get_openai_client(config.api_key) vs_id = await ensure_user_vector_store(username, client) if vs_id: vs_ids = [vs_id] except Exception: pass if vs_ids: tool_def = {"type": "file_search", "vector_store_ids": vs_ids} if filters: tool_def["filters"] = filters tools.append(tool_def) elif provider == ModelProvider.GOOGLE: attachments = await prepare_attachments( user=username, target_provider=provider, attached_ids=scoped_file_ids, llm_config=config, ) elif provider == ModelProvider.CLAUDE: attachments = await prepare_attachments( user=username, target_provider=provider, attached_ids=scoped_file_ids, llm_config=config, ) attachments_per_model.append(attachments or None) tools_per_model.append(tools or None) # Build judge config (if external_judge mode) judge_config = None if request.judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and request.judge_model: judge = request.judge_model judge_provider = resolve_provider(judge.model_name) judge_api_key = get_user_api_key(resolved, judge_provider.value) judge_config = LLMConfig( provider=judge_provider, model_name=judge.model_name, temperature=judge.temperature if judge.temperature is not None else request.temperature, system_prompt=request.system_prompt, api_key=judge_api_key, reasoning_effort=judge.reasoning_effort if judge.reasoning_effort is not None else request.reasoning_effort, enable_google_search=judge.enable_google_search if judge.enable_google_search is not None else request.enable_google_search, ) return StreamingResponse( debate_event_stream( user_prompt=request.user_prompt, context=execution_context, member_configs=member_configs, judge_config=judge_config, judge_mode=request.judge_mode, debate_format=request.debate_format, max_rounds=request.max_rounds, custom_format_prompt=request.custom_format_prompt, attachments_per_model=attachments_per_model, tools_per_model=tools_per_model, openrouter_api_key=openrouter_key, images=images, ), media_type="text/event-stream", ) class TitleRequest(BaseModel): user_prompt: str response: str class TitleResponse(BaseModel): title: str @app.post("/api/generate_title", response_model=TitleResponse) async def generate_title_endpoint( request: TitleRequest, user: str = DEFAULT_USER, current_user: User | None = Depends(get_current_user_optional) ): """ Generate a short title for a Q-A pair using gpt-5-nano. Returns 3-4 short English words summarizing the topic. """ resolved = resolve_user(current_user, user) api_key = get_user_api_key(resolved, "openai") title = await generate_title(request.user_prompt, request.response, api_key) return TitleResponse(title=title) class SummarizeRequest(BaseModel): content: str model: str # Model to use for summarization class SummarizeResponse(BaseModel): summary: str @app.post("/api/summarize", response_model=SummarizeResponse) async def summarize_endpoint( request: SummarizeRequest, user: str = DEFAULT_USER, current_user: User | None = Depends(get_current_user_optional) ): """ Summarize the given content using the specified model. """ resolved = resolve_user(current_user, user) from app.services.llm import summarize_content openai_key = get_user_api_key(resolved, "openai") gemini_key = get_user_api_key(resolved, "gemini") summary = await summarize_content(request.content, request.model, openai_key, gemini_key) return SummarizeResponse(summary=summary) # ---------------- Project / Blueprint APIs ---------------- @app.get("/api/projects/tree", response_model=List[FSItem]) def get_project_tree(user: str = DEFAULT_USER): """ List all files/folders for the user under the projects root. """ ensure_user_root(user) return list_tree(user) @app.post("/api/projects/create_folder") def create_folder(req: FolderRequest): """ Create a folder (and parents) under the user's project root. """ try: folder_path = safe_path(req.user, req.path) os.makedirs(folder_path, exist_ok=True) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/save_blueprint") def save_blueprint(req: SaveBlueprintRequest): """ Save a blueprint JSON to disk. """ try: full_path = safe_path(req.user, req.path) os.makedirs(os.path.dirname(full_path), exist_ok=True) raw = json.dumps(req.content, ensure_ascii=False, indent=2) logger.info("save_blueprint: user=%s path=%s size=%d bytes", req.user, req.path, len(raw)) with open(full_path, "w", encoding="utf-8") as f: f.write(raw) return {"ok": True} except HTTPException: raise except Exception as e: logger.error("save_blueprint failed: user=%s path=%s error=%s", req.user, req.path, e) raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/projects/file") def read_blueprint(user: str = DEFAULT_USER, path: str = ""): """ Read a blueprint JSON file. """ if not path: raise HTTPException(status_code=400, detail="path is required") full_path = safe_path(user, path) if not os.path.isfile(full_path): raise HTTPException(status_code=404, detail="file not found") try: with open(full_path, "r", encoding="utf-8") as f: data = json.load(f) return {"content": data} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/projects/download") def download_blueprint(user: str = DEFAULT_USER, path: str = ""): """ Download a blueprint file. """ if not path: raise HTTPException(status_code=400, detail="path is required") full_path = safe_path(user, path) if not os.path.isfile(full_path): raise HTTPException(status_code=404, detail="file not found") return FileResponse(full_path, filename=os.path.basename(full_path), media_type="application/json") @app.post("/api/projects/rename") def rename_item(req: RenameRequest): """ Rename or move a file or folder. - If new_path is provided, it is treated as the target relative path (move). - Else, new_name is used within the same directory. """ try: src = safe_path(req.user, req.path) if not os.path.exists(src): raise HTTPException(status_code=404, detail="source not found") if req.new_path: dst = safe_path(req.user, req.new_path) else: if not req.new_name: raise HTTPException(status_code=400, detail="new_name or new_path required") base_dir = os.path.dirname(src) dst = os.path.join(base_dir, req.new_name) # Ensure still inside user root safe_path(req.user, os.path.relpath(dst, ensure_user_root(req.user))) os.rename(src, dst) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/delete") def delete_item(req: DeleteRequest): """ Delete a file or folder. """ try: target = safe_path(req.user, req.path) if not os.path.exists(target): raise HTTPException(status_code=404, detail="not found") if os.path.isdir(target): if not req.is_folder: # Prevent deleting folder accidentally unless flagged raise HTTPException(status_code=400, detail="set is_folder=True to delete folder") shutil.rmtree(target) else: os.remove(target) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ---------------------------------------------------------- # --------------- Archived Nodes APIs ---------------------- def archived_path(user: str) -> str: root = archive_root(user) return os.path.join(root, ARCHIVE_FILENAME) # ---------------- Files (uploads) ---------------- def files_index_path(user: str) -> str: return os.path.join(files_root(user), "index.json") def user_vector_store_path(user: str) -> str: return os.path.join(files_root(user), "vector_store.json") async def ensure_user_vector_store(user: str, client=None) -> str: """ Ensure there is a vector store for the user (OpenAI). Persist the id under data//files/vector_store.json. """ path = user_vector_store_path(user) if client is None: client = get_openai_client() # Try existing cached ID if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) vs_id_cached = data.get("id") if vs_id_cached: try: await client.vector_stores.retrieve(vector_store_id=vs_id_cached) return vs_id_cached except Exception: # Possibly deleted; recreate below pass except Exception: pass # create new vs = await client.vector_stores.create(name=f"{user}-vs") vs_id = getattr(vs, "id", None) if not vs_id: raise HTTPException(status_code=500, detail="Failed to create vector store") os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump({"id": vs_id}, f) return vs_id async def ensure_openai_file_and_index(user: str, meta: FileMeta, path: str, llm_config: Optional[LLMConfig] = None, api_key: str = None) -> tuple[str, str]: """ Ensure the file is uploaded to OpenAI Files and added to the user's vector store. Returns (openai_file_id, vector_store_id). """ key = api_key or (llm_config.api_key if llm_config else None) client = get_openai_client(key) vs_id = await ensure_user_vector_store(user, client) file_id = meta.openai_file_id or (meta.provider_file_id if meta.provider == "openai" else None) if not file_id: with open(path, "rb") as f: content = f.read() resp = await client.files.create( file=(meta.name or "upload.bin", content), purpose="assistants", ) file_id = getattr(resp, "id", None) if not file_id: raise HTTPException(status_code=500, detail="OpenAI file upload returned no file_id") await add_file_to_vector_store(vs_id, file_id, client=client) return file_id, vs_id async def ensure_anthropic_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]: """ Upload a file to the Anthropic Files API (beta). Returns the anthropic file_id on success, or None on skip/failure. """ try: client = get_anthropic_client(api_key) except Exception: logger.debug("Skipping Anthropic file upload: no API key available") return None mime = (meta.mime or "").lower() if mime in IMAGE_MIME_TYPES or mime not in ANTHROPIC_DOCUMENT_MIME_TYPES: logger.debug("Skipping Anthropic file upload for unsupported mime: %s", mime) return None try: with open(file_path, "rb") as f: content = f.read() resp = await asyncio.to_thread( client.beta.files.upload, file=(meta.name or "upload.bin", content, mime), ) file_id = getattr(resp, "id", None) if file_id: logger.info("Anthropic file uploaded: %s -> %s", meta.name, file_id) return file_id except Exception as e: logger.warning("Anthropic file upload failed for %s: %s", meta.name, e) return None async def ensure_google_file_upload(meta: FileMeta, file_path: str, api_key: str = None) -> Optional[str]: """ Upload a file to Google GenAI Files API. Returns the Google file URI on success, or None on skip/failure. """ key = api_key or os.getenv("GOOGLE_API_KEY") if not key: logger.debug("Skipping Google file upload: no API key available") return None mime = (meta.mime or "").lower() if mime in IMAGE_MIME_TYPES: logger.debug("Skipping Google file upload for image: %s", mime) return None tmp_path = None try: client = genai.Client(api_key=key) with open(file_path, "rb") as f: content = f.read() with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(content) tmp_path = tmp.name google_resp = await asyncio.to_thread( client.files.upload, file=tmp_path, config={"mimeType": meta.mime or "application/octet-stream"}, ) google_name = getattr(google_resp, "name", None) google_uri = getattr(google_resp, "uri", None) # Poll for ACTIVE if google_name: for _ in range(10): try: info = await asyncio.to_thread(client.files.get, name=google_name) state = getattr(info, "state", None) google_uri = getattr(info, "uri", google_uri) if str(state).upper().endswith("ACTIVE") or state == "ACTIVE": break await asyncio.sleep(1) except Exception: await asyncio.sleep(1) uri = google_uri or google_name if uri: logger.info("Google file uploaded: %s -> %s", meta.name, uri) return uri except Exception as e: logger.warning("Google file upload failed for %s: %s", meta.name, e) return None finally: if tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) async def remove_file_from_vector_store(vs_id: str, file_id: str, client=None): if not vs_id or not file_id: return if client is None: client = get_openai_client() try: await client.vector_stores.files.delete(vector_store_id=vs_id, file_id=file_id) except Exception as e: logger.warning("remove_file_from_vector_store failed: %s", e) async def add_file_to_vector_store(vs_id: str, file_id: str, client=None): """ Add a file to vector store with file_id as attribute for filtering. We use file_id as the attribute so we can filter by specific files at query time. """ if client is None: client = get_openai_client() # Use file_id as attribute for filtering create_params = { "vector_store_id": vs_id, "file_id": file_id, "attributes": {"file_id": file_id} # Enable filtering by file_id } await client.vector_stores.files.create(**create_params) # Poll until completed (limit capped at 100 per API spec) for _ in range(20): listing = await client.vector_stores.files.list(vector_store_id=vs_id, limit=100) found = None for item in getattr(listing, "data", []): if getattr(item, "id", None) == file_id or getattr(item, "file_id", None) == file_id: found = item break status = getattr(found, "status", None) if found else None if status == "completed": return await asyncio.sleep(0.5) # If not confirmed, still continue return def load_files_index(user: str) -> List[FileMeta]: path = files_index_path(user) if not os.path.exists(path): return [] with open(path, "r", encoding="utf-8") as f: data = json.load(f) return [FileMeta(**item) for item in data] def save_files_index(user: str, items: List[FileMeta]): path = files_index_path(user) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump([item.model_dump() for item in items], f, ensure_ascii=False, indent=2) def resolve_scoped_file_ids(user: str, scopes: List[str], explicit_ids: List[str]) -> List[str]: """ Resolve file IDs that are relevant to the given scopes (upstream nodes). Combines scope-matched files with explicitly attached files. This gives Google/Claude the same scope awareness that OpenAI gets via file_search. """ if not scopes and not explicit_ids: return [] items = load_files_index(user) result_ids: dict[str, bool] = {} # Add explicitly attached files first for fid in explicit_ids: result_ids[fid] = True # Add files whose scopes intersect with requested scopes (skip images) if scopes: for item in items: if item.id in result_ids: continue if item.mime in IMAGE_MIME_TYPES: continue if item.scopes and any(s in scopes for s in item.scopes): result_ids[item.id] = True logger.debug("resolve_scoped_file_ids: scope match %s -> %s", item.name, item.id) return list(result_ids.keys()) async def _check_google_file_active(uri_or_name: str, api_key: str = None) -> bool: """Check if a Google file reference is still ACTIVE (not expired).""" key = api_key or os.getenv("GOOGLE_API_KEY") if not key: return False try: client = genai.Client(api_key=key) # Google file names look like "files/abc123", URIs like "https://..." # files.get() needs the name, but we can extract it or just try name = uri_or_name if "://" in uri_or_name: # Try to get by URI — extract the name from the path # URI format: https://generativelanguage.googleapis.com/v1beta/files/abc123 parts = uri_or_name.rstrip("/").split("/") name = f"files/{parts[-1]}" if parts else uri_or_name info = await asyncio.to_thread(client.files.get, name=name) state = str(getattr(info, "state", "")).upper() return state.endswith("ACTIVE") or state == "ACTIVE" except Exception as e: logger.debug("Google file check failed for %s: %s", uri_or_name, e) return False async def prepare_attachments( user: str, target_provider: str, attached_ids: List[str], llm_config: LLMConfig, ) -> list[dict]: """ For each attached file ID: - If already uploaded to the target provider, reuse provider_file_id/uri. - Otherwise, upload with the original filename (required by OpenAI). Returns a list of dicts describing attachment references for the provider. """ if not attached_ids: return [] items = load_files_index(user) items_map = {item.id: item for item in items} attachments: list[dict] = [] changed = False if isinstance(target_provider, ModelProvider): provider_norm = target_provider.value.lower() else: provider_norm = str(target_provider).lower() for fid in attached_ids: meta = items_map.get(fid) if not meta: logger.warning("Attached file id not found, skipping: %s", fid) continue path = os.path.join(files_root(user), fid) if not os.path.exists(path): raise HTTPException(status_code=404, detail=f"Attached file missing on disk: {meta.name}") if provider_norm == ModelProvider.OPENAI or provider_norm == "openai": # Reuse provider file id if available if meta.provider == "openai" and meta.provider_file_id: attachments.append({ "provider": "openai", "file_id": meta.provider_file_id, "name": meta.name, "mime": meta.mime, }) continue # Upload to OpenAI with original filename with open(path, "rb") as f: content = f.read() size = len(content) if size > OPENAI_MAX_FILE_SIZE: raise HTTPException(status_code=400, detail=f"File {meta.name} exceeds OpenAI 50MB limit") try: client = get_openai_client(llm_config.api_key) resp = await client.files.create( file=(meta.name or "upload.bin", content), purpose=OPENAI_DEFAULT_FILE_PURPOSE, ) openai_file_id = getattr(resp, "id", None) if not openai_file_id: raise HTTPException(status_code=500, detail="OpenAI file upload returned no file_id") attachments.append({ "provider": "openai", "file_id": openai_file_id, "name": meta.name, "mime": meta.mime, }) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}") elif provider_norm == ModelProvider.GOOGLE or provider_norm == "google": # Reuse cached google_file_uri — but verify it's still ACTIVE cached_uri = meta.google_file_uri or ( meta.provider_file_id if meta.provider == "google" and meta.provider_file_id and "://" in meta.provider_file_id else None ) if cached_uri: still_valid = await _check_google_file_active(cached_uri, llm_config.api_key) if still_valid: attachments.append({ "provider": "google", "uri": cached_uri, "name": meta.name, "mime": meta.mime, }) continue # Expired — clear cache, fall through to re-upload logger.info("Google file expired, re-uploading: %s (%s)", meta.name, cached_uri) meta.google_file_uri = None changed = True # On-demand upload (or re-upload after expiry) uri = await ensure_google_file_upload(meta, path, llm_config.api_key) if not uri: raise HTTPException(status_code=500, detail=f"Google upload failed for {meta.name}") meta.google_file_uri = uri changed = True attachments.append({ "provider": "google", "uri": uri, "name": meta.name, "mime": meta.mime, }) elif provider_norm == ModelProvider.CLAUDE or provider_norm == "claude": # Reuse cached anthropic_file_id if available if meta.anthropic_file_id: attachments.append({ "provider": "claude", "file_id": meta.anthropic_file_id, "name": meta.name, "mime": meta.mime, }) continue # Try on-demand upload anthropic_id = await ensure_anthropic_file_upload(meta, path, llm_config.api_key) if anthropic_id: meta.anthropic_file_id = anthropic_id changed = True attachments.append({ "provider": "claude", "file_id": anthropic_id, "name": meta.name, "mime": meta.mime, }) continue # Fallback: inline base64 data with open(path, "rb") as f: raw = f.read() attachments.append({ "provider": "claude", "data_base64": base64.b64encode(raw).decode("utf-8"), "name": meta.name, "mime": meta.mime, }) else: raise HTTPException(status_code=400, detail=f"Unsupported provider for attachments: {target_provider}") # Persist index if any on-demand uploads updated meta (applies to Claude and Google) if changed: save_files_index(user, list(items_map.values())) # Debug log logger.debug("attachments: provider=%s count=%d detail=%s", provider_norm, len(attachments), [{'name': a.get('name'), 'id': a.get('file_id', a.get('uri'))} for a in attachments]) return attachments async def prepare_openai_vector_search( user: str, attached_ids: List[str], scopes: List[str], llm_config: LLMConfig, ) -> tuple[List[str], List[dict], Optional[dict]]: """ Ensure all attached files are uploaded to OpenAI Files (purpose=assistants) and added to the user's vector store. Returns (vector_store_ids, openai_file_refs_for_debug, filters). Filtering logic: - Include files whose scopes intersect with requested scopes - ALSO include explicitly attached files (attached_ids) - Deduplicate to avoid double-processing - Filters are constructed using file_id attribute in vector store """ items = load_files_index(user) items_map = {item.id: item for item in items} # Determine which files to include - combine scopes AND attached_ids relevant_files_map: dict[str, FileMeta] = {} # First: add files matching scopes (skip image files — they are handled inline) if scopes: for item in items: if item.mime in IMAGE_MIME_TYPES: continue if item.scopes and any(s in scopes for s in item.scopes): relevant_files_map[item.id] = item logger.debug("file_search: scopes=%s matched_files=%s", scopes, [f.name for f in relevant_files_map.values()]) # Second: also add explicitly attached files (they should always be searchable) if attached_ids: for fid in attached_ids: meta = items_map.get(fid) if meta and fid not in relevant_files_map: relevant_files_map[fid] = meta logger.debug("file_search: adding explicitly attached file: %s", meta.name) relevant_files = list(relevant_files_map.values()) if not relevant_files: return [], [], None changed = False vs_ids: List[str] = [] debug_refs: List[dict] = [] file_ids_for_filter: List[str] = [] for meta in relevant_files: path = os.path.join(files_root(user), meta.id) if not os.path.exists(path): logger.warning("Attached file missing on disk, skipping: %s", meta.id) continue # Enforce 50MB OpenAI limit file_size = os.path.getsize(path) if file_size > OPENAI_MAX_FILE_SIZE: logger.warning("File %s exceeds OpenAI 50MB limit, skipping", meta.name) continue openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, path, llm_config) if meta.openai_file_id != openai_file_id or meta.openai_vector_store_id != vs_id: meta.openai_file_id = openai_file_id meta.openai_vector_store_id = vs_id changed = True vs_ids.append(vs_id) debug_refs.append({"name": meta.name, "file_id": openai_file_id, "vs_id": vs_id}) if openai_file_id: file_ids_for_filter.append(openai_file_id) if changed: save_files_index(user, list(items_map.values())) # deduplicate vs_ids_unique = list({vid for vid in vs_ids if vid}) # Build filters to only search relevant files filters = None if file_ids_for_filter: filters = {"type": "in", "key": "file_id", "value": file_ids_for_filter} return vs_ids_unique, debug_refs, filters # ------------------------------------------------- @app.get("/api/projects/archived") def get_archived_nodes(user: str = DEFAULT_USER): migrate_legacy_layout(user) path = archived_path(user) if not os.path.exists(path): return {"archived": []} try: with open(path, "r", encoding="utf-8") as f: return {"archived": json.load(f)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/archived") def save_archived_nodes(payload: dict): user = payload.get("user", DEFAULT_USER) data = payload.get("archived", []) try: path = archived_path(user) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return {"ok": True} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/files") def list_files(user: str = DEFAULT_USER): migrate_legacy_layout(user) items = load_files_index(user) return {"files": [item.model_dump() for item in items]} @app.post("/api/files/upload") async def upload_file( user: str = DEFAULT_USER, file: UploadFile = File(...), provider: str = Form("local"), purpose: Optional[str] = Form(None), current_user: User | None = Depends(get_current_user_optional), ): if current_user: user = current_user.username migrate_legacy_layout(user) items = load_files_index(user) file_id = str(uuid4()) dest_root = files_root(user) dest_path = os.path.join(dest_root, file_id) file_name = file.filename or "upload.bin" provider_normalized = (provider or "local").lower() if provider_normalized not in VALID_FILE_PROVIDERS: raise HTTPException(status_code=400, detail="Unsupported provider") try: content = await file.read() except Exception as e: raise HTTPException(status_code=500, detail=str(e)) size = len(content) if provider_normalized == "openai" and size > OPENAI_MAX_FILE_SIZE: raise HTTPException(status_code=400, detail="OpenAI provider limit: max 50MB per file") provider_file_id: Optional[str] = None provider_created_at: Optional[float] = None # Resolve user API keys for all providers openai_key = get_user_api_key(current_user, "openai") google_key = get_user_api_key(current_user, "google") anthropic_key = get_user_api_key(current_user, "claude") if provider_normalized == "openai": try: client = get_openai_client(openai_key) upload_purpose = purpose or OPENAI_DEFAULT_FILE_PURPOSE resp = await client.files.create( file=(file_name, content), purpose=upload_purpose, ) provider_file_id = getattr(resp, "id", None) provider_created_at = getattr(resp, "created_at", None) except Exception as e: raise HTTPException(status_code=500, detail=f"OpenAI upload failed: {str(e)}") elif provider_normalized == "google": try: g_key = google_key if not g_key: raise HTTPException(status_code=500, detail="Google API Key not found") client = genai.Client(api_key=g_key) # The Google GenAI SDK upload is synchronous; run in thread to avoid blocking the event loop. tmp_path = None try: with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(content) tmp_path = tmp.name google_resp = await asyncio.to_thread( client.files.upload, file=tmp_path, config={"mimeType": file.content_type or "application/octet-stream"}, ) google_name = getattr(google_resp, "name", None) google_uri = getattr(google_resp, "uri", None) # Poll for ACTIVE and uri if missing if google_name: for _ in range(10): try: info = await asyncio.to_thread(client.files.get, name=google_name) state = getattr(info, "state", None) google_uri = getattr(info, "uri", google_uri) if str(state).upper().endswith("ACTIVE") or state == "ACTIVE": break await asyncio.sleep(1) except Exception: await asyncio.sleep(1) provider_file_id = google_uri or google_name finally: if tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) provider_created_at = time.time() except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Google upload failed: {str(e)}") try: os.makedirs(dest_root, exist_ok=True) with open(dest_path, "wb") as f: f.write(content) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) created_at = provider_created_at or os.path.getmtime(dest_path) meta = FileMeta( id=file_id, name=file_name, size=size, mime=file.content_type or "application/octet-stream", created_at=created_at, provider=provider_normalized if provider_normalized != "local" else None, provider_file_id=provider_file_id, openai_file_id=None, openai_vector_store_id=None, ) # Always try to index into OpenAI vector store (if <=50MB) # Skip image files — they are sent inline to vision-capable models, not via file_search mime_for_check = (file.content_type or "application/octet-stream").lower() if mime_for_check in IMAGE_MIME_TYPES: logger.debug("Skipping OpenAI vector store indexing for image file: %s", file_name) elif size <= OPENAI_MAX_FILE_SIZE: try: openai_file_id, vs_id = await ensure_openai_file_and_index(user, meta, dest_path, api_key=openai_key) meta.openai_file_id = openai_file_id meta.openai_vector_store_id = vs_id if provider_normalized == "openai" and not meta.provider_file_id: meta.provider_file_id = openai_file_id except Exception as e: logger.warning("OpenAI indexing failed for %s: %s", file_name, e) else: logger.warning("Skipping OpenAI indexing for %s: exceeds 50MB", file_name) # Anthropic Files API upload (eager, like OpenAI) anthropic_id = await ensure_anthropic_file_upload(meta, dest_path, api_key=anthropic_key) if anthropic_id: meta.anthropic_file_id = anthropic_id # Google Files API upload (eager, like OpenAI) google_uri = await ensure_google_file_upload(meta, dest_path, api_key=google_key) if google_uri: meta.google_file_uri = google_uri # Also backfill provider_file_id for legacy compat if provider was google if provider_normalized == "google" and not meta.provider_file_id: meta.provider_file_id = google_uri items.append(meta) save_files_index(user, items) return {"file": meta} @app.get("/api/files/download") def download_file(user: str = DEFAULT_USER, file_id: str = ""): migrate_legacy_layout(user) items = load_files_index(user) meta = next((i for i in items if i.id == file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") path = os.path.join(files_root(user), file_id) if not os.path.exists(path): raise HTTPException(status_code=404, detail="file missing on disk") return FileResponse(path, filename=meta.name, media_type=meta.mime) @app.post("/api/files/delete") async def delete_file( user: str = DEFAULT_USER, file_id: str = "", current_user: User | None = Depends(get_current_user_optional), ): if current_user: user = current_user.username migrate_legacy_layout(user) items = load_files_index(user) meta = next((i for i in items if i.id == file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") # Remove from vector store and OpenAI Files if present if meta.openai_vector_store_id and meta.openai_file_id: await remove_file_from_vector_store(meta.openai_vector_store_id, meta.openai_file_id) if meta.provider == "openai" and meta.provider_file_id: try: client = get_openai_client(get_user_api_key(current_user, "openai")) await client.files.delete(meta.provider_file_id) except Exception as e: raise HTTPException(status_code=500, detail=f"OpenAI delete failed: {str(e)}") # Delete from Google Files API — collect all distinct Google refs google_refs = set() if meta.provider == "google" and meta.provider_file_id: google_refs.add(meta.provider_file_id) if meta.google_file_uri: google_refs.add(meta.google_file_uri) for g_ref in google_refs: try: key = get_user_api_key(current_user, "google") if not key: logger.warning("Skipping Google file deletion: no API key") break client = genai.Client(api_key=key) await asyncio.to_thread(client.files.delete, g_ref) logger.info("Google file deleted: %s", g_ref) except Exception as e: logger.warning("Google file deletion failed for %s: %s", g_ref, e) # Delete from Anthropic Files API if present if meta.anthropic_file_id: try: client = get_anthropic_client(get_user_api_key(current_user, "claude")) await asyncio.to_thread( client.beta.files.delete, meta.anthropic_file_id ) logger.info("Anthropic file deleted: %s", meta.anthropic_file_id) except Exception as e: logger.warning("Anthropic file deletion failed for %s: %s", meta.anthropic_file_id, e) path = os.path.join(files_root(user), file_id) if os.path.exists(path): os.remove(path) items = [i for i in items if i.id != file_id] save_files_index(user, items) return {"ok": True} class AddScopeRequest(BaseModel): user: str = DEFAULT_USER file_id: str scope: str # "project_path/node_id" composite key @app.post("/api/files/add_scope") def add_file_scope(request: AddScopeRequest): """ Add a scope to a file's scopes list. Called when user attaches a file to a node. """ migrate_legacy_layout(request.user) items = load_files_index(request.user) meta = next((i for i in items if i.id == request.file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") if request.scope not in meta.scopes: meta.scopes.append(request.scope) save_files_index(request.user, items) return {"file": meta.model_dump()} class RemoveScopeRequest(BaseModel): user: str = DEFAULT_USER file_id: str scope: str @app.post("/api/files/remove_scope") def remove_file_scope(request: RemoveScopeRequest): """ Remove a scope from a file's scopes list. Called when user detaches a file from a node. """ migrate_legacy_layout(request.user) items = load_files_index(request.user) meta = next((i for i in items if i.id == request.file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") if request.scope in meta.scopes: meta.scopes.remove(request.scope) save_files_index(request.user, items) return {"file": meta.model_dump()}