"""Multi-round LLM Debate orchestration for ContextFlow.""" import asyncio import json import logging from typing import AsyncGenerator, Dict, List, Any, Optional from app.schemas import Context, LLMConfig, DebateFormat, DebateJudgeMode from app.services.llm import query_model_full, llm_streamer logger = logging.getLogger("contextflow.debate") def _sse_event(data: dict) -> str: """Format a dict as an SSE data line.""" return f"data: {json.dumps(data)}\n\n" def _format_history(debate_history: List[Dict[str, Any]]) -> str: """Format debate history into readable text.""" text = "" for past_round in debate_history: rn = past_round["round"] text += f"\n--- Round {rn} ---\n" for resp in past_round["responses"]: text += f"\n[{resp['model']}]:\n{resp['response']}\n" return text def _format_own_position(debate_history: List[Dict[str, Any]], model_name: str) -> str: """Extract this model's own previous responses across rounds.""" parts = [] for past_round in debate_history: for resp in past_round["responses"]: if resp["model"] == model_name: parts.append(f"Round {past_round['round']}:\n{resp['response']}") return "\n\n".join(parts) def build_debate_prompt( user_query: str, debate_history: List[Dict[str, Any]], model_name: str, round_num: int, debate_format: DebateFormat, custom_prompt: Optional[str] = None, model_index: int = 0, total_models: int = 2, current_round_so_far: Optional[List[Dict[str, Any]]] = None, ) -> str: """ Build the prompt for a debater based on format and history. Key principle: The user's question is an open-ended question, NOT a debate proposition. In Round 1, each model independently answers the question to form their own thesis. In Round 2+, models see others' positions and debate — defending their own viewpoint and critiquing others with evidence. For Round 2+, models are queried sequentially. current_round_so_far contains responses from models that have already spoken this round. """ history_text = _format_history(debate_history) if debate_history else "" own_position = _format_own_position(debate_history, model_name) if debate_history else "" # Format current round's earlier responses (turn-based context) current_round_text = "" if current_round_so_far: current_round_text = f"\n--- Round {round_num} (so far) ---\n" for resp in current_round_so_far: current_round_text += f"\n[{resp['model']}]:\n{resp['response']}\n" if debate_format == DebateFormat.FREE_DISCUSSION: if round_num == 1: return ( f"You are about to participate in a multi-model debate. " f"First, independently answer the following question. " f"Your answer will become your position in the debate.\n\n" f'Question: "{user_query}"\n\n' f"Provide a thorough, well-reasoned answer. This is your initial position." ) return ( f'You are in a multi-model debate about the question:\n"{user_query}"\n\n' f"Your position so far:\n{own_position}\n\n" f"Previous rounds:\n{history_text}\n" f"{current_round_text}\n" f"This is round {round_num}. It is now your turn to speak. " f"Review what the other participants have argued (including anyone who has " f"already spoken this round). Defend your position where you believe you are " f"right, acknowledge good points from others, and strengthen your argument " f"with additional evidence or reasoning. " f"You may refine your position but should not abandon it without strong justification." ) if debate_format == DebateFormat.STRUCTURED_OPPOSITION: if round_num == 1: return ( f"You are Debater #{model_index + 1} in a structured multi-model debate. " f"First, independently answer the following question. " f"Your answer will be YOUR unique position that you must defend.\n\n" f'Question: "{user_query}"\n\n' f"Provide a thorough, well-reasoned answer. Take a clear, distinctive stance." ) return ( f'You are Debater #{model_index + 1} in a structured multi-model debate.\n' f'Question: "{user_query}"\n\n' f"Your position so far:\n{own_position}\n\n" f"Previous rounds:\n{history_text}\n" f"{current_round_text}\n" f"This is round {round_num}. It is now your turn. Your task:\n" f"1. Defend YOUR position with concrete arguments and evidence\n" f"2. Directly critique each other debater's position — point out flaws, " f"gaps, or weaker reasoning in what they have said so far\n" f"3. Explain why your answer is superior to theirs\n" f"Be persuasive and specific. Reference the other debaters' actual claims." ) if debate_format == DebateFormat.ITERATIVE_IMPROVEMENT: if round_num == 1: return ( f"You are participating in an iterative improvement exercise. " f"First, independently answer the following question with your best effort.\n\n" f'Question: "{user_query}"\n\n' f"Provide a thorough, well-reasoned answer." ) return ( f'You are in an iterative improvement exercise.\n' f'Question: "{user_query}"\n\n' f"Your previous answer:\n{own_position}\n\n" f"All participants' previous answers:\n{history_text}\n" f"{current_round_text}\n" f"This is round {round_num}. Review ALL other participants' answers carefully " f"(including those who have already spoken this round). " f"Identify their best ideas, strongest arguments, and any insights you missed. " f"Also identify flaws or gaps in their reasoning. " f"Now produce an improved version of YOUR answer that incorporates the best " f"insights from everyone while fixing any weaknesses." ) if debate_format == DebateFormat.CUSTOM and custom_prompt: prompt = custom_prompt prompt = prompt.replace("{history}", history_text or "(No history yet)") prompt = prompt.replace("{own_position}", own_position or "(No position yet)") prompt = prompt.replace("{current_round}", current_round_text or "(You are first to speak)") prompt = prompt.replace("{round}", str(round_num)) prompt = prompt.replace("{model_name}", model_name) prompt = prompt.replace("{question}", user_query) prompt = prompt.replace("{debater_number}", str(model_index + 1)) return prompt # Fallback to free discussion if round_num == 1: return ( f"Answer the following question thoroughly. " f"Your answer will be your position in a multi-model debate.\n\n" f'Question: "{user_query}"' ) return ( f'Question: "{user_query}"\n\n' f"Your position so far:\n{own_position}\n\n" f"Previous discussion:\n{history_text}\n" f"{current_round_text}\n" f"Round {round_num}: It is your turn. Defend and refine your position, " f"responding to what others have argued." ) async def debate_round( configs: List[LLMConfig], context: Context, user_prompt: str, debate_history: List[Dict[str, Any]], round_num: int, debate_format: DebateFormat, custom_prompt: Optional[str] = None, attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, openrouter_api_key: Optional[str] = None, images: Optional[List[Dict[str, Any]]] = None, original_indices: Optional[List[int]] = None, original_total: Optional[int] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """ Query debate models for one round. Round 1: all models in parallel (independent initial positions). Round 2+: sequential turn-based (each model sees prior models' responses from the current round before responding). """ async def _query_one( idx: int, config: LLMConfig, current_round_so_far: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: mi = original_indices[idx] if original_indices else idx tm = original_total if original_total else len(configs) prompt = build_debate_prompt( user_prompt, debate_history, config.model_name, round_num, debate_format, custom_prompt, model_index=mi, total_models=tm, current_round_so_far=current_round_so_far, ) atts = attachments_per_model[idx] if attachments_per_model else None tls = tools_per_model[idx] if tools_per_model else None try: response = await query_model_full( context, prompt, config, attachments=atts, tools=tls, openrouter_api_key=openrouter_api_key, images=images if round_num == 1 else None, ) return {"model": config.model_name, "response": response} except Exception as e: logger.error("Debate round %d failed for %s: %s", round_num, config.model_name, e) return {"model": config.model_name, "response": f"[Error: {e}]"} if round_num == 1: # Round 1: parallel — all models form positions independently tasks = { asyncio.ensure_future(_query_one(i, cfg)): i for i, cfg in enumerate(configs) } for coro in asyncio.as_completed(tasks.keys()): result = await coro yield result else: # Round 2+: sequential turn-based — each model sees current round context current_round_responses: List[Dict[str, Any]] = [] for i, cfg in enumerate(configs): result = await _query_one(i, cfg, current_round_so_far=current_round_responses) current_round_responses.append(result) yield result async def judge_evaluate_round( judge_config: LLMConfig, debate_history: List[Dict[str, Any]], user_query: str, openrouter_api_key: Optional[str] = None, ) -> Dict[str, Any]: """Judge decides if debate should continue after a round.""" last_round = len(debate_history) history_text = "" for past_round in debate_history: rn = past_round["round"] history_text += f"\n--- Round {rn} ---\n" for resp in past_round["responses"]: history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" prompt = ( f"You are the judge of a multi-model debate. Each model answered the question " f"independently and is now defending their position.\n\n" f'Original question: "{user_query}"\n\n' f"Debate transcript (Round 1 to {last_round}):\n{history_text}\n\n" f"Evaluate whether continuing the debate would produce meaningful new insights.\n" f"Consider:\n" f"- Are the participants still raising substantive new arguments?\n" f"- Has one position clearly emerged as strongest, or are there still valid competing views?\n" f"- Would another round help clarify remaining disagreements?\n\n" f"Respond with exactly one of:\n" f"CONTINUE - if there are still productive arguments to be made (explain why briefly)\n" f"STOP - if the debate has been thorough enough for a final verdict (explain why briefly)" ) empty_context = Context(messages=[]) try: response = await query_model_full( empty_context, prompt, judge_config, openrouter_api_key=openrouter_api_key, ) should_continue = "CONTINUE" in response.upper().split("\n")[0] return {"continue": should_continue, "reasoning": response} except Exception as e: logger.error("Judge evaluation failed: %s", e) return {"continue": False, "reasoning": f"[Judge error: {e}]"} async def check_model_convinced( config: LLMConfig, debate_history: List[Dict[str, Any]], user_query: str, other_model_names: List[str], openrouter_api_key: Optional[str] = None, ) -> Dict[str, Any]: """ Ask a single model whether it has been convinced by another participant. Returns {"convinced": bool, "convinced_by": str|None, "reasoning": str}. """ history_text = _format_history(debate_history) own_position = _format_own_position(debate_history, config.model_name) prompt = ( f"You are {config.model_name}, a participant in a multi-model debate.\n\n" f'Original question: "{user_query}"\n\n' f"Your position across rounds:\n{own_position}\n\n" f"Full debate transcript:\n{history_text}\n\n" f"After reviewing the debate so far, honestly evaluate:\n" f"Has another participant made arguments strong enough to convince you " f"that their answer is better than yours?\n\n" f"Respond in EXACTLY this format:\n" f"CONVINCED: - if you concede to another participant's position\n" f"NOT CONVINCED - if you still believe your position is strongest\n" f"Then briefly explain why.\n\n" f"Other participants: {', '.join(other_model_names)}\n" f"Be intellectually honest. If someone made a clearly stronger argument, " f"acknowledge it." ) empty_context = Context(messages=[]) try: response = await query_model_full( empty_context, prompt, config, openrouter_api_key=openrouter_api_key, ) first_line = response.strip().split("\n")[0].upper() if "CONVINCED:" in first_line or (first_line.startswith("CONVINCED") and "NOT" not in first_line): # Parse who convinced them convinced_by = None raw = response.strip().split("\n")[0] if ":" in raw: candidate = raw.split(":", 1)[1].strip().rstrip(".") for mn in other_model_names: if mn.lower() in candidate.lower() or candidate.lower() in mn.lower(): convinced_by = mn break return {"convinced": True, "convinced_by": convinced_by, "reasoning": response} return {"convinced": False, "convinced_by": None, "reasoning": response} except Exception as e: logger.error("Conviction check failed for %s: %s", config.model_name, e) return {"convinced": False, "convinced_by": None, "reasoning": f"[Check error: {e}]"} async def winner_final_summary( winner_config: LLMConfig, debate_history: List[Dict[str, Any]], user_query: str, openrouter_api_key: Optional[str] = None, ) -> AsyncGenerator[str, None]: """Stream a final summary from the winning model in self-convergence mode.""" history_text = _format_history(debate_history) prompt = ( f"You participated in a multi-model debate and your position was judged " f"the strongest.\n\n" f'Original question: "{user_query}"\n\n' f"Full debate transcript:\n{history_text}\n\n" f"Now provide a comprehensive final answer to the original question. " f"Incorporate the best insights and valid points raised by other participants " f"during the debate, while maintaining the core of your position. " f"This should be a polished, definitive answer — not a debate response." ) empty_context = Context(messages=[]) async for chunk in llm_streamer( empty_context, prompt, winner_config, openrouter_api_key=openrouter_api_key, ): yield chunk async def judge_final_verdict( judge_config: LLMConfig, debate_history: List[Dict[str, Any]], user_query: str, openrouter_api_key: Optional[str] = None, ) -> AsyncGenerator[str, None]: """Stream the judge's final verdict/synthesis.""" history_text = "" for past_round in debate_history: rn = past_round["round"] history_text += f"\n--- Round {rn} ---\n" for resp in past_round["responses"]: history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" prompt = ( f"You are the judge of a multi-model debate. Each model independently answered " f"the question and then debated to defend their position.\n\n" f'Original question: "{user_query}"\n\n' f"Full debate transcript:\n{history_text}\n\n" f"As the judge, provide:\n" f"1. Each participant's core position and how it evolved through the debate\n" f"2. The strengths and weaknesses of each position, noting which arguments " f"were effectively challenged and which stood up to scrutiny\n" f"3. Your final verdict: synthesize the best answer to the original question, " f"drawing from the strongest arguments and evidence presented across all participants. " f"Clearly explain which positions or insights you drew from and why." ) empty_context = Context(messages=[]) async for chunk in llm_streamer( empty_context, prompt, judge_config, openrouter_api_key=openrouter_api_key, ): yield chunk async def debate_event_stream( user_prompt: str, context: Context, member_configs: List[LLMConfig], judge_config: Optional[LLMConfig], judge_mode: DebateJudgeMode, debate_format: DebateFormat, max_rounds: int = 5, custom_format_prompt: Optional[str] = None, attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, openrouter_api_key: Optional[str] = None, images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: """Master orchestrator yielding SSE JSON events through the debate process.""" model_names = [c.model_name for c in member_configs] yield _sse_event({ "type": "debate_start", "data": { "max_rounds": max_rounds, "format": debate_format.value, "judge_mode": judge_mode.value, "models": model_names, }, }) debate_history: List[Dict[str, Any]] = [] # Track active participants (for self_convergence elimination) active_indices = list(range(len(member_configs))) for round_num in range(1, max_rounds + 1): active_configs = [member_configs[i] for i in active_indices] active_atts = [attachments_per_model[i] if attachments_per_model else None for i in active_indices] if attachments_per_model else None active_tools = [tools_per_model[i] if tools_per_model else None for i in active_indices] if tools_per_model else None yield _sse_event({"type": "round_start", "data": { "round": round_num, "active_models": [c.model_name for c in active_configs], }}) round_responses: List[Dict[str, Any]] = [] async for result in debate_round( active_configs, context, user_prompt, debate_history, round_num, debate_format, custom_format_prompt, attachments_per_model=active_atts, tools_per_model=active_tools, openrouter_api_key=openrouter_api_key, images=images, original_indices=active_indices, original_total=len(member_configs), ): round_responses.append(result) yield _sse_event({ "type": "round_model_complete", "data": {"round": round_num, "model": result["model"], "response": result["response"]}, }) debate_history.append({"round": round_num, "responses": round_responses}) yield _sse_event({ "type": "round_complete", "data": {"round": round_num, "responses": round_responses}, }) if not round_responses: yield _sse_event({ "type": "error", "data": {"message": "All debate models failed to respond."}, }) return # Check stop condition (skip Round 1 — still forming positions, and last round) if round_num >= 2 and round_num < max_rounds: if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: decision = await judge_evaluate_round( judge_config, debate_history, user_prompt, openrouter_api_key=openrouter_api_key, ) yield _sse_event({ "type": "judge_decision", "data": {"round": round_num, **decision}, }) if not decision["continue"]: break elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: # Ask each active model if it's been convinced other_names = [c.model_name for c in active_configs] conviction_tasks = [] for cfg in active_configs: others = [n for n in other_names if n != cfg.model_name] conviction_tasks.append( check_model_convinced( cfg, debate_history, user_prompt, others, openrouter_api_key=openrouter_api_key, ) ) results = await asyncio.gather(*conviction_tasks) # Process eliminations eliminated_this_round = [] for cfg, result in zip(active_configs, results): if result["convinced"]: eliminated_this_round.append(cfg.model_name) yield _sse_event({ "type": "model_eliminated", "data": { "round": round_num, "model": cfg.model_name, "convinced_by": result.get("convinced_by"), "reasoning": result["reasoning"], }, }) # Remove eliminated models from active list if eliminated_this_round: active_indices = [ i for i in active_indices if member_configs[i].model_name not in eliminated_this_round ] remaining = [member_configs[i].model_name for i in active_indices] yield _sse_event({ "type": "convergence_status", "data": { "round": round_num, "eliminated": eliminated_this_round, "remaining": remaining, }, }) # If only one model left, debate is over if len(active_indices) <= 1: break # DISPLAY_ONLY: just continue to next round # === Final synthesis === if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: yield _sse_event({ "type": "final_start", "data": {"model": judge_config.model_name}, }) full_verdict = "" async for chunk in judge_final_verdict( judge_config, debate_history, user_prompt, openrouter_api_key=openrouter_api_key, ): full_verdict += chunk yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}}) yield _sse_event({ "type": "final_complete", "data": {"model": judge_config.model_name, "response": full_verdict}, }) elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE and debate_history: # Winner = last remaining model winner_cfg = member_configs[active_indices[0]] if active_indices else member_configs[0] winner_name = winner_cfg.model_name yield _sse_event({ "type": "final_start", "data": {"model": winner_name}, }) full_summary = "" async for chunk in winner_final_summary( winner_cfg, debate_history, user_prompt, openrouter_api_key=openrouter_api_key, ): full_summary += chunk yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}}) yield _sse_event({ "type": "final_complete", "data": {"model": winner_name, "response": full_summary}, }) yield _sse_event({"type": "debate_complete"})