1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
|
"""Multi-round LLM Debate orchestration for ContextFlow."""
import asyncio
import json
import logging
from typing import AsyncGenerator, Dict, List, Any, Optional
from app.schemas import Context, LLMConfig, DebateFormat, DebateJudgeMode
from app.services.llm import query_model_full, llm_streamer
logger = logging.getLogger("contextflow.debate")
def _sse_event(data: dict) -> str:
"""Format a dict as an SSE data line."""
return f"data: {json.dumps(data)}\n\n"
def build_debate_prompt(
user_query: str,
debate_history: List[Dict[str, Any]],
model_name: str,
round_num: int,
debate_format: DebateFormat,
custom_prompt: Optional[str] = None,
model_index: int = 0,
total_models: int = 2,
) -> str:
"""Build the prompt for a debater based on format and history."""
history_text = ""
if debate_history:
for past_round in debate_history:
rn = past_round["round"]
history_text += f"\n--- Round {rn} ---\n"
for resp in past_round["responses"]:
history_text += f"\n[{resp['model']}]:\n{resp['response']}\n"
if debate_format == DebateFormat.FREE_DISCUSSION:
if round_num == 1:
return (
f"You are participating in a roundtable discussion about the following question:\n\n"
f'"{user_query}"\n\n'
f"Provide your perspective and answer to this question."
)
return (
f"You are participating in a roundtable discussion about the following question:\n\n"
f'"{user_query}"\n\n'
f"Here is the discussion so far:\n{history_text}\n\n"
f"This is round {round_num}. Consider what others have said, respond to their points, "
f"and refine or defend your position."
)
if debate_format == DebateFormat.STRUCTURED_OPPOSITION:
roles = ["FOR", "AGAINST", "DEVIL'S ADVOCATE", "MEDIATOR", "CRITIC", "SYNTHESIZER"]
role = roles[model_index % len(roles)]
if round_num == 1:
return (
f"You are arguing {role} the following position in a structured debate:\n\n"
f'"{user_query}"\n\n'
f"Present your strongest arguments from the {role} perspective."
)
return (
f"You are arguing {role} the following position in a structured debate:\n\n"
f'"{user_query}"\n\n'
f"Debate history:\n{history_text}\n\n"
f"This is round {round_num}. Respond to the other participants' arguments "
f"while maintaining your {role} position. Address their strongest points."
)
if debate_format == DebateFormat.ITERATIVE_IMPROVEMENT:
if round_num == 1:
return (
f"You are participating in an iterative improvement exercise on the following question:\n\n"
f'"{user_query}"\n\n'
f"Provide your best answer."
)
return (
f"You are participating in an iterative improvement exercise on the following question:\n\n"
f'"{user_query}"\n\n'
f"Here are the previous answers from all participants:\n{history_text}\n\n"
f"This is round {round_num}. Critique the other participants' answers, identify flaws or gaps, "
f"and provide an improved answer that incorporates the best insights from everyone."
)
if debate_format == DebateFormat.CUSTOM and custom_prompt:
prompt = custom_prompt
prompt = prompt.replace("{history}", history_text or "(No history yet)")
prompt = prompt.replace("{round}", str(round_num))
prompt = prompt.replace("{model_name}", model_name)
prompt = prompt.replace("{question}", user_query)
return prompt
# Fallback to free discussion
if round_num == 1:
return f'Provide your answer to the following question:\n\n"{user_query}"'
return (
f'Question: "{user_query}"\n\n'
f"Previous discussion:\n{history_text}\n\n"
f"Round {round_num}: Provide your updated response."
)
async def debate_round(
configs: List[LLMConfig],
context: Context,
user_prompt: str,
debate_history: List[Dict[str, Any]],
round_num: int,
debate_format: DebateFormat,
custom_prompt: Optional[str] = None,
attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
openrouter_api_key: Optional[str] = None,
images: Optional[List[Dict[str, Any]]] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""Query all debate models in parallel for one round, yielding as each completes."""
async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]:
prompt = build_debate_prompt(
user_prompt, debate_history, config.model_name,
round_num, debate_format, custom_prompt,
model_index=idx, total_models=len(configs),
)
atts = attachments_per_model[idx] if attachments_per_model else None
tls = tools_per_model[idx] if tools_per_model else None
try:
response = await query_model_full(
context, prompt, config,
attachments=atts, tools=tls,
openrouter_api_key=openrouter_api_key,
images=images if round_num == 1 else None, # Only send images in round 1
)
return {"model": config.model_name, "response": response}
except Exception as e:
logger.error("Debate round %d failed for %s: %s", round_num, config.model_name, e)
return {"model": config.model_name, "response": f"[Error: {e}]"}
tasks = {
asyncio.ensure_future(_query_one(i, cfg)): i
for i, cfg in enumerate(configs)
}
for coro in asyncio.as_completed(tasks.keys()):
result = await coro
yield result
async def judge_evaluate_round(
judge_config: LLMConfig,
debate_history: List[Dict[str, Any]],
user_query: str,
openrouter_api_key: Optional[str] = None,
) -> Dict[str, Any]:
"""Judge decides if debate should continue after a round."""
last_round = len(debate_history)
history_text = ""
for past_round in debate_history:
rn = past_round["round"]
history_text += f"\n--- Round {rn} ---\n"
for resp in past_round["responses"]:
history_text += f"\n[{resp['model']}]:\n{resp['response']}\n"
prompt = (
f"You are the judge of a multi-model debate on the following question:\n"
f'"{user_query}"\n\n'
f"Debate history (Round 1 to {last_round}):\n{history_text}\n\n"
f"Evaluate whether the debate has reached a satisfactory conclusion.\n"
f"Consider: Have the key points been thoroughly explored? Is there consensus?\n"
f"Are there unresolved disagreements worth continuing?\n\n"
f"Respond with exactly one of:\n"
f"CONTINUE - if the debate should go on (explain why briefly)\n"
f"STOP - if a clear conclusion has been reached (explain why briefly)"
)
empty_context = Context(messages=[])
try:
response = await query_model_full(
empty_context, prompt, judge_config,
openrouter_api_key=openrouter_api_key,
)
should_continue = "CONTINUE" in response.upper().split("\n")[0]
return {"continue": should_continue, "reasoning": response}
except Exception as e:
logger.error("Judge evaluation failed: %s", e)
return {"continue": False, "reasoning": f"[Judge error: {e}]"}
async def check_self_convergence(
configs: List[LLMConfig],
round_responses: List[Dict[str, Any]],
openrouter_api_key: Optional[str] = None,
) -> Dict[str, Any]:
"""Check if debate responses have converged using the first available model."""
responses_text = "\n\n".join(
f"[{r['model']}]:\n{r['response']}" for r in round_responses
)
prompt = (
f"Below are the responses from the latest round of a debate:\n\n"
f"{responses_text}\n\n"
f"Do all participants essentially agree on the answer? Respond ONLY with:\n"
f"CONVERGED - if there is clear consensus\n"
f"DIVERGENT - if there are still significant disagreements"
)
empty_context = Context(messages=[])
# Use the first config as the convergence checker
check_config = configs[0]
try:
response = await query_model_full(
empty_context, prompt, check_config,
openrouter_api_key=openrouter_api_key,
)
converged = "CONVERGED" in response.upper().split("\n")[0]
return {"converged": converged, "reasoning": response}
except Exception as e:
logger.error("Convergence check failed: %s", e)
return {"converged": False, "reasoning": f"[Convergence check error: {e}]"}
async def judge_final_verdict(
judge_config: LLMConfig,
debate_history: List[Dict[str, Any]],
user_query: str,
openrouter_api_key: Optional[str] = None,
) -> AsyncGenerator[str, None]:
"""Stream the judge's final verdict/synthesis."""
history_text = ""
for past_round in debate_history:
rn = past_round["round"]
history_text += f"\n--- Round {rn} ---\n"
for resp in past_round["responses"]:
history_text += f"\n[{resp['model']}]:\n{resp['response']}\n"
prompt = (
f"You are the judge of a multi-model debate. Below is the full debate transcript.\n\n"
f'Question: "{user_query}"\n\n'
f"{history_text}\n\n"
f"As the judge, provide:\n"
f"1. A summary of the key arguments from each participant\n"
f"2. An evaluation of the strengths and weaknesses of each position\n"
f"3. Your final verdict: the best, most accurate, and most comprehensive answer "
f"to the original question, synthesizing the best insights from the debate."
)
empty_context = Context(messages=[])
async for chunk in llm_streamer(
empty_context, prompt, judge_config,
openrouter_api_key=openrouter_api_key,
):
yield chunk
async def debate_event_stream(
user_prompt: str,
context: Context,
member_configs: List[LLMConfig],
judge_config: Optional[LLMConfig],
judge_mode: DebateJudgeMode,
debate_format: DebateFormat,
max_rounds: int = 5,
custom_format_prompt: Optional[str] = None,
attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
openrouter_api_key: Optional[str] = None,
images: Optional[List[Dict[str, Any]]] = None,
) -> AsyncGenerator[str, None]:
"""Master orchestrator yielding SSE JSON events through the debate process."""
model_names = [c.model_name for c in member_configs]
yield _sse_event({
"type": "debate_start",
"data": {
"max_rounds": max_rounds,
"format": debate_format.value,
"judge_mode": judge_mode.value,
"models": model_names,
},
})
debate_history: List[Dict[str, Any]] = []
for round_num in range(1, max_rounds + 1):
yield _sse_event({"type": "round_start", "data": {"round": round_num}})
round_responses: List[Dict[str, Any]] = []
async for result in debate_round(
member_configs, context, user_prompt,
debate_history, round_num, debate_format, custom_format_prompt,
attachments_per_model=attachments_per_model,
tools_per_model=tools_per_model,
openrouter_api_key=openrouter_api_key,
images=images,
):
round_responses.append(result)
yield _sse_event({
"type": "round_model_complete",
"data": {"round": round_num, "model": result["model"], "response": result["response"]},
})
debate_history.append({"round": round_num, "responses": round_responses})
yield _sse_event({
"type": "round_complete",
"data": {"round": round_num, "responses": round_responses},
})
if not round_responses:
yield _sse_event({
"type": "error",
"data": {"message": "All debate models failed to respond."},
})
return
# Check stop condition (skip on last round)
if round_num < max_rounds:
if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config:
decision = await judge_evaluate_round(
judge_config, debate_history, user_prompt,
openrouter_api_key=openrouter_api_key,
)
yield _sse_event({
"type": "judge_decision",
"data": {"round": round_num, **decision},
})
if not decision["continue"]:
break
elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE:
convergence = await check_self_convergence(
member_configs, round_responses,
openrouter_api_key=openrouter_api_key,
)
yield _sse_event({
"type": "convergence_check",
"data": {"round": round_num, **convergence},
})
if convergence["converged"]:
break
# DISPLAY_ONLY: just continue to next round
# Final synthesis
if judge_mode == DebateJudgeMode.EXTERNAL_JUDGE and judge_config:
yield _sse_event({
"type": "final_start",
"data": {"model": judge_config.model_name},
})
full_verdict = ""
async for chunk in judge_final_verdict(
judge_config, debate_history, user_prompt,
openrouter_api_key=openrouter_api_key,
):
full_verdict += chunk
yield _sse_event({"type": "final_chunk", "data": {"chunk": chunk}})
yield _sse_event({
"type": "final_complete",
"data": {"model": judge_config.model_name, "response": full_verdict},
})
elif judge_mode == DebateJudgeMode.SELF_CONVERGENCE:
# Use the last round's responses as the final answer
last_responses = debate_history[-1]["responses"] if debate_history else []
# Pick the longest response as the "best" convergent answer
if last_responses:
best = max(last_responses, key=lambda r: len(r.get("response", "")))
yield _sse_event({
"type": "final_complete",
"data": {"model": best["model"], "response": best["response"]},
})
yield _sse_event({"type": "debate_complete"})
|