diff options
| author | blackhao <13851610112@163.com> | 2025-12-08 15:07:12 -0600 |
|---|---|---|
| committer | blackhao <13851610112@163.com> | 2025-12-08 15:07:12 -0600 |
| commit | f97b7a1bfa220a0947f2cd63c23f4faa9fcd42e7 (patch) | |
| tree | d1d6eb9e5196afb7bac8a22d0d7587aedcada450 /frontend/src/store | |
| parent | 93dbe11014cf967690727c25e89d9d1075008c24 (diff) | |
merge logic
Diffstat (limited to 'frontend/src/store')
| -rw-r--r-- | frontend/src/store/flowStore.ts | 1357 |
1 files changed, 1253 insertions, 104 deletions
diff --git a/frontend/src/store/flowStore.ts b/frontend/src/store/flowStore.ts index 0c90357..636113d 100644 --- a/frontend/src/store/flowStore.ts +++ b/frontend/src/store/flowStore.ts @@ -21,6 +21,8 @@ export interface Message { id?: string; role: 'user' | 'assistant' | 'system'; content: string; + sourceTraceId?: string; // For merged traces: which trace this message came from + sourceTraceColor?: string; // For merged traces: color of the source trace } export interface Trace { @@ -30,6 +32,20 @@ export interface Trace { messages: Message[]; } +// Merge strategy types +export type MergeStrategy = 'query_time' | 'response_time' | 'trace_order' | 'grouped' | 'interleaved' | 'summary'; + +// Merged trace - combines multiple traces with a strategy +export interface MergedTrace { + id: string; + sourceNodeId: string; + sourceTraceIds: string[]; // IDs of traces being merged (in order for trace_order) + strategy: MergeStrategy; + colors: string[]; // Colors from source traces (for alternating display) + messages: Message[]; // Computed merged messages + summarizedContent?: string; // For summary strategy, stores the LLM-generated summary +} + export interface NodeData { label: string; model: string; @@ -44,13 +60,19 @@ export interface NodeData { // Traces logic traces: Trace[]; // INCOMING Traces - outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks) + outgoingTraces: Trace[]; // ALL Outgoing (inherited + self + forks + merged) forkedTraces: Trace[]; // Manually created forks from "New" handle + mergedTraces: MergedTrace[]; // Merged traces from multiple inputs activeTraceIds: string[]; response: string; status: NodeStatus; - inputs: number; + inputs: number; + + // Timestamps for merge logic + querySentAt?: number; // Unix timestamp when query was sent + responseReceivedAt?: number; // Unix timestamp when response was received + [key: string]: any; } @@ -71,12 +93,16 @@ interface FlowState { edges: Edge[]; selectedNodeId: string | null; archivedNodes: ArchivedNode[]; // Stored node templates + theme: 'light' | 'dark'; onNodesChange: OnNodesChange; onEdgesChange: OnEdgesChange; onConnect: OnConnect; addNode: (node: LLMNode) => void; + toggleTheme: () => void; + autoLayout: () => void; + findNonOverlappingPosition: (baseX: number, baseY: number) => { x: number; y: number }; updateNodeData: (nodeId: string, data: Partial<NodeData>) => void; setSelectedNode: (nodeId: string | null) => void; @@ -86,6 +112,7 @@ interface FlowState { deleteEdge: (edgeId: string) => void; deleteNode: (nodeId: string) => void; deleteBranch: (startNodeId?: string, startEdgeId?: string) => void; + deleteTrace: (startEdgeId: string) => void; // Archive actions toggleNodeDisabled: (nodeId: string) => void; @@ -97,6 +124,37 @@ interface FlowState { toggleTraceDisabled: (edgeId: string) => void; updateEdgeStyles: () => void; + // Quick Chat helpers + isTraceComplete: (trace: Trace) => boolean; + getTraceNodeIds: (trace: Trace) => string[]; + createQuickChatNode: ( + fromNodeId: string, + trace: Trace | null, + userPrompt: string, + response: string, + model: string, + config: Partial<NodeData> + ) => string; // Returns new node ID + + // Merge trace functions + createMergedTrace: ( + nodeId: string, + sourceTraceIds: string[], + strategy: MergeStrategy + ) => string; // Returns merged trace ID + updateMergedTrace: ( + nodeId: string, + mergedTraceId: string, + updates: { sourceTraceIds?: string[]; strategy?: MergeStrategy; summarizedContent?: string } + ) => void; + deleteMergedTrace: (nodeId: string, mergedTraceId: string) => void; + computeMergedMessages: ( + nodeId: string, + sourceTraceIds: string[], + strategy: MergeStrategy, + tracesOverride?: Trace[] + ) => Message[]; + propagateTraces: () => void; } @@ -115,11 +173,134 @@ const useFlowStore = create<FlowState>((set, get) => ({ edges: [], selectedNodeId: null, archivedNodes: [], + theme: 'light' as const, + + toggleTheme: () => { + const newTheme = get().theme === 'light' ? 'dark' : 'light'; + set({ theme: newTheme }); + // Update document class for global CSS + if (newTheme === 'dark') { + document.documentElement.classList.add('dark'); + } else { + document.documentElement.classList.remove('dark'); + } + }, + + findNonOverlappingPosition: (baseX: number, baseY: number) => { + const { nodes } = get(); + const nodeWidth = 220; + const nodeHeight = 80; + const padding = 10; + + let x = baseX; + let y = baseY; + let attempts = 0; + const maxAttempts = 30; + + const isOverlapping = (testX: number, testY: number) => { + return nodes.some(node => { + const nodeX = node.position.x; + const nodeY = node.position.y; + return !(testX + nodeWidth + padding < nodeX || + testX > nodeX + nodeWidth + padding || + testY + nodeHeight + padding < nodeY || + testY > nodeY + nodeHeight + padding); + }); + }; + + // Try positions in a tighter spiral pattern + while (isOverlapping(x, y) && attempts < maxAttempts) { + attempts++; + const angle = attempts * 0.7; + const radius = 30 + attempts * 15; + x = baseX + Math.cos(angle) * radius; + y = baseY + Math.sin(angle) * radius; + } + + return { x, y }; + }, + + autoLayout: () => { + const { nodes, edges } = get(); + if (nodes.length === 0) return; + + // Find root nodes (no incoming edges) + const nodesWithIncoming = new Set(edges.map(e => e.target)); + const rootNodes = nodes.filter(n => !nodesWithIncoming.has(n.id)); + + // BFS to layout nodes in levels + const nodePositions: Map<string, { x: number; y: number }> = new Map(); + const visited = new Set<string>(); + const queue: { id: string; level: number; index: number }[] = []; + + const horizontalSpacing = 350; + const verticalSpacing = 150; + + // Initialize with root nodes + rootNodes.forEach((node, index) => { + queue.push({ id: node.id, level: 0, index }); + visited.add(node.id); + }); + + // Track nodes per level for vertical positioning + const nodesPerLevel: Map<number, number> = new Map(); + + while (queue.length > 0) { + const { id, level, index } = queue.shift()!; + + // Count nodes at this level + const currentCount = nodesPerLevel.get(level) || 0; + nodesPerLevel.set(level, currentCount + 1); + + // Calculate position + const x = 100 + level * horizontalSpacing; + const y = 100 + currentCount * verticalSpacing; + nodePositions.set(id, { x, y }); + + // Find child nodes + const outgoingEdges = edges.filter(e => e.source === id); + outgoingEdges.forEach((edge, i) => { + if (!visited.has(edge.target)) { + visited.add(edge.target); + queue.push({ id: edge.target, level: level + 1, index: i }); + } + }); + } + + // Handle orphan nodes (not connected to anything) + let orphanY = 100; + nodes.forEach(node => { + if (!nodePositions.has(node.id)) { + nodePositions.set(node.id, { x: 100, y: orphanY }); + orphanY += verticalSpacing; + } + }); + + // Apply positions + set({ + nodes: nodes.map(node => ({ + ...node, + position: nodePositions.get(node.id) || node.position + })) + }); + }, onNodesChange: (changes: NodeChange[]) => { + // Check if any nodes are being removed + const hasRemovals = changes.some(c => c.type === 'remove'); + set({ nodes: applyNodeChanges(changes, get().nodes) as LLMNode[], }); + + // If nodes were removed, also clean up related edges and propagate traces + if (hasRemovals) { + const removedIds = changes.filter(c => c.type === 'remove').map(c => c.id); + set({ + edges: get().edges.filter(e => !removedIds.includes(e.source) && !removedIds.includes(e.target)) + }); + get().propagateTraces(); + } }, onEdgesChange: (changes: EdgeChange[]) => { set({ @@ -128,52 +309,276 @@ const useFlowStore = create<FlowState>((set, get) => ({ get().propagateTraces(); }, onConnect: (connection: Connection) => { - const { nodes } = get(); + const { nodes, edges } = get(); + const sourceNode = nodes.find(n => n.id === connection.source); + if (!sourceNode) return; + + // Handle prepend connections - only allow from 'new-trace' handle + // Prepend makes the source node become the NEW HEAD of an existing trace + if (connection.targetHandle?.startsWith('prepend-')) { + // Prepend connections MUST come from 'new-trace' handle + if (connection.sourceHandle !== 'new-trace') { + console.warn('Prepend connections only allowed from new-trace handle'); + return; + } + + const targetNode = nodes.find(n => n.id === connection.target); + if (!targetNode) return; + + // Get the trace ID from the prepend handle - this is the trace we're joining + const traceId = connection.targetHandle.replace('prepend-', '').replace('inherited-', ''); + const regularTrace = targetNode.data.outgoingTraces?.find(t => t.id === traceId); + const mergedTrace = targetNode.data.mergedTraces?.find(m => m.id === traceId); + const traceColor = regularTrace?.color || (mergedTrace?.colors?.[0]) || getStableColor(traceId); + + // Instead of creating a new trace, we JOIN the existing trace + // The source node (C) becomes the new head of the trace + // We use the SAME trace ID so it's truly the same trace + + // Add this trace as a "forked trace" on the source node so it shows up as an output handle + // But use the ORIGINAL trace ID (not a new prepend-xxx ID) + const inheritedTrace: Trace = { + id: traceId, // Use the SAME trace ID! + sourceNodeId: sourceNode.id, // C is now the source/head + color: traceColor, + messages: [] // Will be populated by propagateTraces + }; + + // Add to source node's forkedTraces + get().updateNodeData(sourceNode.id, { + forkedTraces: [...(sourceNode.data.forkedTraces || []), inheritedTrace] + }); + + // Update target node's forkedTrace to mark it as "prepended" + // by setting sourceNodeId to the new head (source node) + const targetForked = targetNode.data.forkedTraces || []; + const updatedForked = targetForked.map(t => + t.id === traceId ? { ...t, sourceNodeId: sourceNode.id } : t + ); + if (JSON.stringify(updatedForked) !== JSON.stringify(targetForked)) { + get().updateNodeData(targetNode.id, { + forkedTraces: updatedForked + }); + } + + // Create the edge using the SAME trace ID + // This connects C's output to A's prepend input + set({ + edges: addEdge({ + ...connection, + sourceHandle: `trace-${traceId}`, // Same trace ID! + style: { stroke: traceColor, strokeWidth: 2 } + }, get().edges), + }); + + setTimeout(() => get().propagateTraces(), 0); + return; + } + + // Helper to trace back the path of a trace by following edges upstream + const duplicateTracePath = (traceId: string, forkAtNodeId: string): { newTraceId: string, newEdges: Edge[], firstNodeId: string } | null => { + // Trace back from forkAtNodeId to find the origin of this trace + // We follow incoming edges that match the trace pattern + + const pathNodes: string[] = [forkAtNodeId]; + const pathEdges: Edge[] = []; + let currentNodeId = forkAtNodeId; + + // Trace backwards through incoming edges + while (true) { + // Find incoming edge to current node that's part of this trace + const incomingEdge = edges.find(e => + e.target === currentNodeId && + e.sourceHandle?.startsWith('trace-') + ); + + if (!incomingEdge) break; // Reached the start of the trace + + pathNodes.unshift(incomingEdge.source); + pathEdges.unshift(incomingEdge); + currentNodeId = incomingEdge.source; + } + + // If path only has one node, no upstream to duplicate + if (pathNodes.length <= 1) return null; + + const firstNodeId = pathNodes[0]; + const firstNode = nodes.find(n => n.id === firstNodeId); + if (!firstNode) return null; + + // Create a new trace ID for the duplicated path + const timestamp = Date.now(); + const newTraceId = `fork-${firstNodeId}-${timestamp}`; + const newTraceColor = getStableColor(newTraceId); + + // Create new edges for the entire path + const newEdges: Edge[] = []; + let evolvedTraceId = newTraceId; + + // Track which input handles we're creating for new edges + const newInputHandles: Map<string, number> = new Map(); + + for (let i = 0; i < pathEdges.length; i++) { + const originalEdge = pathEdges[i]; + const fromNodeId = pathNodes[i]; + const toNodeId = pathNodes[i + 1]; + + // Find the next available input handle for the target node + // Count existing edges to this node + any new edges we're creating + const existingEdgesToTarget = edges.filter(e => e.target === toNodeId).length; + const newEdgesToTarget = newInputHandles.get(toNodeId) || 0; + const nextInputIndex = existingEdgesToTarget + newEdgesToTarget; + newInputHandles.set(toNodeId, newEdgesToTarget + 1); + + newEdges.push({ + id: `edge-fork-${timestamp}-${i}`, + source: fromNodeId, + target: toNodeId, + sourceHandle: `trace-${evolvedTraceId}`, + targetHandle: `input-${nextInputIndex}`, + style: { stroke: newTraceColor, strokeWidth: 2 } + }); + + // Evolve trace ID for next edge + evolvedTraceId = `${evolvedTraceId}_${toNodeId}`; + } + + // Find the messages up to the fork point + const originalTrace = sourceNode.data.outgoingTraces?.find(t => t.id === traceId); + const messagesUpToFork = originalTrace?.messages || []; + + // Add the new trace as a forked trace on the first node + const newForkTrace: Trace = { + id: newTraceId, + sourceNodeId: firstNodeId, + color: newTraceColor, + messages: [...messagesUpToFork] + }; + + get().updateNodeData(firstNodeId, { + forkedTraces: [...(firstNode.data.forkedTraces || []), newForkTrace] + }); + + return { newTraceId, newEdges, firstNodeId }; + }; - // Check if connecting from "new-trace" handle + // Helper to create a simple forked trace (for new-trace handle or first connection) + const createSimpleForkTrace = () => { + let originalTraceMessages: Message[] = []; + + if (connection.sourceHandle?.startsWith('trace-')) { + const originalTraceId = connection.sourceHandle.replace('trace-', ''); + const originalTrace = sourceNode.data.outgoingTraces?.find(t => t.id === originalTraceId); + if (originalTrace) { + originalTraceMessages = [...originalTrace.messages]; + } + } + + if (originalTraceMessages.length === 0) { + if (sourceNode.data.userPrompt) { + originalTraceMessages.push({ id: `${sourceNode.id}-u`, role: 'user', content: sourceNode.data.userPrompt }); + } + if (sourceNode.data.response) { + originalTraceMessages.push({ id: `${sourceNode.id}-a`, role: 'assistant', content: sourceNode.data.response }); + } + } + + const newForkId = `fork-${sourceNode.id}-${Date.now()}`; + const newForkTrace: Trace = { + id: newForkId, + sourceNodeId: sourceNode.id, + color: getStableColor(newForkId), + messages: originalTraceMessages + }; + + get().updateNodeData(sourceNode.id, { + forkedTraces: [...(sourceNode.data.forkedTraces || []), newForkTrace] + }); + + return newForkTrace; + }; + + // Check if connecting from "new-trace" handle - always create simple fork if (connection.sourceHandle === 'new-trace') { - // Logic: Create a new Forked Trace on the source node - const sourceNode = nodes.find(n => n.id === connection.source); - if (sourceNode) { - // Generate the content for this new trace (it's essentially the Self Trace of this node) - const myResponseMsg: Message[] = []; - if (sourceNode.data.userPrompt) myResponseMsg.push({ id: `${sourceNode.id}-u`, role: 'user', content: sourceNode.data.userPrompt }); - if (sourceNode.data.response) myResponseMsg.push({ id: `${sourceNode.id}-a`, role: 'assistant', content: sourceNode.data.response }); - - const newForkId = `trace-${sourceNode.id}-fork-${Date.now()}`; - const newForkTrace: Trace = { - id: newForkId, - sourceNodeId: sourceNode.id, - color: getStableColor(newForkId), // Unique color for this fork - messages: [...myResponseMsg] - }; - - // Update Source Node to include this fork - get().updateNodeData(sourceNode.id, { - forkedTraces: [...(sourceNode.data.forkedTraces || []), newForkTrace] - }); - - // Redirect connection to the new handle - // Note: We must wait for propagateTraces to render the new handle? - // ReactFlow might complain if handle doesn't exist yet. - // But since we updateNodeData synchronously (mostly), it might work. - // Let's use the new ID for the connection. - - set({ - edges: addEdge({ - ...connection, - sourceHandle: `trace-${newForkId}`, // Redirect! - style: { stroke: newForkTrace.color, strokeWidth: 2 } - }, get().edges), - }); - - // Trigger propagation to update downstream - setTimeout(() => get().propagateTraces(), 0); - return; - } + const newForkTrace = createSimpleForkTrace(); + + set({ + edges: addEdge({ + ...connection, + sourceHandle: `trace-${newForkTrace.id}`, + style: { stroke: newForkTrace.color, strokeWidth: 2 } + }, get().edges), + }); + + setTimeout(() => get().propagateTraces(), 0); + return; + } + + // Check if this trace handle already has a downstream connection + const existingEdgeFromHandle = edges.find( + e => e.source === connection.source && e.sourceHandle === connection.sourceHandle + ); + + if (existingEdgeFromHandle && connection.sourceHandle?.startsWith('trace-')) { + // This handle already has a connection - need to duplicate the entire upstream trace path + const originalTraceId = connection.sourceHandle.replace('trace-', ''); + const duplicateResult = duplicateTracePath(originalTraceId, connection.source!); + + if (duplicateResult) { + // Add all the duplicated edges plus the new connection + const { newTraceId, newEdges, firstNodeId } = duplicateResult; + const newTraceColor = getStableColor(newTraceId); + + // Calculate the evolved trace ID at the fork node + // The trace evolves through each node: trace-A -> trace-A_B -> trace-A_B_C + // We need to build the evolved ID based on the path + let evolvedTraceId = newTraceId; + + // Find the path from first node to fork node by looking at the new edges + for (const edge of newEdges) { + if (edge.target === connection.source) { + // This edge ends at our fork node, so the evolved trace ID is after this edge + evolvedTraceId = `${evolvedTraceId}_${edge.target}`; + break; + } + evolvedTraceId = `${evolvedTraceId}_${edge.target}`; + } + + set({ + edges: [ + ...get().edges, + ...newEdges, + { + id: `edge-${connection.source}-${connection.target}-${Date.now()}`, + source: connection.source!, + target: connection.target!, + sourceHandle: `trace-${evolvedTraceId}`, + targetHandle: connection.targetHandle, + style: { stroke: newTraceColor, strokeWidth: 2 } + } as Edge + ], + }); + + setTimeout(() => get().propagateTraces(), 0); + return; + } else { + // Fallback to simple fork if path duplication fails + const newForkTrace = createSimpleForkTrace(); + + set({ + edges: addEdge({ + ...connection, + sourceHandle: `trace-${newForkTrace.id}`, + style: { stroke: newForkTrace.color, strokeWidth: 2 } + }, get().edges), + }); + + setTimeout(() => get().propagateTraces(), 0); + return; + } } - // Normal connection + // Normal connection - no existing edge from this handle set({ edges: addEdge({ ...connection, @@ -211,16 +616,52 @@ const useFlowStore = create<FlowState>((set, get) => ({ const node = get().nodes.find(n => n.id === nodeId); if (!node) return []; - // The traces stored in node.data.traces are the INCOMING traces. - // If we select one, we want its history. + const activeIds = node.data.activeTraceIds || []; + if (activeIds.length === 0) return []; - const activeTraces = node.data.traces.filter(t => - node.data.activeTraceIds?.includes(t.id) - ); + // Collect all traces by ID to avoid duplicates + const tracesById = new Map<string, Trace>(); + + // Add incoming traces + (node.data.traces || []).forEach((t: Trace) => { + if (activeIds.includes(t.id)) { + tracesById.set(t.id, t); + } + }); + + // Add outgoing traces (only if not already in incoming) + (node.data.outgoingTraces || []).forEach((t: Trace) => { + if (activeIds.includes(t.id) && !tracesById.has(t.id)) { + tracesById.set(t.id, t); + } + }); + // Collect messages from selected traces const contextMessages: Message[] = []; - activeTraces.forEach(t => { - contextMessages.push(...t.messages); + const nodePrefix = `${nodeId}-`; + + tracesById.forEach((t: Trace) => { + // For traces originated by this node, filter out this node's own messages + const isOriginated = t.id === `trace-${nodeId}` || + t.id.startsWith('fork-') || + (t.id.startsWith('prepend-') && t.id.includes(`-from-${nodeId}`)); + + if (isOriginated) { + // Only include prepended upstream messages + const prependedMessages = t.messages.filter(m => !m.id?.startsWith(nodePrefix)); + contextMessages.push(...prependedMessages); + } else { + // Include all messages for incoming traces + contextMessages.push(...t.messages); + } + }); + + // Check merged traces + const activeMerged = (node.data.mergedTraces || []).filter((m: MergedTrace) => + activeIds.includes(m.id) + ); + activeMerged.forEach((m: MergedTrace) => { + contextMessages.push(...m.messages); }); return contextMessages; @@ -299,6 +740,90 @@ const useFlowStore = create<FlowState>((set, get) => ({ get().propagateTraces(); }, + deleteTrace: (startEdgeId: string) => { + const { edges, nodes } = get(); + // Delete edges along the trace AND orphaned nodes (nodes with no remaining connections) + const edgesToDelete = new Set<string>(); + const nodesInTrace = new Set<string>(); + + // Helper to traverse downstream EDGES based on Trace Dependency + const traverse = (currentEdge: Edge) => { + if (edgesToDelete.has(currentEdge.id)) return; + edgesToDelete.add(currentEdge.id); + + const targetNodeId = currentEdge.target; + nodesInTrace.add(targetNodeId); + + // Identify the trace ID carried by this edge + const traceId = currentEdge.sourceHandle?.replace('trace-', ''); + if (!traceId) return; + + // Look for outgoing edges from the target node that carry the EVOLUTION of this trace. + const expectedNextTraceId = `${traceId}_${targetNodeId}`; + + const outgoing = edges.filter(e => e.source === targetNodeId); + outgoing.forEach(nextEdge => { + if (nextEdge.sourceHandle === `trace-${expectedNextTraceId}`) { + traverse(nextEdge); + } + }); + }; + + // Also traverse backwards to find upstream nodes + const traverseBackward = (currentEdge: Edge) => { + if (edgesToDelete.has(currentEdge.id)) return; + edgesToDelete.add(currentEdge.id); + + const sourceNodeId = currentEdge.source; + nodesInTrace.add(sourceNodeId); + + // Find the incoming edge to the source node that is part of the same trace + const traceId = currentEdge.sourceHandle?.replace('trace-', ''); + if (!traceId) return; + + // Find the parent trace ID by removing the last _nodeId suffix + const lastUnderscore = traceId.lastIndexOf('_'); + if (lastUnderscore > 0) { + const parentTraceId = traceId.substring(0, lastUnderscore); + const incoming = edges.filter(e => e.target === sourceNodeId); + incoming.forEach(prevEdge => { + if (prevEdge.sourceHandle === `trace-${parentTraceId}`) { + traverseBackward(prevEdge); + } + }); + } + }; + + const startEdge = edges.find(e => e.id === startEdgeId); + if (startEdge) { + // Traverse forward + traverse(startEdge); + // Traverse backward + traverseBackward(startEdge); + } + + // Filter remaining edges after deletion + const remainingEdges = edges.filter(e => !edgesToDelete.has(e.id)); + + // Find nodes that become orphaned (no connections at all after edge deletion) + const nodesToDelete = new Set<string>(); + nodesInTrace.forEach(nodeId => { + const hasRemainingEdges = remainingEdges.some( + e => e.source === nodeId || e.target === nodeId + ); + if (!hasRemainingEdges) { + nodesToDelete.add(nodeId); + } + }); + + set({ + nodes: nodes.filter(n => !nodesToDelete.has(n.id)), + edges: remainingEdges + }); + + get().propagateTraces(); + }, + toggleNodeDisabled: (nodeId: string) => { const node = get().nodes.find(n => n.id === nodeId); if (node) { @@ -365,6 +890,7 @@ const useFlowStore = create<FlowState>((set, get) => ({ traces: [], outgoingTraces: [], forkedTraces: [], + mergedTraces: [], activeTraceIds: [], response: '', status: 'idle', @@ -465,6 +991,415 @@ const useFlowStore = create<FlowState>((set, get) => ({ set({ edges: updatedEdges }); }, + // Check if all nodes in trace path have complete Q&A + isTraceComplete: (trace: Trace) => { + // A trace is complete if all nodes in the path have complete Q&A pairs + const messages = trace.messages; + + if (messages.length === 0) { + // Empty trace - check if the source node has content + const { nodes } = get(); + const sourceNode = nodes.find(n => n.id === trace.sourceNodeId); + if (sourceNode) { + return !!sourceNode.data.userPrompt && !!sourceNode.data.response; + } + return true; // No source node, assume complete + } + + // Extract node IDs from message IDs (format: nodeId-user or nodeId-assistant) + // Group messages by node and check each node has both user and assistant + const nodeMessages = new Map<string, { hasUser: boolean; hasAssistant: boolean }>(); + + for (const msg of messages) { + if (!msg.id) continue; + + // Parse nodeId from message ID (format: nodeId-user or nodeId-assistant) + const parts = msg.id.split('-'); + if (parts.length < 2) continue; + + // The nodeId is everything except the last part (user/assistant) + const lastPart = parts[parts.length - 1]; + const nodeId = lastPart === 'user' || lastPart === 'assistant' + ? parts.slice(0, -1).join('-') + : msg.id; // Fallback: use whole ID if format doesn't match + + if (!nodeMessages.has(nodeId)) { + nodeMessages.set(nodeId, { hasUser: false, hasAssistant: false }); + } + + const nodeData = nodeMessages.get(nodeId)!; + if (msg.role === 'user') nodeData.hasUser = true; + if (msg.role === 'assistant') nodeData.hasAssistant = true; + } + + // Check that ALL nodes in the trace have both user and assistant messages + for (const [nodeId, data] of nodeMessages) { + if (!data.hasUser || !data.hasAssistant) { + return false; // This node is incomplete + } + } + + // Must have at least one complete node + return nodeMessages.size > 0; + }, + + // Get all node IDs in trace path + getTraceNodeIds: (trace: Trace) => { + const traceId = trace.id; + const parts = traceId.replace('trace-', '').split('_'); + return parts.filter(p => p.startsWith('node') || get().nodes.some(n => n.id === p)); + }, + + // Create a new node for quick chat, with proper connection + createQuickChatNode: ( + fromNodeId: string, + trace: Trace | null, + userPrompt: string, + response: string, + model: string, + config: Partial<NodeData> + ) => { + const { nodes, edges, addNode, updateNodeData } = get(); + const fromNode = nodes.find(n => n.id === fromNodeId); + + if (!fromNode) return ''; + + // Check if current node is empty (no response) -> overwrite it + const isCurrentNodeEmpty = !fromNode.data.response; + + if (isCurrentNodeEmpty) { + // Overwrite current node + updateNodeData(fromNodeId, { + userPrompt, + response, + model, + status: 'success', + querySentAt: Date.now(), + responseReceivedAt: Date.now(), + ...config + }); + return fromNodeId; + } + + // Create new node to the right + const newNodeId = `node_${Date.now()}`; + const newPos = { + x: fromNode.position.x + 300, + y: fromNode.position.y + }; + + const newNode: LLMNode = { + id: newNodeId, + type: 'llmNode', + position: newPos, + data: { + label: 'Quick Chat', + model, + temperature: config.temperature || 0.7, + systemPrompt: '', + userPrompt, + mergeStrategy: 'smart', + reasoningEffort: config.reasoningEffort || 'medium', + enableGoogleSearch: config.enableGoogleSearch, + traces: [], + outgoingTraces: [], + forkedTraces: [], + mergedTraces: [], + activeTraceIds: [], + response, + status: 'success', + inputs: 1, + querySentAt: Date.now(), + responseReceivedAt: Date.now(), + ...config + } + }; + + addNode(newNode); + + // Connect from the source node using new-trace handle (creates a fork) + setTimeout(() => { + const store = useFlowStore.getState(); + store.onConnect({ + source: fromNodeId, + sourceHandle: 'new-trace', + target: newNodeId, + targetHandle: 'input-0' + }); + }, 50); + + return newNodeId; + }, + + // Compute merged messages based on strategy + // Optional tracesOverride parameter to use latest traces during propagation + computeMergedMessages: (nodeId: string, sourceTraceIds: string[], strategy: MergeStrategy, tracesOverride?: Trace[]): Message[] => { + const { nodes } = get(); + const node = nodes.find(n => n.id === nodeId); + if (!node) return []; + + // Use override traces if provided (for propagation), otherwise use node's stored traces + const availableTraces = tracesOverride || node.data.traces || []; + + // Get the source traces + const sourceTraces = sourceTraceIds + .map(id => availableTraces.find((t: Trace) => t.id === id)) + .filter((t): t is Trace => t !== undefined); + + if (sourceTraces.length === 0) return []; + + // Helper to add source trace info to a message + const tagMessage = (msg: Message, trace: Trace): Message => ({ + ...msg, + sourceTraceId: trace.id, + sourceTraceColor: trace.color + }); + + // Helper to get timestamp for a message based on node + const getMessageTimestamp = (msg: Message, type: 'query' | 'response'): number => { + // Extract node ID from message ID (format: nodeId-user or nodeId-assistant) + const msgNodeId = msg.id?.split('-')[0]; + if (!msgNodeId) return 0; + + const msgNode = nodes.find(n => n.id === msgNodeId); + if (!msgNode) return 0; + + if (type === 'query') { + return msgNode.data.querySentAt || 0; + } else { + return msgNode.data.responseReceivedAt || 0; + } + }; + + // Helper to get all messages with their timestamps and trace info + const getAllMessagesWithTime = () => { + const allMessages: { msg: Message; queryTime: number; responseTime: number; trace: Trace }[] = []; + + sourceTraces.forEach((trace) => { + trace.messages.forEach(msg => { + allMessages.push({ + msg: tagMessage(msg, trace), + queryTime: getMessageTimestamp(msg, 'query'), + responseTime: getMessageTimestamp(msg, 'response'), + trace + }); + }); + }); + + return allMessages; + }; + + switch (strategy) { + case 'query_time': { + // Sort by query time, keeping Q-A pairs together + const pairs: { user: Message | null; assistant: Message | null; time: number; trace: Trace }[] = []; + + sourceTraces.forEach(trace => { + for (let i = 0; i < trace.messages.length; i += 2) { + const user = trace.messages[i]; + const assistant = trace.messages[i + 1] || null; + const time = getMessageTimestamp(user, 'query'); + pairs.push({ + user: user ? tagMessage(user, trace) : null, + assistant: assistant ? tagMessage(assistant, trace) : null, + time, + trace + }); + } + }); + + pairs.sort((a, b) => a.time - b.time); + + const result: Message[] = []; + pairs.forEach(pair => { + if (pair.user) result.push(pair.user); + if (pair.assistant) result.push(pair.assistant); + }); + return result; + } + + case 'response_time': { + // Sort by response time, keeping Q-A pairs together + const pairs: { user: Message | null; assistant: Message | null; time: number; trace: Trace }[] = []; + + sourceTraces.forEach(trace => { + for (let i = 0; i < trace.messages.length; i += 2) { + const user = trace.messages[i]; + const assistant = trace.messages[i + 1] || null; + const time = getMessageTimestamp(assistant || user, 'response'); + pairs.push({ + user: user ? tagMessage(user, trace) : null, + assistant: assistant ? tagMessage(assistant, trace) : null, + time, + trace + }); + } + }); + + pairs.sort((a, b) => a.time - b.time); + + const result: Message[] = []; + pairs.forEach(pair => { + if (pair.user) result.push(pair.user); + if (pair.assistant) result.push(pair.assistant); + }); + return result; + } + + case 'trace_order': { + // Simply concatenate in the order of sourceTraceIds + const result: Message[] = []; + sourceTraces.forEach(trace => { + trace.messages.forEach(msg => { + result.push(tagMessage(msg, trace)); + }); + }); + return result; + } + + case 'grouped': { + // Same as trace_order but more explicitly grouped + const result: Message[] = []; + sourceTraces.forEach(trace => { + trace.messages.forEach(msg => { + result.push(tagMessage(msg, trace)); + }); + }); + return result; + } + + case 'interleaved': { + // True interleaving - sort ALL messages by their actual time + const allMessages = getAllMessagesWithTime(); + + // Sort by the earlier of query/response time for each message + allMessages.sort((a, b) => { + const aTime = a.msg.role === 'user' ? a.queryTime : a.responseTime; + const bTime = b.msg.role === 'user' ? b.queryTime : b.responseTime; + return aTime - bTime; + }); + + return allMessages.map(m => m.msg); + } + + case 'summary': { + // For summary, we return all messages in trace order with source tags + // The actual summarization is done asynchronously and stored in summarizedContent + const result: Message[] = []; + sourceTraces.forEach(trace => { + trace.messages.forEach(msg => { + result.push(tagMessage(msg, trace)); + }); + }); + return result; + } + + default: + return []; + } + }, + + // Create a new merged trace + createMergedTrace: (nodeId: string, sourceTraceIds: string[], strategy: MergeStrategy): string => { + const { nodes, updateNodeData, computeMergedMessages } = get(); + const node = nodes.find(n => n.id === nodeId); + if (!node) return ''; + + // Get colors from source traces + const colors = sourceTraceIds + .map(id => node.data.traces.find((t: Trace) => t.id === id)?.color) + .filter((c): c is string => c !== undefined); + + // Compute merged messages + const messages = computeMergedMessages(nodeId, sourceTraceIds, strategy); + + const mergedTraceId = `merged-${nodeId}-${Date.now()}`; + + const mergedTrace: MergedTrace = { + id: mergedTraceId, + sourceNodeId: nodeId, + sourceTraceIds, + strategy, + colors, + messages + }; + + const existingMerged = node.data.mergedTraces || []; + updateNodeData(nodeId, { + mergedTraces: [...existingMerged, mergedTrace] + }); + + // Trigger trace propagation to update outgoing traces + setTimeout(() => { + get().propagateTraces(); + }, 50); + + return mergedTraceId; + }, + + // Update an existing merged trace + updateMergedTrace: (nodeId: string, mergedTraceId: string, updates: { sourceTraceIds?: string[]; strategy?: MergeStrategy; summarizedContent?: string }) => { + const { nodes, updateNodeData, computeMergedMessages } = get(); + const node = nodes.find(n => n.id === nodeId); + if (!node) return; + + const existingMerged = node.data.mergedTraces || []; + const mergedIndex = existingMerged.findIndex((m: MergedTrace) => m.id === mergedTraceId); + if (mergedIndex === -1) return; + + const current = existingMerged[mergedIndex]; + const newSourceTraceIds = updates.sourceTraceIds || current.sourceTraceIds; + const newStrategy = updates.strategy || current.strategy; + + // Recompute colors if source traces changed + let newColors = current.colors; + if (updates.sourceTraceIds) { + newColors = updates.sourceTraceIds + .map(id => node.data.traces.find((t: Trace) => t.id === id)?.color) + .filter((c): c is string => c !== undefined); + } + + // Recompute messages if source or strategy changed + let newMessages = current.messages; + if (updates.sourceTraceIds || updates.strategy) { + newMessages = computeMergedMessages(nodeId, newSourceTraceIds, newStrategy); + } + + const updatedMerged = [...existingMerged]; + updatedMerged[mergedIndex] = { + ...current, + sourceTraceIds: newSourceTraceIds, + strategy: newStrategy, + colors: newColors, + messages: newMessages, + summarizedContent: updates.summarizedContent !== undefined ? updates.summarizedContent : current.summarizedContent + }; + + updateNodeData(nodeId, { mergedTraces: updatedMerged }); + + // Trigger trace propagation + setTimeout(() => { + get().propagateTraces(); + }, 50); + }, + + // Delete a merged trace + deleteMergedTrace: (nodeId: string, mergedTraceId: string) => { + const { nodes, updateNodeData } = get(); + const node = nodes.find(n => n.id === nodeId); + if (!node) return; + + const existingMerged = node.data.mergedTraces || []; + const filteredMerged = existingMerged.filter((m: MergedTrace) => m.id !== mergedTraceId); + + updateNodeData(nodeId, { mergedTraces: filteredMerged }); + + // Trigger trace propagation + setTimeout(() => { + get().propagateTraces(); + }, 50); + }, + propagateTraces: () => { const { nodes, edges } = get(); @@ -506,6 +1441,14 @@ const useFlowStore = create<FlowState>((set, get) => ({ const nodeOutgoingTraces = new Map<string, Trace[]>(); // Map<NodeID, Trace[]>: Traces ENTERING this node (to update NodeData) const nodeIncomingTraces = new Map<string, Trace[]>(); + // Map<NodeID, string[]>: Merged traces to delete from each node (disconnected sources) + const nodeMergedTracesToDelete = new Map<string, string[]>(); + // Map<NodeID, Map<MergedTraceId, UpdatedMergedTrace>>: Updated merged traces with new messages + const nodeUpdatedMergedTraces = new Map<string, Map<string, { messages: Message[], colors: string[] }>>(); + // Map<NodeID, Message[]>: Prepend messages for each node (for recursive prepend chains) + const nodePrependMessages = new Map<string, Message[]>(); + // Map<NodeID, Trace[]>: Forked traces to keep (cleanup unused ones) + const nodeForkedTracesToClean = new Map<string, Trace[]>(); // Also track Edge updates (Color AND SourceHandle) const updatedEdges = [...edges]; @@ -516,37 +1459,90 @@ const useFlowStore = create<FlowState>((set, get) => ({ const node = nodes.find(n => n.id === nodeId); if (!node) return; - // 1. Gather Incoming Traces + // 1. Gather Incoming Traces (regular and prepend) const incomingEdges = edges.filter(e => e.target === nodeId); const myIncomingTraces: Trace[] = []; - incomingEdges.forEach(edge => { + // Map: traceIdToPrepend -> Message[] (messages to prepend to that trace) + const prependMessages = new Map<string, Message[]>(); + + incomingEdges.forEach(edge => { const parentOutgoing = nodeOutgoingTraces.get(edge.source) || []; + const targetHandle = edge.targetHandle || ''; + + // Check if this is a prepend handle + const isPrependHandle = targetHandle.startsWith('prepend-'); + + // Special case: prepend connection (from a prepend trace created by onConnect) + if (isPrependHandle) { + // Find the source trace - it could be: + // 1. A prepend trace (prepend-xxx-from-yyy format) + // 2. Or any trace from the source handle + let matchedPrependTrace = parentOutgoing.find(t => edge.sourceHandle === `trace-${t.id}`); + + // Fallback: if no exact match, find by sourceHandle pattern + if (!matchedPrependTrace && edge.sourceHandle?.startsWith('trace-')) { + const sourceTraceId = edge.sourceHandle.replace('trace-', ''); + matchedPrependTrace = parentOutgoing.find(t => t.id === sourceTraceId); + } + + // Extract the original target trace ID from the prepend handle + // Format is "prepend-{traceId}" where traceId could be "fork-xxx" or "trace-xxx" + const targetTraceId = targetHandle.replace('prepend-', '').replace('inherited-', ''); + + if (matchedPrependTrace && matchedPrependTrace.messages.length > 0) { + // Prepend the messages from the source trace + const existing = prependMessages.get(targetTraceId) || []; + prependMessages.set(targetTraceId, [...existing, ...matchedPrependTrace.messages]); + + // Update edge color to match the trace + const edgeIndex = updatedEdges.findIndex(e => e.id === edge.id); + if (edgeIndex !== -1) { + const currentEdge = updatedEdges[edgeIndex]; + if (currentEdge.style?.stroke !== matchedPrependTrace.color) { + updatedEdges[edgeIndex] = { + ...currentEdge, + style: { ...currentEdge.style, stroke: matchedPrependTrace.color, strokeWidth: 2 } + }; + } + } + } else { + // Even if no messages yet, store the prepend trace info for later updates + // The prepend connection should still work when the source node gets content + const existing = prependMessages.get(targetTraceId) || []; + if (!prependMessages.has(targetTraceId)) { + prependMessages.set(targetTraceId, existing); + } + } + return; // Don't process prepend edges as regular incoming traces + } // Find match based on Handle ID - // EXACT match first - // Since we removed 'new-trace' handle, we only look for exact trace matches. let matchedTrace = parentOutgoing.find(t => edge.sourceHandle === `trace-${t.id}`); // If no exact match, try to find a "Semantic Match" (Auto-Reconnect) - // If edge.sourceHandle was 'trace-X', and now we have 'trace-X_Parent', that's a likely evolution. if (!matchedTrace && edge.sourceHandle?.startsWith('trace-')) { const oldId = edge.sourceHandle.replace('trace-', ''); matchedTrace = parentOutgoing.find(t => t.id === `${oldId}_${edge.source}`); } // Fallback: If still no match, and parent has traces, try to connect to the most logical one. - // If parent has only 1 trace, connect to it. - // This handles cases where edge.sourceHandle might be null or outdated. if (!matchedTrace && parentOutgoing.length > 0) { - // If edge has no handle ID, default to the last generated trace (usually Self Trace) if (!edge.sourceHandle) { matchedTrace = parentOutgoing[parentOutgoing.length - 1]; } } if (matchedTrace) { - myIncomingTraces.push(matchedTrace); + if (isPrependHandle) { + // This is a prepend connection from a trace handle - extract the target trace ID + const targetTraceId = targetHandle.replace('prepend-', '').replace('inherited-', ''); + const existing = prependMessages.get(targetTraceId) || []; + prependMessages.set(targetTraceId, [...existing, ...matchedTrace.messages]); + } else { + // Regular incoming trace + myIncomingTraces.push(matchedTrace); + } // Update Edge Visuals & Logical Connection const edgeIndex = updatedEdges.findIndex(e => e.id === edge.id); @@ -554,15 +1550,39 @@ const useFlowStore = create<FlowState>((set, get) => ({ const currentEdge = updatedEdges[edgeIndex]; const newHandleId = `trace-${matchedTrace.id}`; - // Check if we need to update - if (currentEdge.sourceHandle !== newHandleId || currentEdge.style?.stroke !== matchedTrace.color) { - updatedEdges[edgeIndex] = { - ...currentEdge, - sourceHandle: newHandleId, // Auto-update handle connection! - style: { ...currentEdge.style, stroke: matchedTrace.color, strokeWidth: 2 } - }; - edgesChanged = true; - } + // Check if this is a merged trace (need gradient) + const isMergedTrace = matchedTrace.id.startsWith('merged-'); + const parentNode = nodes.find(n => n.id === edge.source); + const mergedTraceData = isMergedTrace + ? parentNode?.data.mergedTraces?.find((m: MergedTrace) => m.id === matchedTrace.id) + : null; + + // Create gradient for merged traces + let gradient: string | undefined; + if (mergedTraceData && mergedTraceData.colors.length > 0) { + const colors = mergedTraceData.colors; + const gradientStops = colors.map((color: string, idx: number) => + `${color} ${(idx / colors.length) * 100}%, ${color} ${((idx + 1) / colors.length) * 100}%` + ).join(', '); + gradient = `linear-gradient(90deg, ${gradientStops})`; + } + + // Check if we need to update + if (currentEdge.sourceHandle !== newHandleId || currentEdge.style?.stroke !== matchedTrace.color) { + updatedEdges[edgeIndex] = { + ...currentEdge, + sourceHandle: newHandleId, + type: isMergedTrace ? 'merged' : undefined, + style: { ...currentEdge.style, stroke: matchedTrace.color, strokeWidth: 2 }, + data: { + ...currentEdge.data, + gradient, + isMerged: isMergedTrace, + colors: mergedTraceData?.colors || [] + } + }; + edgesChanged = true; + } } } }); @@ -593,7 +1613,7 @@ const useFlowStore = create<FlowState>((set, get) => ({ const myOutgoingTraces: Trace[] = []; - // A. Pass-through traces (append history) + // A. Pass-through traces (append history) - only if there's a downstream edge uniqueIncoming.forEach(t => { // When a trace passes through a node and gets modified, it effectively becomes a NEW branch of that trace. // We must append the current node ID to the trace ID to distinguish branches. @@ -601,46 +1621,141 @@ const useFlowStore = create<FlowState>((set, get) => ({ // If it passes Node B -> becomes "root_B" // Downstream Node D can then distinguish "root_A" from "root_B". - // Match Logic: - // We need to find if this edge was PREVIOUSLY connected to a trace that has now evolved into 'newTrace'. - // The edge.sourceHandle might be the OLD ID. - // We need a heuristic: if edge.sourceHandle contains the ROOT ID of this trace, we assume it's a match. - // But this is risky if multiple branches exist. - - // Better heuristic: - // When we extend a trace t -> t_new (with id t.id + '_' + node.id), - // we record this evolution mapping. - const newTraceId = `${t.id}_${node.id}`; - myOutgoingTraces.push({ - ...t, - id: newTraceId, - messages: [...t.messages, ...myResponseMsg] - }); + // Only create pass-through if there's actually a downstream edge using it + const hasDownstreamEdge = updatedEdges.some(e => + e.source === node.id && + (e.sourceHandle === `trace-${newTraceId}` || e.sourceHandle === `trace-${t.id}`) + ); + + if (hasDownstreamEdge) { + myOutgoingTraces.push({ + ...t, + id: newTraceId, + // Keep original sourceNodeId - this is a pass-through, not originated here + sourceNodeId: t.sourceNodeId, + messages: [...t.messages, ...myResponseMsg] + }); + } }); - // B. Self Trace (New Branch) -> This is the "Default" self trace (always there?) - // Actually, if we use Manual Forks, maybe we don't need an automatic self trace? - // Or maybe the "Default" self trace is just one of the outgoing ones. - // Let's keep it for compatibility if downstream picks it up automatically. - const selfTrace: Trace = { - id: `trace-${node.id}`, - sourceNodeId: node.id, - color: getStableColor(node.id), - messages: [...myResponseMsg] - }; - myOutgoingTraces.push(selfTrace); + // B. Self Trace (New Branch) -> Only create if there's a downstream edge using it + const selfTraceId = `trace-${node.id}`; + const selfPrepend = prependMessages.get(selfTraceId) || []; + + // Store prepend messages for this node (for recursive prepend chains) + // This includes any prepend from upstream and this node's own messages + if (selfPrepend.length > 0 || myResponseMsg.length > 0) { + nodePrependMessages.set(node.id, [...selfPrepend, ...myResponseMsg]); + } + + // Only add self trace to outgoing if there's actually a downstream edge using it + // Check if any edge uses this self trace as source + // Edge sourceHandle format: "trace-{traceId}" where traceId = "trace-{nodeId}" + // So sourceHandle would be "trace-trace-{nodeId}" + const hasSelfTraceEdge = updatedEdges.some(e => + e.source === node.id && + (e.sourceHandle === `trace-${selfTraceId}` || e.sourceHandle === selfTraceId) + ); + + if (hasSelfTraceEdge) { + const selfTrace: Trace = { + id: selfTraceId, + sourceNodeId: node.id, + color: getStableColor(node.id), + messages: [...selfPrepend, ...myResponseMsg] + }; + myOutgoingTraces.push(selfTrace); + } - // C. Manual Forks + // C. Manual Forks - only include forks that have downstream edges if (node.data.forkedTraces) { - // We need to keep them updated with the latest messages (if prompt changed) - // But keep their IDs and Colors stable. - const updatedForks = node.data.forkedTraces.map(fork => ({ - ...fork, - messages: [...myResponseMsg] // Re-sync messages - })); + // Filter to only forks that are actually being used (have edges) + const activeForks = node.data.forkedTraces.filter(fork => { + return updatedEdges.some(e => + e.source === node.id && e.sourceHandle === `trace-${fork.id}` + ); + }); + + // Update messages for active forks + const updatedForks = activeForks.map(fork => { + const forkPrepend = prependMessages.get(fork.id) || []; + return { + ...fork, + messages: [...forkPrepend, ...myResponseMsg] // Prepend + current messages + }; + }); myOutgoingTraces.push(...updatedForks); + + // Clean up unused forks from node data + if (activeForks.length !== node.data.forkedTraces.length) { + nodeForkedTracesToClean.set(nodeId, activeForks); + } + } + + // D. Merged Traces - recompute messages when source traces change + // Also check if any source traces are disconnected, and mark for deletion + const mergedTracesToDelete: string[] = []; + const updatedMergedMap = new Map<string, { messages: Message[], colors: string[] }>(); + + if (node.data.mergedTraces && node.data.mergedTraces.length > 0) { + const { computeMergedMessages } = get(); + + node.data.mergedTraces.forEach((merged: MergedTrace) => { + // Check if all source traces are still connected + const allSourcesConnected = merged.sourceTraceIds.every(id => + uniqueIncoming.some(t => t.id === id) + ); + + if (!allSourcesConnected) { + // Mark this merged trace for deletion + mergedTracesToDelete.push(merged.id); + return; // Don't add to outgoing traces + } + + // Recompute messages based on the current incoming traces (pass uniqueIncoming for latest data) + const updatedMessages = computeMergedMessages(node.id, merged.sourceTraceIds, merged.strategy, uniqueIncoming); + + // Filter out current node's messages from updatedMessages to avoid duplication + // (since myResponseMsg will be appended at the end) + const nodePrefix = `${node.id}-`; + const filteredMessages = updatedMessages.filter(m => !m.id?.startsWith(nodePrefix)); + + // Get prepend messages for this merged trace + const mergedPrepend = prependMessages.get(merged.id) || []; + + // Update colors from current traces + const updatedColors = merged.sourceTraceIds + .map(id => uniqueIncoming.find(t => t.id === id)?.color) + .filter((c): c is string => c !== undefined); + + // Combine all messages for this merged trace + const mergedMessages = [...mergedPrepend, ...filteredMessages, ...myResponseMsg]; + + // Store updated data for bulk update later + updatedMergedMap.set(merged.id, { messages: mergedMessages, colors: updatedColors }); + + // Create a trace-like object for merged output + const mergedOutgoing: Trace = { + id: merged.id, + sourceNodeId: node.id, + color: updatedColors[0] || getStableColor(merged.id), + messages: mergedMessages + }; + + myOutgoingTraces.push(mergedOutgoing); + }); + } + + // Store merged traces to delete for this node + if (mergedTracesToDelete.length > 0) { + nodeMergedTracesToDelete.set(nodeId, mergedTracesToDelete); + } + + // Store updated merged trace data for this node + if (updatedMergedMap.size > 0) { + nodeUpdatedMergedTraces.set(nodeId, updatedMergedMap); } nodeOutgoingTraces.set(nodeId, myOutgoingTraces); @@ -657,13 +1772,47 @@ const useFlowStore = create<FlowState>((set, get) => ({ nodes: state.nodes.map(n => { const traces = nodeIncomingTraces.get(n.id) || []; const outTraces = nodeOutgoingTraces.get(n.id) || []; + const mergedToDelete = nodeMergedTracesToDelete.get(n.id) || []; + const updatedMerged = nodeUpdatedMergedTraces.get(n.id); + const cleanedForks = nodeForkedTracesToClean.get(n.id); + + // Filter out disconnected merged traces and update messages for remaining ones + let filteredMergedTraces = (n.data.mergedTraces || []).filter( + (m: MergedTrace) => !mergedToDelete.includes(m.id) + ); + + // Apply updated messages and colors to merged traces + if (updatedMerged && updatedMerged.size > 0) { + filteredMergedTraces = filteredMergedTraces.map((m: MergedTrace) => { + const updated = updatedMerged.get(m.id); + if (updated) { + return { ...m, messages: updated.messages, colors: updated.colors }; + } + return m; + }); + } + + // Clean up unused forked traces + const filteredForkedTraces = cleanedForks !== undefined + ? cleanedForks + : (n.data.forkedTraces || []); + + // Also update activeTraceIds to remove deleted merged traces and orphaned fork traces + const activeForkIds = filteredForkedTraces.map(f => f.id); + const filteredActiveTraceIds = (n.data.activeTraceIds || []).filter( + (id: string) => !mergedToDelete.includes(id) && + (activeForkIds.includes(id) || !id.startsWith('fork-')) + ); + return { ...n, data: { ...n.data, traces, outgoingTraces: outTraces, - activeTraceIds: n.data.activeTraceIds + forkedTraces: filteredForkedTraces, + mergedTraces: filteredMergedTraces, + activeTraceIds: filteredActiveTraceIds } }; }) |
