diff options
Diffstat (limited to 'backend/app/services/council.py')
| -rw-r--r-- | backend/app/services/council.py | 322 |
1 files changed, 322 insertions, 0 deletions
diff --git a/backend/app/services/council.py b/backend/app/services/council.py new file mode 100644 index 0000000..d177f44 --- /dev/null +++ b/backend/app/services/council.py @@ -0,0 +1,322 @@ +"""3-stage LLM Council orchestration for ContextFlow.""" + +import asyncio +import json +import logging +import re +from collections import defaultdict +from typing import AsyncGenerator, Dict, List, Any, Optional, Tuple + +from app.schemas import Context, LLMConfig +from app.services.llm import query_model_full, llm_streamer + +logger = logging.getLogger("contextflow.council") + + +async def stage1_collect_responses( + user_prompt: str, + context: Context, + configs: List[LLMConfig], + attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[Dict[str, Any], None]: + """ + Stage 1: Query all council member models in parallel. + Yields events as each model completes. + Returns final list via stage1_complete event. + """ + async def _query_one(idx: int, config: LLMConfig) -> Dict[str, Any]: + atts = attachments_per_model[idx] if attachments_per_model else None + tls = tools_per_model[idx] if tools_per_model else None + try: + response = await query_model_full( + context, user_prompt, config, + attachments=atts, tools=tls, + openrouter_api_key=openrouter_api_key, images=images, + ) + return {"model": config.model_name, "response": response} + except Exception as e: + logger.error("Council stage1 failed for %s: %s", config.model_name, e) + return {"model": config.model_name, "response": f"[Error: {e}]"} + + # Launch all queries concurrently, yield as each completes + tasks = { + asyncio.ensure_future(_query_one(i, cfg)): i + for i, cfg in enumerate(configs) + } + results: List[Optional[Dict[str, Any]]] = [None] * len(configs) + + for coro in asyncio.as_completed(tasks.keys()): + result = await coro + idx = tasks[[t for t in tasks if t.done() and t.result() is result][0]] + results[idx] = result + yield result # caller sends stage1_model_complete event + + # Not yielded — caller collects via the individual yields + + +def _build_ranking_prompt(user_query: str, stage1_results: List[Dict[str, Any]]) -> str: + """Build the anonymized ranking prompt for Stage 2.""" + labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ... + + responses_text = "\n\n".join([ + f"Response {label}:\n{result['response']}" + for label, result in zip(labels, stage1_results) + ]) + + return f"""You are evaluating different responses to the following question: + +Question: {user_query} + +Here are the responses from different models (anonymized): + +{responses_text} + +Your task: +1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly. +2. Then, at the very end of your response, provide a final ranking. + +IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows: +- Start with the line "FINAL RANKING:" (all caps, with colon) +- Then list the responses from best to worst as a numbered list +- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A") +- Do not add any other text or explanations in the ranking section + +Example of the correct format for your ENTIRE response: + +Response A provides good detail on X but misses Y... +Response B is accurate but lacks depth on Z... +Response C offers the most comprehensive answer... + +FINAL RANKING: +1. Response C +2. Response A +3. Response B + +Now provide your evaluation and ranking:""" + + +def parse_ranking_from_text(ranking_text: str) -> List[str]: + """Parse the FINAL RANKING section from the model's response.""" + if "FINAL RANKING:" in ranking_text: + parts = ranking_text.split("FINAL RANKING:") + if len(parts) >= 2: + ranking_section = parts[1] + numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section) + if numbered_matches: + return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches] + matches = re.findall(r'Response [A-Z]', ranking_section) + return matches + + matches = re.findall(r'Response [A-Z]', ranking_text) + return matches + + +async def stage2_collect_rankings( + user_query: str, + stage1_results: List[Dict[str, Any]], + configs: List[LLMConfig], + openrouter_api_key: Optional[str] = None, +) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: + """ + Stage 2: Each model ranks the anonymized responses. + Text-only prompts, no file attachments. + """ + labels = [chr(65 + i) for i in range(len(stage1_results))] + label_to_model = { + f"Response {label}": result['model'] + for label, result in zip(labels, stage1_results) + } + + ranking_prompt = _build_ranking_prompt(user_query, stage1_results) + empty_context = Context(messages=[]) + + async def _rank_one(config: LLMConfig) -> Dict[str, Any]: + try: + response = await query_model_full( + empty_context, ranking_prompt, config, + openrouter_api_key=openrouter_api_key, + ) + parsed = parse_ranking_from_text(response) + return { + "model": config.model_name, + "ranking": response, + "parsed_ranking": parsed, + } + except Exception as e: + logger.error("Council stage2 failed for %s: %s", config.model_name, e) + return { + "model": config.model_name, + "ranking": f"[Error: {e}]", + "parsed_ranking": [], + } + + results = await asyncio.gather(*[_rank_one(cfg) for cfg in configs]) + return list(results), label_to_model + + +def calculate_aggregate_rankings( + stage2_results: List[Dict[str, Any]], + label_to_model: Dict[str, str], +) -> List[Dict[str, Any]]: + """Calculate aggregate rankings across all models.""" + model_positions: Dict[str, List[int]] = defaultdict(list) + + for ranking in stage2_results: + parsed_ranking = ranking.get("parsed_ranking", []) + if not parsed_ranking: + parsed_ranking = parse_ranking_from_text(ranking.get("ranking", "")) + for position, label in enumerate(parsed_ranking, start=1): + if label in label_to_model: + model_name = label_to_model[label] + model_positions[model_name].append(position) + + aggregate = [] + for model, positions in model_positions.items(): + if positions: + avg_rank = sum(positions) / len(positions) + aggregate.append({ + "model": model, + "average_rank": round(avg_rank, 2), + "rankings_count": len(positions), + }) + + aggregate.sort(key=lambda x: x["average_rank"]) + return aggregate + + +def _build_chairman_prompt( + user_query: str, + stage1_results: List[Dict[str, Any]], + stage2_results: List[Dict[str, Any]], +) -> str: + """Build the chairman synthesis prompt for Stage 3.""" + stage1_text = "\n\n".join([ + f"Model: {result['model']}\nResponse: {result['response']}" + for result in stage1_results + ]) + + stage2_text = "\n\n".join([ + f"Model: {result['model']}\nRanking: {result['ranking']}" + for result in stage2_results + ]) + + return f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. + +Original Question: {user_query} + +STAGE 1 - Individual Responses: +{stage1_text} + +STAGE 2 - Peer Rankings: +{stage2_text} + +Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: +- The individual responses and their insights +- The peer rankings and what they reveal about response quality +- Any patterns of agreement or disagreement + +Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" + + +async def stage3_stream_synthesis( + user_query: str, + stage1_results: List[Dict[str, Any]], + stage2_results: List[Dict[str, Any]], + chairman_config: LLMConfig, + openrouter_api_key: Optional[str] = None, +) -> AsyncGenerator[str, None]: + """ + Stage 3: Chairman synthesizes final answer. Streams text chunks. + """ + chairman_prompt = _build_chairman_prompt(user_query, stage1_results, stage2_results) + empty_context = Context(messages=[]) + + async for chunk in llm_streamer( + empty_context, chairman_prompt, chairman_config, + openrouter_api_key=openrouter_api_key, + ): + yield chunk + + +def _sse_event(data: dict) -> str: + """Format a dict as an SSE data line.""" + return f"data: {json.dumps(data)}\n\n" + + +async def council_event_stream( + user_prompt: str, + context: Context, + member_configs: List[LLMConfig], + chairman_config: LLMConfig, + attachments_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + tools_per_model: Optional[List[Optional[List[Dict[str, Any]]]]] = None, + openrouter_api_key: Optional[str] = None, + images: Optional[List[Dict[str, Any]]] = None, +) -> AsyncGenerator[str, None]: + """ + Master orchestrator yielding SSE JSON events through the 3-stage council process. + """ + # === Stage 1 === + yield _sse_event({"type": "stage1_start"}) + + stage1_results: List[Dict[str, Any]] = [] + async for result in stage1_collect_responses( + user_prompt, context, member_configs, + attachments_per_model=attachments_per_model, + tools_per_model=tools_per_model, + openrouter_api_key=openrouter_api_key, + images=images, + ): + stage1_results.append(result) + yield _sse_event({ + "type": "stage1_model_complete", + "data": {"model": result["model"], "response": result["response"]}, + }) + + yield _sse_event({"type": "stage1_complete", "data": stage1_results}) + + if not stage1_results: + yield _sse_event({ + "type": "error", + "data": {"message": "All council models failed to respond."}, + }) + return + + # === Stage 2 === + yield _sse_event({"type": "stage2_start"}) + + stage2_results, label_to_model = await stage2_collect_rankings( + user_prompt, stage1_results, member_configs, + openrouter_api_key=openrouter_api_key, + ) + aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) + + yield _sse_event({ + "type": "stage2_complete", + "data": { + "rankings": stage2_results, + "label_to_model": label_to_model, + "aggregate_rankings": aggregate_rankings, + }, + }) + + # === Stage 3 (streamed) === + yield _sse_event({"type": "stage3_start"}) + + full_response = "" + async for chunk in stage3_stream_synthesis( + user_prompt, stage1_results, stage2_results, chairman_config, + openrouter_api_key=openrouter_api_key, + ): + full_response += chunk + yield _sse_event({"type": "stage3_chunk", "data": {"chunk": chunk}}) + + yield _sse_event({ + "type": "stage3_complete", + "data": {"model": chairman_config.model_name, "response": full_response}, + }) + + yield _sse_event({"type": "complete"}) |
