""" Evaluation Pipeline Runs evaluation sessions between user simulator and agents. Computes metrics: Task Success (TS), User Effort (UE), Efficiency (Eff). """ import json import os from dataclasses import dataclass, field, asdict from typing import List, Dict, Any, Optional from datetime import datetime from ..profiles.generator import UserProfile from ..preference_bank.schemas import PreferenceBank from ..baselines.base import BaselineAgent from ..user_simulator.simulator import UserSimulator, UserSimulatorResponse @dataclass class Task: """A problem/task for evaluation.""" task_id: str dataset: str problem: str solution: str task_description: str = "Work with the assistant to solve this problem:" @dataclass class SessionResult: """Result of a single evaluation session.""" user_id: str task_id: str dataset: str agent_name: str # Metrics task_success: bool # TS: Was the task solved correctly? user_effort: int # UE: Number of preference enforcements efficiency: int # Eff: Total number of messages # Details conversation: List[Dict[str, str]] preference_violations: List[Dict[str, Any]] final_draft_answer: str # Debug debug_info: Dict[str, Any] = field(default_factory=dict) timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class EvaluationMetrics: """Aggregated evaluation metrics.""" agent_name: str num_sessions: int # Average metrics avg_task_success: float # Average TS avg_user_effort: float # Average UE avg_efficiency: float # Average Eff # Breakdowns task_success_by_dataset: Dict[str, float] = field(default_factory=dict) user_effort_by_dataset: Dict[str, float] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return asdict(self) class JudgeModel: """ LLM judge for evaluating task success. Uses the same approach as collaborativeagents. """ def __init__( self, model_name: str = "Llama-3.3-70B-Instruct", api_base: Optional[str] = None, api_key: Optional[str] = None, ): self.model_name = model_name self.api_base = api_base or os.getenv("JUDGE_API_BASE", "http://localhost:8004/v1") self.api_key = api_key or os.getenv("JUDGE_API_KEY", "EMPTY") self._init_client() def _init_client(self): try: import openai self.client = openai.OpenAI( base_url=self.api_base, api_key=self.api_key, ) except Exception as e: print(f"Warning: Could not initialize judge client: {e}") self.client = None def evaluate_answer( self, problem: str, correct_answer: str, user_draft_answer: str, ) -> bool: """ Evaluate if the user's draft answer is correct. Returns: True if answer is correct, False otherwise """ prompt = f"""You are an expert evaluator. Determine if the user's answer is correct. # Problem {problem} # Correct Answer {correct_answer} # User's Answer {user_draft_answer} # Instructions Determine if the user's answer is accurate and consistent with the correct answer. Minor formatting differences are acceptable. The core answer/solution must match. Output JSON: {{ "reasoning": "Brief explanation", "is_correct": true or false }} Output only valid JSON.""" if self.client is None: # Fallback - simple string matching return correct_answer.lower().strip() in user_draft_answer.lower() try: response = self.client.chat.completions.create( model=self.model_name, messages=[{"role": "user", "content": prompt}], temperature=0.0, max_tokens=256, ) text = response.choices[0].message.content.strip() # Parse JSON if "```" in text: text = text.split("```")[1] if text.startswith("json"): text = text[4:] data = json.loads(text) return data.get("is_correct", False) except Exception as e: print(f"Error in judge evaluation: {e}") # Fallback return correct_answer.lower().strip() in user_draft_answer.lower() class Evaluator: """ Main evaluator that runs sessions and computes metrics. """ def __init__( self, user_simulator: Optional[UserSimulator] = None, judge: Optional[JudgeModel] = None, ): self.user_sim = user_simulator or UserSimulator() self.judge = judge or JudgeModel() def run_session( self, agent: BaselineAgent, user_profile: UserProfile, task: Task, max_turns: int = 30, ) -> SessionResult: """ Run a single evaluation session. Args: agent: The agent being evaluated user_profile: User with preferences task: Task to solve max_turns: Maximum conversation turns Returns: SessionResult with metrics and conversation """ # Setup user simulator self.user_sim.setup( profile=user_profile, task_description=task.task_description, problem=task.problem, solution=task.solution, ) conversation: List[Dict[str, str]] = [] preference_violations: List[Dict[str, Any]] = [] user_effort = 0 final_draft_answer = "I don't know" # Agent opens the conversation conversation.append({ "role": "assistant", "content": "How can I help you today?" }) for turn in range(max_turns): # User responds user_response = self.user_sim.respond(conversation) conversation.append({ "role": "user", "content": user_response.response, }) # Track preference violations and enforcement violations_this_turn = [ { "turn": turn, "preference_id": check.preference_id, "topic": check.topic, "violation_detail": check.violation_detail, } for check in user_response.preference_checks if check.relevant and check.satisfied == False ] if violations_this_turn: preference_violations.extend(violations_this_turn) if user_response.enforcement_needed: user_effort += 1 final_draft_answer = user_response.draft_answer # Check termination if user_response.should_terminate or "TERMINATE" in user_response.response: break # Agent responds agent_response = agent.respond( user_id=user_profile.user_id, query=user_response.response, conversation_history=conversation, ) conversation.append({ "role": "assistant", "content": agent_response.answer, }) # End session for agent (update memory, etc.) agent.end_session(user_profile.user_id, conversation) # Evaluate task success task_success = self.judge.evaluate_answer( problem=task.problem, correct_answer=task.solution, user_draft_answer=final_draft_answer, ) return SessionResult( user_id=user_profile.user_id, task_id=task.task_id, dataset=task.dataset, agent_name=agent.get_name(), task_success=task_success, user_effort=user_effort, efficiency=len(conversation), conversation=conversation, preference_violations=preference_violations, final_draft_answer=final_draft_answer, debug_info={ "num_turns": len(conversation) // 2, "num_violations": len(preference_violations), }, ) def aggregate_metrics( self, results: List[SessionResult], agent_name: str, ) -> EvaluationMetrics: """ Aggregate metrics from multiple sessions. """ if not results: return EvaluationMetrics( agent_name=agent_name, num_sessions=0, avg_task_success=0.0, avg_user_effort=0.0, avg_efficiency=0.0, ) # Overall averages avg_ts = sum(r.task_success for r in results) / len(results) avg_ue = sum(r.user_effort for r in results) / len(results) avg_eff = sum(r.efficiency for r in results) / len(results) # By dataset datasets = set(r.dataset for r in results) ts_by_ds = {} ue_by_ds = {} for ds in datasets: ds_results = [r for r in results if r.dataset == ds] if ds_results: ts_by_ds[ds] = sum(r.task_success for r in ds_results) / len(ds_results) ue_by_ds[ds] = sum(r.user_effort for r in ds_results) / len(ds_results) return EvaluationMetrics( agent_name=agent_name, num_sessions=len(results), avg_task_success=avg_ts, avg_user_effort=avg_ue, avg_efficiency=avg_eff, task_success_by_dataset=ts_by_ds, user_effort_by_dataset=ue_by_ds, ) def save_results(self, results: List[SessionResult], path: str): """Save results to JSONL file.""" with open(path, "w", encoding="utf-8") as f: for result in results: f.write(json.dumps(result.to_dict(), ensure_ascii=False) + "\n") @staticmethod def load_results(path: str) -> List[SessionResult]: """Load results from JSONL file.""" results = [] with open(path, "r", encoding="utf-8") as f: for line in f: if line.strip(): data = json.loads(line) # Reconstruct SessionResult results.append(SessionResult(**data)) return results