summaryrefslogtreecommitdiff
path: root/backend/app/services/council.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/app/services/council.py')
-rw-r--r--backend/app/services/council.py322
1 files changed, 322 insertions, 0 deletions
diff --git a/backend/app/services/council.py b/backend/app/services/council.py
new file mode 100644
index 0000000..d177f44
--- /dev/null
+++ b/backend/app/services/council.py
@@ -0,0 +1,322 @@
+"""3-stage LLM Council orchestration for ContextFlow."""
+
+import asyncio
+import json
+import logging
+import re
+from collections import defaultdict
+from typing import AsyncGenerator, Dict, List, Any, Optional, Tuple
+
+from app.schemas import Context, LLMConfig
+from app.services.llm import query_model_full, llm_streamer
+
+logger = logging.getLogger("contextflow.council")
+
+
+async def stage1_collect_responses(
+ user_prompt: str,
+ context: Context,
+ configs: List[LLMConfig],
+ attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ openrouter_api_key: Optional[str] = None,
+ images: Optional[List[Dict[str, Any]]] = None,
+) -> AsyncGenerator[Dict[str, Any], None]:
+ """
+ Stage 1: Query all council member models in parallel.
+ Yields events as each model completes.
+ Returns final list via stage1_complete event.
+ """
+ async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]:
+ atts = attachments_per_model[idx] if attachments_per_model else None
+ tls = tools_per_model[idx] if tools_per_model else None
+ try:
+ response = await query_model_full(
+ context, user_prompt, config,
+ attachments=atts, tools=tls,
+ openrouter_api_key=openrouter_api_key, images=images,
+ )
+ return {"model": config.model_name, "response": response}
+ except Exception as e:
+ logger.error("Council stage1 failed for %s: %s", config.model_name, e)
+ return {"model": config.model_name, "response": f"[Error: {e}]"}
+
+ # Launch all queries concurrently, yield as each completes
+ tasks = {
+ asyncio.ensure_future(_query_one(i, cfg)): i
+ for i, cfg in enumerate(configs)
+ }
+ results: List[Optional[Dict[str, Any]]] = [None] * len(configs)
+
+ for coro in asyncio.as_completed(tasks.keys()):
+ result = await coro
+ idx = tasks[[t for t in tasks if t.done() and t.result() is result][0]]
+ results[idx] = result
+ yield result # caller sends stage1_model_complete event
+
+ # Not yielded — caller collects via the individual yields
+
+
+def _build_ranking_prompt(user_query: str, stage1_results: List[Dict[str, Any]]) -> str:
+ """Build the anonymized ranking prompt for Stage 2."""
+ labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
+
+ responses_text = "\n\n".join([
+ f"Response {label}:\n{result['response']}"
+ for label, result in zip(labels, stage1_results)
+ ])
+
+ return f"""You are evaluating different responses to the following question:
+
+Question: {user_query}
+
+Here are the responses from different models (anonymized):
+
+{responses_text}
+
+Your task:
+1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly.
+2. Then, at the very end of your response, provide a final ranking.
+
+IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows:
+- Start with the line "FINAL RANKING:" (all caps, with colon)
+- Then list the responses from best to worst as a numbered list
+- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A")
+- Do not add any other text or explanations in the ranking section
+
+Example of the correct format for your ENTIRE response:
+
+Response A provides good detail on X but misses Y...
+Response B is accurate but lacks depth on Z...
+Response C offers the most comprehensive answer...
+
+FINAL RANKING:
+1. Response C
+2. Response A
+3. Response B
+
+Now provide your evaluation and ranking:"""
+
+
+def parse_ranking_from_text(ranking_text: str) -> List[str]:
+ """Parse the FINAL RANKING section from the model's response."""
+ if "FINAL RANKING:" in ranking_text:
+ parts = ranking_text.split("FINAL RANKING:")
+ if len(parts) >= 2:
+ ranking_section = parts[1]
+ numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section)
+ if numbered_matches:
+ return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches]
+ matches = re.findall(r'Response [A-Z]', ranking_section)
+ return matches
+
+ matches = re.findall(r'Response [A-Z]', ranking_text)
+ return matches
+
+
+async def stage2_collect_rankings(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]],
+ configs: List[LLMConfig],
+ openrouter_api_key: Optional[str] = None,
+) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
+ """
+ Stage 2: Each model ranks the anonymized responses.
+ Text-only prompts, no file attachments.
+ """
+ labels = [chr(65 + i) for i in range(len(stage1_results))]
+ label_to_model = {
+ f"Response {label}": result['model']
+ for label, result in zip(labels, stage1_results)
+ }
+
+ ranking_prompt = _build_ranking_prompt(user_query, stage1_results)
+ empty_context = Context(messages=[])
+
+ async def _rank_one(config: LLMConfig) -> Dict[str, Any]:
+ try:
+ response = await query_model_full(
+ empty_context, ranking_prompt, config,
+ openrouter_api_key=openrouter_api_key,
+ )
+ parsed = parse_ranking_from_text(response)
+ return {
+ "model": config.model_name,
+ "ranking": response,
+ "parsed_ranking": parsed,
+ }
+ except Exception as e:
+ logger.error("Council stage2 failed for %s: %s", config.model_name, e)
+ return {
+ "model": config.model_name,
+ "ranking": f"[Error: {e}]",
+ "parsed_ranking": [],
+ }
+
+ results = await asyncio.gather(*[_rank_one(cfg) for cfg in configs])
+ return list(results), label_to_model
+
+
+def calculate_aggregate_rankings(
+ stage2_results: List[Dict[str, Any]],
+ label_to_model: Dict[str, str],
+) -> List[Dict[str, Any]]:
+ """Calculate aggregate rankings across all models."""
+ model_positions: Dict[str, List[int]] = defaultdict(list)
+
+ for ranking in stage2_results:
+ parsed_ranking = ranking.get("parsed_ranking", [])
+ if not parsed_ranking:
+ parsed_ranking = parse_ranking_from_text(ranking.get("ranking", ""))
+ for position, label in enumerate(parsed_ranking, start=1):
+ if label in label_to_model:
+ model_name = label_to_model[label]
+ model_positions[model_name].append(position)
+
+ aggregate = []
+ for model, positions in model_positions.items():
+ if positions:
+ avg_rank = sum(positions) / len(positions)
+ aggregate.append({
+ "model": model,
+ "average_rank": round(avg_rank, 2),
+ "rankings_count": len(positions),
+ })
+
+ aggregate.sort(key=lambda x: x["average_rank"])
+ return aggregate
+
+
+def _build_chairman_prompt(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]],
+ stage2_results: List[Dict[str, Any]],
+) -> str:
+ """Build the chairman synthesis prompt for Stage 3."""
+ stage1_text = "\n\n".join([
+ f"Model: {result['model']}\nResponse: {result['response']}"
+ for result in stage1_results
+ ])
+
+ stage2_text = "\n\n".join([
+ f"Model: {result['model']}\nRanking: {result['ranking']}"
+ for result in stage2_results
+ ])
+
+ return f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
+
+Original Question: {user_query}
+
+STAGE 1 - Individual Responses:
+{stage1_text}
+
+STAGE 2 - Peer Rankings:
+{stage2_text}
+
+Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
+- The individual responses and their insights
+- The peer rankings and what they reveal about response quality
+- Any patterns of agreement or disagreement
+
+Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
+
+
+async def stage3_stream_synthesis(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]],
+ stage2_results: List[Dict[str, Any]],
+ chairman_config: LLMConfig,
+ openrouter_api_key: Optional[str] = None,
+) -> AsyncGenerator[str, None]:
+ """
+ Stage 3: Chairman synthesizes final answer. Streams text chunks.
+ """
+ chairman_prompt = _build_chairman_prompt(user_query, stage1_results, stage2_results)
+ empty_context = Context(messages=[])
+
+ async for chunk in llm_streamer(
+ empty_context, chairman_prompt, chairman_config,
+ openrouter_api_key=openrouter_api_key,
+ ):
+ yield chunk
+
+
+def _sse_event(data: dict) -> str:
+ """Format a dict as an SSE data line."""
+ return f"data: {json.dumps(data)}\n\n"
+
+
+async def council_event_stream(
+ user_prompt: str,
+ context: Context,
+ member_configs: List[LLMConfig],
+ chairman_config: LLMConfig,
+ attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None,
+ openrouter_api_key: Optional[str] = None,
+ images: Optional[List[Dict[str, Any]]] = None,
+) -> AsyncGenerator[str, None]:
+ """
+ Master orchestrator yielding SSE JSON events through the 3-stage council process.
+ """
+ # === Stage 1 ===
+ yield _sse_event({"type": "stage1_start"})
+
+ stage1_results: List[Dict[str, Any]] = []
+ async for result in stage1_collect_responses(
+ user_prompt, context, member_configs,
+ attachments_per_model=attachments_per_model,
+ tools_per_model=tools_per_model,
+ openrouter_api_key=openrouter_api_key,
+ images=images,
+ ):
+ stage1_results.append(result)
+ yield _sse_event({
+ "type": "stage1_model_complete",
+ "data": {"model": result["model"], "response": result["response"]},
+ })
+
+ yield _sse_event({"type": "stage1_complete", "data": stage1_results})
+
+ if not stage1_results:
+ yield _sse_event({
+ "type": "error",
+ "data": {"message": "All council models failed to respond."},
+ })
+ return
+
+ # === Stage 2 ===
+ yield _sse_event({"type": "stage2_start"})
+
+ stage2_results, label_to_model = await stage2_collect_rankings(
+ user_prompt, stage1_results, member_configs,
+ openrouter_api_key=openrouter_api_key,
+ )
+ aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
+
+ yield _sse_event({
+ "type": "stage2_complete",
+ "data": {
+ "rankings": stage2_results,
+ "label_to_model": label_to_model,
+ "aggregate_rankings": aggregate_rankings,
+ },
+ })
+
+ # === Stage 3 (streamed) ===
+ yield _sse_event({"type": "stage3_start"})
+
+ full_response = ""
+ async for chunk in stage3_stream_synthesis(
+ user_prompt, stage1_results, stage2_results, chairman_config,
+ openrouter_api_key=openrouter_api_key,
+ ):
+ full_response += chunk
+ yield _sse_event({"type": "stage3_chunk", "data": {"chunk": chunk}})
+
+ yield _sse_event({
+ "type": "stage3_complete",
+ "data": {"model": chairman_config.model_name, "response": full_response},
+ })
+
+ yield _sse_event({"type": "complete"})