From dc801c07cf38b0c495686463e6ca6f871a64440e Mon Sep 17 00:00:00 2001 From: YurenHao0426 Date: Tue, 27 Jan 2026 09:57:37 -0600 Subject: Add collaborativeagents module and update gitignore - Add collaborativeagents subproject with adapters, agents, and evaluation modules - Update .gitignore to exclude large binary files (.whl, .tar), wandb logs, and results Co-Authored-By: Claude Opus 4.5 --- .../adapters/personalized_llm_adapter.py | 731 +++++++++++++++++++++ 1 file changed, 731 insertions(+) create mode 100644 collaborativeagents/adapters/personalized_llm_adapter.py (limited to 'collaborativeagents/adapters/personalized_llm_adapter.py') diff --git a/collaborativeagents/adapters/personalized_llm_adapter.py b/collaborativeagents/adapters/personalized_llm_adapter.py new file mode 100644 index 0000000..c2d4727 --- /dev/null +++ b/collaborativeagents/adapters/personalized_llm_adapter.py @@ -0,0 +1,731 @@ +""" +Adapter to integrate PersonalizedLLM with CollaborativeAgents benchmark. + +This adapter wraps PersonalizedLLM to work as a CollaboratorAgent in the +MULTISESSIONCOLLAB framework while maintaining all personalization features. +""" + +import sys +import os +from pathlib import Path +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field +import json +import numpy as np + +# Add paths +_project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(_project_root / "src")) + +# Import from your personalization system +from personalization.serving.personalized_llm import ( + PersonalizedLLM, + AssistantResponse, + Feedback, + create_personalized_llm +) + + +@dataclass +class AdapterConfig: + """Configuration for the PersonalizedLLM adapter.""" + # PersonalizedLLM config + mode: str = "full" # "full", "nopersonal", "vanilla" + eval_mode: bool = True + enable_preference_extraction: bool = True + enable_rl_updates: bool = True + use_user_vector: bool = True # Whether to use user vector in policy scoring + + # Paths (absolute paths to actual file locations in the repo) + # Note: Using empty_store to start fresh - RAG will accumulate memories during evaluation + user_store_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/users/collab_eval_store.npz" + memory_cards_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_cards.jsonl" + memory_embeddings_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_embeddings.npy" + item_projection_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/item_projection.npz" + + # Multi-GPU assignment + device_assignment: Optional[Dict[str, str]] = None + + # LLM backend selection + llm_name: str = "qwen_1_5b" # Use "llama_8b_vllm" for vLLM backend + + # Shared model mode for multi-threaded efficiency + use_shared_models: bool = False # If True, share embedding/reranker across parallel workers + + # Reranker selection: "qwen3" (8B) or "bge" (278M) + reranker_type: str = "qwen3" + + # Best-of-N sampling: generate N responses and pick best (for RAG methods) + best_of_n: int = 1 + + # Reward mode: "keyword" (legacy heuristic) or "llm" (GPT-5-nano judge) + reward_mode: str = "keyword" + + # Reward mapping for user behavior + preference_enforcement_reward: float = -0.8 # Negative reward when user enforces + disappointment_expression_reward: float = -0.4 # Milder negative for disappointment + positive_feedback_reward: float = 0.5 # When user expresses satisfaction + task_completion_reward: float = 1.0 # When task is solved correctly + + +class PersonalizedLLMAdapter: + """ + Adapter that wraps PersonalizedLLM for use in CollaborativeAgents. + + This adapter: + 1. Translates CollaborativeAgents conversation format to PersonalizedLLM + 2. Converts user simulator signals to reward/gating for REINFORCE + 3. Tracks metrics for evaluation + 4. Supports all baseline modes + """ + + def __init__(self, config: AdapterConfig = None): + self.config = config or AdapterConfig() + self._llm: Optional[PersonalizedLLM] = None + self._initialized = False + + # Session tracking + self._current_user_id: Optional[str] = None + self._turn_counter: int = 0 + self._session_metrics: Dict[str, Any] = {} + + # Metrics accumulation + self._total_enforcements: int = 0 + self._total_disappointments: int = 0 + self._total_turns: int = 0 + + def initialize(self): + """Initialize the PersonalizedLLM instance.""" + if self._initialized: + return + + shared_mode_str = " (shared models)" if self.config.use_shared_models else "" + print(f"[Adapter] Initializing PersonalizedLLM with LLM: {self.config.llm_name}{shared_mode_str}...") + self._llm = PersonalizedLLM( + mode=self.config.mode, + eval_mode=self.config.eval_mode, + enable_preference_extraction=self.config.enable_preference_extraction, + enable_rl_updates=self.config.enable_rl_updates, + user_store_path=self.config.user_store_path, + memory_cards_path=self.config.memory_cards_path, + memory_embeddings_path=self.config.memory_embeddings_path, + item_projection_path=self.config.item_projection_path, + device_assignment=self.config.device_assignment, + llm_name=self.config.llm_name, + use_shared_models=self.config.use_shared_models, + reranker_type=self.config.reranker_type, + best_of_n=self.config.best_of_n, + reward_mode=self.config.reward_mode, + ) + self._initialized = True + print("[Adapter] Initialization complete.") + + def start_session(self, user_id: str, user_profile: dict = None): + """ + Start a new session for a user. + + Args: + user_id: Unique user identifier + user_profile: Optional user profile with preferences (for ground truth) + """ + if not self._initialized: + self.initialize() + + self._current_user_id = user_id + self._turn_counter = 0 + self._session_metrics = { + "user_id": user_id, + "enforcements": 0, + "disappointments": 0, + "turns": 0, + "rewards_applied": [], + } + + # Reset session (keeps z_long, clears z_short and history) + self._llm.reset_session(user_id) + + def generate_response( + self, + query: str, + conversation_history: List[Dict[str, str]] = None + ) -> Dict[str, Any]: + """ + Generate a response using PersonalizedLLM. + + Args: + query: Current user query + conversation_history: Previous conversation (for context, though + PersonalizedLLM tracks its own history) + + Returns: + Dict with 'response', 'reasoning', and debug info + """ + if not self._initialized: + self.initialize() + + # Call PersonalizedLLM + result: AssistantResponse = self._llm.chat(self._current_user_id, query) + + self._turn_counter += 1 + self._session_metrics["turns"] = self._turn_counter + + # Handle None result defensively + if result is None: + return {"response": "[Error: LLM returned None]", "reasoning": "", "debug": {}} + + # Format response for CollaborativeAgents + answer = result.answer if result.answer else "[No answer generated]" + debug_info = result.debug if result.debug else None + usage_info = result.usage if result.usage else None + + return { + "response": answer, + "reasoning": f"Retrieved {len(debug_info.selected_memory_notes) if debug_info else 0} memories", + "debug": { + "selected_memories": debug_info.selected_memory_notes if debug_info else [], + "memory_scores": debug_info.selected_memory_scores if debug_info else [], + "extracted_preferences": debug_info.extracted_preferences if debug_info else [], + "user_vector_norm": debug_info.extra.get("z_long_norm", 0) if debug_info and debug_info.extra else 0, + "usage": { + "prompt_tokens": usage_info.prompt_tokens if usage_info else 0, + "completion_tokens": usage_info.completion_tokens if usage_info else 0, + "total_tokens": usage_info.total_tokens if usage_info else 0, + } if usage_info else {} + } + } + + def prepare_prompt( + self, + query: str, + conversation_history: List[Dict[str, str]] = None + ) -> tuple: + """ + Prepare prompt for batch processing without calling LLM. + + This method does all preparation (embedding, memory retrieval) and + returns messages for batched vLLM call. + + Args: + query: Current user query + conversation_history: Previous conversation + + Returns: + Tuple of (messages, context) where messages is ready for vLLM batch + and context is needed for process_response(). + """ + if not self._initialized: + self.initialize() + + # Use chat_prepare from PersonalizedLLM + result = self._llm.chat_prepare(self._current_user_id, query) + return result["messages"], result["context"] + + def process_response( + self, + response: str, + context: dict + ) -> Dict[str, Any]: + """ + Process LLM response after batch call. + + This method takes the LLM response and context from prepare_prompt(), + does post-processing, and returns the formatted result. + + Args: + response: LLM response text from batched vLLM call + context: Context dict from prepare_prompt() + + Returns: + Dict with 'response', 'reasoning', and debug info + """ + # Use chat_complete from PersonalizedLLM + result: AssistantResponse = self._llm.chat_complete(response, context) + + self._turn_counter += 1 + self._session_metrics["turns"] = self._turn_counter + + # Handle None result defensively + if result is None: + return {"response": "[Error: LLM returned None]", "reasoning": "", "debug": {}} + + # Format response for CollaborativeAgents + answer = result.answer if result.answer else "[No answer generated]" + debug_info = result.debug if result.debug else None + usage_info = result.usage if result.usage else None + + return { + "response": answer, + "reasoning": f"Retrieved {len(debug_info.selected_memory_notes) if debug_info else 0} memories", + "debug": { + "selected_memories": debug_info.selected_memory_notes if debug_info else [], + "memory_scores": debug_info.selected_memory_scores if debug_info else [], + "extracted_preferences": debug_info.extracted_preferences if debug_info else [], + "user_vector_norm": debug_info.extra.get("z_long_norm", 0) if debug_info and debug_info.extra else 0, + "usage": { + "prompt_tokens": usage_info.prompt_tokens if usage_info else 0, + "completion_tokens": usage_info.completion_tokens if usage_info else 0, + "total_tokens": usage_info.total_tokens if usage_info else 0, + } if usage_info else {} + } + } + + def process_user_turn( + self, + user_response: str, + enforce_preferences: bool = False, + express_disappointment: bool = False, + express_satisfaction: bool = False, + draft_answer_updated: bool = False + ): + """ + Process user turn and derive reward signal for REINFORCE. + + Args: + user_response: The user's response text + enforce_preferences: Whether user explicitly enforced preferences + express_disappointment: Whether user expressed disappointment + express_satisfaction: Whether user expressed satisfaction + draft_answer_updated: Whether user updated their draft answer + + This is called AFTER generate_response and BEFORE the next turn. + """ + # Derive reward from user behavior + reward = 0.0 + gating = 1.0 # Always apply (could be conditional) + + if enforce_preferences: + reward = self.config.preference_enforcement_reward + self._session_metrics["enforcements"] += 1 + self._total_enforcements += 1 + + elif express_disappointment: + reward = self.config.disappointment_expression_reward + self._session_metrics["disappointments"] += 1 + self._total_disappointments += 1 + + elif express_satisfaction or draft_answer_updated: + reward = self.config.positive_feedback_reward + + # Apply feedback to PersonalizedLLM + if self.config.enable_rl_updates and reward != 0.0: + feedback = Feedback( + user_id=self._current_user_id, + turn_id=self._turn_counter - 1, + reward=reward, + gating=gating, + meta={ + "enforce": enforce_preferences, + "disappointment": express_disappointment, + "satisfaction": express_satisfaction, + } + ) + self._llm.apply_feedback(feedback) + self._session_metrics["rewards_applied"].append(reward) + + def end_session(self, task_success: bool = False) -> Dict[str, Any]: + """ + End the current session and return metrics. + + Args: + task_success: Whether the task was solved correctly + + Returns: + Session metrics dictionary + """ + # Apply final reward for task completion + if task_success and self.config.enable_rl_updates: + feedback = Feedback( + user_id=self._current_user_id, + turn_id=self._turn_counter, + reward=self.config.task_completion_reward, + gating=1.0, + meta={"task_success": True} + ) + self._llm.apply_feedback(feedback) + self._session_metrics["rewards_applied"].append( + self.config.task_completion_reward + ) + + self._session_metrics["task_success"] = task_success + self._total_turns += self._turn_counter + + return self._session_metrics.copy() + + def reset_user(self, user_id: str): + """Completely reset a user (new experiment).""" + if self._initialized: + self._llm.reset_user(user_id) + + def get_user_vector(self, user_id: str) -> Optional[np.ndarray]: + """Get the user's z_long vector for analysis.""" + if not self._initialized: + return None + + state = self._llm._user_store.get_state(user_id) + return state.z_long.copy() + + def get_user_state_summary(self, user_id: str) -> Dict[str, Any]: + """Get summary of user state for analysis.""" + if not self._initialized: + return {} + + return self._llm.get_user_state_summary(user_id) + + def persist(self): + """Save all state to disk.""" + if self._initialized: + self._llm.persist() + + # ========================================================================= + # CollaborativeAgents Interface Methods + # ========================================================================= + + def __call__( + self, + messages: List[Dict[str, str]], + user_profile: dict = None, + **kwargs + ) -> str: + """ + Callable interface for CollaborativeAgents ConversationGenerator. + + Args: + messages: Conversation history in [{"role": "user/assistant", "content": "..."}] + user_profile: Optional user profile + + Returns: + Response string + """ + if not messages: + return "How can I help you?" + + # Get the last user message + last_user_msg = None + for msg in reversed(messages): + if msg["role"] == "user": + last_user_msg = msg["content"] + break + + if last_user_msg is None: + return "How can I help you?" + + result = self.generate_response(last_user_msg, messages) + return result["response"] + + +# ============================================================================= +# Baseline Adapter Factory +# ============================================================================= + +def create_baseline_adapter( + baseline_name: str, + device_assignment: dict = None, + use_vllm: bool = False, + use_shared_models: bool = False, + reward_mode: str = "keyword", +) -> PersonalizedLLMAdapter: + """ + Create an adapter configured for a specific baseline. + + Args: + baseline_name: One of: + - "vanilla": No memory or personalization + - "contextual": Full history in context (truncate if overflow) + - "reflection": CollaborativeAgents' agent_notes approach + - "reflection_grpo": Reflection + GRPO training + - "all_memory": All extracted memories in context (no retrieval) + - "rag": Extractor + RAG (no user vector) + - "rag_vector": Full personalization (Extractor + RAG + User Vector) + device_assignment: GPU assignment dict + use_vllm: If True, use vLLM HTTP API for LLM inference (much faster) + reward_mode: Global reward mode ("keyword" or "llm") applied to all methods + use_shared_models: If True, share embedding/reranker models across parallel + workers. ESSENTIAL for parallel profile processing to avoid OOM. + + Returns: + Configured adapter (PersonalizedLLMAdapter or baseline-specific adapter) + """ + # Select LLM backend + llm_name = "llama_8b_vllm" if use_vllm else "llama_8b" + configs = { + # Baseline 1: Vanilla - no memory at all + "vanilla": AdapterConfig( + mode="vanilla", + enable_preference_extraction=False, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + ), + # Baseline 2: Contextual - full history in context + # This needs a separate adapter (ContextualAdapter) + "contextual": None, # Handled separately + # Baseline 3: Reflection - agent_notes mechanism + # This needs a separate adapter (ReflectionAdapter) + "reflection": None, # Handled separately + # Baseline 4: Reflection + GRPO + # This needs a separate adapter (ReflectionGRPOAdapter) + "reflection_grpo": None, # Handled separately + # Baseline 5: All memory in context (no retrieval) + "all_memory": AdapterConfig( + mode="nopersonal", # Uses all memories, no policy selection + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + ), + # Baseline 6: Extractor + RAG (no user vector) + # Use "nopersonal" mode for pure dense+rerank retrieval without user vector influence + # Device assignment: GPUs 2,3 for HF models (8B vLLM uses 40% memory, leaving room) + "rag": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, # No RL updates + use_user_vector=False, # No user vector in policy + llm_name=llm_name, + use_shared_models=use_shared_models, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7: Full - Extractor + RAG + User Vector (proposed method) + # Device assignment: GPUs 2,3 for HF models (8B vLLM uses 40% memory, leaving room) + "rag_vector": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 8: RAG with BGE reranker (278M instead of 8B) + "rag_bge": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + reranker_type="bge", + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 9: RAG + Vector with BGE reranker (278M instead of 8B) + "rag_vector_bge": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + reranker_type="bge", + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 10: RAG + Vector with best-of-3 sampling + "rag_vector_best3": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + best_of_n=3, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Legacy aliases + "nopersonal": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + ), + "full": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + ), + } + + if baseline_name not in configs: + raise ValueError(f"Unknown baseline: {baseline_name}. Choose from {list(configs.keys())}") + + config = configs[baseline_name] + + # Handle baselines that need separate adapters + if config is None: + if baseline_name == "contextual": + from .contextual_adapter import ContextualAdapter + return ContextualAdapter(device_assignment=device_assignment) + elif baseline_name == "reflection": + from .reflection_adapter import ReflectionAdapter + return ReflectionAdapter(device_assignment=device_assignment) + elif baseline_name == "reflection_grpo": + from .reflection_grpo_adapter import ReflectionGRPOAdapter + return ReflectionGRPOAdapter(device_assignment=device_assignment) + else: + raise ValueError(f"Baseline {baseline_name} not implemented yet") + + if device_assignment: + config.device_assignment = device_assignment + + # Apply global reward_mode to all methods (overrides per-method defaults) + config.reward_mode = reward_mode + + return PersonalizedLLMAdapter(config) + + +# ============================================================================= +# Integration with CollaborativeAgents ConversationGenerator +# ============================================================================= + +class PersonalizedCollaborator: + """ + Drop-in replacement for CollaboratorAgent that uses PersonalizedLLM. + + Compatible with ConversationGenerator.generate_conversation() + """ + + def __init__( + self, + adapter: PersonalizedLLMAdapter, + user_id: str, + user_profile: dict = None, + max_new_tokens: int = 1024 + ): + self.adapter = adapter + self.user_id = user_id + self.user_profile = user_profile + self.max_new_tokens = max_new_tokens + + # Start session + self.adapter.start_session(user_id, user_profile) + + def generate(self, messages: List[Dict[str, str]]) -> Dict[str, Any]: + """ + Generate response in CollaborativeAgents format. + + Returns dict with 'reasoning' and 'response' keys. + """ + # Extract last user message + last_user_msg = "" + for msg in reversed(messages): + if msg["role"] == "user": + last_user_msg = msg["content"] + break + + # Check for preference enforcement in the user message + enforce_detected = self._detect_enforcement(last_user_msg) + disappointment_detected = self._detect_disappointment(last_user_msg) + satisfaction_detected = self._detect_satisfaction(last_user_msg) + + # Process the previous turn's feedback (if any) + if len(messages) > 2: # Not the first turn + self.adapter.process_user_turn( + last_user_msg, + enforce_preferences=enforce_detected, + express_disappointment=disappointment_detected, + express_satisfaction=satisfaction_detected, + ) + + # Generate response + result = self.adapter.generate_response(last_user_msg, messages) + + return { + "reasoning": result["reasoning"], + "response": result["response"], + "debug": result.get("debug", {}) + } + + def _detect_enforcement(self, text: str) -> bool: + """Detect if user is enforcing preferences.""" + enforcement_phrases = [ + "please use", "i asked for", "i prefer", "can you", + "instead of", "not what i wanted", "i said", "remember that", + "you should", "don't", "avoid", "stop" + ] + text_lower = text.lower() + return any(phrase in text_lower for phrase in enforcement_phrases) + + def _detect_disappointment(self, text: str) -> bool: + """Detect expressions of disappointment.""" + disappointment_phrases = [ + "not quite", "that's not", "hmm", "not really", + "i was hoping", "could be better", "not exactly" + ] + text_lower = text.lower() + return any(phrase in text_lower for phrase in disappointment_phrases) + + def _detect_satisfaction(self, text: str) -> bool: + """Detect expressions of satisfaction.""" + satisfaction_phrases = [ + "thanks", "perfect", "great", "exactly", "that's what i", + "helpful", "makes sense", "got it", "understand now" + ] + text_lower = text.lower() + return any(phrase in text_lower for phrase in satisfaction_phrases) + + def end_session(self, task_success: bool) -> Dict[str, Any]: + """End session and get metrics.""" + return self.adapter.end_session(task_success) + + +# ============================================================================= +# Usage Example +# ============================================================================= + +if __name__ == "__main__": + # Example usage + adapter = create_baseline_adapter("full") + adapter.initialize() + + # Simulate a session + user_id = "test_user_001" + adapter.start_session(user_id) + + # First turn + response = adapter.generate_response("How do I implement quicksort?") + print(f"Response: {response['response'][:200]}...") + + # User provides feedback (simulating enforcement) + adapter.process_user_turn( + "Can you use bullet points instead?", + enforce_preferences=True + ) + + # Second turn + response = adapter.generate_response("Can you use bullet points instead?") + print(f"Response: {response['response'][:200]}...") + + # End session + metrics = adapter.end_session(task_success=True) + print(f"Session metrics: {metrics}") + + # Get user vector for analysis + z_long = adapter.get_user_vector(user_id) + print(f"User vector norm: {np.linalg.norm(z_long):.4f}") + + adapter.persist() -- cgit v1.2.3