diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-14 03:40:31 +0000 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-14 03:40:31 +0000 |
| commit | bdf381a2c8a0337f7459000f487a80f9cbbbdd2f (patch) | |
| tree | b3c72c85f3e7c47b4c98a1301acc7fa7d23a6d05 | |
| parent | ded75a5c19ad4aa8dc832fc4c138b68093e22ee8 (diff) | |
Add background task persistence for debate & council operations
Decouple debate/council execution from SSE connection lifecycle so tasks
survive browser disconnects. Backend runs work as asyncio.Tasks with
progressive disk persistence; frontend can reconnect and recover state.
- New backend/app/services/tasks.py: task registry, broadcast pattern,
disk persistence at milestones, stale task cleanup on startup
- New endpoints: POST start_debate/start_council, GET task stream/poll
- Frontend stores taskId on nodes, recovers running tasks on page load
- _applyPartialEvents rebuilds stage text + data from accumulated events
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| -rw-r--r-- | backend/app/main.py | 355 | ||||
| -rw-r--r-- | backend/app/services/tasks.py | 304 | ||||
| -rw-r--r-- | frontend/src/components/LeftSidebar.tsx | 3 | ||||
| -rw-r--r-- | frontend/src/components/Sidebar.tsx | 324 | ||||
| -rw-r--r-- | frontend/src/store/flowStore.ts | 487 |
5 files changed, 1208 insertions, 265 deletions
diff --git a/backend/app/main.py b/backend/app/main.py index 89c5dd0..746b731 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -12,6 +12,10 @@ from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Me from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client, resolve_provider from app.services.council import council_event_stream from app.services.debate import debate_event_stream +from app.services.tasks import ( + register_task, get_task, get_task_status, run_task_in_background, + cleanup_stale_tasks, TaskStatus, _CHUNK_EVENT_TYPES, _task_registry, +) from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db from app.auth.utils import get_password_hash from dotenv import load_dotenv @@ -57,7 +61,12 @@ app.add_middleware( async def startup_event(): """Initialize database and create default test user if not exists""" init_db() - + + # Mark any in-progress tasks from prior run as interrupted + stale_count = cleanup_stale_tasks(DATA_ROOT) + if stale_count: + logger.info("Cleaned up %d stale background tasks", stale_count) + # Create test user if not exists from app.auth.models import SessionLocal db = SessionLocal() @@ -711,6 +720,350 @@ async def run_debate_stream( ) +# --------------- Background Task Helpers --------------- + +async def _prepare_council_args(request: CouncilRunRequest, resolved: User | None, username: str) -> dict: + """Extract shared council request processing into a reusable helper. + Returns kwargs suitable for council_event_stream().""" + raw_messages = [] + for ctx in request.incoming_contexts: + raw_messages.extend(ctx.messages) + if request.merge_strategy == MergeStrategy.SMART: + final_messages = smart_merge_messages(raw_messages) + else: + final_messages = raw_messages + execution_context = Context(messages=final_messages) + + images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) + openrouter_key = get_user_api_key(resolved, "openrouter") + + member_configs: list[LLMConfig] = [] + attachments_per_model: list[list[dict] | None] = [] + tools_per_model: list[list[dict] | None] = [] + contexts_per_model: list[Context | None] = [] + + for member in request.council_models: + provider = resolve_provider(member.model_name) + provider_str = provider.value + api_key = get_user_api_key(resolved, provider_str) + config = LLMConfig( + provider=provider, + model_name=member.model_name, + temperature=member.temperature if member.temperature is not None else request.temperature, + system_prompt=request.system_prompt, + api_key=api_key, + reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort, + enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search, + ) + member_configs.append(config) + + tools: list[dict] = [] + attachments: list[dict] = [] + scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) + + if provider == ModelProvider.OPENAI: + vs_ids, debug_refs, filters = await prepare_openai_vector_search( + user=username, attached_ids=non_image_file_ids, + scopes=request.scopes, llm_config=config, + ) + if not vs_ids: + try: + client = get_openai_client(config.api_key) + vs_id = await ensure_user_vector_store(username, client) + if vs_id: + vs_ids = [vs_id] + except Exception: + pass + if vs_ids: + tool_def = {"type": "file_search", "vector_store_ids": vs_ids} + if filters: + tool_def["filters"] = filters + tools.append(tool_def) + elif provider == ModelProvider.GOOGLE: + attachments = await prepare_attachments( + user=username, target_provider=provider, + attached_ids=scoped_file_ids, llm_config=config, + ) + elif provider == ModelProvider.CLAUDE: + attachments = await prepare_attachments( + user=username, target_provider=provider, + attached_ids=scoped_file_ids, llm_config=config, + ) + + attachments_per_model.append(attachments or None) + tools_per_model.append(tools or None) + + if member.incoming_contexts: + raw = [] + for ctx in member.incoming_contexts: + raw.extend(ctx.messages) + if request.merge_strategy == MergeStrategy.SMART: + merged = smart_merge_messages(raw) + else: + merged = raw + contexts_per_model.append(Context(messages=merged)) + else: + contexts_per_model.append(None) + + chairman = request.chairman_model + chairman_provider = resolve_provider(chairman.model_name) + chairman_api_key = get_user_api_key(resolved, chairman_provider.value) + chairman_config = LLMConfig( + provider=chairman_provider, + model_name=chairman.model_name, + temperature=chairman.temperature if chairman.temperature is not None else request.temperature, + system_prompt=request.system_prompt, + api_key=chairman_api_key, + reasoning_effort=chairman.reasoning_effort if chairman.reasoning_effort is not None else request.reasoning_effort, + enable_google_search=chairman.enable_google_search if chairman.enable_google_search is not None else request.enable_google_search, + ) + + return dict( + user_prompt=request.user_prompt, + context=execution_context, + member_configs=member_configs, + chairman_config=chairman_config, + attachments_per_model=attachments_per_model, + tools_per_model=tools_per_model, + openrouter_api_key=openrouter_key, + images=images, + contexts_per_model=contexts_per_model, + ) + + +async def _prepare_debate_args(request: DebateRunRequest, resolved: User | None, username: str) -> dict: + """Extract shared debate request processing into a reusable helper. + Returns kwargs suitable for debate_event_stream().""" + raw_messages = [] + for ctx in request.incoming_contexts: + raw_messages.extend(ctx.messages) + if request.merge_strategy == MergeStrategy.SMART: + final_messages = smart_merge_messages(raw_messages) + else: + final_messages = raw_messages + execution_context = Context(messages=final_messages) + + images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) + openrouter_key = get_user_api_key(resolved, "openrouter") + + member_configs: list[LLMConfig] = [] + attachments_per_model: list[list[dict] | None] = [] + tools_per_model: list[list[dict] | None] = [] + + for member in request.debate_models: + provider = resolve_provider(member.model_name) + provider_str = provider.value + api_key = get_user_api_key(resolved, provider_str) + config = LLMConfig( + provider=provider, + model_name=member.model_name, + temperature=member.temperature if member.temperature is not None else request.temperature, + system_prompt=request.system_prompt, + api_key=api_key, + reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort, + enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search, + ) + member_configs.append(config) + + tools: list[dict] = [] + attachments: list[dict] = [] + scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) + + if provider == ModelProvider.OPENAI: + vs_ids, debug_refs, filters = await prepare_openai_vector_search( + user=username, attached_ids=non_image_file_ids, + scopes=request.scopes, llm_config=config, + ) + if not vs_ids: + try: + client = get_openai_client(config.api_key) + vs_id = await ensure_user_vector_store(username, client) + if vs_id: + vs_ids = [vs_id] + except Exception: + pass + if vs_ids: + tool_def = {"type": "file_search", "vector_store_ids": vs_ids} + if filters: + tool_def["filters"] = filters + tools.append(tool_def) + elif provider == ModelProvider.GOOGLE: + attachments = await prepare_attachments( + user=username, target_provider=provider, + attached_ids=scoped_file_ids, llm_config=config, + ) + elif provider == ModelProvider.CLAUDE: + attachments = await prepare_attachments( + user=username, target_provider=provider, + attached_ids=scoped_file_ids, llm_config=config, + ) + + attachments_per_model.append(attachments or None) + tools_per_model.append(tools or None) + + judge_config = None + if request.judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and request.judge_model: + judge = request.judge_model + judge_provider = resolve_provider(judge.model_name) + judge_api_key = get_user_api_key(resolved, judge_provider.value) + judge_config = LLMConfig( + provider=judge_provider, + model_name=judge.model_name, + temperature=judge.temperature if judge.temperature is not None else request.temperature, + system_prompt=request.system_prompt, + api_key=judge_api_key, + reasoning_effort=judge.reasoning_effort if judge.reasoning_effort is not None else request.reasoning_effort, + enable_google_search=judge.enable_google_search if judge.enable_google_search is not None else request.enable_google_search, + ) + + return dict( + user_prompt=request.user_prompt, + context=execution_context, + member_configs=member_configs, + judge_config=judge_config, + judge_mode=request.judge_mode, + debate_format=request.debate_format, + max_rounds=request.max_rounds, + custom_format_prompt=request.custom_format_prompt, + attachments_per_model=attachments_per_model, + tools_per_model=tools_per_model, + openrouter_api_key=openrouter_key, + images=images, + ) + + +# --------------- Background Task Endpoints --------------- + +@app.post("/api/task/start_council") +async def start_council_task( + request: CouncilRunRequest, + user: str = DEFAULT_USER, + current_user: User | None = Depends(get_current_user_optional), +): + """Start council as a background task. Returns {task_id}.""" + resolved = resolve_user(current_user, user) + username = resolved.username if resolved else DEFAULT_USER + + # Cancel existing task on same node if still running + for existing in list(_task_registry.values()): + if existing.node_id == request.node_id and existing.user == username and existing.status == TaskStatus.RUNNING: + if existing.asyncio_task and not existing.asyncio_task.done(): + existing.asyncio_task.cancel() + logger.info("Cancelled previous task %s for node %s", existing.task_id, request.node_id) + + kwargs = await _prepare_council_args(request, resolved, username) + generator = council_event_stream(**kwargs) + + task_id = str(uuid4()) + info = register_task(task_id, username, request.node_id, "council") + info.asyncio_task = asyncio.create_task(run_task_in_background(info, generator)) + return {"task_id": task_id} + + +@app.post("/api/task/start_debate") +async def start_debate_task( + request: DebateRunRequest, + user: str = DEFAULT_USER, + current_user: User | None = Depends(get_current_user_optional), +): + """Start debate as a background task. Returns {task_id}.""" + resolved = resolve_user(current_user, user) + username = resolved.username if resolved else DEFAULT_USER + + # Cancel existing task on same node if still running + for existing in list(_task_registry.values()): + if existing.node_id == request.node_id and existing.user == username and existing.status == TaskStatus.RUNNING: + if existing.asyncio_task and not existing.asyncio_task.done(): + existing.asyncio_task.cancel() + logger.info("Cancelled previous task %s for node %s", existing.task_id, request.node_id) + + kwargs = await _prepare_debate_args(request, resolved, username) + generator = debate_event_stream(**kwargs) + + task_id = str(uuid4()) + info = register_task(task_id, username, request.node_id, "debate") + info.asyncio_task = asyncio.create_task(run_task_in_background(info, generator)) + return {"task_id": task_id} + + +@app.get("/api/task/{task_id}/stream") +async def stream_task_events( + task_id: str, + from_event: int = 0, + user: str = DEFAULT_USER, + current_user: User | None = Depends(get_current_user_optional), +): + """SSE stream: replay missed events then stream live.""" + resolved = resolve_user(current_user, user) + username = resolved.username if resolved else DEFAULT_USER + + info = get_task(task_id) + if not info: + raise HTTPException(status_code=404, detail="Task not found") + if info.user != username: + raise HTTPException(status_code=403, detail="Not your task") + + async def event_generator(): + cursor = from_event + # 1. Replay accumulated events from cursor + while cursor < len(info.events): + evt = info.events[cursor] + yield f"data: {json.dumps(evt)}\n\n" + cursor += 1 + + # 2. If already done, send terminal and return + if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED): + yield f"data: {json.dumps({'type': 'task_status', 'data': {'status': info.status.value}})}\n\n" + return + + # 3. Stream live events + while True: + # Wait for new events with timeout (keepalive) + try: + await asyncio.wait_for( + asyncio.shield(info.new_event_signal.wait()), + timeout=15.0, + ) + except asyncio.TimeoutError: + # Send keepalive comment + yield ": keepalive\n\n" + # Check if task finished while waiting + if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED): + break + continue + + # Yield any new events since cursor + while cursor < len(info.events): + evt = info.events[cursor] + yield f"data: {json.dumps(evt)}\n\n" + cursor += 1 + + if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED): + break + + yield f"data: {json.dumps({'type': 'task_status', 'data': {'status': info.status.value}})}\n\n" + + return StreamingResponse(event_generator(), media_type="text/event-stream") + + +@app.get("/api/task/{task_id}") +async def poll_task( + task_id: str, + user: str = DEFAULT_USER, + current_user: User | None = Depends(get_current_user_optional), +): + """Poll: return status + accumulated results.""" + resolved = resolve_user(current_user, user) + username = resolved.username if resolved else DEFAULT_USER + + status_data = get_task_status(username, task_id) + if not status_data: + raise HTTPException(status_code=404, detail="Task not found") + if status_data.get("user") != username: + raise HTTPException(status_code=403, detail="Not your task") + return status_data + + class TitleRequest(BaseModel): user_prompt: str response: str diff --git a/backend/app/services/tasks.py b/backend/app/services/tasks.py new file mode 100644 index 0000000..003c02d --- /dev/null +++ b/backend/app/services/tasks.py @@ -0,0 +1,304 @@ +""" +Background task management for debate & council operations. + +Decouples execution from SSE delivery so tasks survive browser disconnects. +Results are persisted progressively to disk. +""" + +import asyncio +import json +import logging +import os +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, AsyncGenerator, Dict, List, Optional + +logger = logging.getLogger("contextflow.tasks") + +DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data"))) + + +class TaskStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + INTERRUPTED = "interrupted" + + +# Chunk event types that are NOT persisted to disk (redundant with *_complete events) +_CHUNK_EVENT_TYPES = {"final_chunk", "stage3_chunk"} + + +@dataclass +class TaskInfo: + task_id: str + user: str + node_id: str + task_type: str # "debate" | "council" + status: TaskStatus + created_at: float + updated_at: float + events: List[Dict] = field(default_factory=list) + result: Optional[Dict] = None + error: Optional[str] = None + # In-memory only (not serialized): + new_event_signal: asyncio.Event = field(default_factory=asyncio.Event, repr=False) + asyncio_task: Optional[asyncio.Task] = field(default=None, repr=False) + + +# --------------- In-memory registry --------------- +_task_registry: Dict[str, TaskInfo] = {} + + +def register_task(task_id: str, user: str, node_id: str, task_type: str) -> TaskInfo: + """Create and register a new task in memory + persist initial state to disk.""" + now = time.time() + info = TaskInfo( + task_id=task_id, + user=user, + node_id=node_id, + task_type=task_type, + status=TaskStatus.PENDING, + created_at=now, + updated_at=now, + ) + _task_registry[task_id] = info + _persist_task(info) + return info + + +def get_task(task_id: str) -> Optional[TaskInfo]: + """Get task from in-memory registry.""" + return _task_registry.get(task_id) + + +def get_task_status(user: str, task_id: str) -> Optional[Dict]: + """ + Get task status dict. In-memory first, fallback to disk. + If disk says "running" but not in memory → report "interrupted". + """ + info = _task_registry.get(task_id) + if info: + return _task_to_dict(info) + + # Fallback: read from disk + disk_data = _load_task_from_disk(user, task_id) + if not disk_data: + return None + + # If disk says running but it's not in memory, backend must have restarted + if disk_data.get("status") == TaskStatus.RUNNING: + disk_data["status"] = TaskStatus.INTERRUPTED + return disk_data + + +async def run_task_in_background(info: TaskInfo, event_generator: AsyncGenerator[str, None]) -> None: + """ + Consume the async generator, appending events to info.events. + Signals new_event_signal for SSE consumers. + Persists at milestones, skips chunk events from persistence. + """ + info.status = TaskStatus.RUNNING + info.updated_at = time.time() + _persist_task(info) + + try: + async for raw_sse in event_generator: + # raw_sse is a string like 'data: {"type": "...", ...}\n\n' + evt = _parse_sse_event(raw_sse) + if evt is None: + continue + + evt_type = evt.get("type", "") + + # Always store in memory (even chunks, for live streaming) + info.events.append(evt) + info.updated_at = time.time() + + # Signal waiting SSE consumers + info.new_event_signal.set() + info.new_event_signal.clear() + + # Persist at milestones (skip chunk events) + if evt_type not in _CHUNK_EVENT_TYPES: + _persist_task(info) + + # Detect completion / error + if evt_type in ("complete", "debate_complete"): + info.result = _extract_result(info.task_type, info.events) + info.status = TaskStatus.COMPLETED + info.updated_at = time.time() + _persist_task(info) + return + if evt_type == "error": + info.error = evt.get("data", {}).get("message", "Unknown error") if isinstance(evt.get("data"), dict) else str(evt.get("data", "Unknown error")) + info.status = TaskStatus.FAILED + info.updated_at = time.time() + _persist_task(info) + return + + # Generator exhausted without explicit complete/error + if info.status == TaskStatus.RUNNING: + info.result = _extract_result(info.task_type, info.events) + info.status = TaskStatus.COMPLETED + info.updated_at = time.time() + _persist_task(info) + + except asyncio.CancelledError: + info.status = TaskStatus.INTERRUPTED + info.updated_at = time.time() + _persist_task(info) + raise + except Exception as e: + logger.exception("Background task %s failed: %s", info.task_id, e) + info.error = str(e) + info.status = TaskStatus.FAILED + info.updated_at = time.time() + _persist_task(info) + + +def cleanup_stale_tasks(data_root: Optional[str] = None) -> int: + """ + On startup, scan task files on disk and mark any "running" as "interrupted". + Returns count of tasks marked interrupted. + """ + root = data_root or DATA_ROOT + count = 0 + if not os.path.exists(root): + return count + + for user_dir in os.listdir(root): + tasks_dir = os.path.join(root, user_dir, "tasks") + if not os.path.isdir(tasks_dir): + continue + for fname in os.listdir(tasks_dir): + if not fname.endswith(".json"): + continue + fpath = os.path.join(tasks_dir, fname) + try: + with open(fpath, "r", encoding="utf-8") as f: + data = json.load(f) + if data.get("status") == TaskStatus.RUNNING: + data["status"] = TaskStatus.INTERRUPTED + data["updated_at"] = time.time() + with open(fpath, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False) + count += 1 + logger.info("Marked stale task as interrupted: %s", fname) + except Exception as e: + logger.warning("Failed to process stale task file %s: %s", fpath, e) + + return count + + +# --------------- Persistence helpers --------------- + +def _task_dir(user: str) -> str: + d = os.path.join(DATA_ROOT, user, "tasks") + os.makedirs(d, exist_ok=True) + return d + + +def _task_file_path(user: str, task_id: str) -> str: + return os.path.join(_task_dir(user), f"{task_id}.json") + + +def _persist_task(info: TaskInfo) -> None: + """Write task state to disk. Excludes chunk events for smaller files.""" + try: + data = _task_to_dict(info, exclude_chunks=True) + path = _task_file_path(info.user, info.task_id) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False) + except Exception as e: + logger.warning("Failed to persist task %s: %s", info.task_id, e) + + +def _load_task_from_disk(user: str, task_id: str) -> Optional[Dict]: + path = _task_file_path(user, task_id) + if not os.path.exists(path): + return None + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.warning("Failed to load task %s from disk: %s", task_id, e) + return None + + +def _task_to_dict(info: TaskInfo, exclude_chunks: bool = False) -> Dict: + events = info.events + if exclude_chunks: + events = [e for e in events if e.get("type") not in _CHUNK_EVENT_TYPES] + return { + "task_id": info.task_id, + "user": info.user, + "node_id": info.node_id, + "task_type": info.task_type, + "status": info.status.value if isinstance(info.status, TaskStatus) else info.status, + "created_at": info.created_at, + "updated_at": info.updated_at, + "events": events, + "result": info.result, + "error": info.error, + } + + +def _parse_sse_event(raw: str) -> Optional[Dict]: + """Parse a raw SSE string like 'data: {...}\\n\\n' into a dict.""" + raw = raw.strip() + if not raw: + return None + for line in raw.split("\n"): + line = line.strip() + if line.startswith("data: "): + try: + return json.loads(line[6:]) + except json.JSONDecodeError: + return None + return None + + +def _extract_result(task_type: str, events: List[Dict]) -> Dict: + """Reconstruct final result data from accumulated events.""" + if task_type == "council": + stage1 = None + stage2 = None + stage3 = None + for evt in events: + t = evt.get("type", "") + if t == "stage1_complete": + stage1 = evt.get("data") + elif t == "stage1_model_complete": + if stage1 is None: + stage1 = [] + stage1.append(evt.get("data")) + elif t == "stage2_complete": + stage2 = evt.get("data") + elif t == "stage3_complete": + stage3 = evt.get("data") + return {"councilData": {"stage1": stage1, "stage2": stage2, "stage3": stage3}} + + elif task_type == "debate": + rounds: List[Dict] = [] + final_verdict = None + for evt in events: + t = evt.get("type", "") + if t == "round_complete": + rounds.append(evt.get("data", {})) + elif t == "final_complete": + final_verdict = evt.get("data") + elif t == "judge_decision": + # Attach to last round + if rounds: + rounds[-1]["judgeDecision"] = evt.get("data") + elif t == "model_eliminated": + if rounds: + if "eliminated" not in rounds[-1]: + rounds[-1]["eliminated"] = [] + rounds[-1]["eliminated"].append(evt.get("data")) + return {"debateRounds": rounds, "finalVerdict": final_verdict} + + return {} diff --git a/frontend/src/components/LeftSidebar.tsx b/frontend/src/components/LeftSidebar.tsx index 54c2527..441b7e0 100644 --- a/frontend/src/components/LeftSidebar.tsx +++ b/frontend/src/components/LeftSidebar.tsx @@ -30,6 +30,7 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => { deleteFile, readBlueprintFile, loadBlueprint, + recoverBackgroundTasks, saveBlueprintFile, saveCurrentBlueprint, createProjectFolder, @@ -266,6 +267,8 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => { if (vp) { setViewport(vp); } + // Recover any background tasks that were running before page refresh + recoverBackgroundTasks(); } catch (e) { console.error(e); alert('Not a valid blueprint JSON.'); diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 8bb5fcb..474a969 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -387,6 +387,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }) }); + if (!response.ok) throw new Error(await response.text() || `HTTP ${response.status}`); if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); @@ -511,7 +512,8 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }; try { - const response = await fetch(`/api/run_council_stream?user=${encodeURIComponent(user?.username || 'test')}`, { + // Step 1: Start background task + const startRes = await fetch(`/api/task/start_council?user=${encodeURIComponent(user?.username || 'test')}`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...getAuthHeader() }, body: JSON.stringify({ @@ -546,101 +548,35 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }), }); - if (!response.body) return; - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let sseBuffer = ''; - let stage1Results: Array<{ model: string; response: string }> = []; - let stage2Data: any = null; - let stage3Full = ''; - let stage3Model = ''; + if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`); + const { task_id } = await startRes.json(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - sseBuffer += decoder.decode(value, { stream: true }); - - // Parse SSE events (data: {...}\n\n) - const parts = sseBuffer.split('\n\n'); - sseBuffer = parts.pop() || ''; - - for (const part of parts) { - const line = part.trim(); - if (!line.startsWith('data: ')) continue; - let evt: any; - try { - evt = JSON.parse(line.slice(6)); - } catch { continue; } - - switch (evt.type) { - case 'stage1_start': - setCouncilStage('Stage 1: Collecting responses...'); - break; - case 'stage1_model_complete': - stage1Results = [...stage1Results, evt.data]; - setCouncilStage(`Stage 1: ${stage1Results.length}/${councilModels.length} models done`); - updateNodeData(runningNodeId, { - councilData: { stage1: [...stage1Results], stage2: null, stage3: null }, - }); - break; - case 'stage1_complete': - stage1Results = evt.data; - updateNodeData(runningNodeId, { - councilData: { stage1: stage1Results, stage2: null, stage3: null }, - }); - break; - case 'stage2_start': - setCouncilStage('Stage 2: Peer ranking...'); - break; - case 'stage2_complete': - stage2Data = evt.data; - updateNodeData(runningNodeId, { - councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null }, - }); - break; - case 'stage3_start': - setCouncilStage('Stage 3: Chairman synthesizing...'); - setCouncilStreamBuffer(''); - break; - case 'stage3_chunk': - stage3Full += evt.data.chunk; - setCouncilStreamBuffer(stage3Full); - setStreamBuffer(stage3Full); - break; - case 'stage3_complete': - stage3Model = evt.data.model; - stage3Full = evt.data.response; - break; - case 'complete': { - const responseReceivedAt = Date.now(); - const councilData: CouncilData = { - stage1: stage1Results, - stage2: stage2Data, - stage3: { model: stage3Model, response: stage3Full }, - }; - const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt }; - const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: stage3Full }; - updateNodeData(runningNodeId, { - status: 'success', - response: stage3Full, - responseReceivedAt, - councilData, - messages: [...context, newUserMsg, newAssistantMsg] as any, - }); - setCouncilStage(''); - generateTitle(runningNodeId, runningPrompt, stage3Full); - break; - } - case 'error': - updateNodeData(runningNodeId, { status: 'error' }); - setCouncilStage(''); - break; - } - } + // Step 2: Store taskId on node for recovery + force save + updateNodeData(runningNodeId, { taskId: task_id }); + if (currentBlueprintPath) { + saveCurrentBlueprint(currentBlueprintPath).catch(console.error); } + + // Step 3: Consume SSE stream from task + const { _consumeTaskStream } = await import('../store/flowStore'); + await _consumeTaskStream( + task_id, runningNodeId, 'council', 0, updateNodeData, user?.username || 'test', + { + onStage: setCouncilStage, + onStreamBuffer: (text) => { setCouncilStreamBuffer(text); setStreamBuffer(text); }, + onComplete: () => { + setCouncilStage(''); + // Fetch final response for title generation + const currentNode = nodes.find(n => n.id === runningNodeId); + const finalResponse = currentNode?.data.response || ''; + if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse); + }, + onError: () => { setCouncilStage(''); }, + }, + ); } catch (error) { console.error(error); - updateNodeData(runningNodeId, { status: 'error' }); + updateNodeData(runningNodeId, { status: 'error', taskId: undefined }); setCouncilStage(''); } finally { setStreamingNodeId(prev => prev === runningNodeId ? null : prev); @@ -707,7 +643,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { try { const judgeModelConfig = selectedNode.data.judgeModel || debateModels[0]; - const response = await fetch(`/api/run_debate_stream?user=${encodeURIComponent(user?.username || 'test')}`, { + + // Step 1: Start background task + const startRes = await fetch(`/api/task/start_debate?user=${encodeURIComponent(user?.username || 'test')}`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...getAuthHeader() }, body: JSON.stringify({ @@ -740,176 +678,34 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }), }); - if (!response.body) return; - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let sseBuffer = ''; - const debateRounds: DebateRound[] = []; - let currentRound = 0; - let currentRoundResponses: Array<{ model: string; response: string }> = []; - let finalModel = ''; - let finalFull = ''; + if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`); + const { task_id } = await startRes.json(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - sseBuffer += decoder.decode(value, { stream: true }); - - const parts = sseBuffer.split('\n\n'); - sseBuffer = parts.pop() || ''; - - for (const part of parts) { - const line = part.trim(); - if (!line.startsWith('data: ')) continue; - let evt: any; - try { - evt = JSON.parse(line.slice(6)); - } catch { continue; } - - switch (evt.type) { - case 'debate_start': - setDebateStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`); - break; - case 'round_start': - currentRound = evt.data.round; - currentRoundResponses = []; - setDebateStage(`Round ${currentRound}/${maxRounds}: Collecting responses...`); - break; - case 'round_model_complete': - currentRoundResponses = [...currentRoundResponses, { model: evt.data.model, response: evt.data.response }]; - setDebateStage(`Round ${currentRound}/${maxRounds}: ${currentRoundResponses.length}/${debateModels.length} models done`); - break; - case 'round_complete': { - const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses }; - debateRounds.push(roundData); - updateNodeData(runningNodeId, { - debateData: { - rounds: [...debateRounds], - finalVerdict: null, - config: { judgeMode, format: debateFormat, maxRounds }, - }, - }); - break; - } - case 'judge_decision': { - const lastRound = debateRounds[debateRounds.length - 1]; - if (lastRound) { - lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning }; - updateNodeData(runningNodeId, { - debateData: { - rounds: [...debateRounds], - finalVerdict: null, - config: { judgeMode, format: debateFormat, maxRounds }, - }, - }); - } - if (!evt.data.continue) { - setDebateStage('Judge stopped debate. Generating final verdict...'); - } else { - setDebateStage(`Judge: Continue to round ${currentRound + 1}...`); - } - break; - } - case 'model_eliminated': { - const lastRound2 = debateRounds[debateRounds.length - 1]; - if (lastRound2) { - if (!lastRound2.eliminated) lastRound2.eliminated = []; - lastRound2.eliminated.push({ - model: evt.data.model, - convincedBy: evt.data.convinced_by, - reasoning: evt.data.reasoning, - }); - } - setDebateStage(`${evt.data.model} concedes to ${evt.data.convinced_by || 'another'}...`); - break; - } - case 'convergence_status': { - const remaining = evt.data.remaining as string[]; - updateNodeData(runningNodeId, { - debateData: { - rounds: [...debateRounds], - finalVerdict: null, - config: { judgeMode, format: debateFormat, maxRounds }, - }, - }); - if (remaining.length <= 1) { - setDebateStage(`${remaining[0] || 'Winner'} is the last one standing!`); - } else { - setDebateStage(`${remaining.length} models remaining...`); - } - break; - } - case 'final_start': - finalModel = evt.data.model; - setDebateStage('Judge synthesizing final verdict...'); - setDebateStreamBuffer(''); - break; - case 'final_chunk': - finalFull += evt.data.chunk; - setDebateStreamBuffer(finalFull); - setStreamBuffer(finalFull); - break; - case 'final_complete': { - finalModel = evt.data.model; - finalFull = evt.data.response; - const responseReceivedAt = Date.now(); - const debateData: DebateData = { - rounds: debateRounds, - finalVerdict: { model: finalModel, response: finalFull }, - config: { judgeMode, format: debateFormat, maxRounds }, - }; - const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt }; - const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: finalFull }; - updateNodeData(runningNodeId, { - status: 'success', - response: finalFull, - responseReceivedAt, - debateData, - messages: [...context, newUserMsg, newAssistantMsg] as any, - }); - setDebateStage(''); - generateTitle(runningNodeId, runningPrompt, finalFull); - break; - } - case 'debate_complete': { - // If no final verdict (display_only or self_convergence without explicit final_complete) - const currentNode = nodes.find(n => n.id === runningNodeId); - if (currentNode?.data.status === 'loading') { - const responseReceivedAt = Date.now(); - const lastRoundResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : []; - const bestResponse = lastRoundResp.length > 0 - ? lastRoundResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response - : ''; - const debateData: DebateData = { - rounds: debateRounds, - finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, - config: { judgeMode, format: debateFormat, maxRounds }, - }; - const displayResponse = finalFull || bestResponse; - const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt }; - const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: displayResponse }; - updateNodeData(runningNodeId, { - status: 'success', - response: displayResponse, - responseReceivedAt, - debateData, - messages: [...context, newUserMsg, newAssistantMsg] as any, - }); - setDebateStage(''); - if (displayResponse) generateTitle(runningNodeId, runningPrompt, displayResponse); - } - break; - } - case 'error': - updateNodeData(runningNodeId, { status: 'error' }); - setDebateStage(''); - break; - } - } + // Step 2: Store taskId on node for recovery + force save + updateNodeData(runningNodeId, { taskId: task_id }); + if (currentBlueprintPath) { + saveCurrentBlueprint(currentBlueprintPath).catch(console.error); } + + // Step 3: Consume SSE stream from task + const { _consumeTaskStream } = await import('../store/flowStore'); + await _consumeTaskStream( + task_id, runningNodeId, 'debate', 0, updateNodeData, user?.username || 'test', + { + onStage: setDebateStage, + onStreamBuffer: (text) => { setDebateStreamBuffer(text); setStreamBuffer(text); }, + onComplete: () => { + setDebateStage(''); + const currentNode = nodes.find(n => n.id === runningNodeId); + const finalResponse = currentNode?.data.response || ''; + if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse); + }, + onError: () => { setDebateStage(''); }, + }, + ); } catch (error) { console.error(error); - updateNodeData(runningNodeId, { status: 'error' }); + updateNodeData(runningNodeId, { status: 'error', taskId: undefined }); setDebateStage(''); } finally { setStreamingNodeId(prev => prev === runningNodeId ? null : prev); @@ -2968,7 +2764,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }`} > {selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <Users size={16} />} - {selectedNode.data.status === 'loading' && councilStage ? councilStage : `Run Council (${(selectedNode.data.councilModels || []).length})`} + {selectedNode.data.status === 'loading' && (councilStage || selectedNode.data.taskStage) ? (councilStage || selectedNode.data.taskStage) : `Run Council (${(selectedNode.data.councilModels || []).length})`} </button> <div className={`text-center text-[10px] mt-0.5 leading-tight ${isDark ? 'text-gray-600' : 'text-gray-400'}`}> Inspired by <a href="https://github.com/karpathy/llm-council" target="_blank" rel="noopener noreferrer" className={`underline decoration-dotted ${isDark ? 'text-gray-500 hover:text-gray-400' : 'text-gray-500 hover:text-gray-600'}`}>karpathy/llm-council</a> @@ -2985,7 +2781,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }`} > {selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <MessageSquare size={16} />} - {selectedNode.data.status === 'loading' && debateStage ? debateStage : `Run Debate (${(selectedNode.data.debateModels || []).length})`} + {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) ? (debateStage || selectedNode.data.taskStage) : `Run Debate (${(selectedNode.data.debateModels || []).length})`} </button> ) : ( <button @@ -3086,7 +2882,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { <div> {selectedNode.data.councilData.stage3 ? ( <div className={`text-xs mb-1 ${isDark ? 'text-gray-500' : 'text-gray-400'}`}>Chairman: {selectedNode.data.councilData.stage3.model}</div> - ) : selectedNode.data.status === 'loading' && councilStage.includes('Stage 3') ? ( + ) : selectedNode.data.status === 'loading' && (councilStage.includes('Stage 3') || (selectedNode.data.taskStage || '').includes('Stage 3')) ? ( <div className={`text-xs mb-1 flex items-center gap-1 ${isDark ? 'text-amber-400' : 'text-amber-600'}`}><Loader2 className="animate-spin" size={10} /> Synthesizing...</div> ) : null} <div className={`p-3 rounded-md border min-h-[150px] text-sm prose prose-sm max-w-none ${ @@ -3333,9 +3129,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { </div> ); })} - {selectedNode.data.status === 'loading' && debateStage && ( + {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) && ( <div className={`flex items-center gap-2 p-2 text-xs ${isDark ? 'text-cyan-400' : 'text-cyan-600'}`}> - <Loader2 className="animate-spin" size={12} /> {debateStage} + <Loader2 className="animate-spin" size={12} /> {debateStage || selectedNode.data.taskStage} </div> )} </div> diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts index 5bb3e22..ec96185 100644 --- a/frontend/src/store/flowStore.ts +++ b/frontend/src/store/flowStore.ts @@ -137,6 +137,10 @@ export interface NodeData { debateMaxRounds?: number; debateData?: DebateData; + // Background task persistence + taskId?: string; // Background task ID for recovery + taskStage?: string; // Live stage text (e.g. "Round 2: Collecting responses...") + // Traces logic traces: Trace[]; // INCOMING Traces outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks + merged) @@ -293,6 +297,9 @@ interface FlowState { ) => Message[]; propagateTraces: () => void; + + // Background task recovery + recoverBackgroundTasks: () => Promise<void>; } // Hash string to color @@ -341,6 +348,401 @@ const jsonFetch = async <T>(url: string, options?: RequestInit): Promise<T> => { return res.json() as Promise<T>; }; +// --------------- Background Task Helpers --------------- + +/** Apply partial state from accumulated events (for recovery of in-progress tasks) */ +function _applyPartialEvents( + updateNodeData: (nodeId: string, data: Partial<NodeData>) => void, + nodeId: string, + taskType: string, + events: any[], + currentData: NodeData, +) { + // Compute taskStage from the last relevant event + let taskStage = 'Recovering...'; + + if (taskType === 'council') { + let stage1: any[] | null = null; + let stage2: any = null; + let stage3Full = ''; + for (const evt of events) { + if (evt.type === 'stage1_start') { + taskStage = 'Stage 1: Collecting responses...'; + } else if (evt.type === 'stage1_model_complete') { + if (!stage1) stage1 = []; + stage1.push(evt.data); + taskStage = `Stage 1: ${stage1.length} models done`; + } else if (evt.type === 'stage1_complete') { + stage1 = evt.data; + taskStage = 'Stage 1 complete'; + } else if (evt.type === 'stage2_start') { + taskStage = 'Stage 2: Peer ranking...'; + } else if (evt.type === 'stage2_complete') { + stage2 = evt.data; + taskStage = 'Stage 2 complete'; + } else if (evt.type === 'stage3_start') { + taskStage = 'Stage 3: Chairman synthesizing...'; + } else if (evt.type === 'stage3_complete') { + stage3Full = evt.data.response; + taskStage = 'Stage 3: Chairman synthesizing...'; + } + } + updateNodeData(nodeId, { + councilData: { stage1, stage2, stage3: stage3Full ? { model: '', response: stage3Full } : null }, + taskStage, + }); + } else if (taskType === 'debate') { + const rounds: DebateRound[] = []; + let finalFull = ''; + let finalModel = ''; + let currentRound = 0; + for (const evt of events) { + if (evt.type === 'debate_start') { + taskStage = `Debate started (${evt.data?.models?.length || '?'} models)`; + } else if (evt.type === 'round_start') { + currentRound = evt.data.round; + taskStage = `Round ${currentRound}: Collecting responses...`; + } else if (evt.type === 'round_model_complete') { + taskStage = `Round ${currentRound}: ${evt.data.model} done`; + } else if (evt.type === 'round_complete') { + rounds.push({ round: evt.data.round, responses: evt.data.responses }); + taskStage = `Round ${evt.data.round} complete`; + } else if (evt.type === 'judge_decision' && rounds.length > 0) { + rounds[rounds.length - 1].judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning }; + } else if (evt.type === 'model_eliminated' && rounds.length > 0) { + if (!rounds[rounds.length - 1].eliminated) rounds[rounds.length - 1].eliminated = []; + rounds[rounds.length - 1].eliminated!.push({ + model: evt.data.model, convincedBy: evt.data.convinced_by, reasoning: evt.data.reasoning, + }); + } else if (evt.type === 'convergence_status') { + const remaining = evt.data?.remaining as string[] || []; + taskStage = remaining.length <= 1 + ? `${remaining[0] || 'Winner'} is the last one standing!` + : `${remaining.length} models remaining...`; + } else if (evt.type === 'final_start') { + finalModel = evt.data.model; + taskStage = 'Judge synthesizing final verdict...'; + } else if (evt.type === 'final_complete') { + finalModel = evt.data.model; + finalFull = evt.data.response; + } + } + const config = currentData.debateData?.config || { + judgeMode: currentData.debateJudgeMode || 'external_judge', + format: currentData.debateFormat || 'free_discussion', + maxRounds: currentData.debateMaxRounds || 5, + }; + updateNodeData(nodeId, { + debateData: { rounds, finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, config: config as any }, + taskStage, + }); + } +} + +/** + * Connect to the task SSE stream and process events, updating node data. + * Used by both initial task start (Sidebar) and recovery (recoverBackgroundTasks). + * + * Initializes accumulators from existing node data so that recovery after reconnect + * doesn't lose previously accumulated rounds/stages. + */ +async function _consumeTaskStream( + taskId: string, + nodeId: string, + taskType: string, + fromEvent: number, + updateNodeData: (nodeId: string, data: Partial<NodeData>) => void, + username: string, + callbacks?: { + onStage?: (stage: string) => void; + onStreamBuffer?: (text: string) => void; + onComplete?: () => void; + onError?: () => void; + }, +): Promise<void> { + const authHeaders = getAuthHeaders(); + + // Read current node data to initialize accumulators (critical for recovery) + const nodeData = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data; + + // Helper: read debate config from node data (never hardcode) + const getDebateConfig = () => { + const nd = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data; + return nd?.debateData?.config || { + judgeMode: nd?.debateJudgeMode || 'external_judge', + format: nd?.debateFormat || 'free_discussion', + maxRounds: nd?.debateMaxRounds || 5, + }; + }; + + // Helper: trigger auto-save after milestone events + const triggerSave = () => { + try { useFlowStore.getState().triggerAutoSave(); } catch { /* ignore */ } + }; + + // Helper: update stage text on both node data and optional callback + const setStage = (text: string) => { + updateNodeData(nodeId, { taskStage: text }); + callbacks?.onStage?.(text); + }; + + try { + const response = await fetch( + `${API_BASE}/api/task/${encodeURIComponent(taskId)}/stream?from_event=${fromEvent}&user=${encodeURIComponent(username)}`, + { headers: authHeaders }, + ); + if (!response.ok || !response.body) { + callbacks?.onError?.(); + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + return; + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let sseBuffer = ''; + + // Initialize accumulators from existing node data (preserves state across reconnect) + let stage1Results: Array<{ model: string; response: string }> = + (nodeData?.councilData?.stage1 as any[]) || []; + let stage2Data: any = nodeData?.councilData?.stage2 || null; + let stage3Full = nodeData?.councilData?.stage3?.response || ''; + let stage3Model = nodeData?.councilData?.stage3?.model || ''; + + const debateRounds: DebateRound[] = [...(nodeData?.debateData?.rounds || [])]; + let currentRound = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].round : 0; + let finalModel = nodeData?.debateData?.finalVerdict?.model || ''; + let finalFull = nodeData?.debateData?.finalVerdict?.response || ''; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + sseBuffer += decoder.decode(value, { stream: true }); + + const parts = sseBuffer.split('\n\n'); + sseBuffer = parts.pop() || ''; + + for (const part of parts) { + const line = part.trim(); + if (!line.startsWith('data: ')) continue; + let evt: any; + try { evt = JSON.parse(line.slice(6)); } catch { continue; } + + // Handle task_status terminal event + if (evt.type === 'task_status') { + if (evt.data?.status === 'completed') { + callbacks?.onComplete?.(); + } else { + callbacks?.onError?.(); + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + } + continue; + } + + if (taskType === 'council') { + switch (evt.type) { + case 'stage1_start': + setStage('Stage 1: Collecting responses...'); + break; + case 'stage1_model_complete': + stage1Results = [...stage1Results, evt.data]; + setStage(`Stage 1: ${stage1Results.length} models done`); + updateNodeData(nodeId, { + councilData: { stage1: [...stage1Results], stage2: null, stage3: null }, + }); + triggerSave(); + break; + case 'stage1_complete': + stage1Results = evt.data; + updateNodeData(nodeId, { + councilData: { stage1: stage1Results, stage2: null, stage3: null }, + }); + triggerSave(); + break; + case 'stage2_start': + setStage('Stage 2: Peer ranking...'); + break; + case 'stage2_complete': + stage2Data = evt.data; + updateNodeData(nodeId, { + councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null }, + }); + triggerSave(); + break; + case 'stage3_start': + setStage('Stage 3: Chairman synthesizing...'); + callbacks?.onStreamBuffer?.(''); + break; + case 'stage3_chunk': + stage3Full += evt.data.chunk; + callbacks?.onStreamBuffer?.(stage3Full); + break; + case 'stage3_complete': + stage3Model = evt.data.model; + stage3Full = evt.data.response; + triggerSave(); + break; + case 'complete': { + const councilData: CouncilData = { + stage1: stage1Results, + stage2: stage2Data, + stage3: { model: stage3Model, response: stage3Full }, + }; + updateNodeData(nodeId, { + status: 'success', + response: stage3Full, + responseReceivedAt: Date.now(), + councilData, + taskId: undefined, + }); + setStage(''); + callbacks?.onComplete?.(); + triggerSave(); + break; + } + case 'error': + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + setStage(''); + callbacks?.onError?.(); + break; + } + } else if (taskType === 'debate') { + switch (evt.type) { + case 'debate_start': + setStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`); + break; + case 'round_start': + currentRound = evt.data.round; + setStage(`Round ${currentRound}: Collecting responses...`); + break; + case 'round_model_complete': + setStage(`Round ${currentRound}: ${evt.data.model} done`); + break; + case 'round_complete': { + const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses }; + debateRounds.push(roundData); + updateNodeData(nodeId, { + debateData: { + rounds: [...debateRounds], + finalVerdict: null, + config: getDebateConfig() as any, + }, + }); + triggerSave(); + break; + } + case 'judge_decision': { + const lastRound = debateRounds[debateRounds.length - 1]; + if (lastRound) { + lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning }; + updateNodeData(nodeId, { + debateData: { + rounds: [...debateRounds], + finalVerdict: null, + config: getDebateConfig() as any, + }, + }); + triggerSave(); + } + break; + } + case 'model_eliminated': { + const lastRound2 = debateRounds[debateRounds.length - 1]; + if (lastRound2) { + if (!lastRound2.eliminated) lastRound2.eliminated = []; + lastRound2.eliminated.push({ + model: evt.data.model, + convincedBy: evt.data.convinced_by, + reasoning: evt.data.reasoning, + }); + } + break; + } + case 'convergence_status': { + updateNodeData(nodeId, { + debateData: { + rounds: [...debateRounds], + finalVerdict: null, + config: getDebateConfig() as any, + }, + }); + const remaining = evt.data.remaining as string[]; + if (remaining.length <= 1) { + setStage(`${remaining[0] || 'Winner'} is the last one standing!`); + } else { + setStage(`${remaining.length} models remaining...`); + } + break; + } + case 'final_start': + finalModel = evt.data.model; + setStage('Judge synthesizing final verdict...'); + callbacks?.onStreamBuffer?.(''); + break; + case 'final_chunk': + finalFull += evt.data.chunk; + callbacks?.onStreamBuffer?.(finalFull); + break; + case 'final_complete': { + finalModel = evt.data.model; + finalFull = evt.data.response; + updateNodeData(nodeId, { + status: 'success', + response: finalFull, + responseReceivedAt: Date.now(), + debateData: { + rounds: debateRounds, + finalVerdict: { model: finalModel, response: finalFull }, + config: getDebateConfig() as any, + }, + taskId: undefined, + }); + setStage(''); + callbacks?.onComplete?.(); + triggerSave(); + break; + } + case 'debate_complete': { + const store = useFlowStore.getState(); + const currentNode = store.nodes.find(n => n.id === nodeId); + if (currentNode?.data.status === 'loading') { + const lastResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : []; + const bestResponse = lastResp.length > 0 + ? lastResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response + : ''; + const displayResponse = finalFull || bestResponse; + updateNodeData(nodeId, { + status: 'success', + response: displayResponse, + responseReceivedAt: Date.now(), + debateData: { + rounds: debateRounds, + finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, + config: getDebateConfig() as any, + }, + taskId: undefined, + }); + setStage(''); + callbacks?.onComplete?.(); + triggerSave(); + } + break; + } + case 'error': + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + setStage(''); + callbacks?.onError?.(); + break; + } + } + } + } + } catch (error) { + console.error(`Task stream error for ${taskId}:`, error); + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + callbacks?.onError?.(); + } +} + const useFlowStore = create<FlowState>((set, get) => { const validateBlueprint = (doc: any): BlueprintDocument => { @@ -1615,6 +2017,8 @@ const useFlowStore = create<FlowState>((set, get) => { traces: undefined, outgoingTraces: undefined, messages: undefined, + // Drop transient runtime state + taskStage: undefined, // Keep merged/forked trace definitions but strip their computed messages mergedTraces: (n.data.mergedTraces || []).map((m: any) => ({ id: m.id, @@ -1651,6 +2055,13 @@ const useFlowStore = create<FlowState>((set, get) => { if (n.data?.chairmanModel && typeof n.data.chairmanModel === 'string') { n.data.chairmanModel = { model: n.data.chairmanModel }; } + // Reset stale "loading" status — unless there's a background task to recover + if (n.data?.status === 'loading') { + if (!n.data.taskId) { + n.data.status = 'idle'; // Legacy/no task — reset + } + // If taskId exists, keep loading — recoverBackgroundTasks() will handle it + } return n; }); set({ @@ -2273,6 +2684,81 @@ const useFlowStore = create<FlowState>((set, get) => { setTimeout(() => get().propagateTraces(), 50); }, + recoverBackgroundTasks: async () => { + const nodes = get().nodes; + const user = getCurrentUser(); + + for (const node of nodes) { + if (!node.data.taskId) continue; + + try { + const res = await fetch( + `${API_BASE}/api/task/${encodeURIComponent(node.data.taskId)}?user=${encodeURIComponent(user)}`, + { headers: getAuthHeaders() } + ); + if (!res.ok) { + // Task not found — reset node + get().updateNodeData(node.id, { status: 'idle', taskId: undefined }); + continue; + } + const taskData = await res.json(); + const status = taskData.status; + + if (status === 'completed') { + // Apply completed results + const result = taskData.result || {}; + if (taskData.task_type === 'council') { + const cd = result.councilData; + const response = cd?.stage3?.response || ''; + get().updateNodeData(node.id, { + status: 'success', + response, + councilData: cd, + taskId: undefined, + }); + } else if (taskData.task_type === 'debate') { + const rounds = result.debateRounds || []; + const finalVerdict = result.finalVerdict; + const response = finalVerdict?.response || ''; + get().updateNodeData(node.id, { + status: 'success', + response, + debateData: { + rounds, + finalVerdict, + config: node.data.debateData?.config || { + judgeMode: node.data.debateJudgeMode || 'external_judge', + format: node.data.debateFormat || 'free_discussion', + maxRounds: node.data.debateMaxRounds || 5, + }, + }, + taskId: undefined, + }); + } + } else if (status === 'running') { + // Reconnect to live stream from where we left off + const fromEvent = (taskData.events || []).length; + // Apply any partial state from accumulated events first + _applyPartialEvents(get().updateNodeData, node.id, taskData.task_type, taskData.events || [], node.data); + // Then connect to live stream + _consumeTaskStream( + node.data.taskId!, node.id, taskData.task_type, fromEvent, + get().updateNodeData, user + ); + } else { + // failed / interrupted / unknown — reset + get().updateNodeData(node.id, { + status: status === 'interrupted' ? 'error' : 'error', + taskId: undefined, + }); + } + } catch (err) { + console.error(`Failed to recover task for node ${node.id}:`, err); + get().updateNodeData(node.id, { status: 'idle', taskId: undefined }); + } + } + }, + propagateTraces: () => { const { nodes, edges } = get(); @@ -2740,4 +3226,5 @@ useFlowStore.subscribe((state, prev) => { if (state.theme !== prev.theme) syncThemeClass(state.theme); }); +export { _consumeTaskStream, _applyPartialEvents }; export default useFlowStore; |
