from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi import UploadFile, File from pydantic import BaseModel from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort from app.services.llm import llm_streamer, generate_title from dotenv import load_dotenv import os import json import shutil from typing import List, Literal, Optional from uuid import uuid4 load_dotenv() app = FastAPI(title="ContextFlow Backend") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --------- Project / Blueprint storage --------- DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data"))) DEFAULT_USER = "test" ARCHIVE_FILENAME = "archived_nodes.json" def ensure_user_root(user: str) -> str: """ Ensures the new data root structure: data//projects data//archive """ user_root = os.path.join(DATA_ROOT, user) projects_root = os.path.join(user_root, "projects") archive_root = os.path.join(user_root, "archive") os.makedirs(projects_root, exist_ok=True) os.makedirs(archive_root, exist_ok=True) return user_root def projects_root(user: str) -> str: return os.path.join(ensure_user_root(user), "projects") def archive_root(user: str) -> str: return os.path.join(ensure_user_root(user), "archive") def files_root(user: str) -> str: root = os.path.join(ensure_user_root(user), "files") os.makedirs(root, exist_ok=True) return root def migrate_legacy_layout(user: str): """ Migrate from legacy ./projects/ and legacy archive folders to the new data// structure. """ legacy_root = os.path.abspath(os.path.join(os.getcwd(), "projects", user)) new_projects = projects_root(user) if os.path.exists(legacy_root) and not os.listdir(new_projects): try: for name in os.listdir(legacy_root): src = os.path.join(legacy_root, name) dst = os.path.join(new_projects, name) if not os.path.exists(dst): shutil.move(src, dst) except Exception: pass # migrate legacy archive (archived/ or .cf_archived/) legacy_archives = [ os.path.join(legacy_root, "archived", ARCHIVE_FILENAME), os.path.join(legacy_root, ".cf_archived", ARCHIVE_FILENAME), ] new_archive_file = archived_path(user) if not os.path.exists(new_archive_file): for legacy in legacy_archives: if os.path.exists(legacy): os.makedirs(os.path.dirname(new_archive_file), exist_ok=True) try: shutil.move(legacy, new_archive_file) except Exception: pass def safe_path(user: str, relative_path: str) -> str: root = projects_root(user) norm = os.path.normpath(relative_path).lstrip(os.sep) full = os.path.abspath(os.path.join(root, norm)) if not full.startswith(root): raise HTTPException(status_code=400, detail="Invalid path") return full class FSItem(BaseModel): name: str path: str # path relative to user root type: Literal["file", "folder"] size: Optional[int] = None mtime: Optional[float] = None children: Optional[List["FSItem"]] = None FSItem.model_rebuild() def list_tree(user: str, relative_path: str = ".") -> List[FSItem]: migrate_legacy_layout(user) root = safe_path(user, relative_path) items: List[FSItem] = [] for name in sorted(os.listdir(root)): full = os.path.join(root, name) rel = os.path.relpath(full, projects_root(user)) stat = os.stat(full) if os.path.isdir(full): items.append(FSItem( name=name, path=rel, type="folder", size=None, mtime=stat.st_mtime, children=list_tree(user, rel) )) else: items.append(FSItem( name=name, path=rel, type="file", size=stat.st_size, mtime=stat.st_mtime, children=None )) return items class SaveBlueprintRequest(BaseModel): user: str = DEFAULT_USER path: str # relative path including filename.json content: dict class RenameRequest(BaseModel): user: str = DEFAULT_USER path: str new_name: Optional[str] = None new_path: Optional[str] = None class FileMeta(BaseModel): id: str name: str size: int mime: str created_at: float provider: Optional[str] = None provider_file_id: Optional[str] = None class FolderRequest(BaseModel): user: str = DEFAULT_USER path: str # relative folder path class DeleteRequest(BaseModel): user: str = DEFAULT_USER path: str is_folder: bool = False # ----------------------------------------------- @app.get("/") def read_root(): return {"message": "ContextFlow Backend is running"} def smart_merge_messages(messages: list[Message]) -> list[Message]: """ Merges messages using two steps: 1. Deduplication by ID (to handle diamond dependencies). 2. Merging consecutive messages from the same role. """ if not messages: return [] # 1. Deduplicate by ID, keeping order seen_ids = set() deduplicated = [] for msg in messages: if msg.id not in seen_ids: deduplicated.append(msg) seen_ids.add(msg.id) # 2. Merge consecutive roles if not deduplicated: return [] merged = [] current_msg = deduplicated[0].model_copy() for next_msg in deduplicated[1:]: if next_msg.role == current_msg.role: # Merge content current_msg.content += f"\n\n{next_msg.content}" # Keep the latest timestamp current_msg.timestamp = next_msg.timestamp else: merged.append(current_msg) current_msg = next_msg.model_copy() merged.append(current_msg) return merged @app.post("/api/run_node_stream") async def run_node_stream(request: NodeRunRequest): """ Stream the response from the LLM. """ # 1. Concatenate all incoming contexts first raw_messages = [] for ctx in request.incoming_contexts: raw_messages.extend(ctx.messages) # 2. Apply Merge Strategy final_messages = [] if request.merge_strategy == MergeStrategy.SMART: final_messages = smart_merge_messages(raw_messages) else: # RAW strategy: just keep them as is final_messages = raw_messages execution_context = Context(messages=final_messages) return StreamingResponse( llm_streamer(execution_context, request.user_prompt, request.config), media_type="text/event-stream" ) class TitleRequest(BaseModel): user_prompt: str response: str class TitleResponse(BaseModel): title: str @app.post("/api/generate_title", response_model=TitleResponse) async def generate_title_endpoint(request: TitleRequest): """ Generate a short title for a Q-A pair using gpt-5-nano. Returns 3-4 short English words summarizing the topic. """ title = await generate_title(request.user_prompt, request.response) return TitleResponse(title=title) class SummarizeRequest(BaseModel): content: str model: str # Model to use for summarization class SummarizeResponse(BaseModel): summary: str @app.post("/api/summarize", response_model=SummarizeResponse) async def summarize_endpoint(request: SummarizeRequest): """ Summarize the given content using the specified model. """ from app.services.llm import summarize_content summary = await summarize_content(request.content, request.model) return SummarizeResponse(summary=summary) # ---------------- Project / Blueprint APIs ---------------- @app.get("/api/projects/tree", response_model=List[FSItem]) def get_project_tree(user: str = DEFAULT_USER): """ List all files/folders for the user under the projects root. """ ensure_user_root(user) return list_tree(user) @app.post("/api/projects/create_folder") def create_folder(req: FolderRequest): """ Create a folder (and parents) under the user's project root. """ try: folder_path = safe_path(req.user, req.path) os.makedirs(folder_path, exist_ok=True) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/save_blueprint") def save_blueprint(req: SaveBlueprintRequest): """ Save a blueprint JSON to disk. """ try: full_path = safe_path(req.user, req.path) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "w", encoding="utf-8") as f: json.dump(req.content, f, ensure_ascii=False, indent=2) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/projects/file") def read_blueprint(user: str = DEFAULT_USER, path: str = ""): """ Read a blueprint JSON file. """ if not path: raise HTTPException(status_code=400, detail="path is required") full_path = safe_path(user, path) if not os.path.isfile(full_path): raise HTTPException(status_code=404, detail="file not found") try: with open(full_path, "r", encoding="utf-8") as f: data = json.load(f) return {"content": data} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/projects/download") def download_blueprint(user: str = DEFAULT_USER, path: str = ""): """ Download a blueprint file. """ if not path: raise HTTPException(status_code=400, detail="path is required") full_path = safe_path(user, path) if not os.path.isfile(full_path): raise HTTPException(status_code=404, detail="file not found") return FileResponse(full_path, filename=os.path.basename(full_path), media_type="application/json") @app.post("/api/projects/rename") def rename_item(req: RenameRequest): """ Rename or move a file or folder. - If new_path is provided, it is treated as the target relative path (move). - Else, new_name is used within the same directory. """ try: src = safe_path(req.user, req.path) if not os.path.exists(src): raise HTTPException(status_code=404, detail="source not found") if req.new_path: dst = safe_path(req.user, req.new_path) else: if not req.new_name: raise HTTPException(status_code=400, detail="new_name or new_path required") base_dir = os.path.dirname(src) dst = os.path.join(base_dir, req.new_name) # Ensure still inside user root safe_path(req.user, os.path.relpath(dst, ensure_user_root(req.user))) os.rename(src, dst) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/delete") def delete_item(req: DeleteRequest): """ Delete a file or folder. """ try: target = safe_path(req.user, req.path) if not os.path.exists(target): raise HTTPException(status_code=404, detail="not found") if os.path.isdir(target): if not req.is_folder: # Prevent deleting folder accidentally unless flagged raise HTTPException(status_code=400, detail="set is_folder=True to delete folder") shutil.rmtree(target) else: os.remove(target) return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ---------------------------------------------------------- # --------------- Archived Nodes APIs ---------------------- def archived_path(user: str) -> str: root = archive_root(user) return os.path.join(root, ARCHIVE_FILENAME) # ---------------- Files (uploads) ---------------- def files_index_path(user: str) -> str: return os.path.join(files_root(user), "index.json") def load_files_index(user: str) -> List[FileMeta]: path = files_index_path(user) if not os.path.exists(path): return [] with open(path, "r", encoding="utf-8") as f: data = json.load(f) return [FileMeta(**item) for item in data] def save_files_index(user: str, items: List[FileMeta]): path = files_index_path(user) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump([item.model_dump() for item in items], f, ensure_ascii=False, indent=2) # ------------------------------------------------- @app.get("/api/projects/archived") def get_archived_nodes(user: str = DEFAULT_USER): migrate_legacy_layout(user) path = archived_path(user) if not os.path.exists(path): return {"archived": []} try: with open(path, "r", encoding="utf-8") as f: return {"archived": json.load(f)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/projects/archived") def save_archived_nodes(payload: dict): user = payload.get("user", DEFAULT_USER) data = payload.get("archived", []) try: path = archived_path(user) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return {"ok": True} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/files") def list_files(user: str = DEFAULT_USER): migrate_legacy_layout(user) items = load_files_index(user) return {"files": [item.model_dump() for item in items]} @app.post("/api/files/upload") async def upload_file(user: str = DEFAULT_USER, file: UploadFile = File(...)): migrate_legacy_layout(user) items = load_files_index(user) file_id = str(uuid4()) dest_root = files_root(user) dest_path = os.path.join(dest_root, file_id) try: content = await file.read() size = len(content) with open(dest_path, "wb") as f: f.write(content) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) meta = FileMeta( id=file_id, name=file.filename, size=size, mime=file.content_type or "application/octet-stream", created_at=os.path.getmtime(dest_path), ) items.append(meta) save_files_index(user, items) return {"file": meta} @app.get("/api/files/download") def download_file(user: str = DEFAULT_USER, file_id: str = ""): migrate_legacy_layout(user) items = load_files_index(user) meta = next((i for i in items if i.id == file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") path = os.path.join(files_root(user), file_id) if not os.path.exists(path): raise HTTPException(status_code=404, detail="file missing on disk") return FileResponse(path, filename=meta.name, media_type=meta.mime) @app.post("/api/files/delete") def delete_file(user: str = DEFAULT_USER, file_id: str = ""): migrate_legacy_layout(user) items = load_files_index(user) meta = next((i for i in items if i.id == file_id), None) if not meta: raise HTTPException(status_code=404, detail="file not found") path = os.path.join(files_root(user), file_id) if os.path.exists(path): os.remove(path) items = [i for i in items if i.id != file_id] save_files_index(user, items) return {"ok": True}