diff options
Diffstat (limited to 'src/personalization/evaluation/pipeline')
| -rw-r--r-- | src/personalization/evaluation/pipeline/__init__.py | 6 | ||||
| -rw-r--r-- | src/personalization/evaluation/pipeline/evaluator.py | 353 | ||||
| -rw-r--r-- | src/personalization/evaluation/pipeline/runner.py | 333 |
3 files changed, 692 insertions, 0 deletions
diff --git a/src/personalization/evaluation/pipeline/__init__.py b/src/personalization/evaluation/pipeline/__init__.py new file mode 100644 index 0000000..183d0c5 --- /dev/null +++ b/src/personalization/evaluation/pipeline/__init__.py @@ -0,0 +1,6 @@ +from .evaluator import Evaluator, SessionResult, EvaluationMetrics +from .runner import ExperimentRunner + +__all__ = ["Evaluator", "SessionResult", "EvaluationMetrics", "ExperimentRunner"] + + diff --git a/src/personalization/evaluation/pipeline/evaluator.py b/src/personalization/evaluation/pipeline/evaluator.py new file mode 100644 index 0000000..7304400 --- /dev/null +++ b/src/personalization/evaluation/pipeline/evaluator.py @@ -0,0 +1,353 @@ +""" +Evaluation Pipeline + +Runs evaluation sessions between user simulator and agents. +Computes metrics: Task Success (TS), User Effort (UE), Efficiency (Eff). +""" + +import json +import os +from dataclasses import dataclass, field, asdict +from typing import List, Dict, Any, Optional +from datetime import datetime + +from ..profiles.generator import UserProfile +from ..preference_bank.schemas import PreferenceBank +from ..baselines.base import BaselineAgent +from ..user_simulator.simulator import UserSimulator, UserSimulatorResponse + + +@dataclass +class Task: + """A problem/task for evaluation.""" + task_id: str + dataset: str + problem: str + solution: str + task_description: str = "Work with the assistant to solve this problem:" + + +@dataclass +class SessionResult: + """Result of a single evaluation session.""" + user_id: str + task_id: str + dataset: str + agent_name: str + + # Metrics + task_success: bool # TS: Was the task solved correctly? + user_effort: int # UE: Number of preference enforcements + efficiency: int # Eff: Total number of messages + + # Details + conversation: List[Dict[str, str]] + preference_violations: List[Dict[str, Any]] + final_draft_answer: str + + # Debug + debug_info: Dict[str, Any] = field(default_factory=dict) + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class EvaluationMetrics: + """Aggregated evaluation metrics.""" + agent_name: str + num_sessions: int + + # Average metrics + avg_task_success: float # Average TS + avg_user_effort: float # Average UE + avg_efficiency: float # Average Eff + + # Breakdowns + task_success_by_dataset: Dict[str, float] = field(default_factory=dict) + user_effort_by_dataset: Dict[str, float] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class JudgeModel: + """ + LLM judge for evaluating task success. + Uses the same approach as collaborativeagents. + """ + + def __init__( + self, + model_name: str = "Llama-3.3-70B-Instruct", + api_base: Optional[str] = None, + api_key: Optional[str] = None, + ): + self.model_name = model_name + self.api_base = api_base or os.getenv("JUDGE_API_BASE", "http://localhost:8004/v1") + self.api_key = api_key or os.getenv("JUDGE_API_KEY", "EMPTY") + + self._init_client() + + def _init_client(self): + try: + import openai + self.client = openai.OpenAI( + base_url=self.api_base, + api_key=self.api_key, + ) + except Exception as e: + print(f"Warning: Could not initialize judge client: {e}") + self.client = None + + def evaluate_answer( + self, + problem: str, + correct_answer: str, + user_draft_answer: str, + ) -> bool: + """ + Evaluate if the user's draft answer is correct. + + Returns: + True if answer is correct, False otherwise + """ + prompt = f"""You are an expert evaluator. Determine if the user's answer is correct. + +# Problem +{problem} + +# Correct Answer +{correct_answer} + +# User's Answer +{user_draft_answer} + +# Instructions +Determine if the user's answer is accurate and consistent with the correct answer. +Minor formatting differences are acceptable. +The core answer/solution must match. + +Output JSON: +{{ + "reasoning": "Brief explanation", + "is_correct": true or false +}} + +Output only valid JSON.""" + + if self.client is None: + # Fallback - simple string matching + return correct_answer.lower().strip() in user_draft_answer.lower() + + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[{"role": "user", "content": prompt}], + temperature=0.0, + max_tokens=256, + ) + + text = response.choices[0].message.content.strip() + + # Parse JSON + if "```" in text: + text = text.split("```")[1] + if text.startswith("json"): + text = text[4:] + + data = json.loads(text) + return data.get("is_correct", False) + + except Exception as e: + print(f"Error in judge evaluation: {e}") + # Fallback + return correct_answer.lower().strip() in user_draft_answer.lower() + + +class Evaluator: + """ + Main evaluator that runs sessions and computes metrics. + """ + + def __init__( + self, + user_simulator: Optional[UserSimulator] = None, + judge: Optional[JudgeModel] = None, + ): + self.user_sim = user_simulator or UserSimulator() + self.judge = judge or JudgeModel() + + def run_session( + self, + agent: BaselineAgent, + user_profile: UserProfile, + task: Task, + max_turns: int = 30, + ) -> SessionResult: + """ + Run a single evaluation session. + + Args: + agent: The agent being evaluated + user_profile: User with preferences + task: Task to solve + max_turns: Maximum conversation turns + + Returns: + SessionResult with metrics and conversation + """ + # Setup user simulator + self.user_sim.setup( + profile=user_profile, + task_description=task.task_description, + problem=task.problem, + solution=task.solution, + ) + + conversation: List[Dict[str, str]] = [] + preference_violations: List[Dict[str, Any]] = [] + user_effort = 0 + final_draft_answer = "I don't know" + + # Agent opens the conversation + conversation.append({ + "role": "assistant", + "content": "How can I help you today?" + }) + + for turn in range(max_turns): + # User responds + user_response = self.user_sim.respond(conversation) + + conversation.append({ + "role": "user", + "content": user_response.response, + }) + + # Track preference violations and enforcement + violations_this_turn = [ + { + "turn": turn, + "preference_id": check.preference_id, + "topic": check.topic, + "violation_detail": check.violation_detail, + } + for check in user_response.preference_checks + if check.relevant and check.satisfied == False + ] + + if violations_this_turn: + preference_violations.extend(violations_this_turn) + + if user_response.enforcement_needed: + user_effort += 1 + + final_draft_answer = user_response.draft_answer + + # Check termination + if user_response.should_terminate or "TERMINATE" in user_response.response: + break + + # Agent responds + agent_response = agent.respond( + user_id=user_profile.user_id, + query=user_response.response, + conversation_history=conversation, + ) + + conversation.append({ + "role": "assistant", + "content": agent_response.answer, + }) + + # End session for agent (update memory, etc.) + agent.end_session(user_profile.user_id, conversation) + + # Evaluate task success + task_success = self.judge.evaluate_answer( + problem=task.problem, + correct_answer=task.solution, + user_draft_answer=final_draft_answer, + ) + + return SessionResult( + user_id=user_profile.user_id, + task_id=task.task_id, + dataset=task.dataset, + agent_name=agent.get_name(), + task_success=task_success, + user_effort=user_effort, + efficiency=len(conversation), + conversation=conversation, + preference_violations=preference_violations, + final_draft_answer=final_draft_answer, + debug_info={ + "num_turns": len(conversation) // 2, + "num_violations": len(preference_violations), + }, + ) + + def aggregate_metrics( + self, + results: List[SessionResult], + agent_name: str, + ) -> EvaluationMetrics: + """ + Aggregate metrics from multiple sessions. + """ + if not results: + return EvaluationMetrics( + agent_name=agent_name, + num_sessions=0, + avg_task_success=0.0, + avg_user_effort=0.0, + avg_efficiency=0.0, + ) + + # Overall averages + avg_ts = sum(r.task_success for r in results) / len(results) + avg_ue = sum(r.user_effort for r in results) / len(results) + avg_eff = sum(r.efficiency for r in results) / len(results) + + # By dataset + datasets = set(r.dataset for r in results) + ts_by_ds = {} + ue_by_ds = {} + + for ds in datasets: + ds_results = [r for r in results if r.dataset == ds] + if ds_results: + ts_by_ds[ds] = sum(r.task_success for r in ds_results) / len(ds_results) + ue_by_ds[ds] = sum(r.user_effort for r in ds_results) / len(ds_results) + + return EvaluationMetrics( + agent_name=agent_name, + num_sessions=len(results), + avg_task_success=avg_ts, + avg_user_effort=avg_ue, + avg_efficiency=avg_eff, + task_success_by_dataset=ts_by_ds, + user_effort_by_dataset=ue_by_ds, + ) + + def save_results(self, results: List[SessionResult], path: str): + """Save results to JSONL file.""" + with open(path, "w", encoding="utf-8") as f: + for result in results: + f.write(json.dumps(result.to_dict(), ensure_ascii=False) + "\n") + + @staticmethod + def load_results(path: str) -> List[SessionResult]: + """Load results from JSONL file.""" + results = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + if line.strip(): + data = json.loads(line) + # Reconstruct SessionResult + results.append(SessionResult(**data)) + return results + + diff --git a/src/personalization/evaluation/pipeline/runner.py b/src/personalization/evaluation/pipeline/runner.py new file mode 100644 index 0000000..9971c7b --- /dev/null +++ b/src/personalization/evaluation/pipeline/runner.py @@ -0,0 +1,333 @@ +""" +Experiment Runner + +Orchestrates the full evaluation experiment: +1. Generate/load preference bank and user profiles +2. Load datasets +3. Run sessions for all users × tasks × agents +4. Aggregate and report metrics +""" + +import json +import os +from dataclasses import dataclass +from typing import List, Dict, Any, Optional +from datetime import datetime +from tqdm import tqdm + +from ..preference_bank.schemas import PreferenceBank +from ..preference_bank.generator import generate_demo_bank +from ..profiles.generator import UserProfile, UserProfileGenerator, generate_demo_profiles +from ..baselines.base import BaselineAgent +from ..baselines.no_memory import NoMemoryAgent +from ..baselines.rag_memory import RAGMemoryAgent +from ..user_simulator.simulator import UserSimulator +from .evaluator import Evaluator, Task, SessionResult, EvaluationMetrics + + +# Demo dataset: Simple math problems +DEMO_TASKS = [ + Task( + task_id="math_001", + dataset="math-demo", + problem="What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?", + solution="f'(x) = 3x^2 + 4x - 5", + task_description="Work with the assistant to solve this calculus problem:", + ), + Task( + task_id="math_002", + dataset="math-demo", + problem="Solve for x: 2x + 5 = 3x - 7", + solution="x = 12", + task_description="Work with the assistant to solve this algebra problem:", + ), + Task( + task_id="math_003", + dataset="math-demo", + problem="Find the area of a circle with radius 5.", + solution="A = 25π ≈ 78.54 square units", + task_description="Work with the assistant to solve this geometry problem:", + ), + Task( + task_id="code_001", + dataset="code-demo", + problem="Write a Python function that checks if a string is a palindrome.", + solution="def is_palindrome(s): return s == s[::-1]", + task_description="Work with the assistant to write this Python function:", + ), + Task( + task_id="code_002", + dataset="code-demo", + problem="Write a function to find the nth Fibonacci number.", + solution="def fib(n): return n if n <= 1 else fib(n-1) + fib(n-2)", + task_description="Work with the assistant to implement this algorithm:", + ), +] + + +@dataclass +class ExperimentConfig: + """Configuration for an experiment run.""" + name: str + output_dir: str + + # Scale + num_users: int = 2 + prefs_per_user: int = 10 + tasks_per_user: int = 3 + max_turns: int = 25 + + # Baselines to run + run_no_memory: bool = True + run_rag: bool = True + run_rag_uv: bool = False # User vector mode + + # Model configs + agent_model: str = "llama-8b" + user_sim_model: str = "Llama-3.3-70B-Instruct" + judge_model: str = "Llama-3.3-70B-Instruct" + + # API endpoints + agent_api_base: str = "http://localhost:8003/v1" + user_sim_api_base: str = "http://localhost:8004/v1" + + seed: int = 42 + + +class ExperimentRunner: + """ + Runs a complete evaluation experiment. + """ + + def __init__(self, config: ExperimentConfig): + self.config = config + + # Create output directory + os.makedirs(config.output_dir, exist_ok=True) + + # Will be initialized lazily + self._bank: Optional[PreferenceBank] = None + self._profiles: Optional[List[UserProfile]] = None + self._tasks: Optional[List[Task]] = None + self._evaluator: Optional[Evaluator] = None + + def setup(self): + """Initialize all components.""" + print("=" * 60) + print(f"Setting up experiment: {self.config.name}") + print("=" * 60) + + # 1. Generate/load preference bank + bank_path = os.path.join(self.config.output_dir, "preference_bank.json") + if os.path.exists(bank_path): + print(f"Loading existing preference bank from {bank_path}") + self._bank = PreferenceBank.load(bank_path) + else: + print("Generating new preference bank...") + self._bank = generate_demo_bank(output_path=bank_path, use_llm=False) + + print(f" Bank stats: {self._bank.stats()}") + + # 2. Generate/load user profiles + profiles_path = os.path.join(self.config.output_dir, "user_profiles.json") + if os.path.exists(profiles_path): + print(f"Loading existing profiles from {profiles_path}") + self._profiles = UserProfileGenerator.load_profiles(profiles_path) + else: + print(f"Generating {self.config.num_users} user profiles...") + self._profiles = generate_demo_profiles( + bank=self._bank, + num_users=self.config.num_users, + prefs_per_user=self.config.prefs_per_user, + output_path=profiles_path, + seed=self.config.seed, + ) + + # 3. Load tasks + self._tasks = DEMO_TASKS[:self.config.tasks_per_user * 2] # Use demo tasks + print(f" Loaded {len(self._tasks)} tasks") + + # 4. Initialize evaluator + user_sim = UserSimulator( + model_name=self.config.user_sim_model, + api_base=self.config.user_sim_api_base, + ) + self._evaluator = Evaluator(user_simulator=user_sim) + + print("Setup complete!\n") + + def _create_agents(self) -> Dict[str, BaselineAgent]: + """Create agent instances based on config.""" + agents = {} + + if self.config.run_no_memory: + agents["T1_NoMemory"] = NoMemoryAgent( + model_name=self.config.agent_model, + api_base=self.config.agent_api_base, + ) + + if self.config.run_rag: + # Create directories for RAG memory + memory_dir = os.path.join(self.config.output_dir, "rag_memory") + os.makedirs(memory_dir, exist_ok=True) + + agents["Y3_RAG"] = RAGMemoryAgent( + model_name=self.config.agent_model, + mode="nopersonal", + memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"), + memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"), + ) + + if self.config.run_rag_uv: + memory_dir = os.path.join(self.config.output_dir, "rag_uv_memory") + os.makedirs(memory_dir, exist_ok=True) + + agents["Y4_RAG_UV"] = RAGMemoryAgent( + model_name=self.config.agent_model, + mode="full", + memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"), + memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"), + enable_rl_updates=True, + ) + + return agents + + def run(self) -> Dict[str, EvaluationMetrics]: + """ + Run the full experiment. + + Returns: + Dict mapping agent name to aggregated metrics + """ + if self._evaluator is None: + self.setup() + + agents = self._create_agents() + all_results: Dict[str, List[SessionResult]] = {name: [] for name in agents} + + print("=" * 60) + print("Running experiment") + print("=" * 60) + + # Run for each agent + for agent_name, agent in agents.items(): + print(f"\n>>> Agent: {agent_name}") + + # Run for each user + for profile in tqdm(self._profiles, desc=f"Users ({agent_name})"): + # Reset user state + agent.reset_user(profile.user_id) + + # Get tasks for this user + # In demo, just cycle through available tasks + user_tasks = self._tasks[:self.config.tasks_per_user] + + # Run sessions + for task in user_tasks: + result = self._evaluator.run_session( + agent=agent, + user_profile=profile, + task=task, + max_turns=self.config.max_turns, + ) + + all_results[agent_name].append(result) + + # Print progress + status = "✓" if result.task_success else "✗" + print(f" {profile.user_id} | {task.task_id} | " + f"TS={status} | UE={result.user_effort} | Eff={result.efficiency}") + + # Save raw results + for agent_name, results in all_results.items(): + results_path = os.path.join( + self.config.output_dir, + f"results_{agent_name}.jsonl" + ) + self._evaluator.save_results(results, results_path) + + # Aggregate metrics + metrics = {} + for agent_name, results in all_results.items(): + metrics[agent_name] = self._evaluator.aggregate_metrics(results, agent_name) + + # Save and print summary + self._save_summary(metrics) + self._print_summary(metrics) + + return metrics + + def _save_summary(self, metrics: Dict[str, EvaluationMetrics]): + """Save experiment summary.""" + summary = { + "experiment_name": self.config.name, + "timestamp": datetime.now().isoformat(), + "config": { + "num_users": self.config.num_users, + "prefs_per_user": self.config.prefs_per_user, + "tasks_per_user": self.config.tasks_per_user, + "max_turns": self.config.max_turns, + }, + "metrics": {name: m.to_dict() for name, m in metrics.items()}, + } + + summary_path = os.path.join(self.config.output_dir, "summary.json") + with open(summary_path, "w", encoding="utf-8") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + + print(f"\nSummary saved to {summary_path}") + + def _print_summary(self, metrics: Dict[str, EvaluationMetrics]): + """Print experiment summary.""" + print("\n" + "=" * 60) + print("EXPERIMENT SUMMARY") + print("=" * 60) + + # Header + print(f"\n{'Agent':<20} {'TS ↑':>10} {'UE ↓':>10} {'Eff ↓':>10} {'Sessions':>10}") + print("-" * 60) + + for agent_name, m in metrics.items(): + print(f"{agent_name:<20} {m.avg_task_success:>10.2%} " + f"{m.avg_user_effort:>10.2f} {m.avg_efficiency:>10.1f} " + f"{m.num_sessions:>10}") + + print("\n" + "=" * 60) + + +def run_demo_experiment(output_dir: str = "data/eval/demo_experiment"): + """ + Run a minimal demo experiment. + + This is a quick sanity check with: + - 2 users + - 10 preferences per user + - 3 tasks per user + - T1 (NoMemory) vs Y3 (RAG) comparison + """ + config = ExperimentConfig( + name="demo_experiment", + output_dir=output_dir, + num_users=2, + prefs_per_user=10, + tasks_per_user=3, + max_turns=15, + run_no_memory=True, + run_rag=True, + run_rag_uv=False, + ) + + runner = ExperimentRunner(config) + runner.setup() + metrics = runner.run() + + return metrics + + +if __name__ == "__main__": + import sys + + output_dir = sys.argv[1] if len(sys.argv) > 1 else "data/eval/demo_experiment" + run_demo_experiment(output_dir) + + |
