summaryrefslogtreecommitdiff
path: root/backend/app/services
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-02-13 23:08:05 +0000
committerYurenHao0426 <blackhao0426@gmail.com>2026-02-13 23:08:05 +0000
commitcb59ecf3ac3b38ba883fc74bf810ae9e82e2a469 (patch)
treed0cab16f3ddb7708528ceb3cbb126d9437aed91b /backend/app/services
parent2adacdbfa1d1049a0497e55f2b3ed00551bf876f (diff)
Add LLM Debate mode for multi-round iterative model discussions
Implements a debate feature alongside Council mode where 2-6 models engage in multi-round discussions with configurable judge modes (external judge, self-convergence, display-only), debate formats (free discussion, structured opposition, iterative improvement, custom), and early termination conditions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'backend/app/services')
-rw-r--r--backend/app/services/debate.py371
1 files changed, 371 insertions, 0 deletions
diff --git a/backend/app/services/debate.py b/backend/app/services/debate.py
new file mode 100644
index 0000000..d409cb9
--- /dev/null
+++ b/backend/app/services/debate.py
@@ -0,0 +1,371 @@
+"""Multi-round LLM Debate orchestration for ContextFlow."""
+
+import asyncio
+import json
+import logging
+from typing import AsyncGenerator, Dict, List, Any, Optional
+
+from app.schemas import Context, LLMConfig, DebateFormat, DebateJudgeMode
+from app.services.llm import query_model_full, llm_streamer
+
+logger = logging.getLogger("contextflow.debate")
+
+
+def _sse_event(data: dict) -> str:
+ """Format a dict as an SSE data line."""
+ return f"data: {json.dumps(data)}\n\n"
+
+
+def build_debate_prompt(
+ user_query: str,
+ debate_history: List[Dict[str, Any]],
+ model_name: str,
+ round_num: int,
+ debate_format: DebateFormat,
+ custom_prompt: Optional[str] = None,
+ model_index: int = 0,
+ total_models: int = 2,
+) -> str:
+ """Build the prompt for a debater based on format and history."""
+ history_text = ""
+ if debate_history:
+ for past_round in debate_history:
+ rn = past_round["round"]
+ history_text += f"\n--- Round {rn} ---\n"
+ for resp in past_round["responses"]:
+ history_text += f"\n[{resp['model']}]:\n{resp['response']}\n"
+
+ if debate_format == DebateFormat.FREE_DISCUSSION:
+ if round_num == 1:
+ return (
+ f"You are participating in a roundtable discussion about the following question:\n\n"
+ f'"{user_query}"\n\n'
+ f"Provide your perspective and answer to this question."
+ )
+ return (
+ f"You are participating in a roundtable discussion about the following question:\n\n"
+ f'"{user_query}"\n\n'
+ f"Here is the discussion so far:\n{history_text}\n\n"
+ f"This is round {round_num}. Consider what others have said, respond to their points, "
+ f"and refine or defend your position."
+ )
+
+ if debate_format == DebateFormat.STRUCTURED_OPPOSITION:
+ roles = ["FOR", "AGAINST", "DEVIL'S ADVOCATE", "MEDIATOR", "CRITIC", "SYNTHESIZER"]
+ role = roles[model_index % len(roles)]
+ if round_num == 1:
+ return (
+ f"You are arguing {role} the following position in a structured debate:\n\n"
+ f'"{user_query}"\n\n'
+ f"Present your strongest arguments from the {role} perspective."
+ )
+ return (
+ f"You are arguing {role} the following position in a structured debate:\n\n"
+ f'"{user_query}"\n\n'
+ f"Debate history:\n{history_text}\n\n"
+ f"This is round {round_num}. Respond to the other participants' arguments "
+ f"while maintaining your {role} position. Address their strongest points."
+ )
+
+ if debate_format == DebateFormat.ITERATIVE_IMPROVEMENT:
+ if round_num == 1:
+ return (
+ f"You are participating in an iterative improvement exercise on the following question:\n\n"
+ f'"{user_query}"\n\n'
+ f"Provide your best answer."
+ )
+ return (
+ f"You are participating in an iterative improvement exercise on the following question:\n\n"
+ f'"{user_query}"\n\n'
+ f"Here are the previous answers from all participants:\n{history_text}\n\n"
+ f"This is round {round_num}. Critique the other participants' answers, identify flaws or gaps, "
+ f"and provide an improved answer that incorporates the best insights from everyone."
+ )
+
+ if debate_format == DebateFormat.CUSTOM and custom_prompt:
+ prompt = custom_prompt
+ prompt = prompt.replace("{history}", history_text or "(No history yet)")
+ prompt = prompt.replace("{round}", str(round_num))
+ prompt = prompt.replace("{model_name}", model_name)
+ prompt = prompt.replace("{question}", user_query)
+ return prompt
+
+ # Fallback to free discussion
+ if round_num == 1:
+ return f'Provide your answer to the following question:\n\n"{user_query}"'
+ return (
+ f'Question: "{user_query}"\n\n'
+ f"Previous discussion:\n{history_text}\n\n"
+ f"Round {round_num}: Provide your updated response."
+ )
+
+
+async def debate_round(
+ configs: List[LLMConfig],
+ context: Context,
+ user_prompt: str,
+ debate_history: List[Dict[str, Any]],
+ round_num: int,
+ debate_format: DebateFormat,
+ custom_prompt: Optional[str] = None,
+ attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ openrouter_api_key: Optional[str] = None,
+ images: Optional[List[Dict[str, Any]]] = None,
+) -> AsyncGenerator[Dict[str, Any], None]:
+ """Query all debate models in parallel for one round, yielding as each completes."""
+
+ async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]:
+ prompt = build_debate_prompt(
+ user_prompt, debate_history, config.model_name,
+ round_num, debate_format, custom_prompt,
+ model_index=idx, total_models=len(configs),
+ )
+ atts = attachments_per_model[idx] if attachments_per_model else None
+ tls = tools_per_model[idx] if tools_per_model else None
+ try:
+ response = await query_model_full(
+ context, prompt, config,
+ attachments=atts, tools=tls,
+ openrouter_api_key=openrouter_api_key,
+ images=images if round_num == 1 else None, # Only send images in round 1
+ )
+ return {"model": config.model_name, "response": response}
+ except Exception as e:
+ logger.error("Debate round %d failed for %s: %s", round_num, config.model_name, e)
+ return {"model": config.model_name, "response": f"[Error: {e}]"}
+
+ tasks = {
+ asyncio.ensure_future(_query_one(i, cfg)): i
+ for i, cfg in enumerate(configs)
+ }
+ for coro in asyncio.as_completed(tasks.keys()):
+ result = await coro
+ yield result
+
+
+async def judge_evaluate_round(
+ judge_config: LLMConfig,
+ debate_history: List[Dict[str, Any]],
+ user_query: str,
+ openrouter_api_key: Optional[str] = None,
+) -> Dict[str, Any]:
+ """Judge decides if debate should continue after a round."""
+ last_round = len(debate_history)
+ history_text = ""
+ for past_round in debate_history:
+ rn = past_round["round"]
+ history_text += f"\n--- Round {rn} ---\n"
+ for resp in past_round["responses"]:
+ history_text += f"\n[{resp['model']}]:\n{resp['response']}\n"
+
+ prompt = (
+ f"You are the judge of a multi-model debate on the following question:\n"
+ f'"{user_query}"\n\n'
+ f"Debate history (Round 1 to {last_round}):\n{history_text}\n\n"
+ f"Evaluate whether the debate has reached a satisfactory conclusion.\n"
+ f"Consider: Have the key points been thoroughly explored? Is there consensus?\n"
+ f"Are there unresolved disagreements worth continuing?\n\n"
+ f"Respond with exactly one of:\n"
+ f"CONTINUE - if the debate should go on (explain why briefly)\n"
+ f"STOP - if a clear conclusion has been reached (explain why briefly)"
+ )
+
+ empty_context = Context(messages=[])
+ try:
+ response = await query_model_full(
+ empty_context, prompt, judge_config,
+ openrouter_api_key=openrouter_api_key,
+ )
+ should_continue = "CONTINUE" in response.upper().split("\n")[0]
+ return {"continue": should_continue, "reasoning": response}
+ except Exception as e:
+ logger.error("Judge evaluation failed: %s", e)
+ return {"continue": False, "reasoning": f"[Judge error: {e}]"}
+
+
+async def check_self_convergence(
+ configs: List[LLMConfig],
+ round_responses: List[Dict[str, Any]],
+ openrouter_api_key: Optional[str] = None,
+) -> Dict[str, Any]:
+ """Check if debate responses have converged using the first available model."""
+ responses_text = "\n\n".join(
+ f"[{r['model']}]:\n{r['response']}" for r in round_responses
+ )
+ prompt = (
+ f"Below are the responses from the latest round of a debate:\n\n"
+ f"{responses_text}\n\n"
+ f"Do all participants essentially agree on the answer? Respond ONLY with:\n"
+ f"CONVERGED - if there is clear consensus\n"
+ f"DIVERGENT - if there are still significant disagreements"
+ )
+
+ empty_context = Context(messages=[])
+ # Use the first config as the convergence checker
+ check_config = configs[0]
+ try:
+ response = await query_model_full(
+ empty_context, prompt, check_config,
+ openrouter_api_key=openrouter_api_key,
+ )
+ converged = "CONVERGED" in response.upper().split("\n")[0]
+ return {"converged": converged, "reasoning": response}
+ except Exception as e:
+ logger.error("Convergence check failed: %s", e)
+ return {"converged": False, "reasoning": f"[Convergence check error: {e}]"}
+
+
+async def judge_final_verdict(
+ judge_config: LLMConfig,
+ debate_history: List[Dict[str, Any]],
+ user_query: str,
+ openrouter_api_key: Optional[str] = None,
+) -> AsyncGenerator[str, None]:
+ """Stream the judge's final verdict/synthesis."""
+ history_text = ""
+ for past_round in debate_history:
+ rn = past_round["round"]
+ history_text += f"\n--- Round {rn} ---\n"
+ for resp in past_round["responses"]:
+ history_text += f"\n[{resp['model']}]:\n{resp['response']}\n"
+
+ prompt = (
+ f"You are the judge of a multi-model debate. Below is the full debate transcript.\n\n"
+ f'Question: "{user_query}"\n\n'
+ f"{history_text}\n\n"
+ f"As the judge, provide:\n"
+ f"1. A summary of the key arguments from each participant\n"
+ f"2. An evaluation of the strengths and weaknesses of each position\n"
+ f"3. Your final verdict: the best, most accurate, and most comprehensive answer "
+ f"to the original question, synthesizing the best insights from the debate."
+ )
+
+ empty_context = Context(messages=[])
+ async for chunk in llm_streamer(
+ empty_context, prompt, judge_config,
+ openrouter_api_key=openrouter_api_key,
+ ):
+ yield chunk
+
+
+async def debate_event_stream(
+ user_prompt: str,
+ context: Context,
+ member_configs: List[LLMConfig],
+ judge_config: Optional[LLMConfig],
+ judge_mode: DebateJudgeMode,
+ debate_format: DebateFormat,
+ max_rounds: int = 5,
+ custom_format_prompt: Optional[str] = None,
+ attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ openrouter_api_key: Optional[str] = None,
+ images: Optional[List[Dict[str, Any]]] = None,
+) -> AsyncGenerator[str, None]:
+ """Master orchestrator yielding SSE JSON events through the debate process."""
+
+ model_names = [c.model_name for c in member_configs]
+ yield _sse_event({
+ "type": "debate_start",
+ "data": {
+ "max_rounds": max_rounds,
+ "format": debate_format.value,
+ "judge_mode": judge_mode.value,
+ "models": model_names,
+ },
+ })
+
+ debate_history: List[Dict[str, Any]] = []
+
+ for round_num in range(1, max_rounds + 1):
+ yield _sse_event({"type": "round_start", "data": {"round": round_num}})
+
+ round_responses: List[Dict[str, Any]] = []
+ async for result in debate_round(
+ member_configs, context, user_prompt,
+ debate_history, round_num, debate_format, custom_format_prompt,
+ attachments_per_model=attachments_per_model,
+ tools_per_model=tools_per_model,
+ openrouter_api_key=openrouter_api_key,
+ images=images,
+ ):
+ round_responses.append(result)
+ yield _sse_event({
+ "type": "round_model_complete",
+ "data": {"round": round_num, "model": result["model"], "response": result["response"]},
+ })
+
+ debate_history.append({"round": round_num, "responses": round_responses})
+
+ yield _sse_event({
+ "type": "round_complete",
+ "data": {"round": round_num, "responses": round_responses},
+ })
+
+ if not round_responses:
+ yield _sse_event({
+ "type": "error",
+ "data": {"message": "All debate models failed to respond."},
+ })
+ return
+
+ # Check stop condition (skip on last round)
+ if round_num < max_rounds:
+ if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config:
+ decision = await judge_evaluate_round(
+ judge_config, debate_history, user_prompt,
+ openrouter_api_key=openrouter_api_key,
+ )
+ yield _sse_event({
+ "type": "judge_decision",
+ "data": {"round": round_num, **decision},
+ })
+ if not decision["continue"]:
+ break
+
+ elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE:
+ convergence = await check_self_convergence(
+ member_configs, round_responses,
+ openrouter_api_key=openrouter_api_key,
+ )
+ yield _sse_event({
+ "type": "convergence_check",
+ "data": {"round": round_num, **convergence},
+ })
+ if convergence["converged"]:
+ break
+ # DISPLAY_ONLY: just continue to next round
+
+ # Final synthesis
+ if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config:
+ yield _sse_event({
+ "type": "final_start",
+ "data": {"model": judge_config.model_name},
+ })
+
+ full_verdict = ""
+ async for chunk in judge_final_verdict(
+ judge_config, debate_history, user_prompt,
+ openrouter_api_key=openrouter_api_key,
+ ):
+ full_verdict += chunk
+ yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}})
+
+ yield _sse_event({
+ "type": "final_complete",
+ "data": {"model": judge_config.model_name, "response": full_verdict},
+ })
+
+ elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE:
+ # Use the last round's responses as the final answer
+ last_responses = debate_history[-1]["responses"] if debate_history else []
+ # Pick the longest response as the "best" convergent answer
+ if last_responses:
+ best = max(last_responses, key=lambda r: len(r.get("response", "")))
+ yield _sse_event({
+ "type": "final_complete",
+ "data": {"model": best["model"], "response": best["response"]},
+ })
+
+ yield _sse_event({"type": "debate_complete"})