summaryrefslogtreecommitdiff
path: root/collaborativeagents/adapters/personalized_llm_adapter.py
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-01-27 09:57:37 -0600
committerYurenHao0426 <blackhao0426@gmail.com>2026-01-27 09:57:37 -0600
commitdc801c07cf38b0c495686463e6ca6f871a64440e (patch)
tree599f03114775921dbc472403c701f4a3a8ea188a /collaborativeagents/adapters/personalized_llm_adapter.py
parente43b3f8aa36c198b95c1e46bea2eaf3893b13dc3 (diff)
Add collaborativeagents module and update gitignore
- Add collaborativeagents subproject with adapters, agents, and evaluation modules - Update .gitignore to exclude large binary files (.whl, .tar), wandb logs, and results Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'collaborativeagents/adapters/personalized_llm_adapter.py')
-rw-r--r--collaborativeagents/adapters/personalized_llm_adapter.py731
1 files changed, 731 insertions, 0 deletions
diff --git a/collaborativeagents/adapters/personalized_llm_adapter.py b/collaborativeagents/adapters/personalized_llm_adapter.py
new file mode 100644
index 0000000..c2d4727
--- /dev/null
+++ b/collaborativeagents/adapters/personalized_llm_adapter.py
@@ -0,0 +1,731 @@
+"""
+Adapter to integrate PersonalizedLLM with CollaborativeAgents benchmark.
+
+This adapter wraps PersonalizedLLM to work as a CollaboratorAgent in the
+MULTISESSIONCOLLAB framework while maintaining all personalization features.
+"""
+
+import sys
+import os
+from pathlib import Path
+from typing import Optional, List, Dict, Any
+from dataclasses import dataclass, field
+import json
+import numpy as np
+
+# Add paths
+_project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(_project_root / "src"))
+
+# Import from your personalization system
+from personalization.serving.personalized_llm import (
+ PersonalizedLLM,
+ AssistantResponse,
+ Feedback,
+ create_personalized_llm
+)
+
+
+@dataclass
+class AdapterConfig:
+ """Configuration for the PersonalizedLLM adapter."""
+ # PersonalizedLLM config
+ mode: str = "full" # "full", "nopersonal", "vanilla"
+ eval_mode: bool = True
+ enable_preference_extraction: bool = True
+ enable_rl_updates: bool = True
+ use_user_vector: bool = True # Whether to use user vector in policy scoring
+
+ # Paths (absolute paths to actual file locations in the repo)
+ # Note: Using empty_store to start fresh - RAG will accumulate memories during evaluation
+ user_store_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/users/collab_eval_store.npz"
+ memory_cards_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_cards.jsonl"
+ memory_embeddings_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_embeddings.npy"
+ item_projection_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/item_projection.npz"
+
+ # Multi-GPU assignment
+ device_assignment: Optional[Dict[str, str]] = None
+
+ # LLM backend selection
+ llm_name: str = "qwen_1_5b" # Use "llama_8b_vllm" for vLLM backend
+
+ # Shared model mode for multi-threaded efficiency
+ use_shared_models: bool = False # If True, share embedding/reranker across parallel workers
+
+ # Reranker selection: "qwen3" (8B) or "bge" (278M)
+ reranker_type: str = "qwen3"
+
+ # Best-of-N sampling: generate N responses and pick best (for RAG methods)
+ best_of_n: int = 1
+
+ # Reward mode: "keyword" (legacy heuristic) or "llm" (GPT-5-nano judge)
+ reward_mode: str = "keyword"
+
+ # Reward mapping for user behavior
+ preference_enforcement_reward: float = -0.8 # Negative reward when user enforces
+ disappointment_expression_reward: float = -0.4 # Milder negative for disappointment
+ positive_feedback_reward: float = 0.5 # When user expresses satisfaction
+ task_completion_reward: float = 1.0 # When task is solved correctly
+
+
+class PersonalizedLLMAdapter:
+ """
+ Adapter that wraps PersonalizedLLM for use in CollaborativeAgents.
+
+ This adapter:
+ 1. Translates CollaborativeAgents conversation format to PersonalizedLLM
+ 2. Converts user simulator signals to reward/gating for REINFORCE
+ 3. Tracks metrics for evaluation
+ 4. Supports all baseline modes
+ """
+
+ def __init__(self, config: AdapterConfig = None):
+ self.config = config or AdapterConfig()
+ self._llm: Optional[PersonalizedLLM] = None
+ self._initialized = False
+
+ # Session tracking
+ self._current_user_id: Optional[str] = None
+ self._turn_counter: int = 0
+ self._session_metrics: Dict[str, Any] = {}
+
+ # Metrics accumulation
+ self._total_enforcements: int = 0
+ self._total_disappointments: int = 0
+ self._total_turns: int = 0
+
+ def initialize(self):
+ """Initialize the PersonalizedLLM instance."""
+ if self._initialized:
+ return
+
+ shared_mode_str = " (shared models)" if self.config.use_shared_models else ""
+ print(f"[Adapter] Initializing PersonalizedLLM with LLM: {self.config.llm_name}{shared_mode_str}...")
+ self._llm = PersonalizedLLM(
+ mode=self.config.mode,
+ eval_mode=self.config.eval_mode,
+ enable_preference_extraction=self.config.enable_preference_extraction,
+ enable_rl_updates=self.config.enable_rl_updates,
+ user_store_path=self.config.user_store_path,
+ memory_cards_path=self.config.memory_cards_path,
+ memory_embeddings_path=self.config.memory_embeddings_path,
+ item_projection_path=self.config.item_projection_path,
+ device_assignment=self.config.device_assignment,
+ llm_name=self.config.llm_name,
+ use_shared_models=self.config.use_shared_models,
+ reranker_type=self.config.reranker_type,
+ best_of_n=self.config.best_of_n,
+ reward_mode=self.config.reward_mode,
+ )
+ self._initialized = True
+ print("[Adapter] Initialization complete.")
+
+ def start_session(self, user_id: str, user_profile: dict = None):
+ """
+ Start a new session for a user.
+
+ Args:
+ user_id: Unique user identifier
+ user_profile: Optional user profile with preferences (for ground truth)
+ """
+ if not self._initialized:
+ self.initialize()
+
+ self._current_user_id = user_id
+ self._turn_counter = 0
+ self._session_metrics = {
+ "user_id": user_id,
+ "enforcements": 0,
+ "disappointments": 0,
+ "turns": 0,
+ "rewards_applied": [],
+ }
+
+ # Reset session (keeps z_long, clears z_short and history)
+ self._llm.reset_session(user_id)
+
+ def generate_response(
+ self,
+ query: str,
+ conversation_history: List[Dict[str, str]] = None
+ ) -> Dict[str, Any]:
+ """
+ Generate a response using PersonalizedLLM.
+
+ Args:
+ query: Current user query
+ conversation_history: Previous conversation (for context, though
+ PersonalizedLLM tracks its own history)
+
+ Returns:
+ Dict with 'response', 'reasoning', and debug info
+ """
+ if not self._initialized:
+ self.initialize()
+
+ # Call PersonalizedLLM
+ result: AssistantResponse = self._llm.chat(self._current_user_id, query)
+
+ self._turn_counter += 1
+ self._session_metrics["turns"] = self._turn_counter
+
+ # Handle None result defensively
+ if result is None:
+ return {"response": "[Error: LLM returned None]", "reasoning": "", "debug": {}}
+
+ # Format response for CollaborativeAgents
+ answer = result.answer if result.answer else "[No answer generated]"
+ debug_info = result.debug if result.debug else None
+ usage_info = result.usage if result.usage else None
+
+ return {
+ "response": answer,
+ "reasoning": f"Retrieved {len(debug_info.selected_memory_notes) if debug_info else 0} memories",
+ "debug": {
+ "selected_memories": debug_info.selected_memory_notes if debug_info else [],
+ "memory_scores": debug_info.selected_memory_scores if debug_info else [],
+ "extracted_preferences": debug_info.extracted_preferences if debug_info else [],
+ "user_vector_norm": debug_info.extra.get("z_long_norm", 0) if debug_info and debug_info.extra else 0,
+ "usage": {
+ "prompt_tokens": usage_info.prompt_tokens if usage_info else 0,
+ "completion_tokens": usage_info.completion_tokens if usage_info else 0,
+ "total_tokens": usage_info.total_tokens if usage_info else 0,
+ } if usage_info else {}
+ }
+ }
+
+ def prepare_prompt(
+ self,
+ query: str,
+ conversation_history: List[Dict[str, str]] = None
+ ) -> tuple:
+ """
+ Prepare prompt for batch processing without calling LLM.
+
+ This method does all preparation (embedding, memory retrieval) and
+ returns messages for batched vLLM call.
+
+ Args:
+ query: Current user query
+ conversation_history: Previous conversation
+
+ Returns:
+ Tuple of (messages, context) where messages is ready for vLLM batch
+ and context is needed for process_response().
+ """
+ if not self._initialized:
+ self.initialize()
+
+ # Use chat_prepare from PersonalizedLLM
+ result = self._llm.chat_prepare(self._current_user_id, query)
+ return result["messages"], result["context"]
+
+ def process_response(
+ self,
+ response: str,
+ context: dict
+ ) -> Dict[str, Any]:
+ """
+ Process LLM response after batch call.
+
+ This method takes the LLM response and context from prepare_prompt(),
+ does post-processing, and returns the formatted result.
+
+ Args:
+ response: LLM response text from batched vLLM call
+ context: Context dict from prepare_prompt()
+
+ Returns:
+ Dict with 'response', 'reasoning', and debug info
+ """
+ # Use chat_complete from PersonalizedLLM
+ result: AssistantResponse = self._llm.chat_complete(response, context)
+
+ self._turn_counter += 1
+ self._session_metrics["turns"] = self._turn_counter
+
+ # Handle None result defensively
+ if result is None:
+ return {"response": "[Error: LLM returned None]", "reasoning": "", "debug": {}}
+
+ # Format response for CollaborativeAgents
+ answer = result.answer if result.answer else "[No answer generated]"
+ debug_info = result.debug if result.debug else None
+ usage_info = result.usage if result.usage else None
+
+ return {
+ "response": answer,
+ "reasoning": f"Retrieved {len(debug_info.selected_memory_notes) if debug_info else 0} memories",
+ "debug": {
+ "selected_memories": debug_info.selected_memory_notes if debug_info else [],
+ "memory_scores": debug_info.selected_memory_scores if debug_info else [],
+ "extracted_preferences": debug_info.extracted_preferences if debug_info else [],
+ "user_vector_norm": debug_info.extra.get("z_long_norm", 0) if debug_info and debug_info.extra else 0,
+ "usage": {
+ "prompt_tokens": usage_info.prompt_tokens if usage_info else 0,
+ "completion_tokens": usage_info.completion_tokens if usage_info else 0,
+ "total_tokens": usage_info.total_tokens if usage_info else 0,
+ } if usage_info else {}
+ }
+ }
+
+ def process_user_turn(
+ self,
+ user_response: str,
+ enforce_preferences: bool = False,
+ express_disappointment: bool = False,
+ express_satisfaction: bool = False,
+ draft_answer_updated: bool = False
+ ):
+ """
+ Process user turn and derive reward signal for REINFORCE.
+
+ Args:
+ user_response: The user's response text
+ enforce_preferences: Whether user explicitly enforced preferences
+ express_disappointment: Whether user expressed disappointment
+ express_satisfaction: Whether user expressed satisfaction
+ draft_answer_updated: Whether user updated their draft answer
+
+ This is called AFTER generate_response and BEFORE the next turn.
+ """
+ # Derive reward from user behavior
+ reward = 0.0
+ gating = 1.0 # Always apply (could be conditional)
+
+ if enforce_preferences:
+ reward = self.config.preference_enforcement_reward
+ self._session_metrics["enforcements"] += 1
+ self._total_enforcements += 1
+
+ elif express_disappointment:
+ reward = self.config.disappointment_expression_reward
+ self._session_metrics["disappointments"] += 1
+ self._total_disappointments += 1
+
+ elif express_satisfaction or draft_answer_updated:
+ reward = self.config.positive_feedback_reward
+
+ # Apply feedback to PersonalizedLLM
+ if self.config.enable_rl_updates and reward != 0.0:
+ feedback = Feedback(
+ user_id=self._current_user_id,
+ turn_id=self._turn_counter - 1,
+ reward=reward,
+ gating=gating,
+ meta={
+ "enforce": enforce_preferences,
+ "disappointment": express_disappointment,
+ "satisfaction": express_satisfaction,
+ }
+ )
+ self._llm.apply_feedback(feedback)
+ self._session_metrics["rewards_applied"].append(reward)
+
+ def end_session(self, task_success: bool = False) -> Dict[str, Any]:
+ """
+ End the current session and return metrics.
+
+ Args:
+ task_success: Whether the task was solved correctly
+
+ Returns:
+ Session metrics dictionary
+ """
+ # Apply final reward for task completion
+ if task_success and self.config.enable_rl_updates:
+ feedback = Feedback(
+ user_id=self._current_user_id,
+ turn_id=self._turn_counter,
+ reward=self.config.task_completion_reward,
+ gating=1.0,
+ meta={"task_success": True}
+ )
+ self._llm.apply_feedback(feedback)
+ self._session_metrics["rewards_applied"].append(
+ self.config.task_completion_reward
+ )
+
+ self._session_metrics["task_success"] = task_success
+ self._total_turns += self._turn_counter
+
+ return self._session_metrics.copy()
+
+ def reset_user(self, user_id: str):
+ """Completely reset a user (new experiment)."""
+ if self._initialized:
+ self._llm.reset_user(user_id)
+
+ def get_user_vector(self, user_id: str) -> Optional[np.ndarray]:
+ """Get the user's z_long vector for analysis."""
+ if not self._initialized:
+ return None
+
+ state = self._llm._user_store.get_state(user_id)
+ return state.z_long.copy()
+
+ def get_user_state_summary(self, user_id: str) -> Dict[str, Any]:
+ """Get summary of user state for analysis."""
+ if not self._initialized:
+ return {}
+
+ return self._llm.get_user_state_summary(user_id)
+
+ def persist(self):
+ """Save all state to disk."""
+ if self._initialized:
+ self._llm.persist()
+
+ # =========================================================================
+ # CollaborativeAgents Interface Methods
+ # =========================================================================
+
+ def __call__(
+ self,
+ messages: List[Dict[str, str]],
+ user_profile: dict = None,
+ **kwargs
+ ) -> str:
+ """
+ Callable interface for CollaborativeAgents ConversationGenerator.
+
+ Args:
+ messages: Conversation history in [{"role": "user/assistant", "content": "..."}]
+ user_profile: Optional user profile
+
+ Returns:
+ Response string
+ """
+ if not messages:
+ return "How can I help you?"
+
+ # Get the last user message
+ last_user_msg = None
+ for msg in reversed(messages):
+ if msg["role"] == "user":
+ last_user_msg = msg["content"]
+ break
+
+ if last_user_msg is None:
+ return "How can I help you?"
+
+ result = self.generate_response(last_user_msg, messages)
+ return result["response"]
+
+
+# =============================================================================
+# Baseline Adapter Factory
+# =============================================================================
+
+def create_baseline_adapter(
+ baseline_name: str,
+ device_assignment: dict = None,
+ use_vllm: bool = False,
+ use_shared_models: bool = False,
+ reward_mode: str = "keyword",
+) -> PersonalizedLLMAdapter:
+ """
+ Create an adapter configured for a specific baseline.
+
+ Args:
+ baseline_name: One of:
+ - "vanilla": No memory or personalization
+ - "contextual": Full history in context (truncate if overflow)
+ - "reflection": CollaborativeAgents' agent_notes approach
+ - "reflection_grpo": Reflection + GRPO training
+ - "all_memory": All extracted memories in context (no retrieval)
+ - "rag": Extractor + RAG (no user vector)
+ - "rag_vector": Full personalization (Extractor + RAG + User Vector)
+ device_assignment: GPU assignment dict
+ use_vllm: If True, use vLLM HTTP API for LLM inference (much faster)
+ reward_mode: Global reward mode ("keyword" or "llm") applied to all methods
+ use_shared_models: If True, share embedding/reranker models across parallel
+ workers. ESSENTIAL for parallel profile processing to avoid OOM.
+
+ Returns:
+ Configured adapter (PersonalizedLLMAdapter or baseline-specific adapter)
+ """
+ # Select LLM backend
+ llm_name = "llama_8b_vllm" if use_vllm else "llama_8b"
+ configs = {
+ # Baseline 1: Vanilla - no memory at all
+ "vanilla": AdapterConfig(
+ mode="vanilla",
+ enable_preference_extraction=False,
+ enable_rl_updates=False,
+ use_user_vector=False,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ ),
+ # Baseline 2: Contextual - full history in context
+ # This needs a separate adapter (ContextualAdapter)
+ "contextual": None, # Handled separately
+ # Baseline 3: Reflection - agent_notes mechanism
+ # This needs a separate adapter (ReflectionAdapter)
+ "reflection": None, # Handled separately
+ # Baseline 4: Reflection + GRPO
+ # This needs a separate adapter (ReflectionGRPOAdapter)
+ "reflection_grpo": None, # Handled separately
+ # Baseline 5: All memory in context (no retrieval)
+ "all_memory": AdapterConfig(
+ mode="nopersonal", # Uses all memories, no policy selection
+ enable_preference_extraction=True,
+ enable_rl_updates=False,
+ use_user_vector=False,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ ),
+ # Baseline 6: Extractor + RAG (no user vector)
+ # Use "nopersonal" mode for pure dense+rerank retrieval without user vector influence
+ # Device assignment: GPUs 2,3 for HF models (8B vLLM uses 40% memory, leaving room)
+ "rag": AdapterConfig(
+ mode="nopersonal",
+ enable_preference_extraction=True,
+ enable_rl_updates=False, # No RL updates
+ use_user_vector=False, # No user vector in policy
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 7: Full - Extractor + RAG + User Vector (proposed method)
+ # Device assignment: GPUs 2,3 for HF models (8B vLLM uses 40% memory, leaving room)
+ "rag_vector": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 8: RAG with BGE reranker (278M instead of 8B)
+ "rag_bge": AdapterConfig(
+ mode="nopersonal",
+ enable_preference_extraction=True,
+ enable_rl_updates=False,
+ use_user_vector=False,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ reranker_type="bge",
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 9: RAG + Vector with BGE reranker (278M instead of 8B)
+ "rag_vector_bge": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ reranker_type="bge",
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 10: RAG + Vector with best-of-3 sampling
+ "rag_vector_best3": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ best_of_n=3,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Legacy aliases
+ "nopersonal": AdapterConfig(
+ mode="nopersonal",
+ enable_preference_extraction=True,
+ enable_rl_updates=False,
+ use_user_vector=False,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ ),
+ "full": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ ),
+ }
+
+ if baseline_name not in configs:
+ raise ValueError(f"Unknown baseline: {baseline_name}. Choose from {list(configs.keys())}")
+
+ config = configs[baseline_name]
+
+ # Handle baselines that need separate adapters
+ if config is None:
+ if baseline_name == "contextual":
+ from .contextual_adapter import ContextualAdapter
+ return ContextualAdapter(device_assignment=device_assignment)
+ elif baseline_name == "reflection":
+ from .reflection_adapter import ReflectionAdapter
+ return ReflectionAdapter(device_assignment=device_assignment)
+ elif baseline_name == "reflection_grpo":
+ from .reflection_grpo_adapter import ReflectionGRPOAdapter
+ return ReflectionGRPOAdapter(device_assignment=device_assignment)
+ else:
+ raise ValueError(f"Baseline {baseline_name} not implemented yet")
+
+ if device_assignment:
+ config.device_assignment = device_assignment
+
+ # Apply global reward_mode to all methods (overrides per-method defaults)
+ config.reward_mode = reward_mode
+
+ return PersonalizedLLMAdapter(config)
+
+
+# =============================================================================
+# Integration with CollaborativeAgents ConversationGenerator
+# =============================================================================
+
+class PersonalizedCollaborator:
+ """
+ Drop-in replacement for CollaboratorAgent that uses PersonalizedLLM.
+
+ Compatible with ConversationGenerator.generate_conversation()
+ """
+
+ def __init__(
+ self,
+ adapter: PersonalizedLLMAdapter,
+ user_id: str,
+ user_profile: dict = None,
+ max_new_tokens: int = 1024
+ ):
+ self.adapter = adapter
+ self.user_id = user_id
+ self.user_profile = user_profile
+ self.max_new_tokens = max_new_tokens
+
+ # Start session
+ self.adapter.start_session(user_id, user_profile)
+
+ def generate(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
+ """
+ Generate response in CollaborativeAgents format.
+
+ Returns dict with 'reasoning' and 'response' keys.
+ """
+ # Extract last user message
+ last_user_msg = ""
+ for msg in reversed(messages):
+ if msg["role"] == "user":
+ last_user_msg = msg["content"]
+ break
+
+ # Check for preference enforcement in the user message
+ enforce_detected = self._detect_enforcement(last_user_msg)
+ disappointment_detected = self._detect_disappointment(last_user_msg)
+ satisfaction_detected = self._detect_satisfaction(last_user_msg)
+
+ # Process the previous turn's feedback (if any)
+ if len(messages) > 2: # Not the first turn
+ self.adapter.process_user_turn(
+ last_user_msg,
+ enforce_preferences=enforce_detected,
+ express_disappointment=disappointment_detected,
+ express_satisfaction=satisfaction_detected,
+ )
+
+ # Generate response
+ result = self.adapter.generate_response(last_user_msg, messages)
+
+ return {
+ "reasoning": result["reasoning"],
+ "response": result["response"],
+ "debug": result.get("debug", {})
+ }
+
+ def _detect_enforcement(self, text: str) -> bool:
+ """Detect if user is enforcing preferences."""
+ enforcement_phrases = [
+ "please use", "i asked for", "i prefer", "can you",
+ "instead of", "not what i wanted", "i said", "remember that",
+ "you should", "don't", "avoid", "stop"
+ ]
+ text_lower = text.lower()
+ return any(phrase in text_lower for phrase in enforcement_phrases)
+
+ def _detect_disappointment(self, text: str) -> bool:
+ """Detect expressions of disappointment."""
+ disappointment_phrases = [
+ "not quite", "that's not", "hmm", "not really",
+ "i was hoping", "could be better", "not exactly"
+ ]
+ text_lower = text.lower()
+ return any(phrase in text_lower for phrase in disappointment_phrases)
+
+ def _detect_satisfaction(self, text: str) -> bool:
+ """Detect expressions of satisfaction."""
+ satisfaction_phrases = [
+ "thanks", "perfect", "great", "exactly", "that's what i",
+ "helpful", "makes sense", "got it", "understand now"
+ ]
+ text_lower = text.lower()
+ return any(phrase in text_lower for phrase in satisfaction_phrases)
+
+ def end_session(self, task_success: bool) -> Dict[str, Any]:
+ """End session and get metrics."""
+ return self.adapter.end_session(task_success)
+
+
+# =============================================================================
+# Usage Example
+# =============================================================================
+
+if __name__ == "__main__":
+ # Example usage
+ adapter = create_baseline_adapter("full")
+ adapter.initialize()
+
+ # Simulate a session
+ user_id = "test_user_001"
+ adapter.start_session(user_id)
+
+ # First turn
+ response = adapter.generate_response("How do I implement quicksort?")
+ print(f"Response: {response['response'][:200]}...")
+
+ # User provides feedback (simulating enforcement)
+ adapter.process_user_turn(
+ "Can you use bullet points instead?",
+ enforce_preferences=True
+ )
+
+ # Second turn
+ response = adapter.generate_response("Can you use bullet points instead?")
+ print(f"Response: {response['response'][:200]}...")
+
+ # End session
+ metrics = adapter.end_session(task_success=True)
+ print(f"Session metrics: {metrics}")
+
+ # Get user vector for analysis
+ z_long = adapter.get_user_vector(user_id)
+ print(f"User vector norm: {np.linalg.norm(z_long):.4f}")
+
+ adapter.persist()