diff options
Diffstat (limited to 'backend/app/services')
| -rw-r--r-- | backend/app/services/debate.py | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/backend/app/services/debate.py b/backend/app/services/debate.py new file mode 100644 index 0000000..d409cb9 --- /dev/null +++ b/backend/app/services/debate.py @@ -0,0 +1,371 @@ +"""Multi-round LLM Debate orchestration for ContextFlow.""" + +import asyncio +import json +import logging +from typing import AsyncGenerator, Dict, List, Any, Optional + +from app.schemas import Context, LLMConfig, DebateFormat, DebateJudgeMode +from app.services.llm import query_model_full, llm_streamer + +logger = logging.getLogger("contextflow.debate") + + +def _sse_event(data: dict) -> str: + """Format a dict as an SSE data line.""" + return f"data: {json.dumps(data)}\n\n" + + +def build_debate_prompt( + user_query: str, + debate_history: List[Dict[str, Any]], + model_name: str, + round_num: int, + debate_format: DebateFormat, + custom_prompt: Optional[str] = None, + model_index: int = 0, + total_models: int = 2, +) -> str: + """Build the prompt for a debater based on format and history.""" + history_text = "" + if debate_history: + for past_round in debate_history: + rn = past_round["round"] + history_text += f"\n--- Round {rn} ---\n" + for resp in past_round["responses"]: + history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" + + if debate_format == DebateFormat.FREE_DISCUSSION: + if round_num == 1: + return ( + f"You are participating in a roundtable discussion about the following question:\n\n" + f'"{user_query}"\n\n' + f"Provide your perspective and answer to this question." + ) + return ( + f"You are participating in a roundtable discussion about the following question:\n\n" + f'"{user_query}"\n\n' + f"Here is the discussion so far:\n{history_text}\n\n" + f"This is round {round_num}. Consider what others have said, respond to their points, " + f"and refine or defend your position." + ) + + if debate_format == DebateFormat.STRUCTURED_OPPOSITION: + roles = ["FOR", "AGAINST", "DEVIL'S ADVOCATE", "MEDIATOR", "CRITIC", "SYNTHESIZER"] + role = roles[model_index % len(roles)] + if round_num == 1: + return ( + f"You are arguing {role} the following position in a structured debate:\n\n" + f'"{user_query}"\n\n' + f"Present your strongest arguments from the {role} perspective." + ) + return ( + f"You are arguing {role} the following position in a structured debate:\n\n" + f'"{user_query}"\n\n' + f"Debate history:\n{history_text}\n\n" + f"This is round {round_num}. Respond to the other participants' arguments " + f"while maintaining your {role} position. Address their strongest points." + ) + + if debate_format == DebateFormat.ITERATIVE_IMPROVEMENT: + if round_num == 1: + return ( + f"You are participating in an iterative improvement exercise on the following question:\n\n" + f'"{user_query}"\n\n' + f"Provide your best answer." + ) + return ( + f"You are participating in an iterative improvement exercise on the following question:\n\n" + f'"{user_query}"\n\n' + f"Here are the previous answers from all participants:\n{history_text}\n\n" + f"This is round {round_num}. Critique the other participants' answers, identify flaws or gaps, " + f"and provide an improved answer that incorporates the best insights from everyone." + ) + + if debate_format == DebateFormat.CUSTOM and custom_prompt: + prompt = custom_prompt + prompt = prompt.replace("{history}", history_text or "(No history yet)") + prompt = prompt.replace("{round}", str(round_num)) + prompt = prompt.replace("{model_name}", model_name) + prompt = prompt.replace("{question}", user_query) + return prompt + + # Fallback to free discussion + if round_num == 1: + return f'Provide your answer to the following question:\n\n"{user_query}"' + return ( + f'Question: "{user_query}"\n\n' + f"Previous discussion:\n{history_text}\n\n" + f"Round {round_num}: Provide your updated response." + ) + + +async def debate_round( + configs: List[LLMConfig], + context: Context, + user_prompt: str, + debate_history: List[Dict[str, Any]], + round_num: int, + debate_format: DebateFormat, + custom_prompt: Optional[str] = None, + attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[Dict[str, Any], None]: + """Query all debate models in parallel for one round, yielding as each completes.""" + + async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]: + prompt = build_debate_prompt( + user_prompt, debate_history, config.model_name, + round_num, debate_format, custom_prompt, + model_index=idx, total_models=len(configs), + ) + atts = attachments_per_model[idx] if attachments_per_model else None + tls = tools_per_model[idx] if tools_per_model else None + try: + response = await query_model_full( + context, prompt, config, + attachments=atts, tools=tls, + openrouter_api_key=openrouter_api_key, + images=images if round_num == 1 else None, # Only send images in round 1 + ) + return {"model": config.model_name, "response": response} + except Exception as e: + logger.error("Debate round %d failed for %s: %s", round_num, config.model_name, e) + return {"model": config.model_name, "response": f"[Error: {e}]"} + + tasks = { + asyncio.ensure_future(_query_one(i, cfg)): i + for i, cfg in enumerate(configs) + } + for coro in asyncio.as_completed(tasks.keys()): + result = await coro + yield result + + +async def judge_evaluate_round( + judge_config: LLMConfig, + debate_history: List[Dict[str, Any]], + user_query: str, + openrouter_api_key: Optional[str] = None, +) -> Dict[str, Any]: + """Judge decides if debate should continue after a round.""" + last_round = len(debate_history) + history_text = "" + for past_round in debate_history: + rn = past_round["round"] + history_text += f"\n--- Round {rn} ---\n" + for resp in past_round["responses"]: + history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" + + prompt = ( + f"You are the judge of a multi-model debate on the following question:\n" + f'"{user_query}"\n\n' + f"Debate history (Round 1 to {last_round}):\n{history_text}\n\n" + f"Evaluate whether the debate has reached a satisfactory conclusion.\n" + f"Consider: Have the key points been thoroughly explored? Is there consensus?\n" + f"Are there unresolved disagreements worth continuing?\n\n" + f"Respond with exactly one of:\n" + f"CONTINUE - if the debate should go on (explain why briefly)\n" + f"STOP - if a clear conclusion has been reached (explain why briefly)" + ) + + empty_context = Context(messages=[]) + try: + response = await query_model_full( + empty_context, prompt, judge_config, + openrouter_api_key=openrouter_api_key, + ) + should_continue = "CONTINUE" in response.upper().split("\n")[0] + return {"continue": should_continue, "reasoning": response} + except Exception as e: + logger.error("Judge evaluation failed: %s", e) + return {"continue": False, "reasoning": f"[Judge error: {e}]"} + + +async def check_self_convergence( + configs: List[LLMConfig], + round_responses: List[Dict[str, Any]], + openrouter_api_key: Optional[str] = None, +) -> Dict[str, Any]: + """Check if debate responses have converged using the first available model.""" + responses_text = "\n\n".join( + f"[{r['model']}]:\n{r['response']}" for r in round_responses + ) + prompt = ( + f"Below are the responses from the latest round of a debate:\n\n" + f"{responses_text}\n\n" + f"Do all participants essentially agree on the answer? Respond ONLY with:\n" + f"CONVERGED - if there is clear consensus\n" + f"DIVERGENT - if there are still significant disagreements" + ) + + empty_context = Context(messages=[]) + # Use the first config as the convergence checker + check_config = configs[0] + try: + response = await query_model_full( + empty_context, prompt, check_config, + openrouter_api_key=openrouter_api_key, + ) + converged = "CONVERGED" in response.upper().split("\n")[0] + return {"converged": converged, "reasoning": response} + except Exception as e: + logger.error("Convergence check failed: %s", e) + return {"converged": False, "reasoning": f"[Convergence check error: {e}]"} + + +async def judge_final_verdict( + judge_config: LLMConfig, + debate_history: List[Dict[str, Any]], + user_query: str, + openrouter_api_key: Optional[str] = None, +) -> AsyncGenerator[str, None]: + """Stream the judge's final verdict/synthesis.""" + history_text = "" + for past_round in debate_history: + rn = past_round["round"] + history_text += f"\n--- Round {rn} ---\n" + for resp in past_round["responses"]: + history_text += f"\n[{resp['model']}]:\n{resp['response']}\n" + + prompt = ( + f"You are the judge of a multi-model debate. Below is the full debate transcript.\n\n" + f'Question: "{user_query}"\n\n' + f"{history_text}\n\n" + f"As the judge, provide:\n" + f"1. A summary of the key arguments from each participant\n" + f"2. An evaluation of the strengths and weaknesses of each position\n" + f"3. Your final verdict: the best, most accurate, and most comprehensive answer " + f"to the original question, synthesizing the best insights from the debate." + ) + + empty_context = Context(messages=[]) + async for chunk in llm_streamer( + empty_context, prompt, judge_config, + openrouter_api_key=openrouter_api_key, + ): + yield chunk + + +async def debate_event_stream( + user_prompt: str, + context: Context, + member_configs: List[LLMConfig], + judge_config: Optional[LLMConfig], + judge_mode: DebateJudgeMode, + debate_format: DebateFormat, + max_rounds: int = 5, + custom_format_prompt: Optional[str] = None, + attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[str, None]: + """Master orchestrator yielding SSE JSON events through the debate process.""" + + model_names = [c.model_name for c in member_configs] + yield _sse_event({ + "type": "debate_start", + "data": { + "max_rounds": max_rounds, + "format": debate_format.value, + "judge_mode": judge_mode.value, + "models": model_names, + }, + }) + + debate_history: List[Dict[str, Any]] = [] + + for round_num in range(1, max_rounds + 1): + yield _sse_event({"type": "round_start", "data": {"round": round_num}}) + + round_responses: List[Dict[str, Any]] = [] + async for result in debate_round( + member_configs, context, user_prompt, + debate_history, round_num, debate_format, custom_format_prompt, + attachments_per_model=attachments_per_model, + tools_per_model=tools_per_model, + openrouter_api_key=openrouter_api_key, + images=images, + ): + round_responses.append(result) + yield _sse_event({ + "type": "round_model_complete", + "data": {"round": round_num, "model": result["model"], "response": result["response"]}, + }) + + debate_history.append({"round": round_num, "responses": round_responses}) + + yield _sse_event({ + "type": "round_complete", + "data": {"round": round_num, "responses": round_responses}, + }) + + if not round_responses: + yield _sse_event({ + "type": "error", + "data": {"message": "All debate models failed to respond."}, + }) + return + + # Check stop condition (skip on last round) + if round_num < max_rounds: + if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: + decision = await judge_evaluate_round( + judge_config, debate_history, user_prompt, + openrouter_api_key=openrouter_api_key, + ) + yield _sse_event({ + "type": "judge_decision", + "data": {"round": round_num, **decision}, + }) + if not decision["continue"]: + break + + elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: + convergence = await check_self_convergence( + member_configs, round_responses, + openrouter_api_key=openrouter_api_key, + ) + yield _sse_event({ + "type": "convergence_check", + "data": {"round": round_num, **convergence}, + }) + if convergence["converged"]: + break + # DISPLAY_ONLY: just continue to next round + + # Final synthesis + if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config: + yield _sse_event({ + "type": "final_start", + "data": {"model": judge_config.model_name}, + }) + + full_verdict = "" + async for chunk in judge_final_verdict( + judge_config, debate_history, user_prompt, + openrouter_api_key=openrouter_api_key, + ): + full_verdict += chunk + yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}}) + + yield _sse_event({ + "type": "final_complete", + "data": {"model": judge_config.model_name, "response": full_verdict}, + }) + + elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE: + # Use the last round's responses as the final answer + last_responses = debate_history[-1]["responses"] if debate_history else [] + # Pick the longest response as the "best" convergent answer + if last_responses: + best = max(last_responses, key=lambda r: len(r.get("response", ""))) + yield _sse_event({ + "type": "final_complete", + "data": {"model": best["model"], "response": best["response"]}, + }) + + yield _sse_event({"type": "debate_complete"}) |
