From bdf381a2c8a0337f7459000f487a80f9cbbbdd2f Mon Sep 17 00:00:00 2001 From: YurenHao0426 Date: Sat, 14 Feb 2026 03:40:31 +0000 Subject: Add background task persistence for debate & council operations Decouple debate/council execution from SSE connection lifecycle so tasks survive browser disconnects. Backend runs work as asyncio.Tasks with progressive disk persistence; frontend can reconnect and recover state. - New backend/app/services/tasks.py: task registry, broadcast pattern, disk persistence at milestones, stale task cleanup on startup - New endpoints: POST start_debate/start_council, GET task stream/poll - Frontend stores taskId on nodes, recovers running tasks on page load - _applyPartialEvents rebuilds stage text + data from accumulated events Co-Authored-By: Claude Opus 4.6 --- frontend/src/store/flowStore.ts | 487 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 487 insertions(+) (limited to 'frontend/src/store') diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts index 5bb3e22..ec96185 100644 --- a/frontend/src/store/flowStore.ts +++ b/frontend/src/store/flowStore.ts @@ -137,6 +137,10 @@ export interface NodeData { debateMaxRounds?: number; debateData?: DebateData; + // Background task persistence + taskId?: string; // Background task ID for recovery + taskStage?: string; // Live stage text (e.g. "Round 2: Collecting responses...") + // Traces logic traces: Trace[]; // INCOMING Traces outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks + merged) @@ -293,6 +297,9 @@ interface FlowState { ) => Message[]; propagateTraces: () => void; + + // Background task recovery + recoverBackgroundTasks: () => Promise; } // Hash string to color @@ -341,6 +348,401 @@ const jsonFetch = async (url: string, options?: RequestInit): Promise => { return res.json() as Promise; }; +// --------------- Background Task Helpers --------------- + +/** Apply partial state from accumulated events (for recovery of in-progress tasks) */ +function _applyPartialEvents( + updateNodeData: (nodeId: string, data: Partial) => void, + nodeId: string, + taskType: string, + events: any[], + currentData: NodeData, +) { + // Compute taskStage from the last relevant event + let taskStage = 'Recovering...'; + + if (taskType === 'council') { + let stage1: any[] | null = null; + let stage2: any = null; + let stage3Full = ''; + for (const evt of events) { + if (evt.type === 'stage1_start') { + taskStage = 'Stage 1: Collecting responses...'; + } else if (evt.type === 'stage1_model_complete') { + if (!stage1) stage1 = []; + stage1.push(evt.data); + taskStage = `Stage 1: ${stage1.length} models done`; + } else if (evt.type === 'stage1_complete') { + stage1 = evt.data; + taskStage = 'Stage 1 complete'; + } else if (evt.type === 'stage2_start') { + taskStage = 'Stage 2: Peer ranking...'; + } else if (evt.type === 'stage2_complete') { + stage2 = evt.data; + taskStage = 'Stage 2 complete'; + } else if (evt.type === 'stage3_start') { + taskStage = 'Stage 3: Chairman synthesizing...'; + } else if (evt.type === 'stage3_complete') { + stage3Full = evt.data.response; + taskStage = 'Stage 3: Chairman synthesizing...'; + } + } + updateNodeData(nodeId, { + councilData: { stage1, stage2, stage3: stage3Full ? { model: '', response: stage3Full } : null }, + taskStage, + }); + } else if (taskType === 'debate') { + const rounds: DebateRound[] = []; + let finalFull = ''; + let finalModel = ''; + let currentRound = 0; + for (const evt of events) { + if (evt.type === 'debate_start') { + taskStage = `Debate started (${evt.data?.models?.length || '?'} models)`; + } else if (evt.type === 'round_start') { + currentRound = evt.data.round; + taskStage = `Round ${currentRound}: Collecting responses...`; + } else if (evt.type === 'round_model_complete') { + taskStage = `Round ${currentRound}: ${evt.data.model} done`; + } else if (evt.type === 'round_complete') { + rounds.push({ round: evt.data.round, responses: evt.data.responses }); + taskStage = `Round ${evt.data.round} complete`; + } else if (evt.type === 'judge_decision' && rounds.length > 0) { + rounds[rounds.length - 1].judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning }; + } else if (evt.type === 'model_eliminated' && rounds.length > 0) { + if (!rounds[rounds.length - 1].eliminated) rounds[rounds.length - 1].eliminated = []; + rounds[rounds.length - 1].eliminated!.push({ + model: evt.data.model, convincedBy: evt.data.convinced_by, reasoning: evt.data.reasoning, + }); + } else if (evt.type === 'convergence_status') { + const remaining = evt.data?.remaining as string[] || []; + taskStage = remaining.length <= 1 + ? `${remaining[0] || 'Winner'} is the last one standing!` + : `${remaining.length} models remaining...`; + } else if (evt.type === 'final_start') { + finalModel = evt.data.model; + taskStage = 'Judge synthesizing final verdict...'; + } else if (evt.type === 'final_complete') { + finalModel = evt.data.model; + finalFull = evt.data.response; + } + } + const config = currentData.debateData?.config || { + judgeMode: currentData.debateJudgeMode || 'external_judge', + format: currentData.debateFormat || 'free_discussion', + maxRounds: currentData.debateMaxRounds || 5, + }; + updateNodeData(nodeId, { + debateData: { rounds, finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, config: config as any }, + taskStage, + }); + } +} + +/** + * Connect to the task SSE stream and process events, updating node data. + * Used by both initial task start (Sidebar) and recovery (recoverBackgroundTasks). + * + * Initializes accumulators from existing node data so that recovery after reconnect + * doesn't lose previously accumulated rounds/stages. + */ +async function _consumeTaskStream( + taskId: string, + nodeId: string, + taskType: string, + fromEvent: number, + updateNodeData: (nodeId: string, data: Partial) => void, + username: string, + callbacks?: { + onStage?: (stage: string) => void; + onStreamBuffer?: (text: string) => void; + onComplete?: () => void; + onError?: () => void; + }, +): Promise { + const authHeaders = getAuthHeaders(); + + // Read current node data to initialize accumulators (critical for recovery) + const nodeData = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data; + + // Helper: read debate config from node data (never hardcode) + const getDebateConfig = () => { + const nd = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data; + return nd?.debateData?.config || { + judgeMode: nd?.debateJudgeMode || 'external_judge', + format: nd?.debateFormat || 'free_discussion', + maxRounds: nd?.debateMaxRounds || 5, + }; + }; + + // Helper: trigger auto-save after milestone events + const triggerSave = () => { + try { useFlowStore.getState().triggerAutoSave(); } catch { /* ignore */ } + }; + + // Helper: update stage text on both node data and optional callback + const setStage = (text: string) => { + updateNodeData(nodeId, { taskStage: text }); + callbacks?.onStage?.(text); + }; + + try { + const response = await fetch( + `${API_BASE}/api/task/${encodeURIComponent(taskId)}/stream?from_event=${fromEvent}&user=${encodeURIComponent(username)}`, + { headers: authHeaders }, + ); + if (!response.ok || !response.body) { + callbacks?.onError?.(); + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + return; + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let sseBuffer = ''; + + // Initialize accumulators from existing node data (preserves state across reconnect) + let stage1Results: Array<{ model: string; response: string }> = + (nodeData?.councilData?.stage1 as any[]) || []; + let stage2Data: any = nodeData?.councilData?.stage2 || null; + let stage3Full = nodeData?.councilData?.stage3?.response || ''; + let stage3Model = nodeData?.councilData?.stage3?.model || ''; + + const debateRounds: DebateRound[] = [...(nodeData?.debateData?.rounds || [])]; + let currentRound = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].round : 0; + let finalModel = nodeData?.debateData?.finalVerdict?.model || ''; + let finalFull = nodeData?.debateData?.finalVerdict?.response || ''; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + sseBuffer += decoder.decode(value, { stream: true }); + + const parts = sseBuffer.split('\n\n'); + sseBuffer = parts.pop() || ''; + + for (const part of parts) { + const line = part.trim(); + if (!line.startsWith('data: ')) continue; + let evt: any; + try { evt = JSON.parse(line.slice(6)); } catch { continue; } + + // Handle task_status terminal event + if (evt.type === 'task_status') { + if (evt.data?.status === 'completed') { + callbacks?.onComplete?.(); + } else { + callbacks?.onError?.(); + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + } + continue; + } + + if (taskType === 'council') { + switch (evt.type) { + case 'stage1_start': + setStage('Stage 1: Collecting responses...'); + break; + case 'stage1_model_complete': + stage1Results = [...stage1Results, evt.data]; + setStage(`Stage 1: ${stage1Results.length} models done`); + updateNodeData(nodeId, { + councilData: { stage1: [...stage1Results], stage2: null, stage3: null }, + }); + triggerSave(); + break; + case 'stage1_complete': + stage1Results = evt.data; + updateNodeData(nodeId, { + councilData: { stage1: stage1Results, stage2: null, stage3: null }, + }); + triggerSave(); + break; + case 'stage2_start': + setStage('Stage 2: Peer ranking...'); + break; + case 'stage2_complete': + stage2Data = evt.data; + updateNodeData(nodeId, { + councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null }, + }); + triggerSave(); + break; + case 'stage3_start': + setStage('Stage 3: Chairman synthesizing...'); + callbacks?.onStreamBuffer?.(''); + break; + case 'stage3_chunk': + stage3Full += evt.data.chunk; + callbacks?.onStreamBuffer?.(stage3Full); + break; + case 'stage3_complete': + stage3Model = evt.data.model; + stage3Full = evt.data.response; + triggerSave(); + break; + case 'complete': { + const councilData: CouncilData = { + stage1: stage1Results, + stage2: stage2Data, + stage3: { model: stage3Model, response: stage3Full }, + }; + updateNodeData(nodeId, { + status: 'success', + response: stage3Full, + responseReceivedAt: Date.now(), + councilData, + taskId: undefined, + }); + setStage(''); + callbacks?.onComplete?.(); + triggerSave(); + break; + } + case 'error': + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + setStage(''); + callbacks?.onError?.(); + break; + } + } else if (taskType === 'debate') { + switch (evt.type) { + case 'debate_start': + setStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`); + break; + case 'round_start': + currentRound = evt.data.round; + setStage(`Round ${currentRound}: Collecting responses...`); + break; + case 'round_model_complete': + setStage(`Round ${currentRound}: ${evt.data.model} done`); + break; + case 'round_complete': { + const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses }; + debateRounds.push(roundData); + updateNodeData(nodeId, { + debateData: { + rounds: [...debateRounds], + finalVerdict: null, + config: getDebateConfig() as any, + }, + }); + triggerSave(); + break; + } + case 'judge_decision': { + const lastRound = debateRounds[debateRounds.length - 1]; + if (lastRound) { + lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning }; + updateNodeData(nodeId, { + debateData: { + rounds: [...debateRounds], + finalVerdict: null, + config: getDebateConfig() as any, + }, + }); + triggerSave(); + } + break; + } + case 'model_eliminated': { + const lastRound2 = debateRounds[debateRounds.length - 1]; + if (lastRound2) { + if (!lastRound2.eliminated) lastRound2.eliminated = []; + lastRound2.eliminated.push({ + model: evt.data.model, + convincedBy: evt.data.convinced_by, + reasoning: evt.data.reasoning, + }); + } + break; + } + case 'convergence_status': { + updateNodeData(nodeId, { + debateData: { + rounds: [...debateRounds], + finalVerdict: null, + config: getDebateConfig() as any, + }, + }); + const remaining = evt.data.remaining as string[]; + if (remaining.length <= 1) { + setStage(`${remaining[0] || 'Winner'} is the last one standing!`); + } else { + setStage(`${remaining.length} models remaining...`); + } + break; + } + case 'final_start': + finalModel = evt.data.model; + setStage('Judge synthesizing final verdict...'); + callbacks?.onStreamBuffer?.(''); + break; + case 'final_chunk': + finalFull += evt.data.chunk; + callbacks?.onStreamBuffer?.(finalFull); + break; + case 'final_complete': { + finalModel = evt.data.model; + finalFull = evt.data.response; + updateNodeData(nodeId, { + status: 'success', + response: finalFull, + responseReceivedAt: Date.now(), + debateData: { + rounds: debateRounds, + finalVerdict: { model: finalModel, response: finalFull }, + config: getDebateConfig() as any, + }, + taskId: undefined, + }); + setStage(''); + callbacks?.onComplete?.(); + triggerSave(); + break; + } + case 'debate_complete': { + const store = useFlowStore.getState(); + const currentNode = store.nodes.find(n => n.id === nodeId); + if (currentNode?.data.status === 'loading') { + const lastResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : []; + const bestResponse = lastResp.length > 0 + ? lastResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response + : ''; + const displayResponse = finalFull || bestResponse; + updateNodeData(nodeId, { + status: 'success', + response: displayResponse, + responseReceivedAt: Date.now(), + debateData: { + rounds: debateRounds, + finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, + config: getDebateConfig() as any, + }, + taskId: undefined, + }); + setStage(''); + callbacks?.onComplete?.(); + triggerSave(); + } + break; + } + case 'error': + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + setStage(''); + callbacks?.onError?.(); + break; + } + } + } + } + } catch (error) { + console.error(`Task stream error for ${taskId}:`, error); + updateNodeData(nodeId, { status: 'error', taskId: undefined }); + callbacks?.onError?.(); + } +} + const useFlowStore = create((set, get) => { const validateBlueprint = (doc: any): BlueprintDocument => { @@ -1615,6 +2017,8 @@ const useFlowStore = create((set, get) => { traces: undefined, outgoingTraces: undefined, messages: undefined, + // Drop transient runtime state + taskStage: undefined, // Keep merged/forked trace definitions but strip their computed messages mergedTraces: (n.data.mergedTraces || []).map((m: any) => ({ id: m.id, @@ -1651,6 +2055,13 @@ const useFlowStore = create((set, get) => { if (n.data?.chairmanModel && typeof n.data.chairmanModel === 'string') { n.data.chairmanModel = { model: n.data.chairmanModel }; } + // Reset stale "loading" status — unless there's a background task to recover + if (n.data?.status === 'loading') { + if (!n.data.taskId) { + n.data.status = 'idle'; // Legacy/no task — reset + } + // If taskId exists, keep loading — recoverBackgroundTasks() will handle it + } return n; }); set({ @@ -2273,6 +2684,81 @@ const useFlowStore = create((set, get) => { setTimeout(() => get().propagateTraces(), 50); }, + recoverBackgroundTasks: async () => { + const nodes = get().nodes; + const user = getCurrentUser(); + + for (const node of nodes) { + if (!node.data.taskId) continue; + + try { + const res = await fetch( + `${API_BASE}/api/task/${encodeURIComponent(node.data.taskId)}?user=${encodeURIComponent(user)}`, + { headers: getAuthHeaders() } + ); + if (!res.ok) { + // Task not found — reset node + get().updateNodeData(node.id, { status: 'idle', taskId: undefined }); + continue; + } + const taskData = await res.json(); + const status = taskData.status; + + if (status === 'completed') { + // Apply completed results + const result = taskData.result || {}; + if (taskData.task_type === 'council') { + const cd = result.councilData; + const response = cd?.stage3?.response || ''; + get().updateNodeData(node.id, { + status: 'success', + response, + councilData: cd, + taskId: undefined, + }); + } else if (taskData.task_type === 'debate') { + const rounds = result.debateRounds || []; + const finalVerdict = result.finalVerdict; + const response = finalVerdict?.response || ''; + get().updateNodeData(node.id, { + status: 'success', + response, + debateData: { + rounds, + finalVerdict, + config: node.data.debateData?.config || { + judgeMode: node.data.debateJudgeMode || 'external_judge', + format: node.data.debateFormat || 'free_discussion', + maxRounds: node.data.debateMaxRounds || 5, + }, + }, + taskId: undefined, + }); + } + } else if (status === 'running') { + // Reconnect to live stream from where we left off + const fromEvent = (taskData.events || []).length; + // Apply any partial state from accumulated events first + _applyPartialEvents(get().updateNodeData, node.id, taskData.task_type, taskData.events || [], node.data); + // Then connect to live stream + _consumeTaskStream( + node.data.taskId!, node.id, taskData.task_type, fromEvent, + get().updateNodeData, user + ); + } else { + // failed / interrupted / unknown — reset + get().updateNodeData(node.id, { + status: status === 'interrupted' ? 'error' : 'error', + taskId: undefined, + }); + } + } catch (err) { + console.error(`Failed to recover task for node ${node.id}:`, err); + get().updateNodeData(node.id, { status: 'idle', taskId: undefined }); + } + } + }, + propagateTraces: () => { const { nodes, edges } = get(); @@ -2740,4 +3226,5 @@ useFlowStore.subscribe((state, prev) => { if (state.theme !== prev.theme) syncThemeClass(state.theme); }); +export { _consumeTaskStream, _applyPartialEvents }; export default useFlowStore; -- cgit v1.2.3