summaryrefslogtreecommitdiff
path: root/backend
diff options
context:
space:
mode:
authorkarpathy <andrej.karpathy@gmail.com>2025-11-22 14:27:53 -0800
committerkarpathy <andrej.karpathy@gmail.com>2025-11-22 14:27:53 -0800
commiteb0eb26f4cefa4880c895ff017f312e8674f9b73 (patch)
treeea20b736519a5b4149b0356fec93447eef950e6b /backend
v0
Diffstat (limited to 'backend')
-rw-r--r--backend/__init__.py1
-rw-r--r--backend/config.py26
-rw-r--r--backend/council.py297
-rw-r--r--backend/main.py115
-rw-r--r--backend/openrouter.py79
-rw-r--r--backend/storage.py154
6 files changed, 672 insertions, 0 deletions
diff --git a/backend/__init__.py b/backend/__init__.py
new file mode 100644
index 0000000..659fe16
--- /dev/null
+++ b/backend/__init__.py
@@ -0,0 +1 @@
+"""LLM Council backend package."""
diff --git a/backend/config.py b/backend/config.py
new file mode 100644
index 0000000..a9cf7c4
--- /dev/null
+++ b/backend/config.py
@@ -0,0 +1,26 @@
+"""Configuration for the LLM Council."""
+
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# OpenRouter API key
+OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
+
+# Council members - list of OpenRouter model identifiers
+COUNCIL_MODELS = [
+ "openai/gpt-5.1",
+ "google/gemini-3-pro-preview",
+ "anthropic/claude-sonnet-4.5",
+ "x-ai/grok-4",
+]
+
+# Chairman model - synthesizes final response
+CHAIRMAN_MODEL = "google/gemini-3-pro-preview"
+
+# OpenRouter API endpoint
+OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions"
+
+# Data directory for conversation storage
+DATA_DIR = "data/conversations"
diff --git a/backend/council.py b/backend/council.py
new file mode 100644
index 0000000..b7f8839
--- /dev/null
+++ b/backend/council.py
@@ -0,0 +1,297 @@
+"""3-stage LLM Council orchestration."""
+
+from typing import List, Dict, Any, Tuple
+from .openrouter import query_models_parallel, query_model
+from .config import COUNCIL_MODELS, CHAIRMAN_MODEL
+
+
+async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]:
+ """
+ Stage 1: Collect individual responses from all council models.
+
+ Args:
+ user_query: The user's question
+
+ Returns:
+ List of dicts with 'model' and 'response' keys
+ """
+ messages = [{"role": "user", "content": user_query}]
+
+ # Query all models in parallel
+ responses = await query_models_parallel(COUNCIL_MODELS, messages)
+
+ # Format results
+ stage1_results = []
+ for model, response in responses.items():
+ if response is not None: # Only include successful responses
+ stage1_results.append({
+ "model": model,
+ "response": response.get('content', '')
+ })
+
+ return stage1_results
+
+
+async def stage2_collect_rankings(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]]
+) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
+ """
+ Stage 2: Each model ranks the anonymized responses.
+
+ Args:
+ user_query: The original user query
+ stage1_results: Results from Stage 1
+
+ Returns:
+ Tuple of (rankings list, label_to_model mapping)
+ """
+ # Create anonymized labels for responses (Response A, Response B, etc.)
+ labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
+
+ # Create mapping from label to model name
+ label_to_model = {
+ f"Response {label}": result['model']
+ for label, result in zip(labels, stage1_results)
+ }
+
+ # Build the ranking prompt
+ responses_text = "\n\n".join([
+ f"Response {label}:\n{result['response']}"
+ for label, result in zip(labels, stage1_results)
+ ])
+
+ ranking_prompt = f"""You are evaluating different responses to the following question:
+
+Question: {user_query}
+
+Here are the responses from different models (anonymized):
+
+{responses_text}
+
+Your task:
+1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly.
+2. Then, at the very end of your response, provide a final ranking.
+
+IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows:
+- Start with the line "FINAL RANKING:" (all caps, with colon)
+- Then list the responses from best to worst as a numbered list
+- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A")
+- Do not add any other text or explanations in the ranking section
+
+Example of the correct format for your ENTIRE response:
+
+Response A provides good detail on X but misses Y...
+Response B is accurate but lacks depth on Z...
+Response C offers the most comprehensive answer...
+
+FINAL RANKING:
+1. Response C
+2. Response A
+3. Response B
+
+Now provide your evaluation and ranking:"""
+
+ messages = [{"role": "user", "content": ranking_prompt}]
+
+ # Get rankings from all council models in parallel
+ responses = await query_models_parallel(COUNCIL_MODELS, messages)
+
+ # Format results
+ stage2_results = []
+ for model, response in responses.items():
+ if response is not None:
+ full_text = response.get('content', '')
+ parsed = parse_ranking_from_text(full_text)
+ stage2_results.append({
+ "model": model,
+ "ranking": full_text,
+ "parsed_ranking": parsed
+ })
+
+ return stage2_results, label_to_model
+
+
+async def stage3_synthesize_final(
+ user_query: str,
+ stage1_results: List[Dict[str, Any]],
+ stage2_results: List[Dict[str, Any]]
+) -> Dict[str, Any]:
+ """
+ Stage 3: Chairman synthesizes final response.
+
+ Args:
+ user_query: The original user query
+ stage1_results: Individual model responses from Stage 1
+ stage2_results: Rankings from Stage 2
+
+ Returns:
+ Dict with 'model' and 'response' keys
+ """
+ # Build comprehensive context for chairman
+ stage1_text = "\n\n".join([
+ f"Model: {result['model']}\nResponse: {result['response']}"
+ for result in stage1_results
+ ])
+
+ stage2_text = "\n\n".join([
+ f"Model: {result['model']}\nRanking: {result['ranking']}"
+ for result in stage2_results
+ ])
+
+ chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
+
+Original Question: {user_query}
+
+STAGE 1 - Individual Responses:
+{stage1_text}
+
+STAGE 2 - Peer Rankings:
+{stage2_text}
+
+Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
+- The individual responses and their insights
+- The peer rankings and what they reveal about response quality
+- Any patterns of agreement or disagreement
+
+Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
+
+ messages = [{"role": "user", "content": chairman_prompt}]
+
+ # Query the chairman model
+ response = await query_model(CHAIRMAN_MODEL, messages)
+
+ if response is None:
+ # Fallback if chairman fails
+ return {
+ "model": CHAIRMAN_MODEL,
+ "response": "Error: Unable to generate final synthesis."
+ }
+
+ return {
+ "model": CHAIRMAN_MODEL,
+ "response": response.get('content', '')
+ }
+
+
+def parse_ranking_from_text(ranking_text: str) -> List[str]:
+ """
+ Parse the FINAL RANKING section from the model's response.
+
+ Args:
+ ranking_text: The full text response from the model
+
+ Returns:
+ List of response labels in ranked order
+ """
+ import re
+
+ # Look for "FINAL RANKING:" section
+ if "FINAL RANKING:" in ranking_text:
+ # Extract everything after "FINAL RANKING:"
+ parts = ranking_text.split("FINAL RANKING:")
+ if len(parts) >= 2:
+ ranking_section = parts[1]
+ # Try to extract numbered list format (e.g., "1. Response A")
+ # This pattern looks for: number, period, optional space, "Response X"
+ numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section)
+ if numbered_matches:
+ # Extract just the "Response X" part
+ return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches]
+
+ # Fallback: Extract all "Response X" patterns in order
+ matches = re.findall(r'Response [A-Z]', ranking_section)
+ return matches
+
+ # Fallback: try to find any "Response X" patterns in order
+ matches = re.findall(r'Response [A-Z]', ranking_text)
+ return matches
+
+
+def calculate_aggregate_rankings(
+ stage2_results: List[Dict[str, Any]],
+ label_to_model: Dict[str, str]
+) -> List[Dict[str, Any]]:
+ """
+ Calculate aggregate rankings across all models.
+
+ Args:
+ stage2_results: Rankings from each model
+ label_to_model: Mapping from anonymous labels to model names
+
+ Returns:
+ List of dicts with model name and average rank, sorted best to worst
+ """
+ from collections import defaultdict
+
+ # Track positions for each model
+ model_positions = defaultdict(list)
+
+ for ranking in stage2_results:
+ ranking_text = ranking['ranking']
+
+ # Parse the ranking from the structured format
+ parsed_ranking = parse_ranking_from_text(ranking_text)
+
+ for position, label in enumerate(parsed_ranking, start=1):
+ if label in label_to_model:
+ model_name = label_to_model[label]
+ model_positions[model_name].append(position)
+
+ # Calculate average position for each model
+ aggregate = []
+ for model, positions in model_positions.items():
+ if positions:
+ avg_rank = sum(positions) / len(positions)
+ aggregate.append({
+ "model": model,
+ "average_rank": round(avg_rank, 2),
+ "rankings_count": len(positions)
+ })
+
+ # Sort by average rank (lower is better)
+ aggregate.sort(key=lambda x: x['average_rank'])
+
+ return aggregate
+
+
+async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]:
+ """
+ Run the complete 3-stage council process.
+
+ Args:
+ user_query: The user's question
+
+ Returns:
+ Tuple of (stage1_results, stage2_results, stage3_result, metadata)
+ """
+ # Stage 1: Collect individual responses
+ stage1_results = await stage1_collect_responses(user_query)
+
+ # If no models responded successfully, return error
+ if not stage1_results:
+ return [], [], {
+ "model": "error",
+ "response": "All models failed to respond. Please try again."
+ }, {}
+
+ # Stage 2: Collect rankings
+ stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
+
+ # Calculate aggregate rankings
+ aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
+
+ # Stage 3: Synthesize final answer
+ stage3_result = await stage3_synthesize_final(
+ user_query,
+ stage1_results,
+ stage2_results
+ )
+
+ # Prepare metadata
+ metadata = {
+ "label_to_model": label_to_model,
+ "aggregate_rankings": aggregate_rankings
+ }
+
+ return stage1_results, stage2_results, stage3_result, metadata
diff --git a/backend/main.py b/backend/main.py
new file mode 100644
index 0000000..cbb836f
--- /dev/null
+++ b/backend/main.py
@@ -0,0 +1,115 @@
+"""FastAPI backend for LLM Council."""
+
+from fastapi import FastAPI, HTTPException
+from fastapi.middleware.cors import CORSMiddleware
+from pydantic import BaseModel
+from typing import List, Dict, Any
+import uuid
+
+from . import storage
+from .council import run_full_council
+
+app = FastAPI(title="LLM Council API")
+
+# Enable CORS for local development
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["http://localhost:5173", "http://localhost:3000"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+
+class CreateConversationRequest(BaseModel):
+ """Request to create a new conversation."""
+ pass
+
+
+class SendMessageRequest(BaseModel):
+ """Request to send a message in a conversation."""
+ content: str
+
+
+class ConversationMetadata(BaseModel):
+ """Conversation metadata for list view."""
+ id: str
+ created_at: str
+ message_count: int
+
+
+class Conversation(BaseModel):
+ """Full conversation with all messages."""
+ id: str
+ created_at: str
+ messages: List[Dict[str, Any]]
+
+
+@app.get("/")
+async def root():
+ """Health check endpoint."""
+ return {"status": "ok", "service": "LLM Council API"}
+
+
+@app.get("/api/conversations", response_model=List[ConversationMetadata])
+async def list_conversations():
+ """List all conversations (metadata only)."""
+ return storage.list_conversations()
+
+
+@app.post("/api/conversations", response_model=Conversation)
+async def create_conversation(request: CreateConversationRequest):
+ """Create a new conversation."""
+ conversation_id = str(uuid.uuid4())
+ conversation = storage.create_conversation(conversation_id)
+ return conversation
+
+
+@app.get("/api/conversations/{conversation_id}", response_model=Conversation)
+async def get_conversation(conversation_id: str):
+ """Get a specific conversation with all its messages."""
+ conversation = storage.get_conversation(conversation_id)
+ if conversation is None:
+ raise HTTPException(status_code=404, detail="Conversation not found")
+ return conversation
+
+
+@app.post("/api/conversations/{conversation_id}/message")
+async def send_message(conversation_id: str, request: SendMessageRequest):
+ """
+ Send a message and run the 3-stage council process.
+ Returns the complete response with all stages.
+ """
+ # Check if conversation exists
+ conversation = storage.get_conversation(conversation_id)
+ if conversation is None:
+ raise HTTPException(status_code=404, detail="Conversation not found")
+
+ # Add user message
+ storage.add_user_message(conversation_id, request.content)
+
+ # Run the 3-stage council process
+ stage1_results, stage2_results, stage3_result, metadata = await run_full_council(
+ request.content
+ )
+
+ # Add assistant message with all stages
+ storage.add_assistant_message(
+ conversation_id,
+ stage1_results,
+ stage2_results,
+ stage3_result
+ )
+
+ # Return the complete response with metadata
+ return {
+ "stage1": stage1_results,
+ "stage2": stage2_results,
+ "stage3": stage3_result,
+ "metadata": metadata
+ }
+
+
+if __name__ == "__main__":
+ import uvicorn
+ uvicorn.run(app, host="0.0.0.0", port=8001)
diff --git a/backend/openrouter.py b/backend/openrouter.py
new file mode 100644
index 0000000..118fb0b
--- /dev/null
+++ b/backend/openrouter.py
@@ -0,0 +1,79 @@
+"""OpenRouter API client for making LLM requests."""
+
+import httpx
+from typing import List, Dict, Any, Optional
+from .config import OPENROUTER_API_KEY, OPENROUTER_API_URL
+
+
+async def query_model(
+ model: str,
+ messages: List[Dict[str, str]],
+ timeout: float = 120.0
+) -> Optional[Dict[str, Any]]:
+ """
+ Query a single model via OpenRouter API.
+
+ Args:
+ model: OpenRouter model identifier (e.g., "openai/gpt-4o")
+ messages: List of message dicts with 'role' and 'content'
+ timeout: Request timeout in seconds
+
+ Returns:
+ Response dict with 'content' and optional 'reasoning_details', or None if failed
+ """
+ headers = {
+ "Authorization": f"Bearer {OPENROUTER_API_KEY}",
+ "Content-Type": "application/json",
+ }
+
+ payload = {
+ "model": model,
+ "messages": messages,
+ }
+
+ try:
+ async with httpx.AsyncClient(timeout=timeout) as client:
+ response = await client.post(
+ OPENROUTER_API_URL,
+ headers=headers,
+ json=payload
+ )
+ response.raise_for_status()
+
+ data = response.json()
+ message = data['choices'][0]['message']
+
+ return {
+ 'content': message.get('content'),
+ 'reasoning_details': message.get('reasoning_details')
+ }
+
+ except Exception as e:
+ print(f"Error querying model {model}: {e}")
+ return None
+
+
+async def query_models_parallel(
+ models: List[str],
+ messages: List[Dict[str, str]]
+) -> Dict[str, Optional[Dict[str, Any]]]:
+ """
+ Query multiple models in parallel.
+
+ Args:
+ models: List of OpenRouter model identifiers
+ messages: List of message dicts to send to each model
+
+ Returns:
+ Dict mapping model identifier to response dict (or None if failed)
+ """
+ import asyncio
+
+ # Create tasks for all models
+ tasks = [query_model(model, messages) for model in models]
+
+ # Wait for all to complete
+ responses = await asyncio.gather(*tasks)
+
+ # Map models to their responses
+ return {model: response for model, response in zip(models, responses)}
diff --git a/backend/storage.py b/backend/storage.py
new file mode 100644
index 0000000..dd17a1a
--- /dev/null
+++ b/backend/storage.py
@@ -0,0 +1,154 @@
+"""JSON-based storage for conversations."""
+
+import json
+import os
+from datetime import datetime
+from typing import List, Dict, Any, Optional
+from pathlib import Path
+from .config import DATA_DIR
+
+
+def ensure_data_dir():
+ """Ensure the data directory exists."""
+ Path(DATA_DIR).mkdir(parents=True, exist_ok=True)
+
+
+def get_conversation_path(conversation_id: str) -> str:
+ """Get the file path for a conversation."""
+ return os.path.join(DATA_DIR, f"{conversation_id}.json")
+
+
+def create_conversation(conversation_id: str) -> Dict[str, Any]:
+ """
+ Create a new conversation.
+
+ Args:
+ conversation_id: Unique identifier for the conversation
+
+ Returns:
+ New conversation dict
+ """
+ ensure_data_dir()
+
+ conversation = {
+ "id": conversation_id,
+ "created_at": datetime.utcnow().isoformat(),
+ "messages": []
+ }
+
+ # Save to file
+ path = get_conversation_path(conversation_id)
+ with open(path, 'w') as f:
+ json.dump(conversation, f, indent=2)
+
+ return conversation
+
+
+def get_conversation(conversation_id: str) -> Optional[Dict[str, Any]]:
+ """
+ Load a conversation from storage.
+
+ Args:
+ conversation_id: Unique identifier for the conversation
+
+ Returns:
+ Conversation dict or None if not found
+ """
+ path = get_conversation_path(conversation_id)
+
+ if not os.path.exists(path):
+ return None
+
+ with open(path, 'r') as f:
+ return json.load(f)
+
+
+def save_conversation(conversation: Dict[str, Any]):
+ """
+ Save a conversation to storage.
+
+ Args:
+ conversation: Conversation dict to save
+ """
+ ensure_data_dir()
+
+ path = get_conversation_path(conversation['id'])
+ with open(path, 'w') as f:
+ json.dump(conversation, f, indent=2)
+
+
+def list_conversations() -> List[Dict[str, Any]]:
+ """
+ List all conversations (metadata only).
+
+ Returns:
+ List of conversation metadata dicts
+ """
+ ensure_data_dir()
+
+ conversations = []
+ for filename in os.listdir(DATA_DIR):
+ if filename.endswith('.json'):
+ path = os.path.join(DATA_DIR, filename)
+ with open(path, 'r') as f:
+ data = json.load(f)
+ # Return metadata only
+ conversations.append({
+ "id": data["id"],
+ "created_at": data["created_at"],
+ "message_count": len(data["messages"])
+ })
+
+ # Sort by creation time, newest first
+ conversations.sort(key=lambda x: x["created_at"], reverse=True)
+
+ return conversations
+
+
+def add_user_message(conversation_id: str, content: str):
+ """
+ Add a user message to a conversation.
+
+ Args:
+ conversation_id: Conversation identifier
+ content: User message content
+ """
+ conversation = get_conversation(conversation_id)
+ if conversation is None:
+ raise ValueError(f"Conversation {conversation_id} not found")
+
+ conversation["messages"].append({
+ "role": "user",
+ "content": content
+ })
+
+ save_conversation(conversation)
+
+
+def add_assistant_message(
+ conversation_id: str,
+ stage1: List[Dict[str, Any]],
+ stage2: List[Dict[str, Any]],
+ stage3: Dict[str, Any]
+):
+ """
+ Add an assistant message with all 3 stages to a conversation.
+
+ Args:
+ conversation_id: Conversation identifier
+ stage1: List of individual model responses
+ stage2: List of model rankings
+ stage3: Final synthesized response
+ """
+ conversation = get_conversation(conversation_id)
+ if conversation is None:
+ raise ValueError(f"Conversation {conversation_id} not found")
+
+ conversation["messages"].append({
+ "role": "assistant",
+ "stage1": stage1,
+ "stage2": stage2,
+ "stage3": stage3
+ })
+
+ save_conversation(conversation)