summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-02-14 03:40:31 +0000
committerYurenHao0426 <blackhao0426@gmail.com>2026-02-14 03:40:31 +0000
commitbdf381a2c8a0337f7459000f487a80f9cbbbdd2f (patch)
treeb3c72c85f3e7c47b4c98a1301acc7fa7d23a6d05
parentded75a5c19ad4aa8dc832fc4c138b68093e22ee8 (diff)
Add background task persistence for debate & council operations
Decouple debate/council execution from SSE connection lifecycle so tasks survive browser disconnects. Backend runs work as asyncio.Tasks with progressive disk persistence; frontend can reconnect and recover state. - New backend/app/services/tasks.py: task registry, broadcast pattern, disk persistence at milestones, stale task cleanup on startup - New endpoints: POST start_debate/start_council, GET task stream/poll - Frontend stores taskId on nodes, recovers running tasks on page load - _applyPartialEvents rebuilds stage text + data from accumulated events Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
-rw-r--r--backend/app/main.py355
-rw-r--r--backend/app/services/tasks.py304
-rw-r--r--frontend/src/components/LeftSidebar.tsx3
-rw-r--r--frontend/src/components/Sidebar.tsx324
-rw-r--r--frontend/src/store/flowStore.ts487
5 files changed, 1208 insertions, 265 deletions
diff --git a/backend/app/main.py b/backend/app/main.py
index 89c5dd0..746b731 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -12,6 +12,10 @@ from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Me
from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client, resolve_provider
from app.services.council import council_event_stream
from app.services.debate import debate_event_stream
+from app.services.tasks import (
+ register_task, get_task, get_task_status, run_task_in_background,
+ cleanup_stale_tasks, TaskStatus, _CHUNK_EVENT_TYPES, _task_registry,
+)
from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db
from app.auth.utils import get_password_hash
from dotenv import load_dotenv
@@ -57,7 +61,12 @@ app.add_middleware(
async def startup_event():
"""Initialize database and create default test user if not exists"""
init_db()
-
+
+ # Mark any in-progress tasks from prior run as interrupted
+ stale_count = cleanup_stale_tasks(DATA_ROOT)
+ if stale_count:
+ logger.info("Cleaned up %d stale background tasks", stale_count)
+
# Create test user if not exists
from app.auth.models import SessionLocal
db = SessionLocal()
@@ -711,6 +720,350 @@ async def run_debate_stream(
)
+# --------------- Background Task Helpers ---------------
+
+async def _prepare_council_args(request: CouncilRunRequest, resolved: User | None, username: str) -> dict:
+ """Extract shared council request processing into a reusable helper.
+ Returns kwargs suitable for council_event_stream()."""
+ raw_messages = []
+ for ctx in request.incoming_contexts:
+ raw_messages.extend(ctx.messages)
+ if request.merge_strategy == MergeStrategy.SMART:
+ final_messages = smart_merge_messages(raw_messages)
+ else:
+ final_messages = raw_messages
+ execution_context = Context(messages=final_messages)
+
+ images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids)
+ openrouter_key = get_user_api_key(resolved, "openrouter")
+
+ member_configs: list[LLMConfig] = []
+ attachments_per_model: list[list[dict] | None] = []
+ tools_per_model: list[list[dict] | None] = []
+ contexts_per_model: list[Context | None] = []
+
+ for member in request.council_models:
+ provider = resolve_provider(member.model_name)
+ provider_str = provider.value
+ api_key = get_user_api_key(resolved, provider_str)
+ config = LLMConfig(
+ provider=provider,
+ model_name=member.model_name,
+ temperature=member.temperature if member.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=api_key,
+ reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search,
+ )
+ member_configs.append(config)
+
+ tools: list[dict] = []
+ attachments: list[dict] = []
+ scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids)
+
+ if provider == ModelProvider.OPENAI:
+ vs_ids, debug_refs, filters = await prepare_openai_vector_search(
+ user=username, attached_ids=non_image_file_ids,
+ scopes=request.scopes, llm_config=config,
+ )
+ if not vs_ids:
+ try:
+ client = get_openai_client(config.api_key)
+ vs_id = await ensure_user_vector_store(username, client)
+ if vs_id:
+ vs_ids = [vs_id]
+ except Exception:
+ pass
+ if vs_ids:
+ tool_def = {"type": "file_search", "vector_store_ids": vs_ids}
+ if filters:
+ tool_def["filters"] = filters
+ tools.append(tool_def)
+ elif provider == ModelProvider.GOOGLE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+ elif provider == ModelProvider.CLAUDE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+
+ attachments_per_model.append(attachments or None)
+ tools_per_model.append(tools or None)
+
+ if member.incoming_contexts:
+ raw = []
+ for ctx in member.incoming_contexts:
+ raw.extend(ctx.messages)
+ if request.merge_strategy == MergeStrategy.SMART:
+ merged = smart_merge_messages(raw)
+ else:
+ merged = raw
+ contexts_per_model.append(Context(messages=merged))
+ else:
+ contexts_per_model.append(None)
+
+ chairman = request.chairman_model
+ chairman_provider = resolve_provider(chairman.model_name)
+ chairman_api_key = get_user_api_key(resolved, chairman_provider.value)
+ chairman_config = LLMConfig(
+ provider=chairman_provider,
+ model_name=chairman.model_name,
+ temperature=chairman.temperature if chairman.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=chairman_api_key,
+ reasoning_effort=chairman.reasoning_effort if chairman.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=chairman.enable_google_search if chairman.enable_google_search is not None else request.enable_google_search,
+ )
+
+ return dict(
+ user_prompt=request.user_prompt,
+ context=execution_context,
+ member_configs=member_configs,
+ chairman_config=chairman_config,
+ attachments_per_model=attachments_per_model,
+ tools_per_model=tools_per_model,
+ openrouter_api_key=openrouter_key,
+ images=images,
+ contexts_per_model=contexts_per_model,
+ )
+
+
+async def _prepare_debate_args(request: DebateRunRequest, resolved: User | None, username: str) -> dict:
+ """Extract shared debate request processing into a reusable helper.
+ Returns kwargs suitable for debate_event_stream()."""
+ raw_messages = []
+ for ctx in request.incoming_contexts:
+ raw_messages.extend(ctx.messages)
+ if request.merge_strategy == MergeStrategy.SMART:
+ final_messages = smart_merge_messages(raw_messages)
+ else:
+ final_messages = raw_messages
+ execution_context = Context(messages=final_messages)
+
+ images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids)
+ openrouter_key = get_user_api_key(resolved, "openrouter")
+
+ member_configs: list[LLMConfig] = []
+ attachments_per_model: list[list[dict] | None] = []
+ tools_per_model: list[list[dict] | None] = []
+
+ for member in request.debate_models:
+ provider = resolve_provider(member.model_name)
+ provider_str = provider.value
+ api_key = get_user_api_key(resolved, provider_str)
+ config = LLMConfig(
+ provider=provider,
+ model_name=member.model_name,
+ temperature=member.temperature if member.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=api_key,
+ reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search,
+ )
+ member_configs.append(config)
+
+ tools: list[dict] = []
+ attachments: list[dict] = []
+ scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids)
+
+ if provider == ModelProvider.OPENAI:
+ vs_ids, debug_refs, filters = await prepare_openai_vector_search(
+ user=username, attached_ids=non_image_file_ids,
+ scopes=request.scopes, llm_config=config,
+ )
+ if not vs_ids:
+ try:
+ client = get_openai_client(config.api_key)
+ vs_id = await ensure_user_vector_store(username, client)
+ if vs_id:
+ vs_ids = [vs_id]
+ except Exception:
+ pass
+ if vs_ids:
+ tool_def = {"type": "file_search", "vector_store_ids": vs_ids}
+ if filters:
+ tool_def["filters"] = filters
+ tools.append(tool_def)
+ elif provider == ModelProvider.GOOGLE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+ elif provider == ModelProvider.CLAUDE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+
+ attachments_per_model.append(attachments or None)
+ tools_per_model.append(tools or None)
+
+ judge_config = None
+ if request.judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and request.judge_model:
+ judge = request.judge_model
+ judge_provider = resolve_provider(judge.model_name)
+ judge_api_key = get_user_api_key(resolved, judge_provider.value)
+ judge_config = LLMConfig(
+ provider=judge_provider,
+ model_name=judge.model_name,
+ temperature=judge.temperature if judge.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=judge_api_key,
+ reasoning_effort=judge.reasoning_effort if judge.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=judge.enable_google_search if judge.enable_google_search is not None else request.enable_google_search,
+ )
+
+ return dict(
+ user_prompt=request.user_prompt,
+ context=execution_context,
+ member_configs=member_configs,
+ judge_config=judge_config,
+ judge_mode=request.judge_mode,
+ debate_format=request.debate_format,
+ max_rounds=request.max_rounds,
+ custom_format_prompt=request.custom_format_prompt,
+ attachments_per_model=attachments_per_model,
+ tools_per_model=tools_per_model,
+ openrouter_api_key=openrouter_key,
+ images=images,
+ )
+
+
+# --------------- Background Task Endpoints ---------------
+
+@app.post("/api/task/start_council")
+async def start_council_task(
+ request: CouncilRunRequest,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """Start council as a background task. Returns {task_id}."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ # Cancel existing task on same node if still running
+ for existing in list(_task_registry.values()):
+ if existing.node_id == request.node_id and existing.user == username and existing.status == TaskStatus.RUNNING:
+ if existing.asyncio_task and not existing.asyncio_task.done():
+ existing.asyncio_task.cancel()
+ logger.info("Cancelled previous task %s for node %s", existing.task_id, request.node_id)
+
+ kwargs = await _prepare_council_args(request, resolved, username)
+ generator = council_event_stream(**kwargs)
+
+ task_id = str(uuid4())
+ info = register_task(task_id, username, request.node_id, "council")
+ info.asyncio_task = asyncio.create_task(run_task_in_background(info, generator))
+ return {"task_id": task_id}
+
+
+@app.post("/api/task/start_debate")
+async def start_debate_task(
+ request: DebateRunRequest,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """Start debate as a background task. Returns {task_id}."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ # Cancel existing task on same node if still running
+ for existing in list(_task_registry.values()):
+ if existing.node_id == request.node_id and existing.user == username and existing.status == TaskStatus.RUNNING:
+ if existing.asyncio_task and not existing.asyncio_task.done():
+ existing.asyncio_task.cancel()
+ logger.info("Cancelled previous task %s for node %s", existing.task_id, request.node_id)
+
+ kwargs = await _prepare_debate_args(request, resolved, username)
+ generator = debate_event_stream(**kwargs)
+
+ task_id = str(uuid4())
+ info = register_task(task_id, username, request.node_id, "debate")
+ info.asyncio_task = asyncio.create_task(run_task_in_background(info, generator))
+ return {"task_id": task_id}
+
+
+@app.get("/api/task/{task_id}/stream")
+async def stream_task_events(
+ task_id: str,
+ from_event: int = 0,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """SSE stream: replay missed events then stream live."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ info = get_task(task_id)
+ if not info:
+ raise HTTPException(status_code=404, detail="Task not found")
+ if info.user != username:
+ raise HTTPException(status_code=403, detail="Not your task")
+
+ async def event_generator():
+ cursor = from_event
+ # 1. Replay accumulated events from cursor
+ while cursor < len(info.events):
+ evt = info.events[cursor]
+ yield f"data: {json.dumps(evt)}\n\n"
+ cursor += 1
+
+ # 2. If already done, send terminal and return
+ if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED):
+ yield f"data: {json.dumps({'type': 'task_status', 'data': {'status': info.status.value}})}\n\n"
+ return
+
+ # 3. Stream live events
+ while True:
+ # Wait for new events with timeout (keepalive)
+ try:
+ await asyncio.wait_for(
+ asyncio.shield(info.new_event_signal.wait()),
+ timeout=15.0,
+ )
+ except asyncio.TimeoutError:
+ # Send keepalive comment
+ yield ": keepalive\n\n"
+ # Check if task finished while waiting
+ if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED):
+ break
+ continue
+
+ # Yield any new events since cursor
+ while cursor < len(info.events):
+ evt = info.events[cursor]
+ yield f"data: {json.dumps(evt)}\n\n"
+ cursor += 1
+
+ if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED):
+ break
+
+ yield f"data: {json.dumps({'type': 'task_status', 'data': {'status': info.status.value}})}\n\n"
+
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
+
+
+@app.get("/api/task/{task_id}")
+async def poll_task(
+ task_id: str,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """Poll: return status + accumulated results."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ status_data = get_task_status(username, task_id)
+ if not status_data:
+ raise HTTPException(status_code=404, detail="Task not found")
+ if status_data.get("user") != username:
+ raise HTTPException(status_code=403, detail="Not your task")
+ return status_data
+
+
class TitleRequest(BaseModel):
user_prompt: str
response: str
diff --git a/backend/app/services/tasks.py b/backend/app/services/tasks.py
new file mode 100644
index 0000000..003c02d
--- /dev/null
+++ b/backend/app/services/tasks.py
@@ -0,0 +1,304 @@
+"""
+Background task management for debate & council operations.
+
+Decouples execution from SSE delivery so tasks survive browser disconnects.
+Results are persisted progressively to disk.
+"""
+
+import asyncio
+import json
+import logging
+import os
+import time
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Any, AsyncGenerator, Dict, List, Optional
+
+logger = logging.getLogger("contextflow.tasks")
+
+DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data")))
+
+
+class TaskStatus(str, Enum):
+ PENDING = "pending"
+ RUNNING = "running"
+ COMPLETED = "completed"
+ FAILED = "failed"
+ INTERRUPTED = "interrupted"
+
+
+# Chunk event types that are NOT persisted to disk (redundant with *_complete events)
+_CHUNK_EVENT_TYPES = {"final_chunk", "stage3_chunk"}
+
+
+@dataclass
+class TaskInfo:
+ task_id: str
+ user: str
+ node_id: str
+ task_type: str # "debate" | "council"
+ status: TaskStatus
+ created_at: float
+ updated_at: float
+ events: List[Dict] = field(default_factory=list)
+ result: Optional[Dict] = None
+ error: Optional[str] = None
+ # In-memory only (not serialized):
+ new_event_signal: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
+ asyncio_task: Optional[asyncio.Task] = field(default=None, repr=False)
+
+
+# --------------- In-memory registry ---------------
+_task_registry: Dict[str, TaskInfo] = {}
+
+
+def register_task(task_id: str, user: str, node_id: str, task_type: str) -> TaskInfo:
+ """Create and register a new task in memory + persist initial state to disk."""
+ now = time.time()
+ info = TaskInfo(
+ task_id=task_id,
+ user=user,
+ node_id=node_id,
+ task_type=task_type,
+ status=TaskStatus.PENDING,
+ created_at=now,
+ updated_at=now,
+ )
+ _task_registry[task_id] = info
+ _persist_task(info)
+ return info
+
+
+def get_task(task_id: str) -> Optional[TaskInfo]:
+ """Get task from in-memory registry."""
+ return _task_registry.get(task_id)
+
+
+def get_task_status(user: str, task_id: str) -> Optional[Dict]:
+ """
+ Get task status dict. In-memory first, fallback to disk.
+ If disk says "running" but not in memory → report "interrupted".
+ """
+ info = _task_registry.get(task_id)
+ if info:
+ return _task_to_dict(info)
+
+ # Fallback: read from disk
+ disk_data = _load_task_from_disk(user, task_id)
+ if not disk_data:
+ return None
+
+ # If disk says running but it's not in memory, backend must have restarted
+ if disk_data.get("status") == TaskStatus.RUNNING:
+ disk_data["status"] = TaskStatus.INTERRUPTED
+ return disk_data
+
+
+async def run_task_in_background(info: TaskInfo, event_generator: AsyncGenerator[str, None]) -> None:
+ """
+ Consume the async generator, appending events to info.events.
+ Signals new_event_signal for SSE consumers.
+ Persists at milestones, skips chunk events from persistence.
+ """
+ info.status = TaskStatus.RUNNING
+ info.updated_at = time.time()
+ _persist_task(info)
+
+ try:
+ async for raw_sse in event_generator:
+ # raw_sse is a string like 'data: {"type": "...", ...}\n\n'
+ evt = _parse_sse_event(raw_sse)
+ if evt is None:
+ continue
+
+ evt_type = evt.get("type", "")
+
+ # Always store in memory (even chunks, for live streaming)
+ info.events.append(evt)
+ info.updated_at = time.time()
+
+ # Signal waiting SSE consumers
+ info.new_event_signal.set()
+ info.new_event_signal.clear()
+
+ # Persist at milestones (skip chunk events)
+ if evt_type not in _CHUNK_EVENT_TYPES:
+ _persist_task(info)
+
+ # Detect completion / error
+ if evt_type in ("complete", "debate_complete"):
+ info.result = _extract_result(info.task_type, info.events)
+ info.status = TaskStatus.COMPLETED
+ info.updated_at = time.time()
+ _persist_task(info)
+ return
+ if evt_type == "error":
+ info.error = evt.get("data", {}).get("message", "Unknown error") if isinstance(evt.get("data"), dict) else str(evt.get("data", "Unknown error"))
+ info.status = TaskStatus.FAILED
+ info.updated_at = time.time()
+ _persist_task(info)
+ return
+
+ # Generator exhausted without explicit complete/error
+ if info.status == TaskStatus.RUNNING:
+ info.result = _extract_result(info.task_type, info.events)
+ info.status = TaskStatus.COMPLETED
+ info.updated_at = time.time()
+ _persist_task(info)
+
+ except asyncio.CancelledError:
+ info.status = TaskStatus.INTERRUPTED
+ info.updated_at = time.time()
+ _persist_task(info)
+ raise
+ except Exception as e:
+ logger.exception("Background task %s failed: %s", info.task_id, e)
+ info.error = str(e)
+ info.status = TaskStatus.FAILED
+ info.updated_at = time.time()
+ _persist_task(info)
+
+
+def cleanup_stale_tasks(data_root: Optional[str] = None) -> int:
+ """
+ On startup, scan task files on disk and mark any "running" as "interrupted".
+ Returns count of tasks marked interrupted.
+ """
+ root = data_root or DATA_ROOT
+ count = 0
+ if not os.path.exists(root):
+ return count
+
+ for user_dir in os.listdir(root):
+ tasks_dir = os.path.join(root, user_dir, "tasks")
+ if not os.path.isdir(tasks_dir):
+ continue
+ for fname in os.listdir(tasks_dir):
+ if not fname.endswith(".json"):
+ continue
+ fpath = os.path.join(tasks_dir, fname)
+ try:
+ with open(fpath, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ if data.get("status") == TaskStatus.RUNNING:
+ data["status"] = TaskStatus.INTERRUPTED
+ data["updated_at"] = time.time()
+ with open(fpath, "w", encoding="utf-8") as f:
+ json.dump(data, f, ensure_ascii=False)
+ count += 1
+ logger.info("Marked stale task as interrupted: %s", fname)
+ except Exception as e:
+ logger.warning("Failed to process stale task file %s: %s", fpath, e)
+
+ return count
+
+
+# --------------- Persistence helpers ---------------
+
+def _task_dir(user: str) -> str:
+ d = os.path.join(DATA_ROOT, user, "tasks")
+ os.makedirs(d, exist_ok=True)
+ return d
+
+
+def _task_file_path(user: str, task_id: str) -> str:
+ return os.path.join(_task_dir(user), f"{task_id}.json")
+
+
+def _persist_task(info: TaskInfo) -> None:
+ """Write task state to disk. Excludes chunk events for smaller files."""
+ try:
+ data = _task_to_dict(info, exclude_chunks=True)
+ path = _task_file_path(info.user, info.task_id)
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(data, f, ensure_ascii=False)
+ except Exception as e:
+ logger.warning("Failed to persist task %s: %s", info.task_id, e)
+
+
+def _load_task_from_disk(user: str, task_id: str) -> Optional[Dict]:
+ path = _task_file_path(user, task_id)
+ if not os.path.exists(path):
+ return None
+ try:
+ with open(path, "r", encoding="utf-8") as f:
+ return json.load(f)
+ except Exception as e:
+ logger.warning("Failed to load task %s from disk: %s", task_id, e)
+ return None
+
+
+def _task_to_dict(info: TaskInfo, exclude_chunks: bool = False) -> Dict:
+ events = info.events
+ if exclude_chunks:
+ events = [e for e in events if e.get("type") not in _CHUNK_EVENT_TYPES]
+ return {
+ "task_id": info.task_id,
+ "user": info.user,
+ "node_id": info.node_id,
+ "task_type": info.task_type,
+ "status": info.status.value if isinstance(info.status, TaskStatus) else info.status,
+ "created_at": info.created_at,
+ "updated_at": info.updated_at,
+ "events": events,
+ "result": info.result,
+ "error": info.error,
+ }
+
+
+def _parse_sse_event(raw: str) -> Optional[Dict]:
+ """Parse a raw SSE string like 'data: {...}\\n\\n' into a dict."""
+ raw = raw.strip()
+ if not raw:
+ return None
+ for line in raw.split("\n"):
+ line = line.strip()
+ if line.startswith("data: "):
+ try:
+ return json.loads(line[6:])
+ except json.JSONDecodeError:
+ return None
+ return None
+
+
+def _extract_result(task_type: str, events: List[Dict]) -> Dict:
+ """Reconstruct final result data from accumulated events."""
+ if task_type == "council":
+ stage1 = None
+ stage2 = None
+ stage3 = None
+ for evt in events:
+ t = evt.get("type", "")
+ if t == "stage1_complete":
+ stage1 = evt.get("data")
+ elif t == "stage1_model_complete":
+ if stage1 is None:
+ stage1 = []
+ stage1.append(evt.get("data"))
+ elif t == "stage2_complete":
+ stage2 = evt.get("data")
+ elif t == "stage3_complete":
+ stage3 = evt.get("data")
+ return {"councilData": {"stage1": stage1, "stage2": stage2, "stage3": stage3}}
+
+ elif task_type == "debate":
+ rounds: List[Dict] = []
+ final_verdict = None
+ for evt in events:
+ t = evt.get("type", "")
+ if t == "round_complete":
+ rounds.append(evt.get("data", {}))
+ elif t == "final_complete":
+ final_verdict = evt.get("data")
+ elif t == "judge_decision":
+ # Attach to last round
+ if rounds:
+ rounds[-1]["judgeDecision"] = evt.get("data")
+ elif t == "model_eliminated":
+ if rounds:
+ if "eliminated" not in rounds[-1]:
+ rounds[-1]["eliminated"] = []
+ rounds[-1]["eliminated"].append(evt.get("data"))
+ return {"debateRounds": rounds, "finalVerdict": final_verdict}
+
+ return {}
diff --git a/frontend/src/components/LeftSidebar.tsx b/frontend/src/components/LeftSidebar.tsx
index 54c2527..441b7e0 100644
--- a/frontend/src/components/LeftSidebar.tsx
+++ b/frontend/src/components/LeftSidebar.tsx
@@ -30,6 +30,7 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => {
deleteFile,
readBlueprintFile,
loadBlueprint,
+ recoverBackgroundTasks,
saveBlueprintFile,
saveCurrentBlueprint,
createProjectFolder,
@@ -266,6 +267,8 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => {
if (vp) {
setViewport(vp);
}
+ // Recover any background tasks that were running before page refresh
+ recoverBackgroundTasks();
} catch (e) {
console.error(e);
alert('Not a valid blueprint JSON.');
diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx
index 8bb5fcb..474a969 100644
--- a/frontend/src/components/Sidebar.tsx
+++ b/frontend/src/components/Sidebar.tsx
@@ -387,6 +387,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
})
});
+ if (!response.ok) throw new Error(await response.text() || `HTTP ${response.status}`);
if (!response.body) return;
const reader = response.body.getReader();
const decoder = new TextDecoder();
@@ -511,7 +512,8 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
};
try {
- const response = await fetch(`/api/run_council_stream?user=${encodeURIComponent(user?.username || 'test')}`, {
+ // Step 1: Start background task
+ const startRes = await fetch(`/api/task/start_council?user=${encodeURIComponent(user?.username || 'test')}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...getAuthHeader() },
body: JSON.stringify({
@@ -546,101 +548,35 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}),
});
- if (!response.body) return;
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let sseBuffer = '';
- let stage1Results: Array<{ model: string; response: string }> = [];
- let stage2Data: any = null;
- let stage3Full = '';
- let stage3Model = '';
+ if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`);
+ const { task_id } = await startRes.json();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- sseBuffer += decoder.decode(value, { stream: true });
-
- // Parse SSE events (data: {...}\n\n)
- const parts = sseBuffer.split('\n\n');
- sseBuffer = parts.pop() || '';
-
- for (const part of parts) {
- const line = part.trim();
- if (!line.startsWith('data: ')) continue;
- let evt: any;
- try {
- evt = JSON.parse(line.slice(6));
- } catch { continue; }
-
- switch (evt.type) {
- case 'stage1_start':
- setCouncilStage('Stage 1: Collecting responses...');
- break;
- case 'stage1_model_complete':
- stage1Results = [...stage1Results, evt.data];
- setCouncilStage(`Stage 1: ${stage1Results.length}/${councilModels.length} models done`);
- updateNodeData(runningNodeId, {
- councilData: { stage1: [...stage1Results], stage2: null, stage3: null },
- });
- break;
- case 'stage1_complete':
- stage1Results = evt.data;
- updateNodeData(runningNodeId, {
- councilData: { stage1: stage1Results, stage2: null, stage3: null },
- });
- break;
- case 'stage2_start':
- setCouncilStage('Stage 2: Peer ranking...');
- break;
- case 'stage2_complete':
- stage2Data = evt.data;
- updateNodeData(runningNodeId, {
- councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null },
- });
- break;
- case 'stage3_start':
- setCouncilStage('Stage 3: Chairman synthesizing...');
- setCouncilStreamBuffer('');
- break;
- case 'stage3_chunk':
- stage3Full += evt.data.chunk;
- setCouncilStreamBuffer(stage3Full);
- setStreamBuffer(stage3Full);
- break;
- case 'stage3_complete':
- stage3Model = evt.data.model;
- stage3Full = evt.data.response;
- break;
- case 'complete': {
- const responseReceivedAt = Date.now();
- const councilData: CouncilData = {
- stage1: stage1Results,
- stage2: stage2Data,
- stage3: { model: stage3Model, response: stage3Full },
- };
- const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt };
- const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: stage3Full };
- updateNodeData(runningNodeId, {
- status: 'success',
- response: stage3Full,
- responseReceivedAt,
- councilData,
- messages: [...context, newUserMsg, newAssistantMsg] as any,
- });
- setCouncilStage('');
- generateTitle(runningNodeId, runningPrompt, stage3Full);
- break;
- }
- case 'error':
- updateNodeData(runningNodeId, { status: 'error' });
- setCouncilStage('');
- break;
- }
- }
+ // Step 2: Store taskId on node for recovery + force save
+ updateNodeData(runningNodeId, { taskId: task_id });
+ if (currentBlueprintPath) {
+ saveCurrentBlueprint(currentBlueprintPath).catch(console.error);
}
+
+ // Step 3: Consume SSE stream from task
+ const { _consumeTaskStream } = await import('../store/flowStore');
+ await _consumeTaskStream(
+ task_id, runningNodeId, 'council', 0, updateNodeData, user?.username || 'test',
+ {
+ onStage: setCouncilStage,
+ onStreamBuffer: (text) => { setCouncilStreamBuffer(text); setStreamBuffer(text); },
+ onComplete: () => {
+ setCouncilStage('');
+ // Fetch final response for title generation
+ const currentNode = nodes.find(n => n.id === runningNodeId);
+ const finalResponse = currentNode?.data.response || '';
+ if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse);
+ },
+ onError: () => { setCouncilStage(''); },
+ },
+ );
} catch (error) {
console.error(error);
- updateNodeData(runningNodeId, { status: 'error' });
+ updateNodeData(runningNodeId, { status: 'error', taskId: undefined });
setCouncilStage('');
} finally {
setStreamingNodeId(prev => prev === runningNodeId ? null : prev);
@@ -707,7 +643,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
try {
const judgeModelConfig = selectedNode.data.judgeModel || debateModels[0];
- const response = await fetch(`/api/run_debate_stream?user=${encodeURIComponent(user?.username || 'test')}`, {
+
+ // Step 1: Start background task
+ const startRes = await fetch(`/api/task/start_debate?user=${encodeURIComponent(user?.username || 'test')}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...getAuthHeader() },
body: JSON.stringify({
@@ -740,176 +678,34 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}),
});
- if (!response.body) return;
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let sseBuffer = '';
- const debateRounds: DebateRound[] = [];
- let currentRound = 0;
- let currentRoundResponses: Array<{ model: string; response: string }> = [];
- let finalModel = '';
- let finalFull = '';
+ if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`);
+ const { task_id } = await startRes.json();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- sseBuffer += decoder.decode(value, { stream: true });
-
- const parts = sseBuffer.split('\n\n');
- sseBuffer = parts.pop() || '';
-
- for (const part of parts) {
- const line = part.trim();
- if (!line.startsWith('data: ')) continue;
- let evt: any;
- try {
- evt = JSON.parse(line.slice(6));
- } catch { continue; }
-
- switch (evt.type) {
- case 'debate_start':
- setDebateStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`);
- break;
- case 'round_start':
- currentRound = evt.data.round;
- currentRoundResponses = [];
- setDebateStage(`Round ${currentRound}/${maxRounds}: Collecting responses...`);
- break;
- case 'round_model_complete':
- currentRoundResponses = [...currentRoundResponses, { model: evt.data.model, response: evt.data.response }];
- setDebateStage(`Round ${currentRound}/${maxRounds}: ${currentRoundResponses.length}/${debateModels.length} models done`);
- break;
- case 'round_complete': {
- const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses };
- debateRounds.push(roundData);
- updateNodeData(runningNodeId, {
- debateData: {
- rounds: [...debateRounds],
- finalVerdict: null,
- config: { judgeMode, format: debateFormat, maxRounds },
- },
- });
- break;
- }
- case 'judge_decision': {
- const lastRound = debateRounds[debateRounds.length - 1];
- if (lastRound) {
- lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
- updateNodeData(runningNodeId, {
- debateData: {
- rounds: [...debateRounds],
- finalVerdict: null,
- config: { judgeMode, format: debateFormat, maxRounds },
- },
- });
- }
- if (!evt.data.continue) {
- setDebateStage('Judge stopped debate. Generating final verdict...');
- } else {
- setDebateStage(`Judge: Continue to round ${currentRound + 1}...`);
- }
- break;
- }
- case 'model_eliminated': {
- const lastRound2 = debateRounds[debateRounds.length - 1];
- if (lastRound2) {
- if (!lastRound2.eliminated) lastRound2.eliminated = [];
- lastRound2.eliminated.push({
- model: evt.data.model,
- convincedBy: evt.data.convinced_by,
- reasoning: evt.data.reasoning,
- });
- }
- setDebateStage(`${evt.data.model} concedes to ${evt.data.convinced_by || 'another'}...`);
- break;
- }
- case 'convergence_status': {
- const remaining = evt.data.remaining as string[];
- updateNodeData(runningNodeId, {
- debateData: {
- rounds: [...debateRounds],
- finalVerdict: null,
- config: { judgeMode, format: debateFormat, maxRounds },
- },
- });
- if (remaining.length <= 1) {
- setDebateStage(`${remaining[0] || 'Winner'} is the last one standing!`);
- } else {
- setDebateStage(`${remaining.length} models remaining...`);
- }
- break;
- }
- case 'final_start':
- finalModel = evt.data.model;
- setDebateStage('Judge synthesizing final verdict...');
- setDebateStreamBuffer('');
- break;
- case 'final_chunk':
- finalFull += evt.data.chunk;
- setDebateStreamBuffer(finalFull);
- setStreamBuffer(finalFull);
- break;
- case 'final_complete': {
- finalModel = evt.data.model;
- finalFull = evt.data.response;
- const responseReceivedAt = Date.now();
- const debateData: DebateData = {
- rounds: debateRounds,
- finalVerdict: { model: finalModel, response: finalFull },
- config: { judgeMode, format: debateFormat, maxRounds },
- };
- const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt };
- const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: finalFull };
- updateNodeData(runningNodeId, {
- status: 'success',
- response: finalFull,
- responseReceivedAt,
- debateData,
- messages: [...context, newUserMsg, newAssistantMsg] as any,
- });
- setDebateStage('');
- generateTitle(runningNodeId, runningPrompt, finalFull);
- break;
- }
- case 'debate_complete': {
- // If no final verdict (display_only or self_convergence without explicit final_complete)
- const currentNode = nodes.find(n => n.id === runningNodeId);
- if (currentNode?.data.status === 'loading') {
- const responseReceivedAt = Date.now();
- const lastRoundResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : [];
- const bestResponse = lastRoundResp.length > 0
- ? lastRoundResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response
- : '';
- const debateData: DebateData = {
- rounds: debateRounds,
- finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null,
- config: { judgeMode, format: debateFormat, maxRounds },
- };
- const displayResponse = finalFull || bestResponse;
- const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt };
- const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: displayResponse };
- updateNodeData(runningNodeId, {
- status: 'success',
- response: displayResponse,
- responseReceivedAt,
- debateData,
- messages: [...context, newUserMsg, newAssistantMsg] as any,
- });
- setDebateStage('');
- if (displayResponse) generateTitle(runningNodeId, runningPrompt, displayResponse);
- }
- break;
- }
- case 'error':
- updateNodeData(runningNodeId, { status: 'error' });
- setDebateStage('');
- break;
- }
- }
+ // Step 2: Store taskId on node for recovery + force save
+ updateNodeData(runningNodeId, { taskId: task_id });
+ if (currentBlueprintPath) {
+ saveCurrentBlueprint(currentBlueprintPath).catch(console.error);
}
+
+ // Step 3: Consume SSE stream from task
+ const { _consumeTaskStream } = await import('../store/flowStore');
+ await _consumeTaskStream(
+ task_id, runningNodeId, 'debate', 0, updateNodeData, user?.username || 'test',
+ {
+ onStage: setDebateStage,
+ onStreamBuffer: (text) => { setDebateStreamBuffer(text); setStreamBuffer(text); },
+ onComplete: () => {
+ setDebateStage('');
+ const currentNode = nodes.find(n => n.id === runningNodeId);
+ const finalResponse = currentNode?.data.response || '';
+ if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse);
+ },
+ onError: () => { setDebateStage(''); },
+ },
+ );
} catch (error) {
console.error(error);
- updateNodeData(runningNodeId, { status: 'error' });
+ updateNodeData(runningNodeId, { status: 'error', taskId: undefined });
setDebateStage('');
} finally {
setStreamingNodeId(prev => prev === runningNodeId ? null : prev);
@@ -2968,7 +2764,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}`}
>
{selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <Users size={16} />}
- {selectedNode.data.status === 'loading' && councilStage ? councilStage : `Run Council (${(selectedNode.data.councilModels || []).length})`}
+ {selectedNode.data.status === 'loading' && (councilStage || selectedNode.data.taskStage) ? (councilStage || selectedNode.data.taskStage) : `Run Council (${(selectedNode.data.councilModels || []).length})`}
</button>
<div className={`text-center text-[10px] mt-0.5 leading-tight ${isDark ? 'text-gray-600' : 'text-gray-400'}`}>
Inspired by <a href="https://github.com/karpathy/llm-council" target="_blank" rel="noopener noreferrer" className={`underline decoration-dotted ${isDark ? 'text-gray-500 hover:text-gray-400' : 'text-gray-500 hover:text-gray-600'}`}>karpathy/llm-council</a>
@@ -2985,7 +2781,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}`}
>
{selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <MessageSquare size={16} />}
- {selectedNode.data.status === 'loading' && debateStage ? debateStage : `Run Debate (${(selectedNode.data.debateModels || []).length})`}
+ {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) ? (debateStage || selectedNode.data.taskStage) : `Run Debate (${(selectedNode.data.debateModels || []).length})`}
</button>
) : (
<button
@@ -3086,7 +2882,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
<div>
{selectedNode.data.councilData.stage3 ? (
<div className={`text-xs mb-1 ${isDark ? 'text-gray-500' : 'text-gray-400'}`}>Chairman: {selectedNode.data.councilData.stage3.model}</div>
- ) : selectedNode.data.status === 'loading' && councilStage.includes('Stage 3') ? (
+ ) : selectedNode.data.status === 'loading' && (councilStage.includes('Stage 3') || (selectedNode.data.taskStage || '').includes('Stage 3')) ? (
<div className={`text-xs mb-1 flex items-center gap-1 ${isDark ? 'text-amber-400' : 'text-amber-600'}`}><Loader2 className="animate-spin" size={10} /> Synthesizing...</div>
) : null}
<div className={`p-3 rounded-md border min-h-[150px] text-sm prose prose-sm max-w-none ${
@@ -3333,9 +3129,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
</div>
);
})}
- {selectedNode.data.status === 'loading' && debateStage && (
+ {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) && (
<div className={`flex items-center gap-2 p-2 text-xs ${isDark ? 'text-cyan-400' : 'text-cyan-600'}`}>
- <Loader2 className="animate-spin" size={12} /> {debateStage}
+ <Loader2 className="animate-spin" size={12} /> {debateStage || selectedNode.data.taskStage}
</div>
)}
</div>
diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts
index 5bb3e22..ec96185 100644
--- a/frontend/src/store/flowStore.ts
+++ b/frontend/src/store/flowStore.ts
@@ -137,6 +137,10 @@ export interface NodeData {
debateMaxRounds?: number;
debateData?: DebateData;
+ // Background task persistence
+ taskId?: string; // Background task ID for recovery
+ taskStage?: string; // Live stage text (e.g. "Round 2: Collecting responses...")
+
// Traces logic
traces: Trace[]; // INCOMING Traces
outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks + merged)
@@ -293,6 +297,9 @@ interface FlowState {
) => Message[];
propagateTraces: () => void;
+
+ // Background task recovery
+ recoverBackgroundTasks: () => Promise<void>;
}
// Hash string to color
@@ -341,6 +348,401 @@ const jsonFetch = async <T>(url: string, options?: RequestInit): Promise<T> => {
return res.json() as Promise<T>;
};
+// --------------- Background Task Helpers ---------------
+
+/** Apply partial state from accumulated events (for recovery of in-progress tasks) */
+function _applyPartialEvents(
+ updateNodeData: (nodeId: string, data: Partial<NodeData>) => void,
+ nodeId: string,
+ taskType: string,
+ events: any[],
+ currentData: NodeData,
+) {
+ // Compute taskStage from the last relevant event
+ let taskStage = 'Recovering...';
+
+ if (taskType === 'council') {
+ let stage1: any[] | null = null;
+ let stage2: any = null;
+ let stage3Full = '';
+ for (const evt of events) {
+ if (evt.type === 'stage1_start') {
+ taskStage = 'Stage 1: Collecting responses...';
+ } else if (evt.type === 'stage1_model_complete') {
+ if (!stage1) stage1 = [];
+ stage1.push(evt.data);
+ taskStage = `Stage 1: ${stage1.length} models done`;
+ } else if (evt.type === 'stage1_complete') {
+ stage1 = evt.data;
+ taskStage = 'Stage 1 complete';
+ } else if (evt.type === 'stage2_start') {
+ taskStage = 'Stage 2: Peer ranking...';
+ } else if (evt.type === 'stage2_complete') {
+ stage2 = evt.data;
+ taskStage = 'Stage 2 complete';
+ } else if (evt.type === 'stage3_start') {
+ taskStage = 'Stage 3: Chairman synthesizing...';
+ } else if (evt.type === 'stage3_complete') {
+ stage3Full = evt.data.response;
+ taskStage = 'Stage 3: Chairman synthesizing...';
+ }
+ }
+ updateNodeData(nodeId, {
+ councilData: { stage1, stage2, stage3: stage3Full ? { model: '', response: stage3Full } : null },
+ taskStage,
+ });
+ } else if (taskType === 'debate') {
+ const rounds: DebateRound[] = [];
+ let finalFull = '';
+ let finalModel = '';
+ let currentRound = 0;
+ for (const evt of events) {
+ if (evt.type === 'debate_start') {
+ taskStage = `Debate started (${evt.data?.models?.length || '?'} models)`;
+ } else if (evt.type === 'round_start') {
+ currentRound = evt.data.round;
+ taskStage = `Round ${currentRound}: Collecting responses...`;
+ } else if (evt.type === 'round_model_complete') {
+ taskStage = `Round ${currentRound}: ${evt.data.model} done`;
+ } else if (evt.type === 'round_complete') {
+ rounds.push({ round: evt.data.round, responses: evt.data.responses });
+ taskStage = `Round ${evt.data.round} complete`;
+ } else if (evt.type === 'judge_decision' && rounds.length > 0) {
+ rounds[rounds.length - 1].judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
+ } else if (evt.type === 'model_eliminated' && rounds.length > 0) {
+ if (!rounds[rounds.length - 1].eliminated) rounds[rounds.length - 1].eliminated = [];
+ rounds[rounds.length - 1].eliminated!.push({
+ model: evt.data.model, convincedBy: evt.data.convinced_by, reasoning: evt.data.reasoning,
+ });
+ } else if (evt.type === 'convergence_status') {
+ const remaining = evt.data?.remaining as string[] || [];
+ taskStage = remaining.length <= 1
+ ? `${remaining[0] || 'Winner'} is the last one standing!`
+ : `${remaining.length} models remaining...`;
+ } else if (evt.type === 'final_start') {
+ finalModel = evt.data.model;
+ taskStage = 'Judge synthesizing final verdict...';
+ } else if (evt.type === 'final_complete') {
+ finalModel = evt.data.model;
+ finalFull = evt.data.response;
+ }
+ }
+ const config = currentData.debateData?.config || {
+ judgeMode: currentData.debateJudgeMode || 'external_judge',
+ format: currentData.debateFormat || 'free_discussion',
+ maxRounds: currentData.debateMaxRounds || 5,
+ };
+ updateNodeData(nodeId, {
+ debateData: { rounds, finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, config: config as any },
+ taskStage,
+ });
+ }
+}
+
+/**
+ * Connect to the task SSE stream and process events, updating node data.
+ * Used by both initial task start (Sidebar) and recovery (recoverBackgroundTasks).
+ *
+ * Initializes accumulators from existing node data so that recovery after reconnect
+ * doesn't lose previously accumulated rounds/stages.
+ */
+async function _consumeTaskStream(
+ taskId: string,
+ nodeId: string,
+ taskType: string,
+ fromEvent: number,
+ updateNodeData: (nodeId: string, data: Partial<NodeData>) => void,
+ username: string,
+ callbacks?: {
+ onStage?: (stage: string) => void;
+ onStreamBuffer?: (text: string) => void;
+ onComplete?: () => void;
+ onError?: () => void;
+ },
+): Promise<void> {
+ const authHeaders = getAuthHeaders();
+
+ // Read current node data to initialize accumulators (critical for recovery)
+ const nodeData = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data;
+
+ // Helper: read debate config from node data (never hardcode)
+ const getDebateConfig = () => {
+ const nd = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data;
+ return nd?.debateData?.config || {
+ judgeMode: nd?.debateJudgeMode || 'external_judge',
+ format: nd?.debateFormat || 'free_discussion',
+ maxRounds: nd?.debateMaxRounds || 5,
+ };
+ };
+
+ // Helper: trigger auto-save after milestone events
+ const triggerSave = () => {
+ try { useFlowStore.getState().triggerAutoSave(); } catch { /* ignore */ }
+ };
+
+ // Helper: update stage text on both node data and optional callback
+ const setStage = (text: string) => {
+ updateNodeData(nodeId, { taskStage: text });
+ callbacks?.onStage?.(text);
+ };
+
+ try {
+ const response = await fetch(
+ `${API_BASE}/api/task/${encodeURIComponent(taskId)}/stream?from_event=${fromEvent}&user=${encodeURIComponent(username)}`,
+ { headers: authHeaders },
+ );
+ if (!response.ok || !response.body) {
+ callbacks?.onError?.();
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ return;
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let sseBuffer = '';
+
+ // Initialize accumulators from existing node data (preserves state across reconnect)
+ let stage1Results: Array<{ model: string; response: string }> =
+ (nodeData?.councilData?.stage1 as any[]) || [];
+ let stage2Data: any = nodeData?.councilData?.stage2 || null;
+ let stage3Full = nodeData?.councilData?.stage3?.response || '';
+ let stage3Model = nodeData?.councilData?.stage3?.model || '';
+
+ const debateRounds: DebateRound[] = [...(nodeData?.debateData?.rounds || [])];
+ let currentRound = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].round : 0;
+ let finalModel = nodeData?.debateData?.finalVerdict?.model || '';
+ let finalFull = nodeData?.debateData?.finalVerdict?.response || '';
+
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ sseBuffer += decoder.decode(value, { stream: true });
+
+ const parts = sseBuffer.split('\n\n');
+ sseBuffer = parts.pop() || '';
+
+ for (const part of parts) {
+ const line = part.trim();
+ if (!line.startsWith('data: ')) continue;
+ let evt: any;
+ try { evt = JSON.parse(line.slice(6)); } catch { continue; }
+
+ // Handle task_status terminal event
+ if (evt.type === 'task_status') {
+ if (evt.data?.status === 'completed') {
+ callbacks?.onComplete?.();
+ } else {
+ callbacks?.onError?.();
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ }
+ continue;
+ }
+
+ if (taskType === 'council') {
+ switch (evt.type) {
+ case 'stage1_start':
+ setStage('Stage 1: Collecting responses...');
+ break;
+ case 'stage1_model_complete':
+ stage1Results = [...stage1Results, evt.data];
+ setStage(`Stage 1: ${stage1Results.length} models done`);
+ updateNodeData(nodeId, {
+ councilData: { stage1: [...stage1Results], stage2: null, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage1_complete':
+ stage1Results = evt.data;
+ updateNodeData(nodeId, {
+ councilData: { stage1: stage1Results, stage2: null, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage2_start':
+ setStage('Stage 2: Peer ranking...');
+ break;
+ case 'stage2_complete':
+ stage2Data = evt.data;
+ updateNodeData(nodeId, {
+ councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage3_start':
+ setStage('Stage 3: Chairman synthesizing...');
+ callbacks?.onStreamBuffer?.('');
+ break;
+ case 'stage3_chunk':
+ stage3Full += evt.data.chunk;
+ callbacks?.onStreamBuffer?.(stage3Full);
+ break;
+ case 'stage3_complete':
+ stage3Model = evt.data.model;
+ stage3Full = evt.data.response;
+ triggerSave();
+ break;
+ case 'complete': {
+ const councilData: CouncilData = {
+ stage1: stage1Results,
+ stage2: stage2Data,
+ stage3: { model: stage3Model, response: stage3Full },
+ };
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: stage3Full,
+ responseReceivedAt: Date.now(),
+ councilData,
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ break;
+ }
+ case 'error':
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ setStage('');
+ callbacks?.onError?.();
+ break;
+ }
+ } else if (taskType === 'debate') {
+ switch (evt.type) {
+ case 'debate_start':
+ setStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`);
+ break;
+ case 'round_start':
+ currentRound = evt.data.round;
+ setStage(`Round ${currentRound}: Collecting responses...`);
+ break;
+ case 'round_model_complete':
+ setStage(`Round ${currentRound}: ${evt.data.model} done`);
+ break;
+ case 'round_complete': {
+ const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses };
+ debateRounds.push(roundData);
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ triggerSave();
+ break;
+ }
+ case 'judge_decision': {
+ const lastRound = debateRounds[debateRounds.length - 1];
+ if (lastRound) {
+ lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ triggerSave();
+ }
+ break;
+ }
+ case 'model_eliminated': {
+ const lastRound2 = debateRounds[debateRounds.length - 1];
+ if (lastRound2) {
+ if (!lastRound2.eliminated) lastRound2.eliminated = [];
+ lastRound2.eliminated.push({
+ model: evt.data.model,
+ convincedBy: evt.data.convinced_by,
+ reasoning: evt.data.reasoning,
+ });
+ }
+ break;
+ }
+ case 'convergence_status': {
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ const remaining = evt.data.remaining as string[];
+ if (remaining.length <= 1) {
+ setStage(`${remaining[0] || 'Winner'} is the last one standing!`);
+ } else {
+ setStage(`${remaining.length} models remaining...`);
+ }
+ break;
+ }
+ case 'final_start':
+ finalModel = evt.data.model;
+ setStage('Judge synthesizing final verdict...');
+ callbacks?.onStreamBuffer?.('');
+ break;
+ case 'final_chunk':
+ finalFull += evt.data.chunk;
+ callbacks?.onStreamBuffer?.(finalFull);
+ break;
+ case 'final_complete': {
+ finalModel = evt.data.model;
+ finalFull = evt.data.response;
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: finalFull,
+ responseReceivedAt: Date.now(),
+ debateData: {
+ rounds: debateRounds,
+ finalVerdict: { model: finalModel, response: finalFull },
+ config: getDebateConfig() as any,
+ },
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ break;
+ }
+ case 'debate_complete': {
+ const store = useFlowStore.getState();
+ const currentNode = store.nodes.find(n => n.id === nodeId);
+ if (currentNode?.data.status === 'loading') {
+ const lastResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : [];
+ const bestResponse = lastResp.length > 0
+ ? lastResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response
+ : '';
+ const displayResponse = finalFull || bestResponse;
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: displayResponse,
+ responseReceivedAt: Date.now(),
+ debateData: {
+ rounds: debateRounds,
+ finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null,
+ config: getDebateConfig() as any,
+ },
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ }
+ break;
+ }
+ case 'error':
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ setStage('');
+ callbacks?.onError?.();
+ break;
+ }
+ }
+ }
+ }
+ } catch (error) {
+ console.error(`Task stream error for ${taskId}:`, error);
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ callbacks?.onError?.();
+ }
+}
+
const useFlowStore = create<FlowState>((set, get) => {
const validateBlueprint = (doc: any): BlueprintDocument => {
@@ -1615,6 +2017,8 @@ const useFlowStore = create<FlowState>((set, get) => {
traces: undefined,
outgoingTraces: undefined,
messages: undefined,
+ // Drop transient runtime state
+ taskStage: undefined,
// Keep merged/forked trace definitions but strip their computed messages
mergedTraces: (n.data.mergedTraces || []).map((m: any) => ({
id: m.id,
@@ -1651,6 +2055,13 @@ const useFlowStore = create<FlowState>((set, get) => {
if (n.data?.chairmanModel && typeof n.data.chairmanModel === 'string') {
n.data.chairmanModel = { model: n.data.chairmanModel };
}
+ // Reset stale "loading" status — unless there's a background task to recover
+ if (n.data?.status === 'loading') {
+ if (!n.data.taskId) {
+ n.data.status = 'idle'; // Legacy/no task — reset
+ }
+ // If taskId exists, keep loading — recoverBackgroundTasks() will handle it
+ }
return n;
});
set({
@@ -2273,6 +2684,81 @@ const useFlowStore = create<FlowState>((set, get) => {
setTimeout(() => get().propagateTraces(), 50);
},
+ recoverBackgroundTasks: async () => {
+ const nodes = get().nodes;
+ const user = getCurrentUser();
+
+ for (const node of nodes) {
+ if (!node.data.taskId) continue;
+
+ try {
+ const res = await fetch(
+ `${API_BASE}/api/task/${encodeURIComponent(node.data.taskId)}?user=${encodeURIComponent(user)}`,
+ { headers: getAuthHeaders() }
+ );
+ if (!res.ok) {
+ // Task not found — reset node
+ get().updateNodeData(node.id, { status: 'idle', taskId: undefined });
+ continue;
+ }
+ const taskData = await res.json();
+ const status = taskData.status;
+
+ if (status === 'completed') {
+ // Apply completed results
+ const result = taskData.result || {};
+ if (taskData.task_type === 'council') {
+ const cd = result.councilData;
+ const response = cd?.stage3?.response || '';
+ get().updateNodeData(node.id, {
+ status: 'success',
+ response,
+ councilData: cd,
+ taskId: undefined,
+ });
+ } else if (taskData.task_type === 'debate') {
+ const rounds = result.debateRounds || [];
+ const finalVerdict = result.finalVerdict;
+ const response = finalVerdict?.response || '';
+ get().updateNodeData(node.id, {
+ status: 'success',
+ response,
+ debateData: {
+ rounds,
+ finalVerdict,
+ config: node.data.debateData?.config || {
+ judgeMode: node.data.debateJudgeMode || 'external_judge',
+ format: node.data.debateFormat || 'free_discussion',
+ maxRounds: node.data.debateMaxRounds || 5,
+ },
+ },
+ taskId: undefined,
+ });
+ }
+ } else if (status === 'running') {
+ // Reconnect to live stream from where we left off
+ const fromEvent = (taskData.events || []).length;
+ // Apply any partial state from accumulated events first
+ _applyPartialEvents(get().updateNodeData, node.id, taskData.task_type, taskData.events || [], node.data);
+ // Then connect to live stream
+ _consumeTaskStream(
+ node.data.taskId!, node.id, taskData.task_type, fromEvent,
+ get().updateNodeData, user
+ );
+ } else {
+ // failed / interrupted / unknown — reset
+ get().updateNodeData(node.id, {
+ status: status === 'interrupted' ? 'error' : 'error',
+ taskId: undefined,
+ });
+ }
+ } catch (err) {
+ console.error(`Failed to recover task for node ${node.id}:`, err);
+ get().updateNodeData(node.id, { status: 'idle', taskId: undefined });
+ }
+ }
+ },
+
propagateTraces: () => {
const { nodes, edges } = get();
@@ -2740,4 +3226,5 @@ useFlowStore.subscribe((state, prev) => {
if (state.theme !== prev.theme) syncThemeClass(state.theme);
});
+export { _consumeTaskStream, _applyPartialEvents };
export default useFlowStore;