diff options
Diffstat (limited to 'src/personalization/evaluation/pipeline/runner.py')
| -rw-r--r-- | src/personalization/evaluation/pipeline/runner.py | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/src/personalization/evaluation/pipeline/runner.py b/src/personalization/evaluation/pipeline/runner.py new file mode 100644 index 0000000..9971c7b --- /dev/null +++ b/src/personalization/evaluation/pipeline/runner.py @@ -0,0 +1,333 @@ +""" +Experiment Runner + +Orchestrates the full evaluation experiment: +1. Generate/load preference bank and user profiles +2. Load datasets +3. Run sessions for all users × tasks × agents +4. Aggregate and report metrics +""" + +import json +import os +from dataclasses import dataclass +from typing import List, Dict, Any, Optional +from datetime import datetime +from tqdm import tqdm + +from ..preference_bank.schemas import PreferenceBank +from ..preference_bank.generator import generate_demo_bank +from ..profiles.generator import UserProfile, UserProfileGenerator, generate_demo_profiles +from ..baselines.base import BaselineAgent +from ..baselines.no_memory import NoMemoryAgent +from ..baselines.rag_memory import RAGMemoryAgent +from ..user_simulator.simulator import UserSimulator +from .evaluator import Evaluator, Task, SessionResult, EvaluationMetrics + + +# Demo dataset: Simple math problems +DEMO_TASKS = [ + Task( + task_id="math_001", + dataset="math-demo", + problem="What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?", + solution="f'(x) = 3x^2 + 4x - 5", + task_description="Work with the assistant to solve this calculus problem:", + ), + Task( + task_id="math_002", + dataset="math-demo", + problem="Solve for x: 2x + 5 = 3x - 7", + solution="x = 12", + task_description="Work with the assistant to solve this algebra problem:", + ), + Task( + task_id="math_003", + dataset="math-demo", + problem="Find the area of a circle with radius 5.", + solution="A = 25π ≈ 78.54 square units", + task_description="Work with the assistant to solve this geometry problem:", + ), + Task( + task_id="code_001", + dataset="code-demo", + problem="Write a Python function that checks if a string is a palindrome.", + solution="def is_palindrome(s): return s == s[::-1]", + task_description="Work with the assistant to write this Python function:", + ), + Task( + task_id="code_002", + dataset="code-demo", + problem="Write a function to find the nth Fibonacci number.", + solution="def fib(n): return n if n <= 1 else fib(n-1) + fib(n-2)", + task_description="Work with the assistant to implement this algorithm:", + ), +] + + +@dataclass +class ExperimentConfig: + """Configuration for an experiment run.""" + name: str + output_dir: str + + # Scale + num_users: int = 2 + prefs_per_user: int = 10 + tasks_per_user: int = 3 + max_turns: int = 25 + + # Baselines to run + run_no_memory: bool = True + run_rag: bool = True + run_rag_uv: bool = False # User vector mode + + # Model configs + agent_model: str = "llama-8b" + user_sim_model: str = "Llama-3.3-70B-Instruct" + judge_model: str = "Llama-3.3-70B-Instruct" + + # API endpoints + agent_api_base: str = "http://localhost:8003/v1" + user_sim_api_base: str = "http://localhost:8004/v1" + + seed: int = 42 + + +class ExperimentRunner: + """ + Runs a complete evaluation experiment. + """ + + def __init__(self, config: ExperimentConfig): + self.config = config + + # Create output directory + os.makedirs(config.output_dir, exist_ok=True) + + # Will be initialized lazily + self._bank: Optional[PreferenceBank] = None + self._profiles: Optional[List[UserProfile]] = None + self._tasks: Optional[List[Task]] = None + self._evaluator: Optional[Evaluator] = None + + def setup(self): + """Initialize all components.""" + print("=" * 60) + print(f"Setting up experiment: {self.config.name}") + print("=" * 60) + + # 1. Generate/load preference bank + bank_path = os.path.join(self.config.output_dir, "preference_bank.json") + if os.path.exists(bank_path): + print(f"Loading existing preference bank from {bank_path}") + self._bank = PreferenceBank.load(bank_path) + else: + print("Generating new preference bank...") + self._bank = generate_demo_bank(output_path=bank_path, use_llm=False) + + print(f" Bank stats: {self._bank.stats()}") + + # 2. Generate/load user profiles + profiles_path = os.path.join(self.config.output_dir, "user_profiles.json") + if os.path.exists(profiles_path): + print(f"Loading existing profiles from {profiles_path}") + self._profiles = UserProfileGenerator.load_profiles(profiles_path) + else: + print(f"Generating {self.config.num_users} user profiles...") + self._profiles = generate_demo_profiles( + bank=self._bank, + num_users=self.config.num_users, + prefs_per_user=self.config.prefs_per_user, + output_path=profiles_path, + seed=self.config.seed, + ) + + # 3. Load tasks + self._tasks = DEMO_TASKS[:self.config.tasks_per_user * 2] # Use demo tasks + print(f" Loaded {len(self._tasks)} tasks") + + # 4. Initialize evaluator + user_sim = UserSimulator( + model_name=self.config.user_sim_model, + api_base=self.config.user_sim_api_base, + ) + self._evaluator = Evaluator(user_simulator=user_sim) + + print("Setup complete!\n") + + def _create_agents(self) -> Dict[str, BaselineAgent]: + """Create agent instances based on config.""" + agents = {} + + if self.config.run_no_memory: + agents["T1_NoMemory"] = NoMemoryAgent( + model_name=self.config.agent_model, + api_base=self.config.agent_api_base, + ) + + if self.config.run_rag: + # Create directories for RAG memory + memory_dir = os.path.join(self.config.output_dir, "rag_memory") + os.makedirs(memory_dir, exist_ok=True) + + agents["Y3_RAG"] = RAGMemoryAgent( + model_name=self.config.agent_model, + mode="nopersonal", + memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"), + memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"), + ) + + if self.config.run_rag_uv: + memory_dir = os.path.join(self.config.output_dir, "rag_uv_memory") + os.makedirs(memory_dir, exist_ok=True) + + agents["Y4_RAG_UV"] = RAGMemoryAgent( + model_name=self.config.agent_model, + mode="full", + memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"), + memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"), + enable_rl_updates=True, + ) + + return agents + + def run(self) -> Dict[str, EvaluationMetrics]: + """ + Run the full experiment. + + Returns: + Dict mapping agent name to aggregated metrics + """ + if self._evaluator is None: + self.setup() + + agents = self._create_agents() + all_results: Dict[str, List[SessionResult]] = {name: [] for name in agents} + + print("=" * 60) + print("Running experiment") + print("=" * 60) + + # Run for each agent + for agent_name, agent in agents.items(): + print(f"\n>>> Agent: {agent_name}") + + # Run for each user + for profile in tqdm(self._profiles, desc=f"Users ({agent_name})"): + # Reset user state + agent.reset_user(profile.user_id) + + # Get tasks for this user + # In demo, just cycle through available tasks + user_tasks = self._tasks[:self.config.tasks_per_user] + + # Run sessions + for task in user_tasks: + result = self._evaluator.run_session( + agent=agent, + user_profile=profile, + task=task, + max_turns=self.config.max_turns, + ) + + all_results[agent_name].append(result) + + # Print progress + status = "✓" if result.task_success else "✗" + print(f" {profile.user_id} | {task.task_id} | " + f"TS={status} | UE={result.user_effort} | Eff={result.efficiency}") + + # Save raw results + for agent_name, results in all_results.items(): + results_path = os.path.join( + self.config.output_dir, + f"results_{agent_name}.jsonl" + ) + self._evaluator.save_results(results, results_path) + + # Aggregate metrics + metrics = {} + for agent_name, results in all_results.items(): + metrics[agent_name] = self._evaluator.aggregate_metrics(results, agent_name) + + # Save and print summary + self._save_summary(metrics) + self._print_summary(metrics) + + return metrics + + def _save_summary(self, metrics: Dict[str, EvaluationMetrics]): + """Save experiment summary.""" + summary = { + "experiment_name": self.config.name, + "timestamp": datetime.now().isoformat(), + "config": { + "num_users": self.config.num_users, + "prefs_per_user": self.config.prefs_per_user, + "tasks_per_user": self.config.tasks_per_user, + "max_turns": self.config.max_turns, + }, + "metrics": {name: m.to_dict() for name, m in metrics.items()}, + } + + summary_path = os.path.join(self.config.output_dir, "summary.json") + with open(summary_path, "w", encoding="utf-8") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + + print(f"\nSummary saved to {summary_path}") + + def _print_summary(self, metrics: Dict[str, EvaluationMetrics]): + """Print experiment summary.""" + print("\n" + "=" * 60) + print("EXPERIMENT SUMMARY") + print("=" * 60) + + # Header + print(f"\n{'Agent':<20} {'TS ↑':>10} {'UE ↓':>10} {'Eff ↓':>10} {'Sessions':>10}") + print("-" * 60) + + for agent_name, m in metrics.items(): + print(f"{agent_name:<20} {m.avg_task_success:>10.2%} " + f"{m.avg_user_effort:>10.2f} {m.avg_efficiency:>10.1f} " + f"{m.num_sessions:>10}") + + print("\n" + "=" * 60) + + +def run_demo_experiment(output_dir: str = "data/eval/demo_experiment"): + """ + Run a minimal demo experiment. + + This is a quick sanity check with: + - 2 users + - 10 preferences per user + - 3 tasks per user + - T1 (NoMemory) vs Y3 (RAG) comparison + """ + config = ExperimentConfig( + name="demo_experiment", + output_dir=output_dir, + num_users=2, + prefs_per_user=10, + tasks_per_user=3, + max_turns=15, + run_no_memory=True, + run_rag=True, + run_rag_uv=False, + ) + + runner = ExperimentRunner(config) + runner.setup() + metrics = runner.run() + + return metrics + + +if __name__ == "__main__": + import sys + + output_dir = sys.argv[1] if len(sys.argv) > 1 else "data/eval/demo_experiment" + run_demo_experiment(output_dir) + + |
