diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-14 03:40:31 +0000 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-14 03:40:31 +0000 |
| commit | bdf381a2c8a0337f7459000f487a80f9cbbbdd2f (patch) | |
| tree | b3c72c85f3e7c47b4c98a1301acc7fa7d23a6d05 /frontend/src/components | |
| parent | ded75a5c19ad4aa8dc832fc4c138b68093e22ee8 (diff) | |
Add background task persistence for debate & council operations
Decouple debate/council execution from SSE connection lifecycle so tasks
survive browser disconnects. Backend runs work as asyncio.Tasks with
progressive disk persistence; frontend can reconnect and recover state.
- New backend/app/services/tasks.py: task registry, broadcast pattern,
disk persistence at milestones, stale task cleanup on startup
- New endpoints: POST start_debate/start_council, GET task stream/poll
- Frontend stores taskId on nodes, recovers running tasks on page load
- _applyPartialEvents rebuilds stage text + data from accumulated events
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'frontend/src/components')
| -rw-r--r-- | frontend/src/components/LeftSidebar.tsx | 3 | ||||
| -rw-r--r-- | frontend/src/components/Sidebar.tsx | 324 |
2 files changed, 63 insertions, 264 deletions
diff --git a/frontend/src/components/LeftSidebar.tsx b/frontend/src/components/LeftSidebar.tsx index 54c2527..441b7e0 100644 --- a/frontend/src/components/LeftSidebar.tsx +++ b/frontend/src/components/LeftSidebar.tsx @@ -30,6 +30,7 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => { deleteFile, readBlueprintFile, loadBlueprint, + recoverBackgroundTasks, saveBlueprintFile, saveCurrentBlueprint, createProjectFolder, @@ -266,6 +267,8 @@ const LeftSidebar: React.FC<LeftSidebarProps> = ({ isOpen, onToggle }) => { if (vp) { setViewport(vp); } + // Recover any background tasks that were running before page refresh + recoverBackgroundTasks(); } catch (e) { console.error(e); alert('Not a valid blueprint JSON.'); diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 8bb5fcb..474a969 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -387,6 +387,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }) }); + if (!response.ok) throw new Error(await response.text() || `HTTP ${response.status}`); if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); @@ -511,7 +512,8 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }; try { - const response = await fetch(`/api/run_council_stream?user=${encodeURIComponent(user?.username || 'test')}`, { + // Step 1: Start background task + const startRes = await fetch(`/api/task/start_council?user=${encodeURIComponent(user?.username || 'test')}`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...getAuthHeader() }, body: JSON.stringify({ @@ -546,101 +548,35 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }), }); - if (!response.body) return; - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let sseBuffer = ''; - let stage1Results: Array<{ model: string; response: string }> = []; - let stage2Data: any = null; - let stage3Full = ''; - let stage3Model = ''; + if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`); + const { task_id } = await startRes.json(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - sseBuffer += decoder.decode(value, { stream: true }); - - // Parse SSE events (data: {...}\n\n) - const parts = sseBuffer.split('\n\n'); - sseBuffer = parts.pop() || ''; - - for (const part of parts) { - const line = part.trim(); - if (!line.startsWith('data: ')) continue; - let evt: any; - try { - evt = JSON.parse(line.slice(6)); - } catch { continue; } - - switch (evt.type) { - case 'stage1_start': - setCouncilStage('Stage 1: Collecting responses...'); - break; - case 'stage1_model_complete': - stage1Results = [...stage1Results, evt.data]; - setCouncilStage(`Stage 1: ${stage1Results.length}/${councilModels.length} models done`); - updateNodeData(runningNodeId, { - councilData: { stage1: [...stage1Results], stage2: null, stage3: null }, - }); - break; - case 'stage1_complete': - stage1Results = evt.data; - updateNodeData(runningNodeId, { - councilData: { stage1: stage1Results, stage2: null, stage3: null }, - }); - break; - case 'stage2_start': - setCouncilStage('Stage 2: Peer ranking...'); - break; - case 'stage2_complete': - stage2Data = evt.data; - updateNodeData(runningNodeId, { - councilData: { stage1: stage1Results, stage2: stage2Data, stage3: null }, - }); - break; - case 'stage3_start': - setCouncilStage('Stage 3: Chairman synthesizing...'); - setCouncilStreamBuffer(''); - break; - case 'stage3_chunk': - stage3Full += evt.data.chunk; - setCouncilStreamBuffer(stage3Full); - setStreamBuffer(stage3Full); - break; - case 'stage3_complete': - stage3Model = evt.data.model; - stage3Full = evt.data.response; - break; - case 'complete': { - const responseReceivedAt = Date.now(); - const councilData: CouncilData = { - stage1: stage1Results, - stage2: stage2Data, - stage3: { model: stage3Model, response: stage3Full }, - }; - const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt }; - const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: stage3Full }; - updateNodeData(runningNodeId, { - status: 'success', - response: stage3Full, - responseReceivedAt, - councilData, - messages: [...context, newUserMsg, newAssistantMsg] as any, - }); - setCouncilStage(''); - generateTitle(runningNodeId, runningPrompt, stage3Full); - break; - } - case 'error': - updateNodeData(runningNodeId, { status: 'error' }); - setCouncilStage(''); - break; - } - } + // Step 2: Store taskId on node for recovery + force save + updateNodeData(runningNodeId, { taskId: task_id }); + if (currentBlueprintPath) { + saveCurrentBlueprint(currentBlueprintPath).catch(console.error); } + + // Step 3: Consume SSE stream from task + const { _consumeTaskStream } = await import('../store/flowStore'); + await _consumeTaskStream( + task_id, runningNodeId, 'council', 0, updateNodeData, user?.username || 'test', + { + onStage: setCouncilStage, + onStreamBuffer: (text) => { setCouncilStreamBuffer(text); setStreamBuffer(text); }, + onComplete: () => { + setCouncilStage(''); + // Fetch final response for title generation + const currentNode = nodes.find(n => n.id === runningNodeId); + const finalResponse = currentNode?.data.response || ''; + if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse); + }, + onError: () => { setCouncilStage(''); }, + }, + ); } catch (error) { console.error(error); - updateNodeData(runningNodeId, { status: 'error' }); + updateNodeData(runningNodeId, { status: 'error', taskId: undefined }); setCouncilStage(''); } finally { setStreamingNodeId(prev => prev === runningNodeId ? null : prev); @@ -707,7 +643,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { try { const judgeModelConfig = selectedNode.data.judgeModel || debateModels[0]; - const response = await fetch(`/api/run_debate_stream?user=${encodeURIComponent(user?.username || 'test')}`, { + + // Step 1: Start background task + const startRes = await fetch(`/api/task/start_debate?user=${encodeURIComponent(user?.username || 'test')}`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...getAuthHeader() }, body: JSON.stringify({ @@ -740,176 +678,34 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }), }); - if (!response.body) return; - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let sseBuffer = ''; - const debateRounds: DebateRound[] = []; - let currentRound = 0; - let currentRoundResponses: Array<{ model: string; response: string }> = []; - let finalModel = ''; - let finalFull = ''; + if (!startRes.ok) throw new Error(await startRes.text() || `HTTP ${startRes.status}`); + const { task_id } = await startRes.json(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - sseBuffer += decoder.decode(value, { stream: true }); - - const parts = sseBuffer.split('\n\n'); - sseBuffer = parts.pop() || ''; - - for (const part of parts) { - const line = part.trim(); - if (!line.startsWith('data: ')) continue; - let evt: any; - try { - evt = JSON.parse(line.slice(6)); - } catch { continue; } - - switch (evt.type) { - case 'debate_start': - setDebateStage(`Debate started (${evt.data.models.length} models, max ${evt.data.max_rounds} rounds)`); - break; - case 'round_start': - currentRound = evt.data.round; - currentRoundResponses = []; - setDebateStage(`Round ${currentRound}/${maxRounds}: Collecting responses...`); - break; - case 'round_model_complete': - currentRoundResponses = [...currentRoundResponses, { model: evt.data.model, response: evt.data.response }]; - setDebateStage(`Round ${currentRound}/${maxRounds}: ${currentRoundResponses.length}/${debateModels.length} models done`); - break; - case 'round_complete': { - const roundData: DebateRound = { round: evt.data.round, responses: evt.data.responses }; - debateRounds.push(roundData); - updateNodeData(runningNodeId, { - debateData: { - rounds: [...debateRounds], - finalVerdict: null, - config: { judgeMode, format: debateFormat, maxRounds }, - }, - }); - break; - } - case 'judge_decision': { - const lastRound = debateRounds[debateRounds.length - 1]; - if (lastRound) { - lastRound.judgeDecision = { continue: evt.data.continue, reasoning: evt.data.reasoning }; - updateNodeData(runningNodeId, { - debateData: { - rounds: [...debateRounds], - finalVerdict: null, - config: { judgeMode, format: debateFormat, maxRounds }, - }, - }); - } - if (!evt.data.continue) { - setDebateStage('Judge stopped debate. Generating final verdict...'); - } else { - setDebateStage(`Judge: Continue to round ${currentRound + 1}...`); - } - break; - } - case 'model_eliminated': { - const lastRound2 = debateRounds[debateRounds.length - 1]; - if (lastRound2) { - if (!lastRound2.eliminated) lastRound2.eliminated = []; - lastRound2.eliminated.push({ - model: evt.data.model, - convincedBy: evt.data.convinced_by, - reasoning: evt.data.reasoning, - }); - } - setDebateStage(`${evt.data.model} concedes to ${evt.data.convinced_by || 'another'}...`); - break; - } - case 'convergence_status': { - const remaining = evt.data.remaining as string[]; - updateNodeData(runningNodeId, { - debateData: { - rounds: [...debateRounds], - finalVerdict: null, - config: { judgeMode, format: debateFormat, maxRounds }, - }, - }); - if (remaining.length <= 1) { - setDebateStage(`${remaining[0] || 'Winner'} is the last one standing!`); - } else { - setDebateStage(`${remaining.length} models remaining...`); - } - break; - } - case 'final_start': - finalModel = evt.data.model; - setDebateStage('Judge synthesizing final verdict...'); - setDebateStreamBuffer(''); - break; - case 'final_chunk': - finalFull += evt.data.chunk; - setDebateStreamBuffer(finalFull); - setStreamBuffer(finalFull); - break; - case 'final_complete': { - finalModel = evt.data.model; - finalFull = evt.data.response; - const responseReceivedAt = Date.now(); - const debateData: DebateData = { - rounds: debateRounds, - finalVerdict: { model: finalModel, response: finalFull }, - config: { judgeMode, format: debateFormat, maxRounds }, - }; - const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt }; - const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: finalFull }; - updateNodeData(runningNodeId, { - status: 'success', - response: finalFull, - responseReceivedAt, - debateData, - messages: [...context, newUserMsg, newAssistantMsg] as any, - }); - setDebateStage(''); - generateTitle(runningNodeId, runningPrompt, finalFull); - break; - } - case 'debate_complete': { - // If no final verdict (display_only or self_convergence without explicit final_complete) - const currentNode = nodes.find(n => n.id === runningNodeId); - if (currentNode?.data.status === 'loading') { - const responseReceivedAt = Date.now(); - const lastRoundResp = debateRounds.length > 0 ? debateRounds[debateRounds.length - 1].responses : []; - const bestResponse = lastRoundResp.length > 0 - ? lastRoundResp.reduce((a, b) => a.response.length > b.response.length ? a : b).response - : ''; - const debateData: DebateData = { - rounds: debateRounds, - finalVerdict: finalFull ? { model: finalModel, response: finalFull } : null, - config: { judgeMode, format: debateFormat, maxRounds }, - }; - const displayResponse = finalFull || bestResponse; - const newUserMsg = { id: `msg_${Date.now()}_u`, role: 'user', content: runningPrompt }; - const newAssistantMsg = { id: `msg_${Date.now()}_a`, role: 'assistant', content: displayResponse }; - updateNodeData(runningNodeId, { - status: 'success', - response: displayResponse, - responseReceivedAt, - debateData, - messages: [...context, newUserMsg, newAssistantMsg] as any, - }); - setDebateStage(''); - if (displayResponse) generateTitle(runningNodeId, runningPrompt, displayResponse); - } - break; - } - case 'error': - updateNodeData(runningNodeId, { status: 'error' }); - setDebateStage(''); - break; - } - } + // Step 2: Store taskId on node for recovery + force save + updateNodeData(runningNodeId, { taskId: task_id }); + if (currentBlueprintPath) { + saveCurrentBlueprint(currentBlueprintPath).catch(console.error); } + + // Step 3: Consume SSE stream from task + const { _consumeTaskStream } = await import('../store/flowStore'); + await _consumeTaskStream( + task_id, runningNodeId, 'debate', 0, updateNodeData, user?.username || 'test', + { + onStage: setDebateStage, + onStreamBuffer: (text) => { setDebateStreamBuffer(text); setStreamBuffer(text); }, + onComplete: () => { + setDebateStage(''); + const currentNode = nodes.find(n => n.id === runningNodeId); + const finalResponse = currentNode?.data.response || ''; + if (finalResponse) generateTitle(runningNodeId, runningPrompt, finalResponse); + }, + onError: () => { setDebateStage(''); }, + }, + ); } catch (error) { console.error(error); - updateNodeData(runningNodeId, { status: 'error' }); + updateNodeData(runningNodeId, { status: 'error', taskId: undefined }); setDebateStage(''); } finally { setStreamingNodeId(prev => prev === runningNodeId ? null : prev); @@ -2968,7 +2764,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }`} > {selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <Users size={16} />} - {selectedNode.data.status === 'loading' && councilStage ? councilStage : `Run Council (${(selectedNode.data.councilModels || []).length})`} + {selectedNode.data.status === 'loading' && (councilStage || selectedNode.data.taskStage) ? (councilStage || selectedNode.data.taskStage) : `Run Council (${(selectedNode.data.councilModels || []).length})`} </button> <div className={`text-center text-[10px] mt-0.5 leading-tight ${isDark ? 'text-gray-600' : 'text-gray-400'}`}> Inspired by <a href="https://github.com/karpathy/llm-council" target="_blank" rel="noopener noreferrer" className={`underline decoration-dotted ${isDark ? 'text-gray-500 hover:text-gray-400' : 'text-gray-500 hover:text-gray-600'}`}>karpathy/llm-council</a> @@ -2985,7 +2781,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { }`} > {selectedNode.data.status === 'loading' ? <Loader2 className="animate-spin" size={16} /> : <MessageSquare size={16} />} - {selectedNode.data.status === 'loading' && debateStage ? debateStage : `Run Debate (${(selectedNode.data.debateModels || []).length})`} + {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) ? (debateStage || selectedNode.data.taskStage) : `Run Debate (${(selectedNode.data.debateModels || []).length})`} </button> ) : ( <button @@ -3086,7 +2882,7 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { <div> {selectedNode.data.councilData.stage3 ? ( <div className={`text-xs mb-1 ${isDark ? 'text-gray-500' : 'text-gray-400'}`}>Chairman: {selectedNode.data.councilData.stage3.model}</div> - ) : selectedNode.data.status === 'loading' && councilStage.includes('Stage 3') ? ( + ) : selectedNode.data.status === 'loading' && (councilStage.includes('Stage 3') || (selectedNode.data.taskStage || '').includes('Stage 3')) ? ( <div className={`text-xs mb-1 flex items-center gap-1 ${isDark ? 'text-amber-400' : 'text-amber-600'}`}><Loader2 className="animate-spin" size={10} /> Synthesizing...</div> ) : null} <div className={`p-3 rounded-md border min-h-[150px] text-sm prose prose-sm max-w-none ${ @@ -3333,9 +3129,9 @@ const Sidebar: React.FC<SidebarProps> = ({ isOpen, onToggle, onInteract }) => { </div> ); })} - {selectedNode.data.status === 'loading' && debateStage && ( + {selectedNode.data.status === 'loading' && (debateStage || selectedNode.data.taskStage) && ( <div className={`flex items-center gap-2 p-2 text-xs ${isDark ? 'text-cyan-400' : 'text-cyan-600'}`}> - <Loader2 className="animate-spin" size={12} /> {debateStage} + <Loader2 className="animate-spin" size={12} /> {debateStage || selectedNode.data.taskStage} </div> )} </div> |
