diff options
| author | karpathy <andrej.karpathy@gmail.com> | 2025-11-22 14:27:53 -0800 |
|---|---|---|
| committer | karpathy <andrej.karpathy@gmail.com> | 2025-11-22 14:27:53 -0800 |
| commit | eb0eb26f4cefa4880c895ff017f312e8674f9b73 (patch) | |
| tree | ea20b736519a5b4149b0356fec93447eef950e6b /backend | |
v0
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/__init__.py | 1 | ||||
| -rw-r--r-- | backend/config.py | 26 | ||||
| -rw-r--r-- | backend/council.py | 297 | ||||
| -rw-r--r-- | backend/main.py | 115 | ||||
| -rw-r--r-- | backend/openrouter.py | 79 | ||||
| -rw-r--r-- | backend/storage.py | 154 |
6 files changed, 672 insertions, 0 deletions
diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000..659fe16 --- /dev/null +++ b/backend/__init__.py @@ -0,0 +1 @@ +"""LLM Council backend package.""" diff --git a/backend/config.py b/backend/config.py new file mode 100644 index 0000000..a9cf7c4 --- /dev/null +++ b/backend/config.py @@ -0,0 +1,26 @@ +"""Configuration for the LLM Council.""" + +import os +from dotenv import load_dotenv + +load_dotenv() + +# OpenRouter API key +OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") + +# Council members - list of OpenRouter model identifiers +COUNCIL_MODELS = [ + "openai/gpt-5.1", + "google/gemini-3-pro-preview", + "anthropic/claude-sonnet-4.5", + "x-ai/grok-4", +] + +# Chairman model - synthesizes final response +CHAIRMAN_MODEL = "google/gemini-3-pro-preview" + +# OpenRouter API endpoint +OPENROUTER_API_URL = "https://openrouter.ai/api/v1/chat/completions" + +# Data directory for conversation storage +DATA_DIR = "data/conversations" diff --git a/backend/council.py b/backend/council.py new file mode 100644 index 0000000..b7f8839 --- /dev/null +++ b/backend/council.py @@ -0,0 +1,297 @@ +"""3-stage LLM Council orchestration.""" + +from typing import List, Dict, Any, Tuple +from .openrouter import query_models_parallel, query_model +from .config import COUNCIL_MODELS, CHAIRMAN_MODEL + + +async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]: + """ + Stage 1: Collect individual responses from all council models. + + Args: + user_query: The user's question + + Returns: + List of dicts with 'model' and 'response' keys + """ + messages = [{"role": "user", "content": user_query}] + + # Query all models in parallel + responses = await query_models_parallel(COUNCIL_MODELS, messages) + + # Format results + stage1_results = [] + for model, response in responses.items(): + if response is not None: # Only include successful responses + stage1_results.append({ + "model": model, + "response": response.get('content', '') + }) + + return stage1_results + + +async def stage2_collect_rankings( + user_query: str, + stage1_results: List[Dict[str, Any]] +) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: + """ + Stage 2: Each model ranks the anonymized responses. + + Args: + user_query: The original user query + stage1_results: Results from Stage 1 + + Returns: + Tuple of (rankings list, label_to_model mapping) + """ + # Create anonymized labels for responses (Response A, Response B, etc.) + labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ... + + # Create mapping from label to model name + label_to_model = { + f"Response {label}": result['model'] + for label, result in zip(labels, stage1_results) + } + + # Build the ranking prompt + responses_text = "\n\n".join([ + f"Response {label}:\n{result['response']}" + for label, result in zip(labels, stage1_results) + ]) + + ranking_prompt = f"""You are evaluating different responses to the following question: + +Question: {user_query} + +Here are the responses from different models (anonymized): + +{responses_text} + +Your task: +1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly. +2. Then, at the very end of your response, provide a final ranking. + +IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows: +- Start with the line "FINAL RANKING:" (all caps, with colon) +- Then list the responses from best to worst as a numbered list +- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A") +- Do not add any other text or explanations in the ranking section + +Example of the correct format for your ENTIRE response: + +Response A provides good detail on X but misses Y... +Response B is accurate but lacks depth on Z... +Response C offers the most comprehensive answer... + +FINAL RANKING: +1. Response C +2. Response A +3. Response B + +Now provide your evaluation and ranking:""" + + messages = [{"role": "user", "content": ranking_prompt}] + + # Get rankings from all council models in parallel + responses = await query_models_parallel(COUNCIL_MODELS, messages) + + # Format results + stage2_results = [] + for model, response in responses.items(): + if response is not None: + full_text = response.get('content', '') + parsed = parse_ranking_from_text(full_text) + stage2_results.append({ + "model": model, + "ranking": full_text, + "parsed_ranking": parsed + }) + + return stage2_results, label_to_model + + +async def stage3_synthesize_final( + user_query: str, + stage1_results: List[Dict[str, Any]], + stage2_results: List[Dict[str, Any]] +) -> Dict[str, Any]: + """ + Stage 3: Chairman synthesizes final response. + + Args: + user_query: The original user query + stage1_results: Individual model responses from Stage 1 + stage2_results: Rankings from Stage 2 + + Returns: + Dict with 'model' and 'response' keys + """ + # Build comprehensive context for chairman + stage1_text = "\n\n".join([ + f"Model: {result['model']}\nResponse: {result['response']}" + for result in stage1_results + ]) + + stage2_text = "\n\n".join([ + f"Model: {result['model']}\nRanking: {result['ranking']}" + for result in stage2_results + ]) + + chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. + +Original Question: {user_query} + +STAGE 1 - Individual Responses: +{stage1_text} + +STAGE 2 - Peer Rankings: +{stage2_text} + +Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: +- The individual responses and their insights +- The peer rankings and what they reveal about response quality +- Any patterns of agreement or disagreement + +Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" + + messages = [{"role": "user", "content": chairman_prompt}] + + # Query the chairman model + response = await query_model(CHAIRMAN_MODEL, messages) + + if response is None: + # Fallback if chairman fails + return { + "model": CHAIRMAN_MODEL, + "response": "Error: Unable to generate final synthesis." + } + + return { + "model": CHAIRMAN_MODEL, + "response": response.get('content', '') + } + + +def parse_ranking_from_text(ranking_text: str) -> List[str]: + """ + Parse the FINAL RANKING section from the model's response. + + Args: + ranking_text: The full text response from the model + + Returns: + List of response labels in ranked order + """ + import re + + # Look for "FINAL RANKING:" section + if "FINAL RANKING:" in ranking_text: + # Extract everything after "FINAL RANKING:" + parts = ranking_text.split("FINAL RANKING:") + if len(parts) >= 2: + ranking_section = parts[1] + # Try to extract numbered list format (e.g., "1. Response A") + # This pattern looks for: number, period, optional space, "Response X" + numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section) + if numbered_matches: + # Extract just the "Response X" part + return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches] + + # Fallback: Extract all "Response X" patterns in order + matches = re.findall(r'Response [A-Z]', ranking_section) + return matches + + # Fallback: try to find any "Response X" patterns in order + matches = re.findall(r'Response [A-Z]', ranking_text) + return matches + + +def calculate_aggregate_rankings( + stage2_results: List[Dict[str, Any]], + label_to_model: Dict[str, str] +) -> List[Dict[str, Any]]: + """ + Calculate aggregate rankings across all models. + + Args: + stage2_results: Rankings from each model + label_to_model: Mapping from anonymous labels to model names + + Returns: + List of dicts with model name and average rank, sorted best to worst + """ + from collections import defaultdict + + # Track positions for each model + model_positions = defaultdict(list) + + for ranking in stage2_results: + ranking_text = ranking['ranking'] + + # Parse the ranking from the structured format + parsed_ranking = parse_ranking_from_text(ranking_text) + + for position, label in enumerate(parsed_ranking, start=1): + if label in label_to_model: + model_name = label_to_model[label] + model_positions[model_name].append(position) + + # Calculate average position for each model + aggregate = [] + for model, positions in model_positions.items(): + if positions: + avg_rank = sum(positions) / len(positions) + aggregate.append({ + "model": model, + "average_rank": round(avg_rank, 2), + "rankings_count": len(positions) + }) + + # Sort by average rank (lower is better) + aggregate.sort(key=lambda x: x['average_rank']) + + return aggregate + + +async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]: + """ + Run the complete 3-stage council process. + + Args: + user_query: The user's question + + Returns: + Tuple of (stage1_results, stage2_results, stage3_result, metadata) + """ + # Stage 1: Collect individual responses + stage1_results = await stage1_collect_responses(user_query) + + # If no models responded successfully, return error + if not stage1_results: + return [], [], { + "model": "error", + "response": "All models failed to respond. Please try again." + }, {} + + # Stage 2: Collect rankings + stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results) + + # Calculate aggregate rankings + aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) + + # Stage 3: Synthesize final answer + stage3_result = await stage3_synthesize_final( + user_query, + stage1_results, + stage2_results + ) + + # Prepare metadata + metadata = { + "label_to_model": label_to_model, + "aggregate_rankings": aggregate_rankings + } + + return stage1_results, stage2_results, stage3_result, metadata diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000..cbb836f --- /dev/null +++ b/backend/main.py @@ -0,0 +1,115 @@ +"""FastAPI backend for LLM Council.""" + +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import List, Dict, Any +import uuid + +from . import storage +from .council import run_full_council + +app = FastAPI(title="LLM Council API") + +# Enable CORS for local development +app.add_middleware( + CORSMiddleware, + allow_origins=["http://localhost:5173", "http://localhost:3000"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +class CreateConversationRequest(BaseModel): + """Request to create a new conversation.""" + pass + + +class SendMessageRequest(BaseModel): + """Request to send a message in a conversation.""" + content: str + + +class ConversationMetadata(BaseModel): + """Conversation metadata for list view.""" + id: str + created_at: str + message_count: int + + +class Conversation(BaseModel): + """Full conversation with all messages.""" + id: str + created_at: str + messages: List[Dict[str, Any]] + + +@app.get("/") +async def root(): + """Health check endpoint.""" + return {"status": "ok", "service": "LLM Council API"} + + +@app.get("/api/conversations", response_model=List[ConversationMetadata]) +async def list_conversations(): + """List all conversations (metadata only).""" + return storage.list_conversations() + + +@app.post("/api/conversations", response_model=Conversation) +async def create_conversation(request: CreateConversationRequest): + """Create a new conversation.""" + conversation_id = str(uuid.uuid4()) + conversation = storage.create_conversation(conversation_id) + return conversation + + +@app.get("/api/conversations/{conversation_id}", response_model=Conversation) +async def get_conversation(conversation_id: str): + """Get a specific conversation with all its messages.""" + conversation = storage.get_conversation(conversation_id) + if conversation is None: + raise HTTPException(status_code=404, detail="Conversation not found") + return conversation + + +@app.post("/api/conversations/{conversation_id}/message") +async def send_message(conversation_id: str, request: SendMessageRequest): + """ + Send a message and run the 3-stage council process. + Returns the complete response with all stages. + """ + # Check if conversation exists + conversation = storage.get_conversation(conversation_id) + if conversation is None: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Add user message + storage.add_user_message(conversation_id, request.content) + + # Run the 3-stage council process + stage1_results, stage2_results, stage3_result, metadata = await run_full_council( + request.content + ) + + # Add assistant message with all stages + storage.add_assistant_message( + conversation_id, + stage1_results, + stage2_results, + stage3_result + ) + + # Return the complete response with metadata + return { + "stage1": stage1_results, + "stage2": stage2_results, + "stage3": stage3_result, + "metadata": metadata + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/backend/openrouter.py b/backend/openrouter.py new file mode 100644 index 0000000..118fb0b --- /dev/null +++ b/backend/openrouter.py @@ -0,0 +1,79 @@ +"""OpenRouter API client for making LLM requests.""" + +import httpx +from typing import List, Dict, Any, Optional +from .config import OPENROUTER_API_KEY, OPENROUTER_API_URL + + +async def query_model( + model: str, + messages: List[Dict[str, str]], + timeout: float = 120.0 +) -> Optional[Dict[str, Any]]: + """ + Query a single model via OpenRouter API. + + Args: + model: OpenRouter model identifier (e.g., "openai/gpt-4o") + messages: List of message dicts with 'role' and 'content' + timeout: Request timeout in seconds + + Returns: + Response dict with 'content' and optional 'reasoning_details', or None if failed + """ + headers = { + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + } + + payload = { + "model": model, + "messages": messages, + } + + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + OPENROUTER_API_URL, + headers=headers, + json=payload + ) + response.raise_for_status() + + data = response.json() + message = data['choices'][0]['message'] + + return { + 'content': message.get('content'), + 'reasoning_details': message.get('reasoning_details') + } + + except Exception as e: + print(f"Error querying model {model}: {e}") + return None + + +async def query_models_parallel( + models: List[str], + messages: List[Dict[str, str]] +) -> Dict[str, Optional[Dict[str, Any]]]: + """ + Query multiple models in parallel. + + Args: + models: List of OpenRouter model identifiers + messages: List of message dicts to send to each model + + Returns: + Dict mapping model identifier to response dict (or None if failed) + """ + import asyncio + + # Create tasks for all models + tasks = [query_model(model, messages) for model in models] + + # Wait for all to complete + responses = await asyncio.gather(*tasks) + + # Map models to their responses + return {model: response for model, response in zip(models, responses)} diff --git a/backend/storage.py b/backend/storage.py new file mode 100644 index 0000000..dd17a1a --- /dev/null +++ b/backend/storage.py @@ -0,0 +1,154 @@ +"""JSON-based storage for conversations.""" + +import json +import os +from datetime import datetime +from typing import List, Dict, Any, Optional +from pathlib import Path +from .config import DATA_DIR + + +def ensure_data_dir(): + """Ensure the data directory exists.""" + Path(DATA_DIR).mkdir(parents=True, exist_ok=True) + + +def get_conversation_path(conversation_id: str) -> str: + """Get the file path for a conversation.""" + return os.path.join(DATA_DIR, f"{conversation_id}.json") + + +def create_conversation(conversation_id: str) -> Dict[str, Any]: + """ + Create a new conversation. + + Args: + conversation_id: Unique identifier for the conversation + + Returns: + New conversation dict + """ + ensure_data_dir() + + conversation = { + "id": conversation_id, + "created_at": datetime.utcnow().isoformat(), + "messages": [] + } + + # Save to file + path = get_conversation_path(conversation_id) + with open(path, 'w') as f: + json.dump(conversation, f, indent=2) + + return conversation + + +def get_conversation(conversation_id: str) -> Optional[Dict[str, Any]]: + """ + Load a conversation from storage. + + Args: + conversation_id: Unique identifier for the conversation + + Returns: + Conversation dict or None if not found + """ + path = get_conversation_path(conversation_id) + + if not os.path.exists(path): + return None + + with open(path, 'r') as f: + return json.load(f) + + +def save_conversation(conversation: Dict[str, Any]): + """ + Save a conversation to storage. + + Args: + conversation: Conversation dict to save + """ + ensure_data_dir() + + path = get_conversation_path(conversation['id']) + with open(path, 'w') as f: + json.dump(conversation, f, indent=2) + + +def list_conversations() -> List[Dict[str, Any]]: + """ + List all conversations (metadata only). + + Returns: + List of conversation metadata dicts + """ + ensure_data_dir() + + conversations = [] + for filename in os.listdir(DATA_DIR): + if filename.endswith('.json'): + path = os.path.join(DATA_DIR, filename) + with open(path, 'r') as f: + data = json.load(f) + # Return metadata only + conversations.append({ + "id": data["id"], + "created_at": data["created_at"], + "message_count": len(data["messages"]) + }) + + # Sort by creation time, newest first + conversations.sort(key=lambda x: x["created_at"], reverse=True) + + return conversations + + +def add_user_message(conversation_id: str, content: str): + """ + Add a user message to a conversation. + + Args: + conversation_id: Conversation identifier + content: User message content + """ + conversation = get_conversation(conversation_id) + if conversation is None: + raise ValueError(f"Conversation {conversation_id} not found") + + conversation["messages"].append({ + "role": "user", + "content": content + }) + + save_conversation(conversation) + + +def add_assistant_message( + conversation_id: str, + stage1: List[Dict[str, Any]], + stage2: List[Dict[str, Any]], + stage3: Dict[str, Any] +): + """ + Add an assistant message with all 3 stages to a conversation. + + Args: + conversation_id: Conversation identifier + stage1: List of individual model responses + stage2: List of model rankings + stage3: Final synthesized response + """ + conversation = get_conversation(conversation_id) + if conversation is None: + raise ValueError(f"Conversation {conversation_id} not found") + + conversation["messages"].append({ + "role": "assistant", + "stage1": stage1, + "stage2": stage2, + "stage3": stage3 + }) + + save_conversation(conversation) |
