summaryrefslogtreecommitdiff
path: root/src/personalization/evaluation/pipeline/evaluator.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/personalization/evaluation/pipeline/evaluator.py')
-rw-r--r--src/personalization/evaluation/pipeline/evaluator.py353
1 files changed, 353 insertions, 0 deletions
diff --git a/src/personalization/evaluation/pipeline/evaluator.py b/src/personalization/evaluation/pipeline/evaluator.py
new file mode 100644
index 0000000..7304400
--- /dev/null
+++ b/src/personalization/evaluation/pipeline/evaluator.py
@@ -0,0 +1,353 @@
+"""
+Evaluation Pipeline
+
+Runs evaluation sessions between user simulator and agents.
+Computes metrics: Task Success (TS), User Effort (UE), Efficiency (Eff).
+"""
+
+import json
+import os
+from dataclasses import dataclass, field, asdict
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+
+from ..profiles.generator import UserProfile
+from ..preference_bank.schemas import PreferenceBank
+from ..baselines.base import BaselineAgent
+from ..user_simulator.simulator import UserSimulator, UserSimulatorResponse
+
+
+@dataclass
+class Task:
+ """A problem/task for evaluation."""
+ task_id: str
+ dataset: str
+ problem: str
+ solution: str
+ task_description: str = "Work with the assistant to solve this problem:"
+
+
+@dataclass
+class SessionResult:
+ """Result of a single evaluation session."""
+ user_id: str
+ task_id: str
+ dataset: str
+ agent_name: str
+
+ # Metrics
+ task_success: bool # TS: Was the task solved correctly?
+ user_effort: int # UE: Number of preference enforcements
+ efficiency: int # Eff: Total number of messages
+
+ # Details
+ conversation: List[Dict[str, str]]
+ preference_violations: List[Dict[str, Any]]
+ final_draft_answer: str
+
+ # Debug
+ debug_info: Dict[str, Any] = field(default_factory=dict)
+ timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
+
+ def to_dict(self) -> Dict[str, Any]:
+ return asdict(self)
+
+
+@dataclass
+class EvaluationMetrics:
+ """Aggregated evaluation metrics."""
+ agent_name: str
+ num_sessions: int
+
+ # Average metrics
+ avg_task_success: float # Average TS
+ avg_user_effort: float # Average UE
+ avg_efficiency: float # Average Eff
+
+ # Breakdowns
+ task_success_by_dataset: Dict[str, float] = field(default_factory=dict)
+ user_effort_by_dataset: Dict[str, float] = field(default_factory=dict)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return asdict(self)
+
+
+class JudgeModel:
+ """
+ LLM judge for evaluating task success.
+ Uses the same approach as collaborativeagents.
+ """
+
+ def __init__(
+ self,
+ model_name: str = "Llama-3.3-70B-Instruct",
+ api_base: Optional[str] = None,
+ api_key: Optional[str] = None,
+ ):
+ self.model_name = model_name
+ self.api_base = api_base or os.getenv("JUDGE_API_BASE", "http://localhost:8004/v1")
+ self.api_key = api_key or os.getenv("JUDGE_API_KEY", "EMPTY")
+
+ self._init_client()
+
+ def _init_client(self):
+ try:
+ import openai
+ self.client = openai.OpenAI(
+ base_url=self.api_base,
+ api_key=self.api_key,
+ )
+ except Exception as e:
+ print(f"Warning: Could not initialize judge client: {e}")
+ self.client = None
+
+ def evaluate_answer(
+ self,
+ problem: str,
+ correct_answer: str,
+ user_draft_answer: str,
+ ) -> bool:
+ """
+ Evaluate if the user's draft answer is correct.
+
+ Returns:
+ True if answer is correct, False otherwise
+ """
+ prompt = f"""You are an expert evaluator. Determine if the user's answer is correct.
+
+# Problem
+{problem}
+
+# Correct Answer
+{correct_answer}
+
+# User's Answer
+{user_draft_answer}
+
+# Instructions
+Determine if the user's answer is accurate and consistent with the correct answer.
+Minor formatting differences are acceptable.
+The core answer/solution must match.
+
+Output JSON:
+{{
+ "reasoning": "Brief explanation",
+ "is_correct": true or false
+}}
+
+Output only valid JSON."""
+
+ if self.client is None:
+ # Fallback - simple string matching
+ return correct_answer.lower().strip() in user_draft_answer.lower()
+
+ try:
+ response = self.client.chat.completions.create(
+ model=self.model_name,
+ messages=[{"role": "user", "content": prompt}],
+ temperature=0.0,
+ max_tokens=256,
+ )
+
+ text = response.choices[0].message.content.strip()
+
+ # Parse JSON
+ if "```" in text:
+ text = text.split("```")[1]
+ if text.startswith("json"):
+ text = text[4:]
+
+ data = json.loads(text)
+ return data.get("is_correct", False)
+
+ except Exception as e:
+ print(f"Error in judge evaluation: {e}")
+ # Fallback
+ return correct_answer.lower().strip() in user_draft_answer.lower()
+
+
+class Evaluator:
+ """
+ Main evaluator that runs sessions and computes metrics.
+ """
+
+ def __init__(
+ self,
+ user_simulator: Optional[UserSimulator] = None,
+ judge: Optional[JudgeModel] = None,
+ ):
+ self.user_sim = user_simulator or UserSimulator()
+ self.judge = judge or JudgeModel()
+
+ def run_session(
+ self,
+ agent: BaselineAgent,
+ user_profile: UserProfile,
+ task: Task,
+ max_turns: int = 30,
+ ) -> SessionResult:
+ """
+ Run a single evaluation session.
+
+ Args:
+ agent: The agent being evaluated
+ user_profile: User with preferences
+ task: Task to solve
+ max_turns: Maximum conversation turns
+
+ Returns:
+ SessionResult with metrics and conversation
+ """
+ # Setup user simulator
+ self.user_sim.setup(
+ profile=user_profile,
+ task_description=task.task_description,
+ problem=task.problem,
+ solution=task.solution,
+ )
+
+ conversation: List[Dict[str, str]] = []
+ preference_violations: List[Dict[str, Any]] = []
+ user_effort = 0
+ final_draft_answer = "I don't know"
+
+ # Agent opens the conversation
+ conversation.append({
+ "role": "assistant",
+ "content": "How can I help you today?"
+ })
+
+ for turn in range(max_turns):
+ # User responds
+ user_response = self.user_sim.respond(conversation)
+
+ conversation.append({
+ "role": "user",
+ "content": user_response.response,
+ })
+
+ # Track preference violations and enforcement
+ violations_this_turn = [
+ {
+ "turn": turn,
+ "preference_id": check.preference_id,
+ "topic": check.topic,
+ "violation_detail": check.violation_detail,
+ }
+ for check in user_response.preference_checks
+ if check.relevant and check.satisfied == False
+ ]
+
+ if violations_this_turn:
+ preference_violations.extend(violations_this_turn)
+
+ if user_response.enforcement_needed:
+ user_effort += 1
+
+ final_draft_answer = user_response.draft_answer
+
+ # Check termination
+ if user_response.should_terminate or "TERMINATE" in user_response.response:
+ break
+
+ # Agent responds
+ agent_response = agent.respond(
+ user_id=user_profile.user_id,
+ query=user_response.response,
+ conversation_history=conversation,
+ )
+
+ conversation.append({
+ "role": "assistant",
+ "content": agent_response.answer,
+ })
+
+ # End session for agent (update memory, etc.)
+ agent.end_session(user_profile.user_id, conversation)
+
+ # Evaluate task success
+ task_success = self.judge.evaluate_answer(
+ problem=task.problem,
+ correct_answer=task.solution,
+ user_draft_answer=final_draft_answer,
+ )
+
+ return SessionResult(
+ user_id=user_profile.user_id,
+ task_id=task.task_id,
+ dataset=task.dataset,
+ agent_name=agent.get_name(),
+ task_success=task_success,
+ user_effort=user_effort,
+ efficiency=len(conversation),
+ conversation=conversation,
+ preference_violations=preference_violations,
+ final_draft_answer=final_draft_answer,
+ debug_info={
+ "num_turns": len(conversation) // 2,
+ "num_violations": len(preference_violations),
+ },
+ )
+
+ def aggregate_metrics(
+ self,
+ results: List[SessionResult],
+ agent_name: str,
+ ) -> EvaluationMetrics:
+ """
+ Aggregate metrics from multiple sessions.
+ """
+ if not results:
+ return EvaluationMetrics(
+ agent_name=agent_name,
+ num_sessions=0,
+ avg_task_success=0.0,
+ avg_user_effort=0.0,
+ avg_efficiency=0.0,
+ )
+
+ # Overall averages
+ avg_ts = sum(r.task_success for r in results) / len(results)
+ avg_ue = sum(r.user_effort for r in results) / len(results)
+ avg_eff = sum(r.efficiency for r in results) / len(results)
+
+ # By dataset
+ datasets = set(r.dataset for r in results)
+ ts_by_ds = {}
+ ue_by_ds = {}
+
+ for ds in datasets:
+ ds_results = [r for r in results if r.dataset == ds]
+ if ds_results:
+ ts_by_ds[ds] = sum(r.task_success for r in ds_results) / len(ds_results)
+ ue_by_ds[ds] = sum(r.user_effort for r in ds_results) / len(ds_results)
+
+ return EvaluationMetrics(
+ agent_name=agent_name,
+ num_sessions=len(results),
+ avg_task_success=avg_ts,
+ avg_user_effort=avg_ue,
+ avg_efficiency=avg_eff,
+ task_success_by_dataset=ts_by_ds,
+ user_effort_by_dataset=ue_by_ds,
+ )
+
+ def save_results(self, results: List[SessionResult], path: str):
+ """Save results to JSONL file."""
+ with open(path, "w", encoding="utf-8") as f:
+ for result in results:
+ f.write(json.dumps(result.to_dict(), ensure_ascii=False) + "\n")
+
+ @staticmethod
+ def load_results(path: str) -> List[SessionResult]:
+ """Load results from JSONL file."""
+ results = []
+ with open(path, "r", encoding="utf-8") as f:
+ for line in f:
+ if line.strip():
+ data = json.loads(line)
+ # Reconstruct SessionResult
+ results.append(SessionResult(**data))
+ return results
+
+