diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-01-27 09:57:37 -0600 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-01-27 09:57:37 -0600 |
| commit | dc801c07cf38b0c495686463e6ca6f871a64440e (patch) | |
| tree | 599f03114775921dbc472403c701f4a3a8ea188a /src/personalization/evaluation/pipeline/evaluator.py | |
| parent | e43b3f8aa36c198b95c1e46bea2eaf3893b13dc3 (diff) | |
Add collaborativeagents module and update gitignore
- Add collaborativeagents subproject with adapters, agents, and evaluation modules
- Update .gitignore to exclude large binary files (.whl, .tar), wandb logs, and results
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'src/personalization/evaluation/pipeline/evaluator.py')
| -rw-r--r-- | src/personalization/evaluation/pipeline/evaluator.py | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/src/personalization/evaluation/pipeline/evaluator.py b/src/personalization/evaluation/pipeline/evaluator.py new file mode 100644 index 0000000..7304400 --- /dev/null +++ b/src/personalization/evaluation/pipeline/evaluator.py @@ -0,0 +1,353 @@ +""" +Evaluation Pipeline + +Runs evaluation sessions between user simulator and agents. +Computes metrics: Task Success (TS), User Effort (UE), Efficiency (Eff). +""" + +import json +import os +from dataclasses import dataclass, field, asdict +from typing import List, Dict, Any, Optional +from datetime import datetime + +from ..profiles.generator import UserProfile +from ..preference_bank.schemas import PreferenceBank +from ..baselines.base import BaselineAgent +from ..user_simulator.simulator import UserSimulator, UserSimulatorResponse + + +@dataclass +class Task: + """A problem/task for evaluation.""" + task_id: str + dataset: str + problem: str + solution: str + task_description: str = "Work with the assistant to solve this problem:" + + +@dataclass +class SessionResult: + """Result of a single evaluation session.""" + user_id: str + task_id: str + dataset: str + agent_name: str + + # Metrics + task_success: bool # TS: Was the task solved correctly? + user_effort: int # UE: Number of preference enforcements + efficiency: int # Eff: Total number of messages + + # Details + conversation: List[Dict[str, str]] + preference_violations: List[Dict[str, Any]] + final_draft_answer: str + + # Debug + debug_info: Dict[str, Any] = field(default_factory=dict) + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class EvaluationMetrics: + """Aggregated evaluation metrics.""" + agent_name: str + num_sessions: int + + # Average metrics + avg_task_success: float # Average TS + avg_user_effort: float # Average UE + avg_efficiency: float # Average Eff + + # Breakdowns + task_success_by_dataset: Dict[str, float] = field(default_factory=dict) + user_effort_by_dataset: Dict[str, float] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class JudgeModel: + """ + LLM judge for evaluating task success. + Uses the same approach as collaborativeagents. + """ + + def __init__( + self, + model_name: str = "Llama-3.3-70B-Instruct", + api_base: Optional[str] = None, + api_key: Optional[str] = None, + ): + self.model_name = model_name + self.api_base = api_base or os.getenv("JUDGE_API_BASE", "http://localhost:8004/v1") + self.api_key = api_key or os.getenv("JUDGE_API_KEY", "EMPTY") + + self._init_client() + + def _init_client(self): + try: + import openai + self.client = openai.OpenAI( + base_url=self.api_base, + api_key=self.api_key, + ) + except Exception as e: + print(f"Warning: Could not initialize judge client: {e}") + self.client = None + + def evaluate_answer( + self, + problem: str, + correct_answer: str, + user_draft_answer: str, + ) -> bool: + """ + Evaluate if the user's draft answer is correct. + + Returns: + True if answer is correct, False otherwise + """ + prompt = f"""You are an expert evaluator. Determine if the user's answer is correct. + +# Problem +{problem} + +# Correct Answer +{correct_answer} + +# User's Answer +{user_draft_answer} + +# Instructions +Determine if the user's answer is accurate and consistent with the correct answer. +Minor formatting differences are acceptable. +The core answer/solution must match. + +Output JSON: +{{ + "reasoning": "Brief explanation", + "is_correct": true or false +}} + +Output only valid JSON.""" + + if self.client is None: + # Fallback - simple string matching + return correct_answer.lower().strip() in user_draft_answer.lower() + + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[{"role": "user", "content": prompt}], + temperature=0.0, + max_tokens=256, + ) + + text = response.choices[0].message.content.strip() + + # Parse JSON + if "```" in text: + text = text.split("```")[1] + if text.startswith("json"): + text = text[4:] + + data = json.loads(text) + return data.get("is_correct", False) + + except Exception as e: + print(f"Error in judge evaluation: {e}") + # Fallback + return correct_answer.lower().strip() in user_draft_answer.lower() + + +class Evaluator: + """ + Main evaluator that runs sessions and computes metrics. + """ + + def __init__( + self, + user_simulator: Optional[UserSimulator] = None, + judge: Optional[JudgeModel] = None, + ): + self.user_sim = user_simulator or UserSimulator() + self.judge = judge or JudgeModel() + + def run_session( + self, + agent: BaselineAgent, + user_profile: UserProfile, + task: Task, + max_turns: int = 30, + ) -> SessionResult: + """ + Run a single evaluation session. + + Args: + agent: The agent being evaluated + user_profile: User with preferences + task: Task to solve + max_turns: Maximum conversation turns + + Returns: + SessionResult with metrics and conversation + """ + # Setup user simulator + self.user_sim.setup( + profile=user_profile, + task_description=task.task_description, + problem=task.problem, + solution=task.solution, + ) + + conversation: List[Dict[str, str]] = [] + preference_violations: List[Dict[str, Any]] = [] + user_effort = 0 + final_draft_answer = "I don't know" + + # Agent opens the conversation + conversation.append({ + "role": "assistant", + "content": "How can I help you today?" + }) + + for turn in range(max_turns): + # User responds + user_response = self.user_sim.respond(conversation) + + conversation.append({ + "role": "user", + "content": user_response.response, + }) + + # Track preference violations and enforcement + violations_this_turn = [ + { + "turn": turn, + "preference_id": check.preference_id, + "topic": check.topic, + "violation_detail": check.violation_detail, + } + for check in user_response.preference_checks + if check.relevant and check.satisfied == False + ] + + if violations_this_turn: + preference_violations.extend(violations_this_turn) + + if user_response.enforcement_needed: + user_effort += 1 + + final_draft_answer = user_response.draft_answer + + # Check termination + if user_response.should_terminate or "TERMINATE" in user_response.response: + break + + # Agent responds + agent_response = agent.respond( + user_id=user_profile.user_id, + query=user_response.response, + conversation_history=conversation, + ) + + conversation.append({ + "role": "assistant", + "content": agent_response.answer, + }) + + # End session for agent (update memory, etc.) + agent.end_session(user_profile.user_id, conversation) + + # Evaluate task success + task_success = self.judge.evaluate_answer( + problem=task.problem, + correct_answer=task.solution, + user_draft_answer=final_draft_answer, + ) + + return SessionResult( + user_id=user_profile.user_id, + task_id=task.task_id, + dataset=task.dataset, + agent_name=agent.get_name(), + task_success=task_success, + user_effort=user_effort, + efficiency=len(conversation), + conversation=conversation, + preference_violations=preference_violations, + final_draft_answer=final_draft_answer, + debug_info={ + "num_turns": len(conversation) // 2, + "num_violations": len(preference_violations), + }, + ) + + def aggregate_metrics( + self, + results: List[SessionResult], + agent_name: str, + ) -> EvaluationMetrics: + """ + Aggregate metrics from multiple sessions. + """ + if not results: + return EvaluationMetrics( + agent_name=agent_name, + num_sessions=0, + avg_task_success=0.0, + avg_user_effort=0.0, + avg_efficiency=0.0, + ) + + # Overall averages + avg_ts = sum(r.task_success for r in results) / len(results) + avg_ue = sum(r.user_effort for r in results) / len(results) + avg_eff = sum(r.efficiency for r in results) / len(results) + + # By dataset + datasets = set(r.dataset for r in results) + ts_by_ds = {} + ue_by_ds = {} + + for ds in datasets: + ds_results = [r for r in results if r.dataset == ds] + if ds_results: + ts_by_ds[ds] = sum(r.task_success for r in ds_results) / len(ds_results) + ue_by_ds[ds] = sum(r.user_effort for r in ds_results) / len(ds_results) + + return EvaluationMetrics( + agent_name=agent_name, + num_sessions=len(results), + avg_task_success=avg_ts, + avg_user_effort=avg_ue, + avg_efficiency=avg_eff, + task_success_by_dataset=ts_by_ds, + user_effort_by_dataset=ue_by_ds, + ) + + def save_results(self, results: List[SessionResult], path: str): + """Save results to JSONL file.""" + with open(path, "w", encoding="utf-8") as f: + for result in results: + f.write(json.dumps(result.to_dict(), ensure_ascii=False) + "\n") + + @staticmethod + def load_results(path: str) -> List[SessionResult]: + """Load results from JSONL file.""" + results = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + if line.strip(): + data = json.loads(line) + # Reconstruct SessionResult + results.append(SessionResult(**data)) + return results + + |
