summaryrefslogtreecommitdiff
path: root/src/personalization/evaluation/pipeline/runner.py
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-01-27 09:57:37 -0600
committerYurenHao0426 <blackhao0426@gmail.com>2026-01-27 09:57:37 -0600
commitdc801c07cf38b0c495686463e6ca6f871a64440e (patch)
tree599f03114775921dbc472403c701f4a3a8ea188a /src/personalization/evaluation/pipeline/runner.py
parente43b3f8aa36c198b95c1e46bea2eaf3893b13dc3 (diff)
Add collaborativeagents module and update gitignore
- Add collaborativeagents subproject with adapters, agents, and evaluation modules - Update .gitignore to exclude large binary files (.whl, .tar), wandb logs, and results Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'src/personalization/evaluation/pipeline/runner.py')
-rw-r--r--src/personalization/evaluation/pipeline/runner.py333
1 files changed, 333 insertions, 0 deletions
diff --git a/src/personalization/evaluation/pipeline/runner.py b/src/personalization/evaluation/pipeline/runner.py
new file mode 100644
index 0000000..9971c7b
--- /dev/null
+++ b/src/personalization/evaluation/pipeline/runner.py
@@ -0,0 +1,333 @@
+"""
+Experiment Runner
+
+Orchestrates the full evaluation experiment:
+1. Generate/load preference bank and user profiles
+2. Load datasets
+3. Run sessions for all users × tasks × agents
+4. Aggregate and report metrics
+"""
+
+import json
+import os
+from dataclasses import dataclass
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+from tqdm import tqdm
+
+from ..preference_bank.schemas import PreferenceBank
+from ..preference_bank.generator import generate_demo_bank
+from ..profiles.generator import UserProfile, UserProfileGenerator, generate_demo_profiles
+from ..baselines.base import BaselineAgent
+from ..baselines.no_memory import NoMemoryAgent
+from ..baselines.rag_memory import RAGMemoryAgent
+from ..user_simulator.simulator import UserSimulator
+from .evaluator import Evaluator, Task, SessionResult, EvaluationMetrics
+
+
+# Demo dataset: Simple math problems
+DEMO_TASKS = [
+ Task(
+ task_id="math_001",
+ dataset="math-demo",
+ problem="What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?",
+ solution="f'(x) = 3x^2 + 4x - 5",
+ task_description="Work with the assistant to solve this calculus problem:",
+ ),
+ Task(
+ task_id="math_002",
+ dataset="math-demo",
+ problem="Solve for x: 2x + 5 = 3x - 7",
+ solution="x = 12",
+ task_description="Work with the assistant to solve this algebra problem:",
+ ),
+ Task(
+ task_id="math_003",
+ dataset="math-demo",
+ problem="Find the area of a circle with radius 5.",
+ solution="A = 25π ≈ 78.54 square units",
+ task_description="Work with the assistant to solve this geometry problem:",
+ ),
+ Task(
+ task_id="code_001",
+ dataset="code-demo",
+ problem="Write a Python function that checks if a string is a palindrome.",
+ solution="def is_palindrome(s): return s == s[::-1]",
+ task_description="Work with the assistant to write this Python function:",
+ ),
+ Task(
+ task_id="code_002",
+ dataset="code-demo",
+ problem="Write a function to find the nth Fibonacci number.",
+ solution="def fib(n): return n if n <= 1 else fib(n-1) + fib(n-2)",
+ task_description="Work with the assistant to implement this algorithm:",
+ ),
+]
+
+
+@dataclass
+class ExperimentConfig:
+ """Configuration for an experiment run."""
+ name: str
+ output_dir: str
+
+ # Scale
+ num_users: int = 2
+ prefs_per_user: int = 10
+ tasks_per_user: int = 3
+ max_turns: int = 25
+
+ # Baselines to run
+ run_no_memory: bool = True
+ run_rag: bool = True
+ run_rag_uv: bool = False # User vector mode
+
+ # Model configs
+ agent_model: str = "llama-8b"
+ user_sim_model: str = "Llama-3.3-70B-Instruct"
+ judge_model: str = "Llama-3.3-70B-Instruct"
+
+ # API endpoints
+ agent_api_base: str = "http://localhost:8003/v1"
+ user_sim_api_base: str = "http://localhost:8004/v1"
+
+ seed: int = 42
+
+
+class ExperimentRunner:
+ """
+ Runs a complete evaluation experiment.
+ """
+
+ def __init__(self, config: ExperimentConfig):
+ self.config = config
+
+ # Create output directory
+ os.makedirs(config.output_dir, exist_ok=True)
+
+ # Will be initialized lazily
+ self._bank: Optional[PreferenceBank] = None
+ self._profiles: Optional[List[UserProfile]] = None
+ self._tasks: Optional[List[Task]] = None
+ self._evaluator: Optional[Evaluator] = None
+
+ def setup(self):
+ """Initialize all components."""
+ print("=" * 60)
+ print(f"Setting up experiment: {self.config.name}")
+ print("=" * 60)
+
+ # 1. Generate/load preference bank
+ bank_path = os.path.join(self.config.output_dir, "preference_bank.json")
+ if os.path.exists(bank_path):
+ print(f"Loading existing preference bank from {bank_path}")
+ self._bank = PreferenceBank.load(bank_path)
+ else:
+ print("Generating new preference bank...")
+ self._bank = generate_demo_bank(output_path=bank_path, use_llm=False)
+
+ print(f" Bank stats: {self._bank.stats()}")
+
+ # 2. Generate/load user profiles
+ profiles_path = os.path.join(self.config.output_dir, "user_profiles.json")
+ if os.path.exists(profiles_path):
+ print(f"Loading existing profiles from {profiles_path}")
+ self._profiles = UserProfileGenerator.load_profiles(profiles_path)
+ else:
+ print(f"Generating {self.config.num_users} user profiles...")
+ self._profiles = generate_demo_profiles(
+ bank=self._bank,
+ num_users=self.config.num_users,
+ prefs_per_user=self.config.prefs_per_user,
+ output_path=profiles_path,
+ seed=self.config.seed,
+ )
+
+ # 3. Load tasks
+ self._tasks = DEMO_TASKS[:self.config.tasks_per_user * 2] # Use demo tasks
+ print(f" Loaded {len(self._tasks)} tasks")
+
+ # 4. Initialize evaluator
+ user_sim = UserSimulator(
+ model_name=self.config.user_sim_model,
+ api_base=self.config.user_sim_api_base,
+ )
+ self._evaluator = Evaluator(user_simulator=user_sim)
+
+ print("Setup complete!\n")
+
+ def _create_agents(self) -> Dict[str, BaselineAgent]:
+ """Create agent instances based on config."""
+ agents = {}
+
+ if self.config.run_no_memory:
+ agents["T1_NoMemory"] = NoMemoryAgent(
+ model_name=self.config.agent_model,
+ api_base=self.config.agent_api_base,
+ )
+
+ if self.config.run_rag:
+ # Create directories for RAG memory
+ memory_dir = os.path.join(self.config.output_dir, "rag_memory")
+ os.makedirs(memory_dir, exist_ok=True)
+
+ agents["Y3_RAG"] = RAGMemoryAgent(
+ model_name=self.config.agent_model,
+ mode="nopersonal",
+ memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"),
+ memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"),
+ )
+
+ if self.config.run_rag_uv:
+ memory_dir = os.path.join(self.config.output_dir, "rag_uv_memory")
+ os.makedirs(memory_dir, exist_ok=True)
+
+ agents["Y4_RAG_UV"] = RAGMemoryAgent(
+ model_name=self.config.agent_model,
+ mode="full",
+ memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"),
+ memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"),
+ enable_rl_updates=True,
+ )
+
+ return agents
+
+ def run(self) -> Dict[str, EvaluationMetrics]:
+ """
+ Run the full experiment.
+
+ Returns:
+ Dict mapping agent name to aggregated metrics
+ """
+ if self._evaluator is None:
+ self.setup()
+
+ agents = self._create_agents()
+ all_results: Dict[str, List[SessionResult]] = {name: [] for name in agents}
+
+ print("=" * 60)
+ print("Running experiment")
+ print("=" * 60)
+
+ # Run for each agent
+ for agent_name, agent in agents.items():
+ print(f"\n>>> Agent: {agent_name}")
+
+ # Run for each user
+ for profile in tqdm(self._profiles, desc=f"Users ({agent_name})"):
+ # Reset user state
+ agent.reset_user(profile.user_id)
+
+ # Get tasks for this user
+ # In demo, just cycle through available tasks
+ user_tasks = self._tasks[:self.config.tasks_per_user]
+
+ # Run sessions
+ for task in user_tasks:
+ result = self._evaluator.run_session(
+ agent=agent,
+ user_profile=profile,
+ task=task,
+ max_turns=self.config.max_turns,
+ )
+
+ all_results[agent_name].append(result)
+
+ # Print progress
+ status = "✓" if result.task_success else "✗"
+ print(f" {profile.user_id} | {task.task_id} | "
+ f"TS={status} | UE={result.user_effort} | Eff={result.efficiency}")
+
+ # Save raw results
+ for agent_name, results in all_results.items():
+ results_path = os.path.join(
+ self.config.output_dir,
+ f"results_{agent_name}.jsonl"
+ )
+ self._evaluator.save_results(results, results_path)
+
+ # Aggregate metrics
+ metrics = {}
+ for agent_name, results in all_results.items():
+ metrics[agent_name] = self._evaluator.aggregate_metrics(results, agent_name)
+
+ # Save and print summary
+ self._save_summary(metrics)
+ self._print_summary(metrics)
+
+ return metrics
+
+ def _save_summary(self, metrics: Dict[str, EvaluationMetrics]):
+ """Save experiment summary."""
+ summary = {
+ "experiment_name": self.config.name,
+ "timestamp": datetime.now().isoformat(),
+ "config": {
+ "num_users": self.config.num_users,
+ "prefs_per_user": self.config.prefs_per_user,
+ "tasks_per_user": self.config.tasks_per_user,
+ "max_turns": self.config.max_turns,
+ },
+ "metrics": {name: m.to_dict() for name, m in metrics.items()},
+ }
+
+ summary_path = os.path.join(self.config.output_dir, "summary.json")
+ with open(summary_path, "w", encoding="utf-8") as f:
+ json.dump(summary, f, indent=2, ensure_ascii=False)
+
+ print(f"\nSummary saved to {summary_path}")
+
+ def _print_summary(self, metrics: Dict[str, EvaluationMetrics]):
+ """Print experiment summary."""
+ print("\n" + "=" * 60)
+ print("EXPERIMENT SUMMARY")
+ print("=" * 60)
+
+ # Header
+ print(f"\n{'Agent':<20} {'TS ↑':>10} {'UE ↓':>10} {'Eff ↓':>10} {'Sessions':>10}")
+ print("-" * 60)
+
+ for agent_name, m in metrics.items():
+ print(f"{agent_name:<20} {m.avg_task_success:>10.2%} "
+ f"{m.avg_user_effort:>10.2f} {m.avg_efficiency:>10.1f} "
+ f"{m.num_sessions:>10}")
+
+ print("\n" + "=" * 60)
+
+
+def run_demo_experiment(output_dir: str = "data/eval/demo_experiment"):
+ """
+ Run a minimal demo experiment.
+
+ This is a quick sanity check with:
+ - 2 users
+ - 10 preferences per user
+ - 3 tasks per user
+ - T1 (NoMemory) vs Y3 (RAG) comparison
+ """
+ config = ExperimentConfig(
+ name="demo_experiment",
+ output_dir=output_dir,
+ num_users=2,
+ prefs_per_user=10,
+ tasks_per_user=3,
+ max_turns=15,
+ run_no_memory=True,
+ run_rag=True,
+ run_rag_uv=False,
+ )
+
+ runner = ExperimentRunner(config)
+ runner.setup()
+ metrics = runner.run()
+
+ return metrics
+
+
+if __name__ == "__main__":
+ import sys
+
+ output_dir = sys.argv[1] if len(sys.argv) > 1 else "data/eval/demo_experiment"
+ run_demo_experiment(output_dir)
+
+