summaryrefslogtreecommitdiff
path: root/frontend/src/store/flowStore.ts
diff options
context:
space:
mode:
Diffstat (limited to 'frontend/src/store/flowStore.ts')
-rw-r--r--frontend/src/store/flowStore.ts487
1 files changed, 487 insertions, 0 deletions
diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts
index 5bb3e22..ec96185 100644
--- a/frontend/src/store/flowStore.ts
+++ b/frontend/src/store/flowStore.ts
@@ -137,6 +137,10 @@ export interface NodeData {
debateMaxRounds?: number;
debateData?: DebateData;
+ // Background task persistence
+ taskId?: string; // Background task ID for recovery
+ taskStage?: string; // Live stage text (e.g. "Round 2: Collecting responses...")
+
// Traces logic
traces: Trace[]; // INCOMING Traces
outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks + merged)
@@ -293,6 +297,9 @@ interface FlowState {
) => Message[];
propagateTraces: () => void;
+
+ // Background task recovery
+ recoverBackgroundTasks: () => Promise<void>;
}
// Hash string to color
@@ -341,6 +348,401 @@ const jsonFetch = async <T>(url: string, options?: RequestInit): Promise<T> => {
return res.json() as Promise<T>;
};
+// --------------- Background Task Helpers ---------------
+
+/** Apply partial state from accumulated events (for recovery of in-progress tasks) */
+function _applyPartialEvents(
+ updateNodeData: (nodeId: string, data: Partial<NodeData>) => void,
+ nodeId: string,
+ taskType: string,
+ events: any[],
+ currentData: NodeData,
+) {
+ // Compute taskStage from the last relevant event
+ let taskStage = 'Recovering...';
+
+ if (taskType === 'council') {
+ let stage1: any[] | null = null;
+ let stage2: any = null;
+ let stage3Full = '';
+ for (const evt of events) {
+ if (evt.type === 'stage1_start') {
+ taskStage = 'Stage 1: Collecting responses...';
+ } else if (evt.type === 'stage1_model_complete') {
+ if (!stage1) stage1 = [];
+ stage1.push(evt.data);
+ taskStage = `Stage 1: ${stage1.length} models done`;
+ } else if (evt.type === 'stage1_complete') {
+ stage1 = evt.data;
+ taskStage = 'Stage 1 complete';
+ } else if (evt.type === 'stage2_start') {
+ taskStage = 'Stage 2: Peer ranking...';
+ } else if (evt.type === 'stage2_complete') {
+ stage2 = evt.data;
+ taskStage = 'Stage 2 complete';
+ } else if (evt.type === 'stage3_start') {
+ taskStage = 'Stage 3: Chairman synthesizing...';
+ } else if (evt.type === 'stage3_complete') {
+ stage3Full = evt.data.response;
+ taskStage = 'Stage 3: Chairman synthesizing...';
+ }
+ }
+ updateNodeData(nodeId, {
+ councilData: { stage1, stage2, stage3: stage3Full ? { model: '', response: stage3Full } : null },
+ taskStage,
+ });
+ } else if (taskType === 'debate') {
+ const rounds: DebateRound[] = [];
+ let finalFull = '';
+ let finalModel = '';
+ let currentRound = 0;
+ for (const evt of events) {
+ if (evt.type === 'debate_start') {
+ taskStage = `Debate started (${evt.data?.models?.length || '?'} models)`;
+ } else if (evt.type === 'round_start') {
+ currentRound = evt.data.round;
+ taskStage = `Round ${currentRound}: Collecting responses...`;
+ } else if (evt.type === 'round_model_complete') {
+ taskStage = `Round ${currentRound}: ${evt.data.model} done`;
+ } else if (evt.type === 'round_complete') {
+ rounds.push({ round: evt.data.round, responses: evt.data.responses });
+ taskStage = `Round ${evt.data.round} complete`;
+ } else if (evt.type === 'judge_decision' && rounds.length > 0) {
+ rounds[rounds.length - 1].judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
+ } else if (evt.type === 'model_eliminated' && rounds.length > 0) {
+ if (!rounds[rounds.length - 1].eliminated) rounds[rounds.length - 1].eliminated = [];
+ rounds[rounds.length - 1].eliminated!.push({
+ model: evt.data.model, convincedBy: evt.data.convinced_by, reasoning: evt.data.reasoning,
+ });
+ } else if (evt.type === 'convergence_status') {
+ const remaining = evt.data?.remaining as string[] || [];
+ taskStage = remaining.length <= 1
+ ? `${remaining[0] || 'Winner'} is the last one standing!`
+ : `${remaining.length} models remaining...`;
+ } else if (evt.type === 'final_start') {
+ finalModel = evt.data.model;
+ taskStage = 'Judge synthesizing final verdict...';
+ } else if (evt.type === 'final_complete') {
+ finalModel = evt.data.model;
+ finalFull = evt.data.response;
+ }
+ }
+ const config = currentData.debateData?.config || {
+ judgeMode: currentData.debateJudgeMode || 'external_judge',
+ format: currentData.debateFormat || 'free_discussion',
+ maxRounds: currentData.debateMaxRounds || 5,
+ };
+ updateNodeData(nodeId, {
+ debateData: { rounds, finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, config: config as any },
+ taskStage,
+ });
+ }
+}
+
+/**
+ * Connect to the task SSE stream and process events, updating node data.
+ * Used by both initial task start (Sidebar) and recovery (recoverBackgroundTasks).
+ *
+ * Initializes accumulators from existing node data so that recovery after reconnect
+ * doesn't lose previously accumulated rounds/stages.
+ */
+async function _consumeTaskStream(
+ taskId: string,
+ nodeId: string,
+ taskType: string,
+ fromEvent: number,
+ updateNodeData: (nodeId: string, data: Partial<NodeData>) => void,
+ username: string,
+ callbacks?: {
+ onStage?: (stage: string) => void;
+ onStreamBuffer?: (text: string) => void;
+ onComplete?: () => void;
+ onError?: () => void;
+ },
+): Promise<void> {
+ const authHeaders = getAuthHeaders();
+
+ // Read current node data to initialize accumulators (critical for recovery)
+ const nodeData = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data;
+
+ // Helper: read debate config from node data (never hardcode)
+ const getDebateConfig = () => {
+ const nd = useFlowStore.getState().nodes.find(n => n.id === nodeId)?.data;
+ return nd?.debateData?.config || {
+ judgeMode: nd?.debateJudgeMode || 'external_judge',
+ format: nd?.debateFormat || 'free_discussion',
+ maxRounds: nd?.debateMaxRounds || 5,
+ };
+ };
+
+ // Helper: trigger auto-save after milestone events
+ const triggerSave = () => {
+ try { useFlowStore.getState().triggerAutoSave(); } catch { /* ignore */ }
+ };
+
+ // Helper: update stage text on both node data and optional callback
+ const setStage = (text: string) => {
+ updateNodeData(nodeId, { taskStage: text });
+ callbacks?.onStage?.(text);
+ };
+
+ try {
+ const response = await fetch(
+ `${API_BASE}/api/task/${encodeURIComponent(taskId)}/stream?from_event=${fromEvent}&user=${encodeURIComponent(username)}`,
+ { headers: authHeaders },
+ );
+ if (!response.ok || !response.body) {
+ callbacks?.onError?.();
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ return;
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let sseBuffer = '';
+
+ // Initialize accumulators from existing node data (preserves state across reconnect)
+ let stage1Results: Array<{ model: string; response: string }> =
+ (nodeData?.councilData?.stage1 as any[]) || [];
+ let stage2Data: any = nodeData?.councilData?.stage2 || null;
+ let stage3Full = nodeData?.councilData?.stage3?.response || '';
+ let stage3Model = nodeData?.councilData?.stage3?.model || '';
+
+ const debateRounds: DebateRound[] = [...(nodeData?.debateData?.rounds || [])];
+ let currentRound = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].round : 0;
+ let finalModel = nodeData?.debateData?.finalVerdict?.model || '';
+ let finalFull = nodeData?.debateData?.finalVerdict?.response || '';
+
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ sseBuffer += decoder.decode(value, { stream: true });
+
+ const parts = sseBuffer.split('\n\n');
+ sseBuffer = parts.pop() || '';
+
+ for (const part of parts) {
+ const line = part.trim();
+ if (!line.startsWith('data: ')) continue;
+ let evt: any;
+ try { evt = JSON.parse(line.slice(6)); } catch { continue; }
+
+ // Handle task_status terminal event
+ if (evt.type === 'task_status') {
+ if (evt.data?.status === 'completed') {
+ callbacks?.onComplete?.();
+ } else {
+ callbacks?.onError?.();
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ }
+ continue;
+ }
+
+ if (taskType === 'council') {
+ switch (evt.type) {
+ case 'stage1_start':
+ setStage('Stage 1: Collecting responses...');
+ break;
+ case 'stage1_model_complete':
+ stage1Results = [...stage1Results, evt.data];
+ setStage(`Stage 1: ${stage1Results.length} models done`);
+ updateNodeData(nodeId, {
+ councilData: { stage1: [...stage1Results], stage2: null, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage1_complete':
+ stage1Results = evt.data;
+ updateNodeData(nodeId, {
+ councilData: { stage1: stage1Results, stage2: null, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage2_start':
+ setStage('Stage 2: Peer ranking...');
+ break;
+ case 'stage2_complete':
+ stage2Data = evt.data;
+ updateNodeData(nodeId, {
+ councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null },
+ });
+ triggerSave();
+ break;
+ case 'stage3_start':
+ setStage('Stage 3: Chairman synthesizing...');
+ callbacks?.onStreamBuffer?.('');
+ break;
+ case 'stage3_chunk':
+ stage3Full += evt.data.chunk;
+ callbacks?.onStreamBuffer?.(stage3Full);
+ break;
+ case 'stage3_complete':
+ stage3Model = evt.data.model;
+ stage3Full = evt.data.response;
+ triggerSave();
+ break;
+ case 'complete': {
+ const councilData: CouncilData = {
+ stage1: stage1Results,
+ stage2: stage2Data,
+ stage3: { model: stage3Model, response: stage3Full },
+ };
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: stage3Full,
+ responseReceivedAt: Date.now(),
+ councilData,
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ break;
+ }
+ case 'error':
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ setStage('');
+ callbacks?.onError?.();
+ break;
+ }
+ } else if (taskType === 'debate') {
+ switch (evt.type) {
+ case 'debate_start':
+ setStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`);
+ break;
+ case 'round_start':
+ currentRound = evt.data.round;
+ setStage(`Round ${currentRound}: Collecting responses...`);
+ break;
+ case 'round_model_complete':
+ setStage(`Round ${currentRound}: ${evt.data.model} done`);
+ break;
+ case 'round_complete': {
+ const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses };
+ debateRounds.push(roundData);
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ triggerSave();
+ break;
+ }
+ case 'judge_decision': {
+ const lastRound = debateRounds[debateRounds.length - 1];
+ if (lastRound) {
+ lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning };
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ triggerSave();
+ }
+ break;
+ }
+ case 'model_eliminated': {
+ const lastRound2 = debateRounds[debateRounds.length - 1];
+ if (lastRound2) {
+ if (!lastRound2.eliminated) lastRound2.eliminated = [];
+ lastRound2.eliminated.push({
+ model: evt.data.model,
+ convincedBy: evt.data.convinced_by,
+ reasoning: evt.data.reasoning,
+ });
+ }
+ break;
+ }
+ case 'convergence_status': {
+ updateNodeData(nodeId, {
+ debateData: {
+ rounds: [...debateRounds],
+ finalVerdict: null,
+ config: getDebateConfig() as any,
+ },
+ });
+ const remaining = evt.data.remaining as string[];
+ if (remaining.length <= 1) {
+ setStage(`${remaining[0] || 'Winner'} is the last one standing!`);
+ } else {
+ setStage(`${remaining.length} models remaining...`);
+ }
+ break;
+ }
+ case 'final_start':
+ finalModel = evt.data.model;
+ setStage('Judge synthesizing final verdict...');
+ callbacks?.onStreamBuffer?.('');
+ break;
+ case 'final_chunk':
+ finalFull += evt.data.chunk;
+ callbacks?.onStreamBuffer?.(finalFull);
+ break;
+ case 'final_complete': {
+ finalModel = evt.data.model;
+ finalFull = evt.data.response;
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: finalFull,
+ responseReceivedAt: Date.now(),
+ debateData: {
+ rounds: debateRounds,
+ finalVerdict: { model: finalModel, response: finalFull },
+ config: getDebateConfig() as any,
+ },
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ break;
+ }
+ case 'debate_complete': {
+ const store = useFlowStore.getState();
+ const currentNode = store.nodes.find(n => n.id === nodeId);
+ if (currentNode?.data.status === 'loading') {
+ const lastResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : [];
+ const bestResponse = lastResp.length > 0
+ ? lastResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response
+ : '';
+ const displayResponse = finalFull || bestResponse;
+ updateNodeData(nodeId, {
+ status: 'success',
+ response: displayResponse,
+ responseReceivedAt: Date.now(),
+ debateData: {
+ rounds: debateRounds,
+ finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null,
+ config: getDebateConfig() as any,
+ },
+ taskId: undefined,
+ });
+ setStage('');
+ callbacks?.onComplete?.();
+ triggerSave();
+ }
+ break;
+ }
+ case 'error':
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ setStage('');
+ callbacks?.onError?.();
+ break;
+ }
+ }
+ }
+ }
+ } catch (error) {
+ console.error(`Task stream error for ${taskId}:`, error);
+ updateNodeData(nodeId, { status: 'error', taskId: undefined });
+ callbacks?.onError?.();
+ }
+}
+
const useFlowStore = create<FlowState>((set, get) => {
const validateBlueprint = (doc: any): BlueprintDocument => {
@@ -1615,6 +2017,8 @@ const useFlowStore = create<FlowState>((set, get) => {
traces: undefined,
outgoingTraces: undefined,
messages: undefined,
+ // Drop transient runtime state
+ taskStage: undefined,
// Keep merged/forked trace definitions but strip their computed messages
mergedTraces: (n.data.mergedTraces || []).map((m: any) => ({
id: m.id,
@@ -1651,6 +2055,13 @@ const useFlowStore = create<FlowState>((set, get) => {
if (n.data?.chairmanModel && typeof n.data.chairmanModel === 'string') {
n.data.chairmanModel = { model: n.data.chairmanModel };
}
+ // Reset stale "loading" status — unless there's a background task to recover
+ if (n.data?.status === 'loading') {
+ if (!n.data.taskId) {
+ n.data.status = 'idle'; // Legacy/no task — reset
+ }
+ // If taskId exists, keep loading — recoverBackgroundTasks() will handle it
+ }
return n;
});
set({
@@ -2273,6 +2684,81 @@ const useFlowStore = create<FlowState>((set, get) => {
setTimeout(() => get().propagateTraces(), 50);
},
+ recoverBackgroundTasks: async () => {
+ const nodes = get().nodes;
+ const user = getCurrentUser();
+
+ for (const node of nodes) {
+ if (!node.data.taskId) continue;
+
+ try {
+ const res = await fetch(
+ `${API_BASE}/api/task/${encodeURIComponent(node.data.taskId)}?user=${encodeURIComponent(user)}`,
+ { headers: getAuthHeaders() }
+ );
+ if (!res.ok) {
+ // Task not found — reset node
+ get().updateNodeData(node.id, { status: 'idle', taskId: undefined });
+ continue;
+ }
+ const taskData = await res.json();
+ const status = taskData.status;
+
+ if (status === 'completed') {
+ // Apply completed results
+ const result = taskData.result || {};
+ if (taskData.task_type === 'council') {
+ const cd = result.councilData;
+ const response = cd?.stage3?.response || '';
+ get().updateNodeData(node.id, {
+ status: 'success',
+ response,
+ councilData: cd,
+ taskId: undefined,
+ });
+ } else if (taskData.task_type === 'debate') {
+ const rounds = result.debateRounds || [];
+ const finalVerdict = result.finalVerdict;
+ const response = finalVerdict?.response || '';
+ get().updateNodeData(node.id, {
+ status: 'success',
+ response,
+ debateData: {
+ rounds,
+ finalVerdict,
+ config: node.data.debateData?.config || {
+ judgeMode: node.data.debateJudgeMode || 'external_judge',
+ format: node.data.debateFormat || 'free_discussion',
+ maxRounds: node.data.debateMaxRounds || 5,
+ },
+ },
+ taskId: undefined,
+ });
+ }
+ } else if (status === 'running') {
+ // Reconnect to live stream from where we left off
+ const fromEvent = (taskData.events || []).length;
+ // Apply any partial state from accumulated events first
+ _applyPartialEvents(get().updateNodeData, node.id, taskData.task_type, taskData.events || [], node.data);
+ // Then connect to live stream
+ _consumeTaskStream(
+ node.data.taskId!, node.id, taskData.task_type, fromEvent,
+ get().updateNodeData, user
+ );
+ } else {
+ // failed / interrupted / unknown — reset
+ get().updateNodeData(node.id, {
+ status: status === 'interrupted' ? 'error' : 'error',
+ taskId: undefined,
+ });
+ }
+ } catch (err) {
+ console.error(`Failed to recover task for node ${node.id}:`, err);
+ get().updateNodeData(node.id, { status: 'idle', taskId: undefined });
+ }
+ }
+ },
+
propagateTraces: () => {
const { nodes, edges } = get();
@@ -2740,4 +3226,5 @@ useFlowStore.subscribe((state, prev) => {
if (state.theme !== prev.theme) syncThemeClass(state.theme);
});
+export { _consumeTaskStream, _applyPartialEvents };
export default useFlowStore;