summaryrefslogtreecommitdiff
path: root/frontend/src
diff options
context:
space:
mode:
Diffstat (limited to 'frontend/src')
-rw-r--r--frontend/src/components/LeftSidebar.tsx3
-rw-r--r--frontend/src/components/Sidebar.tsx324
-rw-r--r--frontend/src/store/flowStore.ts487
3 files changed, 550 insertions, 264 deletions
diff --git a/frontend/src/components/LeftSidebar.tsx b/frontend/src/components/LeftSidebar.tsx
index 54c2527..441b7e0 100644
--- a/frontend/src/components/LeftSidebar.tsx
+++ b/frontend/src/components/LeftSidebar.tsx
@@ -30,6 +30,7 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => {
deleteFile,
readBlueprintFile,
loadBlueprint,
+ recoverBackgroundTasks,
saveBlueprintFile,
saveCurrentBlueprint,
createProjectFolder,
@@ -266,6 +267,8 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => {
if (vp) {
setViewport(vp);
}
+ // Recover any background tasks that were running before page refresh
+ recoverBackgroundTasks();
} catch (e) {
console.error(e);
alert('Not a valid blueprint JSON.');
diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx
index 8bb5fcb..474a969 100644
--- a/frontend/src/components/Sidebar.tsx
+++ b/frontend/src/components/Sidebar.tsx
@@ -387,6 +387,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
})
});
+ if (!response.ok) throw new Error(await response.text() || `HTTP ${response.status}`);
if (!response.body) return;
const reader = response.body.getReader();
const decoder = new TextDecoder();
@@ -511,7 +512,8 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
};
try {
- const response = await fetch(`/api/run_council_stream?user=${encodeURIComponent(user?.username || 'test')}`, {
+ // Step 1: Start background task
+ const startRes = await fetch(`/api/task/start_council?user=${encodeURIComponent(user?.username || 'test')}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...getAuthHeader() },
body: JSON.stringify({
@@ -546,101 +548,35 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}),
});
- if (!response.body) return;
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let sseBuffer = '';
- let stage1Results: Array<{ model: string; response: string }> = [];
- let stage2Data: any = null;
- let stage3Full = '';
- let stage3Model = '';
+ if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`);
+ const { task_id } = await startRes.json();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- sseBuffer += decoder.decode(value, { stream: true });
-
- // Parse SSE events (data: {...}\n\n)
- const parts = sseBuffer.split('\n\n');
- sseBuffer = parts.pop() || '';
-
- for (const part of parts) {
- const line = part.trim();
- if (!line.startsWith('data: ')) continue;
- let evt: any;
- try {
- evt = JSON.parse(line.slice(6));
- } catch { continue; }
-
- switch (evt.type) {
- case 'stage1_start':
- setCouncilStage('Stage 1: Collecting responses...');
- break;
- case 'stage1_model_complete':
- stage1Results = [...stage1Results, evt.data];
- setCouncilStage(`Stage 1: ${stage1Results.length}/${councilModels.length} models done`);
- updateNodeData(runningNodeId, {
- councilData: { stage1: [...stage1Results], stage2: null, stage3: null },
- });
- break;
- case 'stage1_complete':
- stage1Results = evt.data;
- updateNodeData(runningNodeId, {
- councilData: { stage1: stage1Results, stage2: null, stage3: null },
- });
- break;
- case 'stage2_start':
- setCouncilStage('Stage 2: Peer ranking...');
- break;
- case 'stage2_complete':
- stage2Data = evt.data;
- updateNodeData(runningNodeId, {
- councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null },
- });
- break;
- case 'stage3_start':
- setCouncilStage('Stage 3: Chairman synthesizing...');
- setCouncilStreamBuffer('');
- break;
- case 'stage3_chunk':
- stage3Full += evt.data.chunk;
- setCouncilStreamBuffer(stage3Full);
- setStreamBuffer(stage3Full);
- break;
- case 'stage3_complete':
- stage3Model = evt.data.model;
- stage3Full = evt.data.response;
- break;
- case 'complete': {
- const responseReceivedAt = Date.now();
- const councilData: CouncilData = {
- stage1: stage1Results,
- stage2: stage2Data,
- stage3: { model: stage3Model, response: stage3Full },
- };
- const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt };
- const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: stage3Full };
- updateNodeData(runningNodeId, {
- status: 'success',
- response: stage3Full,
- responseReceivedAt,
- councilData,
- messages: [...context, newUserMsg, newAssistantMsg] as any,
- });
- setCouncilStage('');
- generateTitle(runningNodeId, runningPrompt, stage3Full);
- break;
- }
- case 'error':
- updateNodeData(runningNodeId, { status: 'error' });
- setCouncilStage('');
- break;
- }
- }
+ // Step 2: Store taskId on node for recovery + force save
+ updateNodeData(runningNodeId, { taskId: task_id });
+ if (currentBlueprintPath) {
+ saveCurrentBlueprint(currentBlueprintPath).catch(console.error);
}
+
+ // Step 3: Consume SSE stream from task
+ const { _consumeTaskStream } = await import('../store/flowStore');
+ await _consumeTaskStream(
+ task_id, runningNodeId, 'council', 0, updateNodeData, user?.username || 'test',
+ {
+ onStage: setCouncilStage,
+ onStreamBuffer: (text) => { setCouncilStreamBuffer(text); setStreamBuffer(text); },
+ onComplete: () => {
+ setCouncilStage('');
+ // Fetch final response for title generation
+ const currentNode = nodes.find(n => n.id === runningNodeId);
+ const finalResponse = currentNode?.data.response || '';
+ if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse);
+ },
+ onError: () => { setCouncilStage(''); },
+ },
+ );
} catch (error) {
console.error(error);
- updateNodeData(runningNodeId, { status: 'error' });
+ updateNodeData(runningNodeId, { status: 'error', taskId: undefined });
setCouncilStage('');
} finally {
setStreamingNodeId(prev => prev === runningNodeId ? null : prev);
@@ -707,7 +643,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
try {
const judgeModelConfig = selectedNode.data.judgeModel || debateModels[0];
- const response = await fetch(`/api/run_debate_stream?user=${encodeURIComponent(user?.username || 'test')}`, {
+
+ // Step 1: Start background task
+ const startRes = await fetch(`/api/task/start_debate?user=${encodeURIComponent(user?.username || 'test')}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...getAuthHeader() },
body: JSON.stringify({
@@ -740,176 +678,34 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}),
});
- if (!response.body) return;
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let sseBuffer = '';
- const debateRounds: DebateRound[] = [];
- let currentRound = 0;
- let currentRoundResponses: Array<{ model: string; response: string }> = [];
- let finalModel = '';
- let finalFull = '';
+ if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`);
+ const { task_id } = await startRes.json();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- sseBuffer += decoder.decode(value, { stream: true });
-
- const parts = sseBuffer.split('\n\n');
- sseBuffer = parts.pop() || '';
-
- for (const part of parts) {
- const line = part.trim();
- if (!line.startsWith('data: ')) continue;
- let evt: any;
- try {
- evt = JSON.parse(line.slice(6));
- } catch { continue; }
-
- switch (evt.type) {
- case 'debate_start':
- setDebateStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`);
- break;
- case 'round_start':
- currentRound = evt.data.round;
- currentRoundResponses = [];
- setDebateStage(`Round ${currentRound}/${maxRounds}: Collecting responses...`);
- break;
- case 'round_model_complete':
- currentRoundResponses = [...currentRoundResponses, { model: evt.data.model, response: evt.data.response }];
- setDebateStage(`Round ${currentRound}/${maxRounds}: ${currentRoundResponses.length}/${debateModels.length} models done`);
- break;
- case 'round_complete': {
- const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses };
- debateRounds.push(roundData);
- updateNodeData(runningNodeId, {
- debateData: {
- rounds: [...debateRounds],
- finalVerdict: null,
- config: { judgeMode, format: debateFormat, maxRounds },
- },
- });
- break;
- }
- case 'judge_decision': {
- const lastRound = debateRounds[debateRounds.length - 1];
- if (lastRound) {
- lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
- updateNodeData(runningNodeId, {
- debateData: {
- rounds: [...debateRounds],
- finalVerdict: null,
- config: { judgeMode, format: debateFormat, maxRounds },
- },
- });
- }
- if (!evt.data.continue) {
- setDebateStage('Judge stopped debate. Generating final verdict...');
- } else {
- setDebateStage(`Judge: Continue to round ${currentRound + 1}...`);
- }
- break;
- }
- case 'model_eliminated': {
- const lastRound2 = debateRounds[debateRounds.length - 1];
- if (lastRound2) {
- if (!lastRound2.eliminated) lastRound2.eliminated = [];
- lastRound2.eliminated.push({
- model: evt.data.model,
- convincedBy: evt.data.convinced_by,
- reasoning: evt.data.reasoning,
- });
- }
- setDebateStage(`${evt.data.model} concedes to ${evt.data.convinced_by || 'another'}...`);
- break;
- }
- case 'convergence_status': {
- const remaining = evt.data.remaining as string[];
- updateNodeData(runningNodeId, {
- debateData: {
- rounds: [...debateRounds],
- finalVerdict: null,
- config: { judgeMode, format: debateFormat, maxRounds },
- },
- });
- if (remaining.length <= 1) {
- setDebateStage(`${remaining[0] || 'Winner'} is the last one standing!`);
- } else {
- setDebateStage(`${remaining.length} models remaining...`);
- }
- break;
- }
- case 'final_start':
- finalModel = evt.data.model;
- setDebateStage('Judge synthesizing final verdict...');
- setDebateStreamBuffer('');
- break;
- case 'final_chunk':
- finalFull += evt.data.chunk;
- setDebateStreamBuffer(finalFull);
- setStreamBuffer(finalFull);
- break;
- case 'final_complete': {
- finalModel = evt.data.model;
- finalFull = evt.data.response;
- const responseReceivedAt = Date.now();
- const debateData: DebateData = {
- rounds: debateRounds,
- finalVerdict: { model: finalModel, response: finalFull },
- config: { judgeMode, format: debateFormat, maxRounds },
- };
- const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt };
- const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: finalFull };
- updateNodeData(runningNodeId, {
- status: 'success',
- response: finalFull,
- responseReceivedAt,
- debateData,
- messages: [...context, newUserMsg, newAssistantMsg] as any,
- });
- setDebateStage('');
- generateTitle(runningNodeId, runningPrompt, finalFull);
- break;
- }
- case 'debate_complete': {
- // If no final verdict (display_only or self_convergence without explicit final_complete)
- const currentNode = nodes.find(n => n.id === runningNodeId);
- if (currentNode?.data.status === 'loading') {
- const responseReceivedAt = Date.now();
- const lastRoundResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : [];
- const bestResponse = lastRoundResp.length > 0
- ? lastRoundResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response
- : '';
- const debateData: DebateData = {
- rounds: debateRounds,
- finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null,
- config: { judgeMode, format: debateFormat, maxRounds },
- };
- const displayResponse = finalFull || bestResponse;
- const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt };
- const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: displayResponse };
- updateNodeData(runningNodeId, {
- status: 'success',
- response: displayResponse,
- responseReceivedAt,
- debateData,
- messages: [...context, newUserMsg, newAssistantMsg] as any,
- });
- setDebateStage('');
- if (displayResponse) generateTitle(runningNodeId, runningPrompt, displayResponse);
- }
- break;
- }
- case 'error':
- updateNodeData(runningNodeId, { status: 'error' });
- setDebateStage('');
- break;
- }
- }
+ // Step 2: Store taskId on node for recovery + force save
+ updateNodeData(runningNodeId, { taskId: task_id });
+ if (currentBlueprintPath) {
+ saveCurrentBlueprint(currentBlueprintPath).catch(console.error);
}
+
+ // Step 3: Consume SSE stream from task
+ const { _consumeTaskStream } = await import('../store/flowStore');
+ await _consumeTaskStream(
+ task_id, runningNodeId, 'debate', 0, updateNodeData, user?.username || 'test',
+ {
+ onStage: setDebateStage,
+ onStreamBuffer: (text) => { setDebateStreamBuffer(text); setStreamBuffer(text); },
+ onComplete: () => {
+ setDebateStage('');
+ const currentNode = nodes.find(n => n.id === runningNodeId);
+ const finalResponse = currentNode?.data.response || '';
+ if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse);
+ },
+ onError: () => { setDebateStage(''); },
+ },
+ );
} catch (error) {
console.error(error);
- updateNodeData(runningNodeId, { status: 'error' });
+ updateNodeData(runningNodeId, { status: 'error', taskId: undefined });
setDebateStage('');
} finally {
setStreamingNodeId(prev => prev === runningNodeId ? null : prev);
@@ -2968,7 +2764,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}`}
>
{selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <Users size={16} />}
- {selectedNode.data.status === 'loading' && councilStage ? councilStage : `Run Council (${(selectedNode.data.councilModels || []).length})`}
+ {selectedNode.data.status === 'loading' && (councilStage || selectedNode.data.taskStage) ? (councilStage || selectedNode.data.taskStage) : `Run Council (${(selectedNode.data.councilModels || []).length})`}
</button>
<div className={`text-center text-[10px] mt-0.5 leading-tight ${isDark ? 'text-gray-600' : 'text-gray-400'}`}>
Inspired by <a href="https://github.com/karpathy/llm-council" target="_blank" rel="noopener noreferrer" className={`underline decoration-dotted ${isDark ? 'text-gray-500 hover:text-gray-400' : 'text-gray-500 hover:text-gray-600'}`}>karpathy/llm-council</a>
@@ -2985,7 +2781,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
}`}
>
{selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <MessageSquare size={16} />}
- {selectedNode.data.status === 'loading' && debateStage ? debateStage : `Run Debate (${(selectedNode.data.debateModels || []).length})`}
+ {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) ? (debateStage || selectedNode.data.taskStage) : `Run Debate (${(selectedNode.data.debateModels || []).length})`}
</button>
) : (
<button
@@ -3086,7 +2882,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
<div>
{selectedNode.data.councilData.stage3 ? (
<div className={`text-xs mb-1 ${isDark ? 'text-gray-500' : 'text-gray-400'}`}>Chairman: {selectedNode.data.councilData.stage3.model}</div>
- ) : selectedNode.data.status === 'loading' && councilStage.includes('Stage 3') ? (
+ ) : selectedNode.data.status === 'loading' && (councilStage.includes('Stage 3') || (selectedNode.data.taskStage || '').includes('Stage 3')) ? (
<div className={`text-xs mb-1 flex items-center gap-1 ${isDark ? 'text-amber-400' : 'text-amber-600'}`}><Loader2 className="animate-spin" size={10} /> Synthesizing...</div>
) : null}
<div className={`p-3 rounded-md border min-h-[150px] text-sm prose prose-sm max-w-none ${
@@ -3333,9 +3129,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => {
</div>
);
})}
- {selectedNode.data.status === 'loading' && debateStage && (
+ {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) && (
<div className={`flex items-center gap-2 p-2 text-xs ${isDark ? 'text-cyan-400' : 'text-cyan-600'}`}>
- <Loader2 className="animate-spin" size={12} /> {debateStage}
+ <Loader2 className="animate-spin" size={12} /> {debateStage || selectedNode.data.taskStage}
</div>
)}
</div>
diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts
index 5bb3e22..ec96185 100644
--- a/frontend/src/store/flowStore.ts
+++ b/frontend/src/store/flowStore.ts
@@ -137,6 +137,10 @@ export interface NodeData {
debateMaxRounds?: number;
debateData?: DebateData;
+ // Background task persistence
+ taskId?: string; // Background task ID for recovery
+ taskStage?: string; // Live stage text (e.g. "Round 2: Collecting responses...")
+
// Traces logic
traces: Trace[]; // INCOMING Traces
outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks + merged)
@@ -293,6 +297,9 @@ interface FlowState {
) => Message[];
propagateTraces: () => void;
+
+ // Background task recovery
+ recoverBackgroundTasks: () => Promise<void>;
}
// Hash string to color
@@ -341,6 +348,401 @@ const jsonFetch = async <T>(url: string, options?: RequestInit): Promise<T> => {
return res.json() as Promise<T>;
};
+// --------------- Background Task Helpers ---------------
+
+/** Apply partial state from accumulated events (for recovery of in-progress tasks) */
+function _applyPartialEvents(
+ updateNodeData: (nodeId: string, data: Partial<NodeData>) => void,
+ nodeId: string,
+ taskType: string,
+ events: any[],
+ currentData: NodeData,
+) {
+ // Compute taskStage from the last relevant event
+ let taskStage = 'Recovering...';
+
+ if (taskType === 'council') {
+ let stage1: any[] | null = null;
+ let stage2: any = null;
+ let stage3Full = '';
+ for (const evt of events) {
+ if (evt.type === 'stage1_start') {
+ taskStage = 'Stage 1: Collecting responses...';
+ } else if (evt.type === 'stage1_model_complete') {
+ if (!stage1) stage1 = [];
+ stage1.push(evt.data);
+ taskStage = `Stage 1: ${stage1.length} models done`;
+ } else if (evt.type === 'stage1_complete') {
+ stage1 = evt.data;
+ taskStage = 'Stage 1 complete';
+ } else if (evt.type === 'stage2_start') {
+ taskStage = 'Stage 2: Peer ranking...';
+ } else if (evt.type === 'stage2_complete') {
+ stage2 = evt.data;
+ taskStage = 'Stage 2 complete';
+ } else if (evt.type === 'stage3_start') {
+ taskStage = 'Stage 3: Chairman synthesizing...';
+ } else if (evt.type === 'stage3_complete') {
+ stage3Full = evt.data.response;
+ taskStage = 'Stage 3: Chairman synthesizing...';
+ }
+ }
+ updateNodeData(nodeId, {
+ councilData: { stage1, stage2, stage3: stage3Full ? { model: '', response: stage3Full } : null },
+ taskStage,
+ });
+ } else if (taskType === 'debate') {
+ const rounds: DebateRound[] = [];
+ let finalFull = '';
+ let finalModel = '';
+ let currentRound = 0;
+ for (const evt of events) {
+ if (evt.type === 'debate_start') {
+ taskStage = `Debate started (${evt.data?.models?.length || '?'} models)`;
+ } else if (evt.type === 'round_start') {
+ currentRound = evt.data.round;
+ taskStage = `Round ${currentRound}: Collecting responses...`;
+ } else if (evt.type === 'round_model_complete') {
+ taskStage = `Round ${currentRound}: ${evt.data.model} done`;
+ } else if (evt.type === 'round_complete') {
+ rounds.push({ round: evt.data.round, responses: evt.data.responses });
+ taskStage = `Round ${evt.data.round} complete`;
+ } else if (evt.type === 'judge_decision' && rounds.length > 0) {
+ rounds[rounds.length - 1].judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
+ } else if (evt.type === 'model_eliminated' && rounds.length > 0) {
+ if (!rounds[rounds.length - 1].eliminated) rounds[rounds.length - 1].eliminated = [];
+ rounds[rounds.length - 1].eliminated!.push({
+ model: evt.data.model, convincedBy: evt.data.convinced_by, reasoning: evt.data.reasoning,
+ });
+ } else if (evt.type === 'convergence_status') {
+ const remaining = evt.data?.remaining as string[] || [];
+ taskStage = remaining.length <= 1
+ ? `${remaining[0] || 'Winner'} is the last one standing!`
+ : `${remaining.length} models remaining...`;
+ } else if (evt.type === 'final_start') {
+ finalModel = evt.data.model;
+ taskStage = 'Judge synthesizing final verdict...';
+ } else if (evt.type === 'final_complete') {
+ finalModel = evt.data.model;
+ finalFull = evt.data.response;
+ }
+ }
+ const config = currentData.debateData?.config || {
+ judgeMode: currentData.debateJudgeMode || 'external_judge',
+ format: currentData.debateFormat || 'free_discussion',
+ maxRounds: currentData.debateMaxRounds || 5,
+ };
+ updateNodeData(nodeId, {
+ debateData: { rounds, finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, config: config as any },
+ taskStage,
+ });
+ }
+}
+
+/**
+ * Connect to the task SSE stream and process events, updating node data.
+ * Used by both initial task start (Sidebar) and recovery (recoverBackgroundTasks).
+ *
+ * Initializes accumulators from existing node data so that recovery after reconnect
+ * doesn't lose previously accumulated rounds/stages.
+ */
+async function _consumeTaskStream(
+ taskId: string,
+ nodeId: string,
+ taskType: string,
+ fromEvent: number,
+ updateNodeData: (nodeId: string, data: Partial<NodeData>) => void,
+ username: string,
+ callbacks?: {
+ onStage?: (stage: string) => void;
+ onStreamBuffer?: (text: string) => void;
+ onComplete?: () => void;
+ onError?: () => void;
+ },
+): Promise<void> {
+ const authHeaders = getAuthHeaders();
+
+ // Read current node data to initialize accumulators (critical for recovery)
+ const nodeData = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data;
+
+ // Helper: read debate config from node data (never hardcode)
+ const getDebateConfig = () => {
+ const nd = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data;
+ return nd?.debateData?.config || {
+ judgeMode: nd?.debateJudgeMode || 'external_judge',
+ format: nd?.debateFormat || 'free_discussion',
+ maxRounds: nd?.debateMaxRounds || 5,
+ };
+ };
+
+ // Helper: trigger auto-save after milestone events
+ const triggerSave = () => {
+ try { useFlowStore.getState().triggerAutoSave(); } catch { /* ignore */ }
+ };
+
+ // Helper: update stage text on both node data and optional callback
+ const setStage = (text: string) => {
+ updateNodeData(nodeId, { taskStage: text });
+ callbacks?.onStage?.(text);
+ };
+
+ try {
+ const response = await fetch(
+ `${API_BASE}/api/task/${encodeURIComponent(taskId)}/stream?from_event=${fromEvent}&user=${encodeURIComponent(username)}`,
+ { headers: authHeaders },
+ );
+ if (!response.ok || !response.body) {
+ callbacks?.onError?.();
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ return;
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let sseBuffer = '';
+
+ // Initialize accumulators from existing node data (preserves state across reconnect)
+ let stage1Results: Array<{ model: string; response: string }> =
+ (nodeData?.councilData?.stage1 as any[]) || [];
+ let stage2Data: any = nodeData?.councilData?.stage2 || null;
+ let stage3Full = nodeData?.councilData?.stage3?.response || '';
+ let stage3Model = nodeData?.councilData?.stage3?.model || '';
+
+ const debateRounds: DebateRound[] = [...(nodeData?.debateData?.rounds || [])];
+ let currentRound = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].round : 0;
+ let finalModel = nodeData?.debateData?.finalVerdict?.model || '';
+ let finalFull = nodeData?.debateData?.finalVerdict?.response || '';
+
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ sseBuffer += decoder.decode(value, { stream: true });
+
+ const parts = sseBuffer.split('\n\n');
+ sseBuffer = parts.pop() || '';
+
+ for (const part of parts) {
+ const line = part.trim();
+ if (!line.startsWith('data: ')) continue;
+ let evt: any;
+ try { evt = JSON.parse(line.slice(6)); } catch { continue; }
+
+ // Handle task_status terminal event
+ if (evt.type === 'task_status') {
+ if (evt.data?.status === 'completed') {
+ callbacks?.onComplete?.();
+ } else {
+ callbacks?.onError?.();
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ }
+ continue;
+ }
+
+ if (taskType === 'council') {
+ switch (evt.type) {
+ case 'stage1_start':
+ setStage('Stage 1: Collecting responses...');
+ break;
+ case 'stage1_model_complete':
+ stage1Results = [...stage1Results, evt.data];
+ setStage(`Stage 1: ${stage1Results.length} models done`);
+ updateNodeData(nodeId, {
+ councilData: { stage1: [...stage1Results], stage2: null, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage1_complete':
+ stage1Results = evt.data;
+ updateNodeData(nodeId, {
+ councilData: { stage1: stage1Results, stage2: null, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage2_start':
+ setStage('Stage 2: Peer ranking...');
+ break;
+ case 'stage2_complete':
+ stage2Data = evt.data;
+ updateNodeData(nodeId, {
+ councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage3_start':
+ setStage('Stage 3: Chairman synthesizing...');
+ callbacks?.onStreamBuffer?.('');
+ break;
+ case 'stage3_chunk':
+ stage3Full += evt.data.chunk;
+ callbacks?.onStreamBuffer?.(stage3Full);
+ break;
+ case 'stage3_complete':
+ stage3Model = evt.data.model;
+ stage3Full = evt.data.response;
+ triggerSave();
+ break;
+ case 'complete': {
+ const councilData: CouncilData = {
+ stage1: stage1Results,
+ stage2: stage2Data,
+ stage3: { model: stage3Model, response: stage3Full },
+ };
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: stage3Full,
+ responseReceivedAt: Date.now(),
+ councilData,
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ break;
+ }
+ case 'error':
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ setStage('');
+ callbacks?.onError?.();
+ break;
+ }
+ } else if (taskType === 'debate') {
+ switch (evt.type) {
+ case 'debate_start':
+ setStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`);
+ break;
+ case 'round_start':
+ currentRound = evt.data.round;
+ setStage(`Round ${currentRound}: Collecting responses...`);
+ break;
+ case 'round_model_complete':
+ setStage(`Round ${currentRound}: ${evt.data.model} done`);
+ break;
+ case 'round_complete': {
+ const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses };
+ debateRounds.push(roundData);
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ triggerSave();
+ break;
+ }
+ case 'judge_decision': {
+ const lastRound = debateRounds[debateRounds.length - 1];
+ if (lastRound) {
+ lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ triggerSave();
+ }
+ break;
+ }
+ case 'model_eliminated': {
+ const lastRound2 = debateRounds[debateRounds.length - 1];
+ if (lastRound2) {
+ if (!lastRound2.eliminated) lastRound2.eliminated = [];
+ lastRound2.eliminated.push({
+ model: evt.data.model,
+ convincedBy: evt.data.convinced_by,
+ reasoning: evt.data.reasoning,
+ });
+ }
+ break;
+ }
+ case 'convergence_status': {
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ const remaining = evt.data.remaining as string[];
+ if (remaining.length <= 1) {
+ setStage(`${remaining[0] || 'Winner'} is the last one standing!`);
+ } else {
+ setStage(`${remaining.length} models remaining...`);
+ }
+ break;
+ }
+ case 'final_start':
+ finalModel = evt.data.model;
+ setStage('Judge synthesizing final verdict...');
+ callbacks?.onStreamBuffer?.('');
+ break;
+ case 'final_chunk':
+ finalFull += evt.data.chunk;
+ callbacks?.onStreamBuffer?.(finalFull);
+ break;
+ case 'final_complete': {
+ finalModel = evt.data.model;
+ finalFull = evt.data.response;
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: finalFull,
+ responseReceivedAt: Date.now(),
+ debateData: {
+ rounds: debateRounds,
+ finalVerdict: { model: finalModel, response: finalFull },
+ config: getDebateConfig() as any,
+ },
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ break;
+ }
+ case 'debate_complete': {
+ const store = useFlowStore.getState();
+ const currentNode = store.nodes.find(n => n.id === nodeId);
+ if (currentNode?.data.status === 'loading') {
+ const lastResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : [];
+ const bestResponse = lastResp.length > 0
+ ? lastResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response
+ : '';
+ const displayResponse = finalFull || bestResponse;
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: displayResponse,
+ responseReceivedAt: Date.now(),
+ debateData: {
+ rounds: debateRounds,
+ finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null,
+ config: getDebateConfig() as any,
+ },
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ }
+ break;
+ }
+ case 'error':
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ setStage('');
+ callbacks?.onError?.();
+ break;
+ }
+ }
+ }
+ }
+ } catch (error) {
+ console.error(`Task stream error for ${taskId}:`, error);
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ callbacks?.onError?.();
+ }
+}
+
const useFlowStore = create<FlowState>((set, get) => {
const validateBlueprint = (doc: any): BlueprintDocument => {
@@ -1615,6 +2017,8 @@ const useFlowStore = create<FlowState>((set, get) => {
traces: undefined,
outgoingTraces: undefined,
messages: undefined,
+ // Drop transient runtime state
+ taskStage: undefined,
// Keep merged/forked trace definitions but strip their computed messages
mergedTraces: (n.data.mergedTraces || []).map((m: any) => ({
id: m.id,
@@ -1651,6 +2055,13 @@ const useFlowStore = create<FlowState>((set, get) => {
if (n.data?.chairmanModel && typeof n.data.chairmanModel === 'string') {
n.data.chairmanModel = { model: n.data.chairmanModel };
}
+ // Reset stale "loading" status — unless there's a background task to recover
+ if (n.data?.status === 'loading') {
+ if (!n.data.taskId) {
+ n.data.status = 'idle'; // Legacy/no task — reset
+ }
+ // If taskId exists, keep loading — recoverBackgroundTasks() will handle it
+ }
return n;
});
set({
@@ -2273,6 +2684,81 @@ const useFlowStore = create<FlowState>((set, get) => {
setTimeout(() => get().propagateTraces(), 50);
},
+ recoverBackgroundTasks: async () => {
+ const nodes = get().nodes;
+ const user = getCurrentUser();
+
+ for (const node of nodes) {
+ if (!node.data.taskId) continue;
+
+ try {
+ const res = await fetch(
+ `${API_BASE}/api/task/${encodeURIComponent(node.data.taskId)}?user=${encodeURIComponent(user)}`,
+ { headers: getAuthHeaders() }
+ );
+ if (!res.ok) {
+ // Task not found — reset node
+ get().updateNodeData(node.id, { status: 'idle', taskId: undefined });
+ continue;
+ }
+ const taskData = await res.json();
+ const status = taskData.status;
+
+ if (status === 'completed') {
+ // Apply completed results
+ const result = taskData.result || {};
+ if (taskData.task_type === 'council') {
+ const cd = result.councilData;
+ const response = cd?.stage3?.response || '';
+ get().updateNodeData(node.id, {
+ status: 'success',
+ response,
+ councilData: cd,
+ taskId: undefined,
+ });
+ } else if (taskData.task_type === 'debate') {
+ const rounds = result.debateRounds || [];
+ const finalVerdict = result.finalVerdict;
+ const response = finalVerdict?.response || '';
+ get().updateNodeData(node.id, {
+ status: 'success',
+ response,
+ debateData: {
+ rounds,
+ finalVerdict,
+ config: node.data.debateData?.config || {
+ judgeMode: node.data.debateJudgeMode || 'external_judge',
+ format: node.data.debateFormat || 'free_discussion',
+ maxRounds: node.data.debateMaxRounds || 5,
+ },
+ },
+ taskId: undefined,
+ });
+ }
+ } else if (status === 'running') {
+ // Reconnect to live stream from where we left off
+ const fromEvent = (taskData.events || []).length;
+ // Apply any partial state from accumulated events first
+ _applyPartialEvents(get().updateNodeData, node.id, taskData.task_type, taskData.events || [], node.data);
+ // Then connect to live stream
+ _consumeTaskStream(
+ node.data.taskId!, node.id, taskData.task_type, fromEvent,
+ get().updateNodeData, user
+ );
+ } else {
+ // failed / interrupted / unknown — reset
+ get().updateNodeData(node.id, {
+ status: status === 'interrupted' ? 'error' : 'error',
+ taskId: undefined,
+ });
+ }
+ } catch (err) {
+ console.error(`Failed to recover task for node ${node.id}:`, err);
+ get().updateNodeData(node.id, { status: 'idle', taskId: undefined });
+ }
+ }
+ },
+
propagateTraces: () => {
const { nodes, edges } = get();
@@ -2740,4 +3226,5 @@ useFlowStore.subscribe((state, prev) => {
if (state.theme !== prev.theme) syncThemeClass(state.theme);
});
+export { _consumeTaskStream, _applyPartialEvents };
export default useFlowStore;