diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-13 23:08:05 +0000 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-13 23:08:05 +0000 |
| commit | cb59ecf3ac3b38ba883fc74bf810ae9e82e2a469 (patch) | |
| tree | d0cab16f3ddb7708528ceb3cbb126d9437aed91b /backend | |
| parent | 2adacdbfa1d1049a0497e55f2b3ed00551bf876f (diff) | |
Add LLM Debate mode for multi-round iterative model discussions
Implements a debate feature alongside Council mode where 2-6 models
engage in multi-round discussions with configurable judge modes
(external judge, self-convergence, display-only), debate formats
(free discussion, structured opposition, iterative improvement, custom),
and early termination conditions.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/app/main.py | 129 | ||||
| -rw-r--r-- | backend/app/schemas.py | 29 | ||||
| -rw-r--r-- | backend/app/services/debate.py | 371 |
3 files changed, 528 insertions, 1 deletions
diff --git a/backend/app/main.py b/backend/app/main.py index 304c74f..89c5dd0 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,9 +8,10 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi import UploadFile, File, Form from pydantic import BaseModel -from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort, CouncilRunRequest +from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context, LLMConfig, ModelProvider, ReasoningEffort, CouncilRunRequest, DebateRunRequest, DebateJudgeMode from app.services.llm import llm_streamer, generate_title, get_openai_client, get_anthropic_client, resolve_provider from app.services.council import council_event_stream +from app.services.debate import debate_event_stream from app.auth import auth_router, get_current_user, get_current_user_optional, init_db, User, get_db from app.auth.utils import get_password_hash from dotenv import load_dotenv @@ -584,6 +585,132 @@ async def run_council_stream( ) +@app.post("/api/run_debate_stream") +async def run_debate_stream( + request: DebateRunRequest, + user: str = DEFAULT_USER, + current_user: User | None = Depends(get_current_user_optional), +): + """ + Run a multi-round LLM Debate and stream SSE events. + """ + resolved = resolve_user(current_user, user) + username = resolved.username if resolved else DEFAULT_USER + + # Merge incoming contexts + raw_messages = [] + for ctx in request.incoming_contexts: + raw_messages.extend(ctx.messages) + if request.merge_strategy == MergeStrategy.SMART: + final_messages = smart_merge_messages(raw_messages) + else: + final_messages = raw_messages + execution_context = Context(messages=final_messages) + + # Extract images from attached files + images, non_image_file_ids = extract_image_attachments(username, request.attached_file_ids) + + openrouter_key = get_user_api_key(resolved, "openrouter") + + # Build LLMConfig + attachments + tools for each debate member + member_configs: list[LLMConfig] = [] + attachments_per_model: list[list[dict] | None] = [] + tools_per_model: list[list[dict] | None] = [] + + for member in request.debate_models: + provider = resolve_provider(member.model_name) + provider_str = provider.value + api_key = get_user_api_key(resolved, provider_str) + + config = LLMConfig( + provider=provider, + model_name=member.model_name, + temperature=member.temperature if member.temperature is not None else request.temperature, + system_prompt=request.system_prompt, + api_key=api_key, + reasoning_effort=member.reasoning_effort if member.reasoning_effort is not None else request.reasoning_effort, + enable_google_search=member.enable_google_search if member.enable_google_search is not None else request.enable_google_search, + ) + member_configs.append(config) + + # Prepare provider-specific file attachments + tools: list[dict] = [] + attachments: list[dict] = [] + scoped_file_ids = resolve_scoped_file_ids(username, request.scopes, non_image_file_ids) + + if provider == ModelProvider.OPENAI: + vs_ids, debug_refs, filters = await prepare_openai_vector_search( + user=username, + attached_ids=non_image_file_ids, + scopes=request.scopes, + llm_config=config, + ) + if not vs_ids: + try: + client = get_openai_client(config.api_key) + vs_id = await ensure_user_vector_store(username, client) + if vs_id: + vs_ids = [vs_id] + except Exception: + pass + if vs_ids: + tool_def = {"type": "file_search", "vector_store_ids": vs_ids} + if filters: + tool_def["filters"] = filters + tools.append(tool_def) + elif provider == ModelProvider.GOOGLE: + attachments = await prepare_attachments( + user=username, + target_provider=provider, + attached_ids=scoped_file_ids, + llm_config=config, + ) + elif provider == ModelProvider.CLAUDE: + attachments = await prepare_attachments( + user=username, + target_provider=provider, + attached_ids=scoped_file_ids, + llm_config=config, + ) + + attachments_per_model.append(attachments or None) + tools_per_model.append(tools or None) + + # Build judge config (if external_judge mode) + judge_config = None + if request.judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and request.judge_model: + judge = request.judge_model + judge_provider = resolve_provider(judge.model_name) + judge_api_key = get_user_api_key(resolved, judge_provider.value) + judge_config = LLMConfig( + provider=judge_provider, + model_name=judge.model_name, + temperature=judge.temperature if judge.temperature is not None else request.temperature, + system_prompt=request.system_prompt, + api_key=judge_api_key, + reasoning_effort=judge.reasoning_effort if judge.reasoning_effort is not None else request.reasoning_effort, + enable_google_search=judge.enable_google_search if judge.enable_google_search is not None else request.enable_google_search, + ) + + return StreamingResponse( + debate_event_stream( + user_prompt=request.user_prompt, + context=execution_context, + member_configs=member_configs, + judge_config=judge_config, + judge_mode=request.judge_mode, + debate_format=request.debate_format, + max_rounds=request.max_rounds, + custom_format_prompt=request.custom_format_prompt, + attachments_per_model=attachments_per_model, + tools_per_model=tools_per_model, + openrouter_api_key=openrouter_key, + images=images, + ), + media_type="text/event-stream", + ) + + class TitleRequest(BaseModel): user_prompt: str response: str diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 4213f15..7a657a3 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -76,6 +76,35 @@ class CouncilRunRequest(BaseModel): attached_file_ids: List[str] = Field(default_factory=list) scopes: List[str] = Field(default_factory=list) +class DebateJudgeMode(str, Enum): + EXTERNAL_JUDGE = "external_judge" + SELF_CONVERGENCE = "self_convergence" + DISPLAY_ONLY = "display_only" + +class DebateFormat(str, Enum): + FREE_DISCUSSION = "free_discussion" + STRUCTURED_OPPOSITION = "structured_opposition" + ITERATIVE_IMPROVEMENT = "iterative_improvement" + CUSTOM = "custom" + +class DebateRunRequest(BaseModel): + node_id: str + incoming_contexts: List[Context] = [] + user_prompt: str + debate_models: List[CouncilMemberConfig] # 2-6 models + judge_model: Optional[CouncilMemberConfig] = None + judge_mode: DebateJudgeMode = DebateJudgeMode.EXTERNAL_JUDGE + debate_format: DebateFormat = DebateFormat.FREE_DISCUSSION + custom_format_prompt: Optional[str] = None + max_rounds: int = 5 + system_prompt: Optional[str] = None + temperature: float = 0.7 + reasoning_effort: ReasoningEffort = ReasoningEffort.MEDIUM + enable_google_search: bool = False + merge_strategy: MergeStrategy = MergeStrategy.SMART + attached_file_ids: List[str] = Field(default_factory=list) + scopes: List[str] = Field(default_factory=list) + class NodeRunResponse(BaseModel): node_id: str output_context: Context diff --git a/backend/app/services/debate.py b/backend/app/services/debate.py new file mode 100644 index 0000000..d409cb9 --- /dev/null +++ b/backend/app/services/debate.py @@ -0,0 +1,371 @@ +"""Multi-round LLM Debate orchestration for ContextFlow.""" + +import asyncio +import json +import logging +from typing import AsyncGenerator, Dict, List, Any, Optional + +from app.schemas import Context, LLMConfig, DebateFormat, DebateJudgeMode +from app.services.llm import query_model_full, llm_streamer + +logger = logging.getLogger("contextflow.debate") + + +def _sse_event(data: dict) -> str: + """Format a dict as an SSE data line.""" + return f"data: {json.dumps(data)}\n\n" + + +def build_debate_prompt( + user_query: str, + debate_history: List[Dict[str, Any]], + model_name: str, + round_num: int, + debate_format: DebateFormat, + custom_prompt: Optional[str] = None, + model_index: int = 0, + total_models: int = 2, +) -> str: + """Build the prompt for a debater based on format and history.""" + history_text = "" + if debate_history: + for past_round in debate_history: + rn = past_round["round"] + history_text += f"\n--- Round {rn} ---\n" + for resp in past_round["responses"]: + history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" + + if debate_format == DebateFormat.FREE_DISCUSSION: + if round_num == 1: + return ( + f"You are participating in a roundtable discussion about the following question:\n\n" + f'"{user_query}"\n\n' + f"Provide your perspective and answer to this question." + ) + return ( + f"You are participating in a roundtable discussion about the following question:\n\n" + f'"{user_query}"\n\n' + f"Here is the discussion so far:\n{history_text}\n\n" + f"This is round {round_num}. Consider what others have said, respond to their points, " + f"and refine or defend your position." + ) + + if debate_format == DebateFormat.STRUCTURED_OPPOSITION: + roles = ["FOR", "AGAINST", "DEVIL'S ADVOCATE", "MEDIATOR", "CRITIC", "SYNTHESIZER"] + role = roles[model_index % len(roles)] + if round_num == 1: + return ( + f"You are arguing {role} the following position in a structured debate:\n\n" + f'"{user_query}"\n\n' + f"Present your strongest arguments from the {role} perspective." + ) + return ( + f"You are arguing {role} the following position in a structured debate:\n\n" + f'"{user_query}"\n\n' + f"Debate history:\n{history_text}\n\n" + f"This is round {round_num}. Respond to the other participants' arguments " + f"while maintaining your {role} position. Address their strongest points." + ) + + if debate_format == DebateFormat.ITERATIVE_IMPROVEMENT: + if round_num == 1: + return ( + f"You are participating in an iterative improvement exercise on the following question:\n\n" + f'"{user_query}"\n\n' + f"Provide your best answer." + ) + return ( + f"You are participating in an iterative improvement exercise on the following question:\n\n" + f'"{user_query}"\n\n' + f"Here are the previous answers from all participants:\n{history_text}\n\n" + f"This is round {round_num}. Critique the other participants' answers, identify flaws or gaps, " + f"and provide an improved answer that incorporates the best insights from everyone." + ) + + if debate_format == DebateFormat.CUSTOM and custom_prompt: + prompt = custom_prompt + prompt = prompt.replace("{history}", history_text or "(No history yet)") + prompt = prompt.replace("{round}", str(round_num)) + prompt = prompt.replace("{model_name}", model_name) + prompt = prompt.replace("{question}", user_query) + return prompt + + # Fallback to free discussion + if round_num == 1: + return f'Provide your answer to the following question:\n\n"{user_query}"' + return ( + f'Question: "{user_query}"\n\n' + f"Previous discussion:\n{history_text}\n\n" + f"Round {round_num}: Provide your updated response." + ) + + +async def debate_round( + configs: List[LLMConfig], + context: Context, + user_prompt: str, + debate_history: List[Dict[str, Any]], + round_num: int, + debate_format: DebateFormat, + custom_prompt: Optional[str] = None, + attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[Dict[str, Any], None]: + """Query all debate models in parallel for one round, yielding as each completes.""" + + async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]: + prompt = build_debate_prompt( + user_prompt, debate_history, config.model_name, + round_num, debate_format, custom_prompt, + model_index=idx, total_models=len(configs), + ) + atts = attachments_per_model[idx] if attachments_per_model else None + tls = tools_per_model[idx] if tools_per_model else None + try: + response = await query_model_full( + context, prompt, config, + attachments=atts, tools=tls, + openrouter_api_key=openrouter_api_key, + images=images if round_num == 1 else None, # Only send images in round 1 + ) + return {"model": config.model_name, "response": response} + except Exception as e: + logger.error("Debate round %d failed for %s: %s", round_num, config.model_name, e) + return {"model": config.model_name, "response": f"[Error: {e}]"} + + tasks = { + asyncio.ensure_future(_query_one(i, cfg)): i + for i, cfg in enumerate(configs) + } + for coro in asyncio.as_completed(tasks.keys()): + result = await coro + yield result + + +async def judge_evaluate_round( + judge_config: LLMConfig, + debate_history: List[Dict[str, Any]], + user_query: str, + openrouter_api_key: Optional[str] = None, +) -> Dict[str, Any]: + """Judge decides if debate should continue after a round.""" + last_round = len(debate_history) + history_text = "" + for past_round in debate_history: + rn = past_round["round"] + history_text += f"\n--- Round {rn} ---\n" + for resp in past_round["responses"]: + history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" + + prompt = ( + f"You are the judge of a multi-model debate on the following question:\n" + f'"{user_query}"\n\n' + f"Debate history (Round 1 to {last_round}):\n{history_text}\n\n" + f"Evaluate whether the debate has reached a satisfactory conclusion.\n" + f"Consider: Have the key points been thoroughly explored? Is there consensus?\n" + f"Are there unresolved disagreements worth continuing?\n\n" + f"Respond with exactly one of:\n" + f"CONTINUE - if the debate should go on (explain why briefly)\n" + f"STOP - if a clear conclusion has been reached (explain why briefly)" + ) + + empty_context = Context(messages=[]) + try: + response = await query_model_full( + empty_context, prompt, judge_config, + openrouter_api_key=openrouter_api_key, + ) + should_continue = "CONTINUE" in response.upper().split("\n")[0] + return {"continue": should_continue, "reasoning": response} + except Exception as e: + logger.error("Judge evaluation failed: %s", e) + return {"continue": False, "reasoning": f"[Judge error: {e}]"} + + +async def check_self_convergence( + configs: List[LLMConfig], + round_responses: List[Dict[str, Any]], + openrouter_api_key: Optional[str] = None, +) -> Dict[str, Any]: + """Check if debate responses have converged using the first available model.""" + responses_text = "\n\n".join( + f"[{r['model']}]:\n{r['response']}" for r in round_responses + ) + prompt = ( + f"Below are the responses from the latest round of a debate:\n\n" + f"{responses_text}\n\n" + f"Do all participants essentially agree on the answer? Respond ONLY with:\n" + f"CONVERGED - if there is clear consensus\n" + f"DIVERGENT - if there are still significant disagreements" + ) + + empty_context = Context(messages=[]) + # Use the first config as the convergence checker + check_config = configs[0] + try: + response = await query_model_full( + empty_context, prompt, check_config, + openrouter_api_key=openrouter_api_key, + ) + converged = "CONVERGED" in response.upper().split("\n")[0] + return {"converged": converged, "reasoning": response} + except Exception as e: + logger.error("Convergence check failed: %s", e) + return {"converged": False, "reasoning": f"[Convergence check error: {e}]"} + + +async def judge_final_verdict( + judge_config: LLMConfig, + debate_history: List[Dict[str, Any]], + user_query: str, + openrouter_api_key: Optional[str] = None, +) -> AsyncGenerator[str, None]: + """Stream the judge's final verdict/synthesis.""" + history_text = "" + for past_round in debate_history: + rn = past_round["round"] + history_text += f"\n--- Round {rn} ---\n" + for resp in past_round["responses"]: + history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" + + prompt = ( + f"You are the judge of a multi-model debate. Below is the full debate transcript.\n\n" + f'Question: "{user_query}"\n\n' + f"{history_text}\n\n" + f"As the judge, provide:\n" + f"1. A summary of the key arguments from each participant\n" + f"2. An evaluation of the strengths and weaknesses of each position\n" + f"3. Your final verdict: the best, most accurate, and most comprehensive answer " + f"to the original question, synthesizing the best insights from the debate." + ) + + empty_context = Context(messages=[]) + async for chunk in llm_streamer( + empty_context, prompt, judge_config, + openrouter_api_key=openrouter_api_key, + ): + yield chunk + + +async def debate_event_stream( + user_prompt: str, + context: Context, + member_configs: List[LLMConfig], + judge_config: Optional[LLMConfig], + judge_mode: DebateJudgeMode, + debate_format: DebateFormat, + max_rounds: int = 5, + custom_format_prompt: Optional[str] = None, + attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[str, None]: + """Master orchestrator yielding SSE JSON events through the debate process.""" + + model_names = [c.model_name for c in member_configs] + yield _sse_event({ + "type": "debate_start", + "data": { + "max_rounds": max_rounds, + "format": debate_format.value, + "judge_mode": judge_mode.value, + "models": model_names, + }, + }) + + debate_history: List[Dict[str, Any]] = [] + + for round_num in range(1, max_rounds + 1): + yield _sse_event({"type": "round_start", "data": {"round": round_num}}) + + round_responses: List[Dict[str, Any]] = [] + async for result in debate_round( + member_configs, context, user_prompt, + debate_history, round_num, debate_format, custom_format_prompt, + attachments_per_model=attachments_per_model, + tools_per_model=tools_per_model, + openrouter_api_key=openrouter_api_key, + images=images, + ): + round_responses.append(result) + yield _sse_event({ + "type": "round_model_complete", + "data": {"round": round_num, "model": result["model"], "response": result["response"]}, + }) + + debate_history.append({"round": round_num, "responses": round_responses}) + + yield _sse_event({ + "type": "round_complete", + "data": {"round": round_num, "responses": round_responses}, + }) + + if not round_responses: + yield _sse_event({ + "type": "error", + "data": {"message": "All debate models failed to respond."}, + }) + return + + # Check stop condition (skip on last round) + if round_num < max_rounds: + if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: + decision = await judge_evaluate_round( + judge_config, debate_history, user_prompt, + openrouter_api_key=openrouter_api_key, + ) + yield _sse_event({ + "type": "judge_decision", + "data": {"round": round_num, **decision}, + }) + if not decision["continue"]: + break + + elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: + convergence = await check_self_convergence( + member_configs, round_responses, + openrouter_api_key=openrouter_api_key, + ) + yield _sse_event({ + "type": "convergence_check", + "data": {"round": round_num, **convergence}, + }) + if convergence["converged"]: + break + # DISPLAY_ONLY: just continue to next round + + # Final synthesis + if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: + yield _sse_event({ + "type": "final_start", + "data": {"model": judge_config.model_name}, + }) + + full_verdict = "" + async for chunk in judge_final_verdict( + judge_config, debate_history, user_prompt, + openrouter_api_key=openrouter_api_key, + ): + full_verdict += chunk + yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}}) + + yield _sse_event({ + "type": "final_complete", + "data": {"model": judge_config.model_name, "response": full_verdict}, + }) + + elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: + # Use the last round's responses as the final answer + last_responses = debate_history[-1]["responses"] if debate_history else [] + # Pick the longest response as the "best" convergent answer + if last_responses: + best = max(last_responses, key=lambda r: len(r.get("response", ""))) + yield _sse_event({ + "type": "final_complete", + "data": {"model": best["model"], "response": best["response"]}, + }) + + yield _sse_event({"type": "debate_complete"}) |
