summaryrefslogtreecommitdiff
path: root/backend
diff options
context:
space:
mode:
Diffstat (limited to 'backend')
-rw-r--r--backend/app/main.py355
-rw-r--r--backend/app/services/tasks.py304
2 files changed, 658 insertions, 1 deletions
diff --git a/backend/app/main.py b/backend/app/main.py
index 89c5dd0..746b731 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -12,6 +12,10 @@ from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Me
from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client, resolve_provider
from app.services.council import council_event_stream
from app.services.debate import debate_event_stream
+from app.services.tasks import (
+ register_task, get_task, get_task_status, run_task_in_background,
+ cleanup_stale_tasks, TaskStatus, _CHUNK_EVENT_TYPES, _task_registry,
+)
from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db
from app.auth.utils import get_password_hash
from dotenv import load_dotenv
@@ -57,7 +61,12 @@ app.add_middleware(
async def startup_event():
"""Initialize database and create default test user if not exists"""
init_db()
-
+
+ # Mark any in-progress tasks from prior run as interrupted
+ stale_count = cleanup_stale_tasks(DATA_ROOT)
+ if stale_count:
+ logger.info("Cleaned up %d stale background tasks", stale_count)
+
# Create test user if not exists
from app.auth.models import SessionLocal
db = SessionLocal()
@@ -711,6 +720,350 @@ async def run_debate_stream(
)
+# --------------- Background Task Helpers ---------------
+
+async def _prepare_council_args(request: CouncilRunRequest, resolved: User | None, username: str) -> dict:
+ """Extract shared council request processing into a reusable helper.
+ Returns kwargs suitable for council_event_stream()."""
+ raw_messages = []
+ for ctx in request.incoming_contexts:
+ raw_messages.extend(ctx.messages)
+ if request.merge_strategy == MergeStrategy.SMART:
+ final_messages = smart_merge_messages(raw_messages)
+ else:
+ final_messages = raw_messages
+ execution_context = Context(messages=final_messages)
+
+ images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids)
+ openrouter_key = get_user_api_key(resolved, "openrouter")
+
+ member_configs: list[LLMConfig] = []
+ attachments_per_model: list[list[dict] | None] = []
+ tools_per_model: list[list[dict] | None] = []
+ contexts_per_model: list[Context | None] = []
+
+ for member in request.council_models:
+ provider = resolve_provider(member.model_name)
+ provider_str = provider.value
+ api_key = get_user_api_key(resolved, provider_str)
+ config = LLMConfig(
+ provider=provider,
+ model_name=member.model_name,
+ temperature=member.temperature if member.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=api_key,
+ reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search,
+ )
+ member_configs.append(config)
+
+ tools: list[dict] = []
+ attachments: list[dict] = []
+ scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids)
+
+ if provider == ModelProvider.OPENAI:
+ vs_ids, debug_refs, filters = await prepare_openai_vector_search(
+ user=username, attached_ids=non_image_file_ids,
+ scopes=request.scopes, llm_config=config,
+ )
+ if not vs_ids:
+ try:
+ client = get_openai_client(config.api_key)
+ vs_id = await ensure_user_vector_store(username, client)
+ if vs_id:
+ vs_ids = [vs_id]
+ except Exception:
+ pass
+ if vs_ids:
+ tool_def = {"type": "file_search", "vector_store_ids": vs_ids}
+ if filters:
+ tool_def["filters"] = filters
+ tools.append(tool_def)
+ elif provider == ModelProvider.GOOGLE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+ elif provider == ModelProvider.CLAUDE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+
+ attachments_per_model.append(attachments or None)
+ tools_per_model.append(tools or None)
+
+ if member.incoming_contexts:
+ raw = []
+ for ctx in member.incoming_contexts:
+ raw.extend(ctx.messages)
+ if request.merge_strategy == MergeStrategy.SMART:
+ merged = smart_merge_messages(raw)
+ else:
+ merged = raw
+ contexts_per_model.append(Context(messages=merged))
+ else:
+ contexts_per_model.append(None)
+
+ chairman = request.chairman_model
+ chairman_provider = resolve_provider(chairman.model_name)
+ chairman_api_key = get_user_api_key(resolved, chairman_provider.value)
+ chairman_config = LLMConfig(
+ provider=chairman_provider,
+ model_name=chairman.model_name,
+ temperature=chairman.temperature if chairman.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=chairman_api_key,
+ reasoning_effort=chairman.reasoning_effort if chairman.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=chairman.enable_google_search if chairman.enable_google_search is not None else request.enable_google_search,
+ )
+
+ return dict(
+ user_prompt=request.user_prompt,
+ context=execution_context,
+ member_configs=member_configs,
+ chairman_config=chairman_config,
+ attachments_per_model=attachments_per_model,
+ tools_per_model=tools_per_model,
+ openrouter_api_key=openrouter_key,
+ images=images,
+ contexts_per_model=contexts_per_model,
+ )
+
+
+async def _prepare_debate_args(request: DebateRunRequest, resolved: User | None, username: str) -> dict:
+ """Extract shared debate request processing into a reusable helper.
+ Returns kwargs suitable for debate_event_stream()."""
+ raw_messages = []
+ for ctx in request.incoming_contexts:
+ raw_messages.extend(ctx.messages)
+ if request.merge_strategy == MergeStrategy.SMART:
+ final_messages = smart_merge_messages(raw_messages)
+ else:
+ final_messages = raw_messages
+ execution_context = Context(messages=final_messages)
+
+ images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids)
+ openrouter_key = get_user_api_key(resolved, "openrouter")
+
+ member_configs: list[LLMConfig] = []
+ attachments_per_model: list[list[dict] | None] = []
+ tools_per_model: list[list[dict] | None] = []
+
+ for member in request.debate_models:
+ provider = resolve_provider(member.model_name)
+ provider_str = provider.value
+ api_key = get_user_api_key(resolved, provider_str)
+ config = LLMConfig(
+ provider=provider,
+ model_name=member.model_name,
+ temperature=member.temperature if member.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=api_key,
+ reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search,
+ )
+ member_configs.append(config)
+
+ tools: list[dict] = []
+ attachments: list[dict] = []
+ scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids)
+
+ if provider == ModelProvider.OPENAI:
+ vs_ids, debug_refs, filters = await prepare_openai_vector_search(
+ user=username, attached_ids=non_image_file_ids,
+ scopes=request.scopes, llm_config=config,
+ )
+ if not vs_ids:
+ try:
+ client = get_openai_client(config.api_key)
+ vs_id = await ensure_user_vector_store(username, client)
+ if vs_id:
+ vs_ids = [vs_id]
+ except Exception:
+ pass
+ if vs_ids:
+ tool_def = {"type": "file_search", "vector_store_ids": vs_ids}
+ if filters:
+ tool_def["filters"] = filters
+ tools.append(tool_def)
+ elif provider == ModelProvider.GOOGLE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+ elif provider == ModelProvider.CLAUDE:
+ attachments = await prepare_attachments(
+ user=username, target_provider=provider,
+ attached_ids=scoped_file_ids, llm_config=config,
+ )
+
+ attachments_per_model.append(attachments or None)
+ tools_per_model.append(tools or None)
+
+ judge_config = None
+ if request.judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and request.judge_model:
+ judge = request.judge_model
+ judge_provider = resolve_provider(judge.model_name)
+ judge_api_key = get_user_api_key(resolved, judge_provider.value)
+ judge_config = LLMConfig(
+ provider=judge_provider,
+ model_name=judge.model_name,
+ temperature=judge.temperature if judge.temperature is not None else request.temperature,
+ system_prompt=request.system_prompt,
+ api_key=judge_api_key,
+ reasoning_effort=judge.reasoning_effort if judge.reasoning_effort is not None else request.reasoning_effort,
+ enable_google_search=judge.enable_google_search if judge.enable_google_search is not None else request.enable_google_search,
+ )
+
+ return dict(
+ user_prompt=request.user_prompt,
+ context=execution_context,
+ member_configs=member_configs,
+ judge_config=judge_config,
+ judge_mode=request.judge_mode,
+ debate_format=request.debate_format,
+ max_rounds=request.max_rounds,
+ custom_format_prompt=request.custom_format_prompt,
+ attachments_per_model=attachments_per_model,
+ tools_per_model=tools_per_model,
+ openrouter_api_key=openrouter_key,
+ images=images,
+ )
+
+
+# --------------- Background Task Endpoints ---------------
+
+@app.post("/api/task/start_council")
+async def start_council_task(
+ request: CouncilRunRequest,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """Start council as a background task. Returns {task_id}."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ # Cancel existing task on same node if still running
+ for existing in list(_task_registry.values()):
+ if existing.node_id == request.node_id and existing.user == username and existing.status == TaskStatus.RUNNING:
+ if existing.asyncio_task and not existing.asyncio_task.done():
+ existing.asyncio_task.cancel()
+ logger.info("Cancelled previous task %s for node %s", existing.task_id, request.node_id)
+
+ kwargs = await _prepare_council_args(request, resolved, username)
+ generator = council_event_stream(**kwargs)
+
+ task_id = str(uuid4())
+ info = register_task(task_id, username, request.node_id, "council")
+ info.asyncio_task = asyncio.create_task(run_task_in_background(info, generator))
+ return {"task_id": task_id}
+
+
+@app.post("/api/task/start_debate")
+async def start_debate_task(
+ request: DebateRunRequest,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """Start debate as a background task. Returns {task_id}."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ # Cancel existing task on same node if still running
+ for existing in list(_task_registry.values()):
+ if existing.node_id == request.node_id and existing.user == username and existing.status == TaskStatus.RUNNING:
+ if existing.asyncio_task and not existing.asyncio_task.done():
+ existing.asyncio_task.cancel()
+ logger.info("Cancelled previous task %s for node %s", existing.task_id, request.node_id)
+
+ kwargs = await _prepare_debate_args(request, resolved, username)
+ generator = debate_event_stream(**kwargs)
+
+ task_id = str(uuid4())
+ info = register_task(task_id, username, request.node_id, "debate")
+ info.asyncio_task = asyncio.create_task(run_task_in_background(info, generator))
+ return {"task_id": task_id}
+
+
+@app.get("/api/task/{task_id}/stream")
+async def stream_task_events(
+ task_id: str,
+ from_event: int = 0,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """SSE stream: replay missed events then stream live."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ info = get_task(task_id)
+ if not info:
+ raise HTTPException(status_code=404, detail="Task not found")
+ if info.user != username:
+ raise HTTPException(status_code=403, detail="Not your task")
+
+ async def event_generator():
+ cursor = from_event
+ # 1. Replay accumulated events from cursor
+ while cursor < len(info.events):
+ evt = info.events[cursor]
+ yield f"data: {json.dumps(evt)}\n\n"
+ cursor += 1
+
+ # 2. If already done, send terminal and return
+ if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED):
+ yield f"data: {json.dumps({'type': 'task_status', 'data': {'status': info.status.value}})}\n\n"
+ return
+
+ # 3. Stream live events
+ while True:
+ # Wait for new events with timeout (keepalive)
+ try:
+ await asyncio.wait_for(
+ asyncio.shield(info.new_event_signal.wait()),
+ timeout=15.0,
+ )
+ except asyncio.TimeoutError:
+ # Send keepalive comment
+ yield ": keepalive\n\n"
+ # Check if task finished while waiting
+ if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED):
+ break
+ continue
+
+ # Yield any new events since cursor
+ while cursor < len(info.events):
+ evt = info.events[cursor]
+ yield f"data: {json.dumps(evt)}\n\n"
+ cursor += 1
+
+ if info.status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.INTERRUPTED):
+ break
+
+ yield f"data: {json.dumps({'type': 'task_status', 'data': {'status': info.status.value}})}\n\n"
+
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
+
+
+@app.get("/api/task/{task_id}")
+async def poll_task(
+ task_id: str,
+ user: str = DEFAULT_USER,
+ current_user: User | None = Depends(get_current_user_optional),
+):
+ """Poll: return status + accumulated results."""
+ resolved = resolve_user(current_user, user)
+ username = resolved.username if resolved else DEFAULT_USER
+
+ status_data = get_task_status(username, task_id)
+ if not status_data:
+ raise HTTPException(status_code=404, detail="Task not found")
+ if status_data.get("user") != username:
+ raise HTTPException(status_code=403, detail="Not your task")
+ return status_data
+
+
class TitleRequest(BaseModel):
user_prompt: str
response: str
diff --git a/backend/app/services/tasks.py b/backend/app/services/tasks.py
new file mode 100644
index 0000000..003c02d
--- /dev/null
+++ b/backend/app/services/tasks.py
@@ -0,0 +1,304 @@
+"""
+Background task management for debate & council operations.
+
+Decouples execution from SSE delivery so tasks survive browser disconnects.
+Results are persisted progressively to disk.
+"""
+
+import asyncio
+import json
+import logging
+import os
+import time
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Any, AsyncGenerator, Dict, List, Optional
+
+logger = logging.getLogger("contextflow.tasks")
+
+DATA_ROOT = os.path.abspath(os.getenv("DATA_ROOT", os.path.join(os.getcwd(), "data")))
+
+
+class TaskStatus(str, Enum):
+ PENDING = "pending"
+ RUNNING = "running"
+ COMPLETED = "completed"
+ FAILED = "failed"
+ INTERRUPTED = "interrupted"
+
+
+# Chunk event types that are NOT persisted to disk (redundant with *_complete events)
+_CHUNK_EVENT_TYPES = {"final_chunk", "stage3_chunk"}
+
+
+@dataclass
+class TaskInfo:
+ task_id: str
+ user: str
+ node_id: str
+ task_type: str # "debate" | "council"
+ status: TaskStatus
+ created_at: float
+ updated_at: float
+ events: List[Dict] = field(default_factory=list)
+ result: Optional[Dict] = None
+ error: Optional[str] = None
+ # In-memory only (not serialized):
+ new_event_signal: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
+ asyncio_task: Optional[asyncio.Task] = field(default=None, repr=False)
+
+
+# --------------- In-memory registry ---------------
+_task_registry: Dict[str, TaskInfo] = {}
+
+
+def register_task(task_id: str, user: str, node_id: str, task_type: str) -> TaskInfo:
+ """Create and register a new task in memory + persist initial state to disk."""
+ now = time.time()
+ info = TaskInfo(
+ task_id=task_id,
+ user=user,
+ node_id=node_id,
+ task_type=task_type,
+ status=TaskStatus.PENDING,
+ created_at=now,
+ updated_at=now,
+ )
+ _task_registry[task_id] = info
+ _persist_task(info)
+ return info
+
+
+def get_task(task_id: str) -> Optional[TaskInfo]:
+ """Get task from in-memory registry."""
+ return _task_registry.get(task_id)
+
+
+def get_task_status(user: str, task_id: str) -> Optional[Dict]:
+ """
+ Get task status dict. In-memory first, fallback to disk.
+ If disk says "running" but not in memory → report "interrupted".
+ """
+ info = _task_registry.get(task_id)
+ if info:
+ return _task_to_dict(info)
+
+ # Fallback: read from disk
+ disk_data = _load_task_from_disk(user, task_id)
+ if not disk_data:
+ return None
+
+ # If disk says running but it's not in memory, backend must have restarted
+ if disk_data.get("status") == TaskStatus.RUNNING:
+ disk_data["status"] = TaskStatus.INTERRUPTED
+ return disk_data
+
+
+async def run_task_in_background(info: TaskInfo, event_generator: AsyncGenerator[str, None]) -> None:
+ """
+ Consume the async generator, appending events to info.events.
+ Signals new_event_signal for SSE consumers.
+ Persists at milestones, skips chunk events from persistence.
+ """
+ info.status = TaskStatus.RUNNING
+ info.updated_at = time.time()
+ _persist_task(info)
+
+ try:
+ async for raw_sse in event_generator:
+ # raw_sse is a string like 'data: {"type": "...", ...}\n\n'
+ evt = _parse_sse_event(raw_sse)
+ if evt is None:
+ continue
+
+ evt_type = evt.get("type", "")
+
+ # Always store in memory (even chunks, for live streaming)
+ info.events.append(evt)
+ info.updated_at = time.time()
+
+ # Signal waiting SSE consumers
+ info.new_event_signal.set()
+ info.new_event_signal.clear()
+
+ # Persist at milestones (skip chunk events)
+ if evt_type not in _CHUNK_EVENT_TYPES:
+ _persist_task(info)
+
+ # Detect completion / error
+ if evt_type in ("complete", "debate_complete"):
+ info.result = _extract_result(info.task_type, info.events)
+ info.status = TaskStatus.COMPLETED
+ info.updated_at = time.time()
+ _persist_task(info)
+ return
+ if evt_type == "error":
+ info.error = evt.get("data", {}).get("message", "Unknown error") if isinstance(evt.get("data"), dict) else str(evt.get("data", "Unknown error"))
+ info.status = TaskStatus.FAILED
+ info.updated_at = time.time()
+ _persist_task(info)
+ return
+
+ # Generator exhausted without explicit complete/error
+ if info.status == TaskStatus.RUNNING:
+ info.result = _extract_result(info.task_type, info.events)
+ info.status = TaskStatus.COMPLETED
+ info.updated_at = time.time()
+ _persist_task(info)
+
+ except asyncio.CancelledError:
+ info.status = TaskStatus.INTERRUPTED
+ info.updated_at = time.time()
+ _persist_task(info)
+ raise
+ except Exception as e:
+ logger.exception("Background task %s failed: %s", info.task_id, e)
+ info.error = str(e)
+ info.status = TaskStatus.FAILED
+ info.updated_at = time.time()
+ _persist_task(info)
+
+
+def cleanup_stale_tasks(data_root: Optional[str] = None) -> int:
+ """
+ On startup, scan task files on disk and mark any "running" as "interrupted".
+ Returns count of tasks marked interrupted.
+ """
+ root = data_root or DATA_ROOT
+ count = 0
+ if not os.path.exists(root):
+ return count
+
+ for user_dir in os.listdir(root):
+ tasks_dir = os.path.join(root, user_dir, "tasks")
+ if not os.path.isdir(tasks_dir):
+ continue
+ for fname in os.listdir(tasks_dir):
+ if not fname.endswith(".json"):
+ continue
+ fpath = os.path.join(tasks_dir, fname)
+ try:
+ with open(fpath, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ if data.get("status") == TaskStatus.RUNNING:
+ data["status"] = TaskStatus.INTERRUPTED
+ data["updated_at"] = time.time()
+ with open(fpath, "w", encoding="utf-8") as f:
+ json.dump(data, f, ensure_ascii=False)
+ count += 1
+ logger.info("Marked stale task as interrupted: %s", fname)
+ except Exception as e:
+ logger.warning("Failed to process stale task file %s: %s", fpath, e)
+
+ return count
+
+
+# --------------- Persistence helpers ---------------
+
+def _task_dir(user: str) -> str:
+ d = os.path.join(DATA_ROOT, user, "tasks")
+ os.makedirs(d, exist_ok=True)
+ return d
+
+
+def _task_file_path(user: str, task_id: str) -> str:
+ return os.path.join(_task_dir(user), f"{task_id}.json")
+
+
+def _persist_task(info: TaskInfo) -> None:
+ """Write task state to disk. Excludes chunk events for smaller files."""
+ try:
+ data = _task_to_dict(info, exclude_chunks=True)
+ path = _task_file_path(info.user, info.task_id)
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(data, f, ensure_ascii=False)
+ except Exception as e:
+ logger.warning("Failed to persist task %s: %s", info.task_id, e)
+
+
+def _load_task_from_disk(user: str, task_id: str) -> Optional[Dict]:
+ path = _task_file_path(user, task_id)
+ if not os.path.exists(path):
+ return None
+ try:
+ with open(path, "r", encoding="utf-8") as f:
+ return json.load(f)
+ except Exception as e:
+ logger.warning("Failed to load task %s from disk: %s", task_id, e)
+ return None
+
+
+def _task_to_dict(info: TaskInfo, exclude_chunks: bool = False) -> Dict:
+ events = info.events
+ if exclude_chunks:
+ events = [e for e in events if e.get("type") not in _CHUNK_EVENT_TYPES]
+ return {
+ "task_id": info.task_id,
+ "user": info.user,
+ "node_id": info.node_id,
+ "task_type": info.task_type,
+ "status": info.status.value if isinstance(info.status, TaskStatus) else info.status,
+ "created_at": info.created_at,
+ "updated_at": info.updated_at,
+ "events": events,
+ "result": info.result,
+ "error": info.error,
+ }
+
+
+def _parse_sse_event(raw: str) -> Optional[Dict]:
+ """Parse a raw SSE string like 'data: {...}\\n\\n' into a dict."""
+ raw = raw.strip()
+ if not raw:
+ return None
+ for line in raw.split("\n"):
+ line = line.strip()
+ if line.startswith("data: "):
+ try:
+ return json.loads(line[6:])
+ except json.JSONDecodeError:
+ return None
+ return None
+
+
+def _extract_result(task_type: str, events: List[Dict]) -> Dict:
+ """Reconstruct final result data from accumulated events."""
+ if task_type == "council":
+ stage1 = None
+ stage2 = None
+ stage3 = None
+ for evt in events:
+ t = evt.get("type", "")
+ if t == "stage1_complete":
+ stage1 = evt.get("data")
+ elif t == "stage1_model_complete":
+ if stage1 is None:
+ stage1 = []
+ stage1.append(evt.get("data"))
+ elif t == "stage2_complete":
+ stage2 = evt.get("data")
+ elif t == "stage3_complete":
+ stage3 = evt.get("data")
+ return {"councilData": {"stage1": stage1, "stage2": stage2, "stage3": stage3}}
+
+ elif task_type == "debate":
+ rounds: List[Dict] = []
+ final_verdict = None
+ for evt in events:
+ t = evt.get("type", "")
+ if t == "round_complete":
+ rounds.append(evt.get("data", {}))
+ elif t == "final_complete":
+ final_verdict = evt.get("data")
+ elif t == "judge_decision":
+ # Attach to last round
+ if rounds:
+ rounds[-1]["judgeDecision"] = evt.get("data")
+ elif t == "model_eliminated":
+ if rounds:
+ if "eliminated" not in rounds[-1]:
+ rounds[-1]["eliminated"] = []
+ rounds[-1]["eliminated"].append(evt.get("data"))
+ return {"debateRounds": rounds, "finalVerdict": final_verdict}
+
+ return {}