summaryrefslogtreecommitdiff
path: root/backend/council.py
diff options
context:
space:
mode:
Diffstat (limited to 'backend/council.py')
-rw-r--r--backend/council.py297
1 files changed, 297 insertions, 0 deletions
diff --git a/backend/council.py b/backend/council.py
new file mode 100644
index 0000000..b7f8839
--- /dev/null
+++ b/backend/council.py
@@ -0,0 +1,297 @@
+"""3-stage LLM Council orchestration."""
+
+from typing import List, Dict, Any, Tuple
+from .openrouter import query_models_parallel, query_model
+from .config import COUNCIL_MODELS, CHAIRMAN_MODEL
+
+
+async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]:
+ """
+ Stage 1: Collect individual responses from all council models.
+
+ Args:
+ user_query: The user's question
+
+ Returns:
+ List of dicts with 'model' and 'response' keys
+ """
+ messages = [{"role": "user", "content": user_query}]
+
+ # Query all models in parallel
+ responses = await query_models_parallel(COUNCIL_MODELS, messages)
+
+ # Format results
+ stage1_results = []
+ for model, response in responses.items():
+ if response is not None: # Only include successful responses
+ stage1_results.append({
+ "model": model,
+ "response": response.get('content', '')
+ })
+
+ return stage1_results
+
+
+async def stage2_collect_rankings(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]]
+) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
+ """
+ Stage 2: Each model ranks the anonymized responses.
+
+ Args:
+ user_query: The original user query
+ stage1_results: Results from Stage 1
+
+ Returns:
+ Tuple of (rankings list, label_to_model mapping)
+ """
+ # Create anonymized labels for responses (Response A, Response B, etc.)
+ labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
+
+ # Create mapping from label to model name
+ label_to_model = {
+ f"Response {label}": result['model']
+ for label, result in zip(labels, stage1_results)
+ }
+
+ # Build the ranking prompt
+ responses_text = "\n\n".join([
+ f"Response {label}:\n{result['response']}"
+ for label, result in zip(labels, stage1_results)
+ ])
+
+ ranking_prompt = f"""You are evaluating different responses to the following question:
+
+Question: {user_query}
+
+Here are the responses from different models (anonymized):
+
+{responses_text}
+
+Your task:
+1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly.
+2. Then, at the very end of your response, provide a final ranking.
+
+IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows:
+- Start with the line "FINAL RANKING:" (all caps, with colon)
+- Then list the responses from best to worst as a numbered list
+- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A")
+- Do not add any other text or explanations in the ranking section
+
+Example of the correct format for your ENTIRE response:
+
+Response A provides good detail on X but misses Y...
+Response B is accurate but lacks depth on Z...
+Response C offers the most comprehensive answer...
+
+FINAL RANKING:
+1. Response C
+2. Response A
+3. Response B
+
+Now provide your evaluation and ranking:"""
+
+ messages = [{"role": "user", "content": ranking_prompt}]
+
+ # Get rankings from all council models in parallel
+ responses = await query_models_parallel(COUNCIL_MODELS, messages)
+
+ # Format results
+ stage2_results = []
+ for model, response in responses.items():
+ if response is not None:
+ full_text = response.get('content', '')
+ parsed = parse_ranking_from_text(full_text)
+ stage2_results.append({
+ "model": model,
+ "ranking": full_text,
+ "parsed_ranking": parsed
+ })
+
+ return stage2_results, label_to_model
+
+
+async def stage3_synthesize_final(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]],
+ stage2_results: List[Dict[str, Any]]
+) -> Dict[str, Any]:
+ """
+ Stage 3: Chairman synthesizes final response.
+
+ Args:
+ user_query: The original user query
+ stage1_results: Individual model responses from Stage 1
+ stage2_results: Rankings from Stage 2
+
+ Returns:
+ Dict with 'model' and 'response' keys
+ """
+ # Build comprehensive context for chairman
+ stage1_text = "\n\n".join([
+ f"Model: {result['model']}\nResponse: {result['response']}"
+ for result in stage1_results
+ ])
+
+ stage2_text = "\n\n".join([
+ f"Model: {result['model']}\nRanking: {result['ranking']}"
+ for result in stage2_results
+ ])
+
+ chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
+
+Original Question: {user_query}
+
+STAGE 1 - Individual Responses:
+{stage1_text}
+
+STAGE 2 - Peer Rankings:
+{stage2_text}
+
+Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
+- The individual responses and their insights
+- The peer rankings and what they reveal about response quality
+- Any patterns of agreement or disagreement
+
+Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
+
+ messages = [{"role": "user", "content": chairman_prompt}]
+
+ # Query the chairman model
+ response = await query_model(CHAIRMAN_MODEL, messages)
+
+ if response is None:
+ # Fallback if chairman fails
+ return {
+ "model": CHAIRMAN_MODEL,
+ "response": "Error: Unable to generate final synthesis."
+ }
+
+ return {
+ "model": CHAIRMAN_MODEL,
+ "response": response.get('content', '')
+ }
+
+
+def parse_ranking_from_text(ranking_text: str) -> List[str]:
+ """
+ Parse the FINAL RANKING section from the model's response.
+
+ Args:
+ ranking_text: The full text response from the model
+
+ Returns:
+ List of response labels in ranked order
+ """
+ import re
+
+ # Look for "FINAL RANKING:" section
+ if "FINAL RANKING:" in ranking_text:
+ # Extract everything after "FINAL RANKING:"
+ parts = ranking_text.split("FINAL RANKING:")
+ if len(parts) >= 2:
+ ranking_section = parts[1]
+ # Try to extract numbered list format (e.g., "1. Response A")
+ # This pattern looks for: number, period, optional space, "Response X"
+ numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section)
+ if numbered_matches:
+ # Extract just the "Response X" part
+ return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches]
+
+ # Fallback: Extract all "Response X" patterns in order
+ matches = re.findall(r'Response [A-Z]', ranking_section)
+ return matches
+
+ # Fallback: try to find any "Response X" patterns in order
+ matches = re.findall(r'Response [A-Z]', ranking_text)
+ return matches
+
+
+def calculate_aggregate_rankings(
+ stage2_results: List[Dict[str, Any]],
+ label_to_model: Dict[str, str]
+) -> List[Dict[str, Any]]:
+ """
+ Calculate aggregate rankings across all models.
+
+ Args:
+ stage2_results: Rankings from each model
+ label_to_model: Mapping from anonymous labels to model names
+
+ Returns:
+ List of dicts with model name and average rank, sorted best to worst
+ """
+ from collections import defaultdict
+
+ # Track positions for each model
+ model_positions = defaultdict(list)
+
+ for ranking in stage2_results:
+ ranking_text = ranking['ranking']
+
+ # Parse the ranking from the structured format
+ parsed_ranking = parse_ranking_from_text(ranking_text)
+
+ for position, label in enumerate(parsed_ranking, start=1):
+ if label in label_to_model:
+ model_name = label_to_model[label]
+ model_positions[model_name].append(position)
+
+ # Calculate average position for each model
+ aggregate = []
+ for model, positions in model_positions.items():
+ if positions:
+ avg_rank = sum(positions) / len(positions)
+ aggregate.append({
+ "model": model,
+ "average_rank": round(avg_rank, 2),
+ "rankings_count": len(positions)
+ })
+
+ # Sort by average rank (lower is better)
+ aggregate.sort(key=lambda x: x['average_rank'])
+
+ return aggregate
+
+
+async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]:
+ """
+ Run the complete 3-stage council process.
+
+ Args:
+ user_query: The user's question
+
+ Returns:
+ Tuple of (stage1_results, stage2_results, stage3_result, metadata)
+ """
+ # Stage 1: Collect individual responses
+ stage1_results = await stage1_collect_responses(user_query)
+
+ # If no models responded successfully, return error
+ if not stage1_results:
+ return [], [], {
+ "model": "error",
+ "response": "All models failed to respond. Please try again."
+ }, {}
+
+ # Stage 2: Collect rankings
+ stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
+
+ # Calculate aggregate rankings
+ aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
+
+ # Stage 3: Synthesize final answer
+ stage3_result = await stage3_synthesize_final(
+ user_query,
+ stage1_results,
+ stage2_results
+ )
+
+ # Prepare metadata
+ metadata = {
+ "label_to_model": label_to_model,
+ "aggregate_rankings": aggregate_rankings
+ }
+
+ return stage1_results, stage2_results, stage3_result, metadata