summaryrefslogtreecommitdiff
path: root/src/personalization/evaluation/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/personalization/evaluation/pipeline')
-rw-r--r--src/personalization/evaluation/pipeline/__init__.py6
-rw-r--r--src/personalization/evaluation/pipeline/evaluator.py353
-rw-r--r--src/personalization/evaluation/pipeline/runner.py333
3 files changed, 692 insertions, 0 deletions
diff --git a/src/personalization/evaluation/pipeline/__init__.py b/src/personalization/evaluation/pipeline/__init__.py
new file mode 100644
index 0000000..183d0c5
--- /dev/null
+++ b/src/personalization/evaluation/pipeline/__init__.py
@@ -0,0 +1,6 @@
+from .evaluator import Evaluator, SessionResult, EvaluationMetrics
+from .runner import ExperimentRunner
+
+__all__ = ["Evaluator", "SessionResult", "EvaluationMetrics", "ExperimentRunner"]
+
+
diff --git a/src/personalization/evaluation/pipeline/evaluator.py b/src/personalization/evaluation/pipeline/evaluator.py
new file mode 100644
index 0000000..7304400
--- /dev/null
+++ b/src/personalization/evaluation/pipeline/evaluator.py
@@ -0,0 +1,353 @@
+"""
+Evaluation Pipeline
+
+Runs evaluation sessions between user simulator and agents.
+Computes metrics: Task Success (TS), User Effort (UE), Efficiency (Eff).
+"""
+
+import json
+import os
+from dataclasses import dataclass, field, asdict
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+
+from ..profiles.generator import UserProfile
+from ..preference_bank.schemas import PreferenceBank
+from ..baselines.base import BaselineAgent
+from ..user_simulator.simulator import UserSimulator, UserSimulatorResponse
+
+
+@dataclass
+class Task:
+ """A problem/task for evaluation."""
+ task_id: str
+ dataset: str
+ problem: str
+ solution: str
+ task_description: str = "Work with the assistant to solve this problem:"
+
+
+@dataclass
+class SessionResult:
+ """Result of a single evaluation session."""
+ user_id: str
+ task_id: str
+ dataset: str
+ agent_name: str
+
+ # Metrics
+ task_success: bool # TS: Was the task solved correctly?
+ user_effort: int # UE: Number of preference enforcements
+ efficiency: int # Eff: Total number of messages
+
+ # Details
+ conversation: List[Dict[str, str]]
+ preference_violations: List[Dict[str, Any]]
+ final_draft_answer: str
+
+ # Debug
+ debug_info: Dict[str, Any] = field(default_factory=dict)
+ timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
+
+ def to_dict(self) -> Dict[str, Any]:
+ return asdict(self)
+
+
+@dataclass
+class EvaluationMetrics:
+ """Aggregated evaluation metrics."""
+ agent_name: str
+ num_sessions: int
+
+ # Average metrics
+ avg_task_success: float # Average TS
+ avg_user_effort: float # Average UE
+ avg_efficiency: float # Average Eff
+
+ # Breakdowns
+ task_success_by_dataset: Dict[str, float] = field(default_factory=dict)
+ user_effort_by_dataset: Dict[str, float] = field(default_factory=dict)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return asdict(self)
+
+
+class JudgeModel:
+ """
+ LLM judge for evaluating task success.
+ Uses the same approach as collaborativeagents.
+ """
+
+ def __init__(
+ self,
+ model_name: str = "Llama-3.3-70B-Instruct",
+ api_base: Optional[str] = None,
+ api_key: Optional[str] = None,
+ ):
+ self.model_name = model_name
+ self.api_base = api_base or os.getenv("JUDGE_API_BASE", "http://localhost:8004/v1")
+ self.api_key = api_key or os.getenv("JUDGE_API_KEY", "EMPTY")
+
+ self._init_client()
+
+ def _init_client(self):
+ try:
+ import openai
+ self.client = openai.OpenAI(
+ base_url=self.api_base,
+ api_key=self.api_key,
+ )
+ except Exception as e:
+ print(f"Warning: Could not initialize judge client: {e}")
+ self.client = None
+
+ def evaluate_answer(
+ self,
+ problem: str,
+ correct_answer: str,
+ user_draft_answer: str,
+ ) -> bool:
+ """
+ Evaluate if the user's draft answer is correct.
+
+ Returns:
+ True if answer is correct, False otherwise
+ """
+ prompt = f"""You are an expert evaluator. Determine if the user's answer is correct.
+
+# Problem
+{problem}
+
+# Correct Answer
+{correct_answer}
+
+# User's Answer
+{user_draft_answer}
+
+# Instructions
+Determine if the user's answer is accurate and consistent with the correct answer.
+Minor formatting differences are acceptable.
+The core answer/solution must match.
+
+Output JSON:
+{{
+ "reasoning": "Brief explanation",
+ "is_correct": true or false
+}}
+
+Output only valid JSON."""
+
+ if self.client is None:
+ # Fallback - simple string matching
+ return correct_answer.lower().strip() in user_draft_answer.lower()
+
+ try:
+ response = self.client.chat.completions.create(
+ model=self.model_name,
+ messages=[{"role": "user", "content": prompt}],
+ temperature=0.0,
+ max_tokens=256,
+ )
+
+ text = response.choices[0].message.content.strip()
+
+ # Parse JSON
+ if "```" in text:
+ text = text.split("```")[1]
+ if text.startswith("json"):
+ text = text[4:]
+
+ data = json.loads(text)
+ return data.get("is_correct", False)
+
+ except Exception as e:
+ print(f"Error in judge evaluation: {e}")
+ # Fallback
+ return correct_answer.lower().strip() in user_draft_answer.lower()
+
+
+class Evaluator:
+ """
+ Main evaluator that runs sessions and computes metrics.
+ """
+
+ def __init__(
+ self,
+ user_simulator: Optional[UserSimulator] = None,
+ judge: Optional[JudgeModel] = None,
+ ):
+ self.user_sim = user_simulator or UserSimulator()
+ self.judge = judge or JudgeModel()
+
+ def run_session(
+ self,
+ agent: BaselineAgent,
+ user_profile: UserProfile,
+ task: Task,
+ max_turns: int = 30,
+ ) -> SessionResult:
+ """
+ Run a single evaluation session.
+
+ Args:
+ agent: The agent being evaluated
+ user_profile: User with preferences
+ task: Task to solve
+ max_turns: Maximum conversation turns
+
+ Returns:
+ SessionResult with metrics and conversation
+ """
+ # Setup user simulator
+ self.user_sim.setup(
+ profile=user_profile,
+ task_description=task.task_description,
+ problem=task.problem,
+ solution=task.solution,
+ )
+
+ conversation: List[Dict[str, str]] = []
+ preference_violations: List[Dict[str, Any]] = []
+ user_effort = 0
+ final_draft_answer = "I don't know"
+
+ # Agent opens the conversation
+ conversation.append({
+ "role": "assistant",
+ "content": "How can I help you today?"
+ })
+
+ for turn in range(max_turns):
+ # User responds
+ user_response = self.user_sim.respond(conversation)
+
+ conversation.append({
+ "role": "user",
+ "content": user_response.response,
+ })
+
+ # Track preference violations and enforcement
+ violations_this_turn = [
+ {
+ "turn": turn,
+ "preference_id": check.preference_id,
+ "topic": check.topic,
+ "violation_detail": check.violation_detail,
+ }
+ for check in user_response.preference_checks
+ if check.relevant and check.satisfied == False
+ ]
+
+ if violations_this_turn:
+ preference_violations.extend(violations_this_turn)
+
+ if user_response.enforcement_needed:
+ user_effort += 1
+
+ final_draft_answer = user_response.draft_answer
+
+ # Check termination
+ if user_response.should_terminate or "TERMINATE" in user_response.response:
+ break
+
+ # Agent responds
+ agent_response = agent.respond(
+ user_id=user_profile.user_id,
+ query=user_response.response,
+ conversation_history=conversation,
+ )
+
+ conversation.append({
+ "role": "assistant",
+ "content": agent_response.answer,
+ })
+
+ # End session for agent (update memory, etc.)
+ agent.end_session(user_profile.user_id, conversation)
+
+ # Evaluate task success
+ task_success = self.judge.evaluate_answer(
+ problem=task.problem,
+ correct_answer=task.solution,
+ user_draft_answer=final_draft_answer,
+ )
+
+ return SessionResult(
+ user_id=user_profile.user_id,
+ task_id=task.task_id,
+ dataset=task.dataset,
+ agent_name=agent.get_name(),
+ task_success=task_success,
+ user_effort=user_effort,
+ efficiency=len(conversation),
+ conversation=conversation,
+ preference_violations=preference_violations,
+ final_draft_answer=final_draft_answer,
+ debug_info={
+ "num_turns": len(conversation) // 2,
+ "num_violations": len(preference_violations),
+ },
+ )
+
+ def aggregate_metrics(
+ self,
+ results: List[SessionResult],
+ agent_name: str,
+ ) -> EvaluationMetrics:
+ """
+ Aggregate metrics from multiple sessions.
+ """
+ if not results:
+ return EvaluationMetrics(
+ agent_name=agent_name,
+ num_sessions=0,
+ avg_task_success=0.0,
+ avg_user_effort=0.0,
+ avg_efficiency=0.0,
+ )
+
+ # Overall averages
+ avg_ts = sum(r.task_success for r in results) / len(results)
+ avg_ue = sum(r.user_effort for r in results) / len(results)
+ avg_eff = sum(r.efficiency for r in results) / len(results)
+
+ # By dataset
+ datasets = set(r.dataset for r in results)
+ ts_by_ds = {}
+ ue_by_ds = {}
+
+ for ds in datasets:
+ ds_results = [r for r in results if r.dataset == ds]
+ if ds_results:
+ ts_by_ds[ds] = sum(r.task_success for r in ds_results) / len(ds_results)
+ ue_by_ds[ds] = sum(r.user_effort for r in ds_results) / len(ds_results)
+
+ return EvaluationMetrics(
+ agent_name=agent_name,
+ num_sessions=len(results),
+ avg_task_success=avg_ts,
+ avg_user_effort=avg_ue,
+ avg_efficiency=avg_eff,
+ task_success_by_dataset=ts_by_ds,
+ user_effort_by_dataset=ue_by_ds,
+ )
+
+ def save_results(self, results: List[SessionResult], path: str):
+ """Save results to JSONL file."""
+ with open(path, "w", encoding="utf-8") as f:
+ for result in results:
+ f.write(json.dumps(result.to_dict(), ensure_ascii=False) + "\n")
+
+ @staticmethod
+ def load_results(path: str) -> List[SessionResult]:
+ """Load results from JSONL file."""
+ results = []
+ with open(path, "r", encoding="utf-8") as f:
+ for line in f:
+ if line.strip():
+ data = json.loads(line)
+ # Reconstruct SessionResult
+ results.append(SessionResult(**data))
+ return results
+
+
diff --git a/src/personalization/evaluation/pipeline/runner.py b/src/personalization/evaluation/pipeline/runner.py
new file mode 100644
index 0000000..9971c7b
--- /dev/null
+++ b/src/personalization/evaluation/pipeline/runner.py
@@ -0,0 +1,333 @@
+"""
+Experiment Runner
+
+Orchestrates the full evaluation experiment:
+1. Generate/load preference bank and user profiles
+2. Load datasets
+3. Run sessions for all users × tasks × agents
+4. Aggregate and report metrics
+"""
+
+import json
+import os
+from dataclasses import dataclass
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+from tqdm import tqdm
+
+from ..preference_bank.schemas import PreferenceBank
+from ..preference_bank.generator import generate_demo_bank
+from ..profiles.generator import UserProfile, UserProfileGenerator, generate_demo_profiles
+from ..baselines.base import BaselineAgent
+from ..baselines.no_memory import NoMemoryAgent
+from ..baselines.rag_memory import RAGMemoryAgent
+from ..user_simulator.simulator import UserSimulator
+from .evaluator import Evaluator, Task, SessionResult, EvaluationMetrics
+
+
+# Demo dataset: Simple math problems
+DEMO_TASKS = [
+ Task(
+ task_id="math_001",
+ dataset="math-demo",
+ problem="What is the derivative of f(x) = x^3 + 2x^2 - 5x + 3?",
+ solution="f'(x) = 3x^2 + 4x - 5",
+ task_description="Work with the assistant to solve this calculus problem:",
+ ),
+ Task(
+ task_id="math_002",
+ dataset="math-demo",
+ problem="Solve for x: 2x + 5 = 3x - 7",
+ solution="x = 12",
+ task_description="Work with the assistant to solve this algebra problem:",
+ ),
+ Task(
+ task_id="math_003",
+ dataset="math-demo",
+ problem="Find the area of a circle with radius 5.",
+ solution="A = 25π ≈ 78.54 square units",
+ task_description="Work with the assistant to solve this geometry problem:",
+ ),
+ Task(
+ task_id="code_001",
+ dataset="code-demo",
+ problem="Write a Python function that checks if a string is a palindrome.",
+ solution="def is_palindrome(s): return s == s[::-1]",
+ task_description="Work with the assistant to write this Python function:",
+ ),
+ Task(
+ task_id="code_002",
+ dataset="code-demo",
+ problem="Write a function to find the nth Fibonacci number.",
+ solution="def fib(n): return n if n <= 1 else fib(n-1) + fib(n-2)",
+ task_description="Work with the assistant to implement this algorithm:",
+ ),
+]
+
+
+@dataclass
+class ExperimentConfig:
+ """Configuration for an experiment run."""
+ name: str
+ output_dir: str
+
+ # Scale
+ num_users: int = 2
+ prefs_per_user: int = 10
+ tasks_per_user: int = 3
+ max_turns: int = 25
+
+ # Baselines to run
+ run_no_memory: bool = True
+ run_rag: bool = True
+ run_rag_uv: bool = False # User vector mode
+
+ # Model configs
+ agent_model: str = "llama-8b"
+ user_sim_model: str = "Llama-3.3-70B-Instruct"
+ judge_model: str = "Llama-3.3-70B-Instruct"
+
+ # API endpoints
+ agent_api_base: str = "http://localhost:8003/v1"
+ user_sim_api_base: str = "http://localhost:8004/v1"
+
+ seed: int = 42
+
+
+class ExperimentRunner:
+ """
+ Runs a complete evaluation experiment.
+ """
+
+ def __init__(self, config: ExperimentConfig):
+ self.config = config
+
+ # Create output directory
+ os.makedirs(config.output_dir, exist_ok=True)
+
+ # Will be initialized lazily
+ self._bank: Optional[PreferenceBank] = None
+ self._profiles: Optional[List[UserProfile]] = None
+ self._tasks: Optional[List[Task]] = None
+ self._evaluator: Optional[Evaluator] = None
+
+ def setup(self):
+ """Initialize all components."""
+ print("=" * 60)
+ print(f"Setting up experiment: {self.config.name}")
+ print("=" * 60)
+
+ # 1. Generate/load preference bank
+ bank_path = os.path.join(self.config.output_dir, "preference_bank.json")
+ if os.path.exists(bank_path):
+ print(f"Loading existing preference bank from {bank_path}")
+ self._bank = PreferenceBank.load(bank_path)
+ else:
+ print("Generating new preference bank...")
+ self._bank = generate_demo_bank(output_path=bank_path, use_llm=False)
+
+ print(f" Bank stats: {self._bank.stats()}")
+
+ # 2. Generate/load user profiles
+ profiles_path = os.path.join(self.config.output_dir, "user_profiles.json")
+ if os.path.exists(profiles_path):
+ print(f"Loading existing profiles from {profiles_path}")
+ self._profiles = UserProfileGenerator.load_profiles(profiles_path)
+ else:
+ print(f"Generating {self.config.num_users} user profiles...")
+ self._profiles = generate_demo_profiles(
+ bank=self._bank,
+ num_users=self.config.num_users,
+ prefs_per_user=self.config.prefs_per_user,
+ output_path=profiles_path,
+ seed=self.config.seed,
+ )
+
+ # 3. Load tasks
+ self._tasks = DEMO_TASKS[:self.config.tasks_per_user * 2] # Use demo tasks
+ print(f" Loaded {len(self._tasks)} tasks")
+
+ # 4. Initialize evaluator
+ user_sim = UserSimulator(
+ model_name=self.config.user_sim_model,
+ api_base=self.config.user_sim_api_base,
+ )
+ self._evaluator = Evaluator(user_simulator=user_sim)
+
+ print("Setup complete!\n")
+
+ def _create_agents(self) -> Dict[str, BaselineAgent]:
+ """Create agent instances based on config."""
+ agents = {}
+
+ if self.config.run_no_memory:
+ agents["T1_NoMemory"] = NoMemoryAgent(
+ model_name=self.config.agent_model,
+ api_base=self.config.agent_api_base,
+ )
+
+ if self.config.run_rag:
+ # Create directories for RAG memory
+ memory_dir = os.path.join(self.config.output_dir, "rag_memory")
+ os.makedirs(memory_dir, exist_ok=True)
+
+ agents["Y3_RAG"] = RAGMemoryAgent(
+ model_name=self.config.agent_model,
+ mode="nopersonal",
+ memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"),
+ memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"),
+ )
+
+ if self.config.run_rag_uv:
+ memory_dir = os.path.join(self.config.output_dir, "rag_uv_memory")
+ os.makedirs(memory_dir, exist_ok=True)
+
+ agents["Y4_RAG_UV"] = RAGMemoryAgent(
+ model_name=self.config.agent_model,
+ mode="full",
+ memory_cards_path=os.path.join(memory_dir, "memory_cards.jsonl"),
+ memory_embeddings_path=os.path.join(memory_dir, "embeddings.npy"),
+ enable_rl_updates=True,
+ )
+
+ return agents
+
+ def run(self) -> Dict[str, EvaluationMetrics]:
+ """
+ Run the full experiment.
+
+ Returns:
+ Dict mapping agent name to aggregated metrics
+ """
+ if self._evaluator is None:
+ self.setup()
+
+ agents = self._create_agents()
+ all_results: Dict[str, List[SessionResult]] = {name: [] for name in agents}
+
+ print("=" * 60)
+ print("Running experiment")
+ print("=" * 60)
+
+ # Run for each agent
+ for agent_name, agent in agents.items():
+ print(f"\n>>> Agent: {agent_name}")
+
+ # Run for each user
+ for profile in tqdm(self._profiles, desc=f"Users ({agent_name})"):
+ # Reset user state
+ agent.reset_user(profile.user_id)
+
+ # Get tasks for this user
+ # In demo, just cycle through available tasks
+ user_tasks = self._tasks[:self.config.tasks_per_user]
+
+ # Run sessions
+ for task in user_tasks:
+ result = self._evaluator.run_session(
+ agent=agent,
+ user_profile=profile,
+ task=task,
+ max_turns=self.config.max_turns,
+ )
+
+ all_results[agent_name].append(result)
+
+ # Print progress
+ status = "✓" if result.task_success else "✗"
+ print(f" {profile.user_id} | {task.task_id} | "
+ f"TS={status} | UE={result.user_effort} | Eff={result.efficiency}")
+
+ # Save raw results
+ for agent_name, results in all_results.items():
+ results_path = os.path.join(
+ self.config.output_dir,
+ f"results_{agent_name}.jsonl"
+ )
+ self._evaluator.save_results(results, results_path)
+
+ # Aggregate metrics
+ metrics = {}
+ for agent_name, results in all_results.items():
+ metrics[agent_name] = self._evaluator.aggregate_metrics(results, agent_name)
+
+ # Save and print summary
+ self._save_summary(metrics)
+ self._print_summary(metrics)
+
+ return metrics
+
+ def _save_summary(self, metrics: Dict[str, EvaluationMetrics]):
+ """Save experiment summary."""
+ summary = {
+ "experiment_name": self.config.name,
+ "timestamp": datetime.now().isoformat(),
+ "config": {
+ "num_users": self.config.num_users,
+ "prefs_per_user": self.config.prefs_per_user,
+ "tasks_per_user": self.config.tasks_per_user,
+ "max_turns": self.config.max_turns,
+ },
+ "metrics": {name: m.to_dict() for name, m in metrics.items()},
+ }
+
+ summary_path = os.path.join(self.config.output_dir, "summary.json")
+ with open(summary_path, "w", encoding="utf-8") as f:
+ json.dump(summary, f, indent=2, ensure_ascii=False)
+
+ print(f"\nSummary saved to {summary_path}")
+
+ def _print_summary(self, metrics: Dict[str, EvaluationMetrics]):
+ """Print experiment summary."""
+ print("\n" + "=" * 60)
+ print("EXPERIMENT SUMMARY")
+ print("=" * 60)
+
+ # Header
+ print(f"\n{'Agent':<20} {'TS ↑':>10} {'UE ↓':>10} {'Eff ↓':>10} {'Sessions':>10}")
+ print("-" * 60)
+
+ for agent_name, m in metrics.items():
+ print(f"{agent_name:<20} {m.avg_task_success:>10.2%} "
+ f"{m.avg_user_effort:>10.2f} {m.avg_efficiency:>10.1f} "
+ f"{m.num_sessions:>10}")
+
+ print("\n" + "=" * 60)
+
+
+def run_demo_experiment(output_dir: str = "data/eval/demo_experiment"):
+ """
+ Run a minimal demo experiment.
+
+ This is a quick sanity check with:
+ - 2 users
+ - 10 preferences per user
+ - 3 tasks per user
+ - T1 (NoMemory) vs Y3 (RAG) comparison
+ """
+ config = ExperimentConfig(
+ name="demo_experiment",
+ output_dir=output_dir,
+ num_users=2,
+ prefs_per_user=10,
+ tasks_per_user=3,
+ max_turns=15,
+ run_no_memory=True,
+ run_rag=True,
+ run_rag_uv=False,
+ )
+
+ runner = ExperimentRunner(config)
+ runner.setup()
+ metrics = runner.run()
+
+ return metrics
+
+
+if __name__ == "__main__":
+ import sys
+
+ output_dir = sys.argv[1] if len(sys.argv) > 1 else "data/eval/demo_experiment"
+ run_demo_experiment(output_dir)
+
+