"""Multi-round LLM Debate orchestration for ContextFlow.""" import asyncio import json import logging from typing import AsyncGenerator, Dict, List, Any, Optional from app.schemas import Context, LLMConfig, DebateFormat, DebateJudgeMode from app.services.llm import query_model_full, llm_streamer logger = logging.getLogger("contextflow.debate") def _sse_event(data: dict) -> str: """Format a dict as an SSE data line.""" return f"data: {json.dumps(data)}\n\n" def build_debate_prompt( user_query: str, debate_history: List[Dict[str, Any]], model_name: str, round_num: int, debate_format: DebateFormat, custom_prompt: Optional[str] = None, model_index: int = 0, total_models: int = 2, ) -> str: """Build the prompt for a debater based on format and history.""" history_text = "" if debate_history: for past_round in debate_history: rn = past_round["round"] history_text += f"\n--- Round {rn} ---\n" for resp in past_round["responses"]: history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" if debate_format == DebateFormat.FREE_DISCUSSION: if round_num == 1: return ( f"You are participating in a roundtable discussion about the following question:\n\n" f'"{user_query}"\n\n' f"Provide your perspective and answer to this question." ) return ( f"You are participating in a roundtable discussion about the following question:\n\n" f'"{user_query}"\n\n' f"Here is the discussion so far:\n{history_text}\n\n" f"This is round {round_num}. Consider what others have said, respond to their points, " f"and refine or defend your position." ) if debate_format == DebateFormat.STRUCTURED_OPPOSITION: roles = ["FOR", "AGAINST", "DEVIL'S ADVOCATE", "MEDIATOR", "CRITIC", "SYNTHESIZER"] role = roles[model_index % len(roles)] if round_num == 1: return ( f"You are arguing {role} the following position in a structured debate:\n\n" f'"{user_query}"\n\n' f"Present your strongest arguments from the {role} perspective." ) return ( f"You are arguing {role} the following position in a structured debate:\n\n" f'"{user_query}"\n\n' f"Debate history:\n{history_text}\n\n" f"This is round {round_num}. Respond to the other participants' arguments " f"while maintaining your {role} position. Address their strongest points." ) if debate_format == DebateFormat.ITERATIVE_IMPROVEMENT: if round_num == 1: return ( f"You are participating in an iterative improvement exercise on the following question:\n\n" f'"{user_query}"\n\n' f"Provide your best answer." ) return ( f"You are participating in an iterative improvement exercise on the following question:\n\n" f'"{user_query}"\n\n' f"Here are the previous answers from all participants:\n{history_text}\n\n" f"This is round {round_num}. Critique the other participants' answers, identify flaws or gaps, " f"and provide an improved answer that incorporates the best insights from everyone." ) if debate_format == DebateFormat.CUSTOM and custom_prompt: prompt = custom_prompt prompt = prompt.replace("{history}", history_text or "(No history yet)") prompt = prompt.replace("{round}", str(round_num)) prompt = prompt.replace("{model_name}", model_name) prompt = prompt.replace("{question}", user_query) return prompt # Fallback to free discussion if round_num == 1: return f'Provide your answer to the following question:\n\n"{user_query}"' return ( f'Question: "{user_query}"\n\n' f"Previous discussion:\n{history_text}\n\n" f"Round {round_num}: Provide your updated response." ) async def debate_round( configs: List[LLMConfig], context: Context, user_prompt: str, debate_history: List[Dict[str, Any]], round_num: int, debate_format: DebateFormat, custom_prompt: Optional[str] = None, attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, openrouter_api_key: Optional[str] = None, images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[Dict[str, Any], None]: """Query all debate models in parallel for one round, yielding as each completes.""" async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]: prompt = build_debate_prompt( user_prompt, debate_history, config.model_name, round_num, debate_format, custom_prompt, model_index=idx, total_models=len(configs), ) atts = attachments_per_model[idx] if attachments_per_model else None tls = tools_per_model[idx] if tools_per_model else None try: response = await query_model_full( context, prompt, config, attachments=atts, tools=tls, openrouter_api_key=openrouter_api_key, images=images if round_num == 1 else None, # Only send images in round 1 ) return {"model": config.model_name, "response": response} except Exception as e: logger.error("Debate round %d failed for %s: %s", round_num, config.model_name, e) return {"model": config.model_name, "response": f"[Error: {e}]"} tasks = { asyncio.ensure_future(_query_one(i, cfg)): i for i, cfg in enumerate(configs) } for coro in asyncio.as_completed(tasks.keys()): result = await coro yield result async def judge_evaluate_round( judge_config: LLMConfig, debate_history: List[Dict[str, Any]], user_query: str, openrouter_api_key: Optional[str] = None, ) -> Dict[str, Any]: """Judge decides if debate should continue after a round.""" last_round = len(debate_history) history_text = "" for past_round in debate_history: rn = past_round["round"] history_text += f"\n--- Round {rn} ---\n" for resp in past_round["responses"]: history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" prompt = ( f"You are the judge of a multi-model debate on the following question:\n" f'"{user_query}"\n\n' f"Debate history (Round 1 to {last_round}):\n{history_text}\n\n" f"Evaluate whether the debate has reached a satisfactory conclusion.\n" f"Consider: Have the key points been thoroughly explored? Is there consensus?\n" f"Are there unresolved disagreements worth continuing?\n\n" f"Respond with exactly one of:\n" f"CONTINUE - if the debate should go on (explain why briefly)\n" f"STOP - if a clear conclusion has been reached (explain why briefly)" ) empty_context = Context(messages=[]) try: response = await query_model_full( empty_context, prompt, judge_config, openrouter_api_key=openrouter_api_key, ) should_continue = "CONTINUE" in response.upper().split("\n")[0] return {"continue": should_continue, "reasoning": response} except Exception as e: logger.error("Judge evaluation failed: %s", e) return {"continue": False, "reasoning": f"[Judge error: {e}]"} async def check_self_convergence( configs: List[LLMConfig], round_responses: List[Dict[str, Any]], openrouter_api_key: Optional[str] = None, ) -> Dict[str, Any]: """Check if debate responses have converged using the first available model.""" responses_text = "\n\n".join( f"[{r['model']}]:\n{r['response']}" for r in round_responses ) prompt = ( f"Below are the responses from the latest round of a debate:\n\n" f"{responses_text}\n\n" f"Do all participants essentially agree on the answer? Respond ONLY with:\n" f"CONVERGED - if there is clear consensus\n" f"DIVERGENT - if there are still significant disagreements" ) empty_context = Context(messages=[]) # Use the first config as the convergence checker check_config = configs[0] try: response = await query_model_full( empty_context, prompt, check_config, openrouter_api_key=openrouter_api_key, ) converged = "CONVERGED" in response.upper().split("\n")[0] return {"converged": converged, "reasoning": response} except Exception as e: logger.error("Convergence check failed: %s", e) return {"converged": False, "reasoning": f"[Convergence check error: {e}]"} async def judge_final_verdict( judge_config: LLMConfig, debate_history: List[Dict[str, Any]], user_query: str, openrouter_api_key: Optional[str] = None, ) -> AsyncGenerator[str, None]: """Stream the judge's final verdict/synthesis.""" history_text = "" for past_round in debate_history: rn = past_round["round"] history_text += f"\n--- Round {rn} ---\n" for resp in past_round["responses"]: history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" prompt = ( f"You are the judge of a multi-model debate. Below is the full debate transcript.\n\n" f'Question: "{user_query}"\n\n' f"{history_text}\n\n" f"As the judge, provide:\n" f"1. A summary of the key arguments from each participant\n" f"2. An evaluation of the strengths and weaknesses of each position\n" f"3. Your final verdict: the best, most accurate, and most comprehensive answer " f"to the original question, synthesizing the best insights from the debate." ) empty_context = Context(messages=[]) async for chunk in llm_streamer( empty_context, prompt, judge_config, openrouter_api_key=openrouter_api_key, ): yield chunk async def debate_event_stream( user_prompt: str, context: Context, member_configs: List[LLMConfig], judge_config: Optional[LLMConfig], judge_mode: DebateJudgeMode, debate_format: DebateFormat, max_rounds: int = 5, custom_format_prompt: Optional[str] = None, attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, openrouter_api_key: Optional[str] = None, images: Optional[List[Dict[str, Any]]] = None, ) -> AsyncGenerator[str, None]: """Master orchestrator yielding SSE JSON events through the debate process.""" model_names = [c.model_name for c in member_configs] yield _sse_event({ "type": "debate_start", "data": { "max_rounds": max_rounds, "format": debate_format.value, "judge_mode": judge_mode.value, "models": model_names, }, }) debate_history: List[Dict[str, Any]] = [] for round_num in range(1, max_rounds + 1): yield _sse_event({"type": "round_start", "data": {"round": round_num}}) round_responses: List[Dict[str, Any]] = [] async for result in debate_round( member_configs, context, user_prompt, debate_history, round_num, debate_format, custom_format_prompt, attachments_per_model=attachments_per_model, tools_per_model=tools_per_model, openrouter_api_key=openrouter_api_key, images=images, ): round_responses.append(result) yield _sse_event({ "type": "round_model_complete", "data": {"round": round_num, "model": result["model"], "response": result["response"]}, }) debate_history.append({"round": round_num, "responses": round_responses}) yield _sse_event({ "type": "round_complete", "data": {"round": round_num, "responses": round_responses}, }) if not round_responses: yield _sse_event({ "type": "error", "data": {"message": "All debate models failed to respond."}, }) return # Check stop condition (skip on last round) if round_num < max_rounds: if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: decision = await judge_evaluate_round( judge_config, debate_history, user_prompt, openrouter_api_key=openrouter_api_key, ) yield _sse_event({ "type": "judge_decision", "data": {"round": round_num, **decision}, }) if not decision["continue"]: break elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: convergence = await check_self_convergence( member_configs, round_responses, openrouter_api_key=openrouter_api_key, ) yield _sse_event({ "type": "convergence_check", "data": {"round": round_num, **convergence}, }) if convergence["converged"]: break # DISPLAY_ONLY: just continue to next round # Final synthesis if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: yield _sse_event({ "type": "final_start", "data": {"model": judge_config.model_name}, }) full_verdict = "" async for chunk in judge_final_verdict( judge_config, debate_history, user_prompt, openrouter_api_key=openrouter_api_key, ): full_verdict += chunk yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}}) yield _sse_event({ "type": "final_complete", "data": {"model": judge_config.model_name, "response": full_verdict}, }) elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: # Use the last round's responses as the final answer last_responses = debate_history[-1]["responses"] if debate_history else [] # Pick the longest response as the "best" convergent answer if last_responses: best = max(last_responses, key=lambda r: len(r.get("response", ""))) yield _sse_event({ "type": "final_complete", "data": {"model": best["model"], "response": best["response"]}, }) yield _sse_event({"type": "debate_complete"})