From c8fae0256c91a0ebe495270aa15baa2f27211268 Mon Sep 17 00:00:00 2001 From: haoyuren <13851610112@163.com> Date: Thu, 12 Feb 2026 12:45:24 -0600 Subject: Multi-turn conversation, stop generation, SSE fix, and UI improvements - Multi-turn context: all council stages now receive conversation history (user messages + Stage 3 chairman responses) for coherent follow-ups - Stop generation: abort streaming mid-request, recover query to input box - SSE parsing: buffer-based chunking to prevent JSON split across packets - Atomic storage: user + assistant messages saved together after completion, preventing dangling messages on abort - GFM markdown: tables, strikethrough via remark-gfm plugin + table styles - Performance: memo user messages and completed assistant messages, only re-render the active streaming message - Model config: gpt-5.2, claude-opus-4.6 as chairman - Always show input box for multi-turn conversations Co-Authored-By: Claude Opus 4.6 --- backend/config.py | 6 +++--- backend/council.py | 56 +++++++++++++++++++++++++++++++++++++++++++----------- backend/main.py | 40 ++++++++++++++++++++++++++------------ 3 files changed, 76 insertions(+), 26 deletions(-) (limited to 'backend') diff --git a/backend/config.py b/backend/config.py index a9cf7c4..cf8fcb4 100644 --- a/backend/config.py +++ b/backend/config.py @@ -10,14 +10,14 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") # Council members - list of OpenRouter model identifiers COUNCIL_MODELS = [ - "openai/gpt-5.1", + "openai/gpt-5.2", "google/gemini-3-pro-preview", - "anthropic/claude-sonnet-4.5", + "anthropic/claude-opus-4.6", "x-ai/grok-4", ] # Chairman model - synthesizes final response -CHAIRMAN_MODEL = "google/gemini-3-pro-preview" +CHAIRMAN_MODEL = "anthropic/claude-opus-4.6" # OpenRouter API endpoint OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions" diff --git a/backend/council.py b/backend/council.py index 5069abe..6facbd8 100644 --- a/backend/council.py +++ b/backend/council.py @@ -1,21 +1,46 @@ """3-stage LLM Council orchestration.""" -from typing import List, Dict, Any, Tuple +from typing import List, Dict, Any, Tuple, Optional from .openrouter import query_models_parallel, query_model from .config import COUNCIL_MODELS, CHAIRMAN_MODEL -async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]: +def _build_messages( + conversation_history: Optional[List[Dict[str, str]]], + current_content: str +) -> List[Dict[str, str]]: + """ + Build a messages list with conversation history + current user message. + + Args: + conversation_history: List of {"role": "user"/"assistant", "content": ...} dicts + current_content: The current message content to append as user + + Returns: + Messages list for the OpenRouter API + """ + messages = [] + if conversation_history: + messages.extend(conversation_history) + messages.append({"role": "user", "content": current_content}) + return messages + + +async def stage1_collect_responses( + user_query: str, + conversation_history: Optional[List[Dict[str, str]]] = None +) -> List[Dict[str, Any]]: """ Stage 1: Collect individual responses from all council models. Args: user_query: The user's question + conversation_history: Optional list of prior conversation messages Returns: List of dicts with 'model' and 'response' keys """ - messages = [{"role": "user", "content": user_query}] + messages = _build_messages(conversation_history, user_query) # Query all models in parallel responses = await query_models_parallel(COUNCIL_MODELS, messages) @@ -34,7 +59,8 @@ async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]: async def stage2_collect_rankings( user_query: str, - stage1_results: List[Dict[str, Any]] + stage1_results: List[Dict[str, Any]], + conversation_history: Optional[List[Dict[str, str]]] = None ) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: """ Stage 2: Each model ranks the anonymized responses. @@ -42,6 +68,7 @@ async def stage2_collect_rankings( Args: user_query: The original user query stage1_results: Results from Stage 1 + conversation_history: Optional list of prior conversation messages Returns: Tuple of (rankings list, label_to_model mapping) @@ -92,7 +119,7 @@ FINAL RANKING: Now provide your evaluation and ranking:""" - messages = [{"role": "user", "content": ranking_prompt}] + messages = _build_messages(conversation_history, ranking_prompt) # Get rankings from all council models in parallel responses = await query_models_parallel(COUNCIL_MODELS, messages) @@ -115,7 +142,8 @@ Now provide your evaluation and ranking:""" async def stage3_synthesize_final( user_query: str, stage1_results: List[Dict[str, Any]], - stage2_results: List[Dict[str, Any]] + stage2_results: List[Dict[str, Any]], + conversation_history: Optional[List[Dict[str, str]]] = None ) -> Dict[str, Any]: """ Stage 3: Chairman synthesizes final response. @@ -124,6 +152,7 @@ async def stage3_synthesize_final( user_query: The original user query stage1_results: Individual model responses from Stage 1 stage2_results: Rankings from Stage 2 + conversation_history: Optional list of prior conversation messages Returns: Dict with 'model' and 'response' keys @@ -156,7 +185,7 @@ Your task as Chairman is to synthesize all of this information into a single, co Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" - messages = [{"role": "user", "content": chairman_prompt}] + messages = _build_messages(conversation_history, chairman_prompt) # Query the chairman model response = await query_model(CHAIRMAN_MODEL, messages) @@ -293,18 +322,22 @@ Title:""" return title -async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]: +async def run_full_council( + user_query: str, + conversation_history: Optional[List[Dict[str, str]]] = None +) -> Tuple[List, List, Dict, Dict]: """ Run the complete 3-stage council process. Args: user_query: The user's question + conversation_history: Optional list of prior conversation messages Returns: Tuple of (stage1_results, stage2_results, stage3_result, metadata) """ # Stage 1: Collect individual responses - stage1_results = await stage1_collect_responses(user_query) + stage1_results = await stage1_collect_responses(user_query, conversation_history) # If no models responded successfully, return error if not stage1_results: @@ -314,7 +347,7 @@ async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]: }, {} # Stage 2: Collect rankings - stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results) + stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results, conversation_history) # Calculate aggregate rankings aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) @@ -323,7 +356,8 @@ async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]: stage3_result = await stage3_synthesize_final( user_query, stage1_results, - stage2_results + stage2_results, + conversation_history ) # Prepare metadata diff --git a/backend/main.py b/backend/main.py index e33ce59..40353dd 100644 --- a/backend/main.py +++ b/backend/main.py @@ -14,6 +14,20 @@ from .council import run_full_council, generate_conversation_title, stage1_colle app = FastAPI(title="LLM Council API") + +def _extract_conversation_history(conversation: Dict[str, Any]) -> List[Dict[str, str]]: + """ + Extract conversation history as a flat messages list for multi-turn context. + User messages use their content; assistant messages use the Stage 3 (chairman) response. + """ + history = [] + for msg in conversation["messages"]: + if msg["role"] == "user": + history.append({"role": "user", "content": msg["content"]}) + elif msg["role"] == "assistant" and msg.get("stage3"): + history.append({"role": "assistant", "content": msg["stage3"].get("response", "")}) + return history + # Enable CORS for local development app.add_middleware( CORSMiddleware, @@ -93,20 +107,21 @@ async def send_message(conversation_id: str, request: SendMessageRequest): # Check if this is the first message is_first_message = len(conversation["messages"]) == 0 - # Add user message - storage.add_user_message(conversation_id, request.content) - # If this is the first message, generate a title if is_first_message: title = await generate_conversation_title(request.content) storage.update_conversation_title(conversation_id, title) + # Build conversation history for multi-turn context + conversation_history = _extract_conversation_history(conversation) + # Run the 3-stage council process stage1_results, stage2_results, stage3_result, metadata = await run_full_council( - request.content + request.content, conversation_history ) - # Add assistant message with all stages + # Save user + assistant messages together only after full completion + storage.add_user_message(conversation_id, request.content) storage.add_assistant_message( conversation_id, stage1_results, @@ -137,11 +152,11 @@ async def send_message_stream(conversation_id: str, request: SendMessageRequest) # Check if this is the first message is_first_message = len(conversation["messages"]) == 0 + # Build conversation history for multi-turn context + conversation_history = _extract_conversation_history(conversation) + async def event_generator(): try: - # Add user message - storage.add_user_message(conversation_id, request.content) - # Start title generation in parallel (don't await yet) title_task = None if is_first_message: @@ -149,18 +164,18 @@ async def send_message_stream(conversation_id: str, request: SendMessageRequest) # Stage 1: Collect responses yield f"data: {json.dumps({'type': 'stage1_start'})}\n\n" - stage1_results = await stage1_collect_responses(request.content) + stage1_results = await stage1_collect_responses(request.content, conversation_history) yield f"data: {json.dumps({'type': 'stage1_complete', 'data': stage1_results})}\n\n" # Stage 2: Collect rankings yield f"data: {json.dumps({'type': 'stage2_start'})}\n\n" - stage2_results, label_to_model = await stage2_collect_rankings(request.content, stage1_results) + stage2_results, label_to_model = await stage2_collect_rankings(request.content, stage1_results, conversation_history) aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) yield f"data: {json.dumps({'type': 'stage2_complete', 'data': stage2_results, 'metadata': {'label_to_model': label_to_model, 'aggregate_rankings': aggregate_rankings}})}\n\n" # Stage 3: Synthesize final answer yield f"data: {json.dumps({'type': 'stage3_start'})}\n\n" - stage3_result = await stage3_synthesize_final(request.content, stage1_results, stage2_results) + stage3_result = await stage3_synthesize_final(request.content, stage1_results, stage2_results, conversation_history) yield f"data: {json.dumps({'type': 'stage3_complete', 'data': stage3_result})}\n\n" # Wait for title generation if it was started @@ -169,7 +184,8 @@ async def send_message_stream(conversation_id: str, request: SendMessageRequest) storage.update_conversation_title(conversation_id, title) yield f"data: {json.dumps({'type': 'title_complete', 'data': {'title': title}})}\n\n" - # Save complete assistant message + # Save user + assistant messages together only after full completion + storage.add_user_message(conversation_id, request.content) storage.add_assistant_message( conversation_id, stage1_results, -- cgit v1.2.3