From 5626080ca4c4219aec4888d6b9406d0d3349fb55 Mon Sep 17 00:00:00 2001 From: YurenHao0426 Date: Tue, 10 Feb 2026 20:16:36 +0000 Subject: Add RAG rewrite, 60-session experiment scripts, and analysis tools - RAG rewrite adapter and vector preference pipeline in personalized_llm - 60-session experiment queue scripts (reflection, rag, rag_vector, rag_rewrite) - Vector-preference correlation analysis and visualization scripts - Local reward model batch processing improvements - Updated CLAUDE.md with full experiment documentation and notes Co-Authored-By: Claude Opus 4.6 --- .../adapters/personalized_llm_adapter.py | 267 ++++++++++++++++++++- collaborativeagents/adapters/reflection_adapter.py | 186 +++++++------- .../adapters/reflection_grpo_adapter.py | 9 +- 3 files changed, 363 insertions(+), 99 deletions(-) (limited to 'collaborativeagents/adapters') diff --git a/collaborativeagents/adapters/personalized_llm_adapter.py b/collaborativeagents/adapters/personalized_llm_adapter.py index b476272..488b241 100644 --- a/collaborativeagents/adapters/personalized_llm_adapter.py +++ b/collaborativeagents/adapters/personalized_llm_adapter.py @@ -36,12 +36,24 @@ class AdapterConfig: enable_rl_updates: bool = True use_user_vector: bool = True # Whether to use user vector in policy scoring - # Paths (absolute paths to actual file locations in the repo) + # Paths - computed relative to project root # Note: Using empty_store to start fresh - RAG will accumulate memories during evaluation - user_store_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/users/collab_eval_store.npz" - memory_cards_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_cards.jsonl" - memory_embeddings_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_embeddings.npy" - item_projection_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/item_projection.npz" + _project_root: str = field(default_factory=lambda: str(Path(__file__).parent.parent.parent)) + user_store_path: str = "" + memory_cards_path: str = "" + memory_embeddings_path: str = "" + item_projection_path: str = "" + + def __post_init__(self): + root = Path(self._project_root) + if not self.user_store_path: + self.user_store_path = str(root / "data/users/collab_eval_store.npz") + if not self.memory_cards_path: + self.memory_cards_path = str(root / "data/corpora/empty_store/memory_cards.jsonl") + if not self.memory_embeddings_path: + self.memory_embeddings_path = str(root / "data/corpora/empty_store/memory_embeddings.npy") + if not self.item_projection_path: + self.item_projection_path = str(root / "data/corpora/item_projection.npz") # Multi-GPU assignment device_assignment: Optional[Dict[str, str]] = None @@ -64,10 +76,30 @@ class AdapterConfig: # vLLM URL for local reward model (only used when reward_mode="llm_local") reward_vllm_url: str = "http://localhost:8005/v1" + # Retrieval optimizations + enable_query_transform: bool = False # Transform queries for better retrieval matching + enable_global_preferences: bool = False # Separate global prefs that bypass retrieval + enable_preference_rewrite: bool = False # Use LLM to rewrite/merge retrieved preferences + + # Dynamic topk settings + dynamic_topk: bool = False # Use dynamic selection based on rerank score distribution + dynamic_min_k: int = 3 # Minimum preferences to select + dynamic_max_k: int = 8 # Maximum preferences to select + dynamic_score_ratio: float = 0.5 # Threshold = top_score * ratio + + # RL learning rate overrides + eta_long: float = None # Override RL learning rate for z_long (default 0.01) + eta_short: float = None # Override RL learning rate for z_short (default 0.05) + + # Session-level preference consolidation + enable_preference_consolidation: bool = False # Consolidate preferences at session end + consolidation_threshold: int = 5 # Min preferences before consolidation kicks in + # Reward mapping for user behavior preference_enforcement_reward: float = -0.8 # Negative reward when user enforces disappointment_expression_reward: float = -0.4 # Milder negative for disappointment positive_feedback_reward: float = 0.5 # When user expresses satisfaction + no_enforcement_reward: float = 0.1 # Small positive when user doesn't enforce (good turn) task_completion_reward: float = 1.0 # When task is solved correctly @@ -120,6 +152,17 @@ class PersonalizedLLMAdapter: best_of_n=self.config.best_of_n, reward_mode=self.config.reward_mode, reward_vllm_url=self.config.reward_vllm_url, + enable_query_transform=self.config.enable_query_transform, + enable_global_preferences=self.config.enable_global_preferences, + enable_preference_rewrite=self.config.enable_preference_rewrite, + dynamic_topk=self.config.dynamic_topk, + dynamic_min_k=self.config.dynamic_min_k, + dynamic_max_k=self.config.dynamic_max_k, + dynamic_score_ratio=self.config.dynamic_score_ratio, + eta_long=self.config.eta_long, + eta_short=self.config.eta_short, + enable_preference_consolidation=self.config.enable_preference_consolidation, + consolidation_threshold=self.config.consolidation_threshold, ) self._initialized = True print("[Adapter] Initialization complete.") @@ -221,7 +264,9 @@ class PersonalizedLLMAdapter: self.initialize() # Use chat_prepare from PersonalizedLLM - result = self._llm.chat_prepare(self._current_user_id, query) + # skip_extraction=False to enable preference extraction from user messages + # skip_auto_reward=True because batch framework handles rewards via process_user_turn + result = self._llm.chat_prepare(self._current_user_id, query, skip_extraction=False, skip_auto_reward=True) return result["messages"], result["context"] def process_response( @@ -294,24 +339,38 @@ class PersonalizedLLMAdapter: This is called AFTER generate_response and BEFORE the next turn. """ # Derive reward from user behavior - reward = 0.0 - gating = 1.0 # Always apply (could be conditional) + # Key insight: ALWAYS give a reward signal, not just for enforcement + # - Enforcement: negative reward (user had to correct agent) + # - No enforcement: small positive reward (agent did well) + # - Satisfaction/progress: larger positive reward + gating = 1.0 # Always apply if enforce_preferences: - reward = self.config.preference_enforcement_reward + reward = self.config.preference_enforcement_reward # -0.8 self._session_metrics["enforcements"] += 1 self._total_enforcements += 1 elif express_disappointment: - reward = self.config.disappointment_expression_reward + reward = self.config.disappointment_expression_reward # -0.4 self._session_metrics["disappointments"] += 1 self._total_disappointments += 1 elif express_satisfaction or draft_answer_updated: - reward = self.config.positive_feedback_reward + reward = self.config.positive_feedback_reward # +0.5 - # Apply feedback to PersonalizedLLM - if self.config.enable_rl_updates and reward != 0.0: + else: + # No enforcement = good turn, give small positive reward + reward = self.config.no_enforcement_reward # +0.1 + + # Apply feedback to PersonalizedLLM (always, not just when reward != 0) + if self.config.enable_rl_updates: + # Debug: check if pending_rl_update exists + ctx = self._llm._sessions.get(self._current_user_id) + has_pending = ctx is not None and ctx.pending_rl_update is not None + has_chosen = (has_pending and + len(ctx.pending_rl_update.get("last_chosen_indices", [])) > 0) if has_pending else False + print(f"[DEBUG-RL] User={self._current_user_id} reward={reward:.2f} " + f"has_pending={has_pending} has_chosen={has_chosen}") feedback = Feedback( user_id=self._current_user_id, turn_id=self._turn_counter - 1, @@ -380,6 +439,66 @@ class PersonalizedLLMAdapter: if self._initialized: self._llm.persist() + def export_all_user_vectors(self) -> Dict[str, Dict[str, Any]]: + """ + Export all user vectors with full state for analysis. + + Returns: + Dict mapping user_id to dict containing: + - z_long: np.ndarray (long-term user vector) + - z_short: np.ndarray (short-term user vector) + - z_long_norm: float + - z_short_norm: float + - reward_ma: float (reward moving average) + """ + if not self._initialized: + return {} + + result = {} + for user_id, state in self._llm._user_store._states.items(): + result[user_id] = { + "z_long": state.z_long.tolist(), + "z_short": state.z_short.tolist(), + "z_long_norm": float(np.linalg.norm(state.z_long)), + "z_short_norm": float(np.linalg.norm(state.z_short)), + "reward_ma": float(state.reward_ma), + } + return result + + def export_user_vectors_npz(self, output_path: str) -> None: + """ + Export all user vectors to a numpy .npz file for efficient storage and analysis. + + Args: + output_path: Path to save the .npz file + + The saved file contains: + - user_ids: array of user IDs + - z_long: [n_users, k] array of long-term vectors + - z_short: [n_users, k] array of short-term vectors + - reward_ma: [n_users] array of reward moving averages + """ + if not self._initialized: + return + + states = self._llm._user_store._states + if not states: + return + + user_ids = list(states.keys()) + z_long = np.stack([states[uid].z_long for uid in user_ids]) + z_short = np.stack([states[uid].z_short for uid in user_ids]) + reward_ma = np.array([states[uid].reward_ma for uid in user_ids]) + + np.savez( + output_path, + user_ids=np.array(user_ids), + z_long=z_long, + z_short=z_short, + reward_ma=reward_ma, + ) + print(f"[Adapter] Exported {len(user_ids)} user vectors to {output_path}") + # ========================================================================= # CollaborativeAgents Interface Methods # ========================================================================= @@ -491,6 +610,45 @@ def create_baseline_adapter( use_user_vector=False, # No user vector in policy llm_name=llm_name, use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 6b: RAG with dynamic topk (min=3, max=8, ratio=0.5) + "rag_dynamic": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + dynamic_topk=True, + dynamic_min_k=3, + dynamic_max_k=8, + dynamic_score_ratio=0.5, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 6c: RAG with preference rewrite (LLM merges preferences) + "rag_rewrite": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + enable_preference_rewrite=True, # NEW: Use LLM to merge preferences device_assignment={ "embed": "cuda:2", "reranker": "cuda:3", @@ -506,6 +664,89 @@ def create_baseline_adapter( use_user_vector=True, llm_name=llm_name, use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7a: RAG + Vector + Preference Rewrite (combines best of both) + "rag_rewrite_vector": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + enable_preference_rewrite=True, # LLM merges preferences + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7b: RAG + Vector with higher learning rate (10x) + "rag_vector_fast": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + eta_long=0.1, # 10x default (0.01) + eta_short=0.5, # 10x default (0.05) + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7c: RAG + Vector with session-level preference consolidation + "rag_vector_consolidate": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + enable_preference_consolidation=True, + consolidation_threshold=5, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7d: RAG + Vector with balanced rewards (10x LR + no_enforcement_reward) + # Key improvements: + # - 10x learning rate for faster adaptation + # - Small positive reward for turns without enforcement (+0.1) + # - Disappointment detection enabled + # - Balanced reward signal for proper REINFORCE learning + "rag_vector_balanced": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + eta_long=0.1, # 10x default + eta_short=0.5, # 10x default + # Balanced reward structure + preference_enforcement_reward=-0.8, + disappointment_expression_reward=-0.4, + positive_feedback_reward=0.5, + no_enforcement_reward=0.1, # Key: positive signal for good turns device_assignment={ "embed": "cuda:2", "reranker": "cuda:3", diff --git a/collaborativeagents/adapters/reflection_adapter.py b/collaborativeagents/adapters/reflection_adapter.py index d535be2..451c694 100644 --- a/collaborativeagents/adapters/reflection_adapter.py +++ b/collaborativeagents/adapters/reflection_adapter.py @@ -111,6 +111,18 @@ class ReflectionAdapter: return result["content"] + def _prepend_notes_to_conversation(self, conversation, notes_text): + """Prepend notes to the first message of a conversation copy.""" + if conversation: + conversation = list(conversation) + conversation[0] = dict(conversation[0]) + conversation[0]["content"] = ( + f"Remember, you have been taking notes throughout past conversations " + f"about user preferences. Use whatever is relevant in these notes to " + f"guide your response:\n{notes_text}\n\n" + conversation[0]["content"] + ) + return conversation + def _add_scaffolding_to_conversation( self, conversation: List[Dict[str, str]], @@ -118,73 +130,73 @@ class ReflectionAdapter: ) -> List[Dict[str, str]]: """ Add scaffolding (memory notes) to conversation. - - This is the EXACT logic from CollaboratorAgent.add_scaffolding_to_conversation(). - - If with_proper_scaffolding=True: Use LLM to extract relevant notes - If with_proper_scaffolding=False: Prepend all notes to first message + Sequential version - for non-batch use only. """ if not self.with_proper_scaffolding: - # Simple mode: prepend all notes - if conversation: - conversation = list(conversation) # Copy to avoid mutation - conversation[0] = dict(conversation[0]) # Copy first message - conversation[0]["content"] = ( - f"Remember, you have been taking notes throughout past conversations " - f"about user preferences. Use whatever is relevant in these notes to " - f"guide your response:\n{agent_notes}\n\n" + conversation[0]["content"] - ) - return conversation + return self._prepend_notes_to_conversation(conversation, agent_notes) else: - # Proper scaffolding: use LLM to extract relevant notes - conversation_str = get_conversation_string(conversation) - formatted_prompt = proper_scaffolding_prompt.format( - conversation_history=conversation_str, - complete_agent_notes=agent_notes - ) - - # Call LLM to extract relevant notes (with retries) - for attempt in range(3): - try: - messages = [{"role": "user", "content": formatted_prompt}] - response = self._generate(messages, max_new_tokens=512) - - parsed = repair_json(response, return_objects=True) - missing_keys = [k for k in ["reasoning", "relevant_notes"] if k not in parsed] - - if missing_keys: - print(f"[ReflectionAdapter] Scaffolding missing keys: {missing_keys}") - continue - - scaffolded_notes = parsed["relevant_notes"] - - # Prepend extracted notes to first message - if conversation: - conversation = list(conversation) - conversation[0] = dict(conversation[0]) - conversation[0]["content"] = ( - f"Remember, you have been taking notes throughout past conversations " - f"about user preferences. Use these notes to guide your response:\n" - f"{scaffolded_notes}\n\n" + conversation[0]["content"] - ) - - return conversation + prompt = self.get_scaffolding_prompt(conversation, agent_notes) + if prompt is None: + return self._prepend_notes_to_conversation(conversation, agent_notes) + response = self._generate([{"role": "user", "content": prompt}], max_new_tokens=512) + return self.apply_scaffolding_response(conversation, agent_notes, response) + + def get_scaffolding_prompt(self, conversation, agent_notes): + """Build the scaffolding prompt for batch processing. Returns None if no scaffolding needed.""" + if not self.with_proper_scaffolding: + return None + conversation_str = get_conversation_string(conversation) + return proper_scaffolding_prompt.format( + conversation_history=conversation_str, + complete_agent_notes=agent_notes + ) - except Exception as e: - print(f"[ReflectionAdapter] Scaffolding attempt {attempt+1} failed: {e}") - continue + def apply_scaffolding_response(self, conversation, agent_notes, response): + """Apply a scaffolding LLM response to the conversation.""" + try: + parsed = repair_json(response, return_objects=True) + if "relevant_notes" in parsed: + notes_text = parsed["relevant_notes"] + if conversation: + conversation = list(conversation) + conversation[0] = dict(conversation[0]) + conversation[0]["content"] = ( + f"Remember, you have been taking notes throughout past conversations " + f"about user preferences. Use these notes to guide your response:\n" + f"{notes_text}\n\n" + conversation[0]["content"] + ) + return conversation + except Exception: + pass + # Fallback: use all notes + return self._prepend_notes_to_conversation(conversation, agent_notes) + + def get_note_update_prompt(self, user_id=None): + """Build the note-update prompt for batch processing. Returns (messages, user_id) or None.""" + uid = user_id or self._current_user_id + if not uid or not self._conversation_history: + return None + current_notes = self._user_notes.get(uid, "No notes yet.") + conversation_str = get_conversation_string(self._conversation_history) + prompt = update_agent_notes_prompt.format( + agent_notes=current_notes, + conversation_str=conversation_str + ) + return [{"role": "user", "content": prompt}] - # Fallback: use all notes if retrieval fails - print("[ReflectionAdapter] Scaffolding failed, using full notes") - if conversation: - conversation = list(conversation) - conversation[0] = dict(conversation[0]) - conversation[0]["content"] = ( - f"Remember, you have been taking notes throughout past conversations " - f"about user preferences. Use whatever is relevant in these notes to " - f"guide your response:\n{agent_notes}\n\n" + conversation[0]["content"] - ) - return conversation + def apply_note_update_response(self, response, user_id=None): + """Apply a note-update LLM response.""" + uid = user_id or self._current_user_id + if not uid: + return + try: + # 8B model: use raw response directly + updated_notes = response.strip() + if updated_notes: + old_len = len(self._user_notes.get(uid, "")) + self._user_notes[uid] = updated_notes + except Exception: + pass def start_session(self, user_id: str, user_profile: dict = None): """Start a new session for a user.""" @@ -266,11 +278,27 @@ class ReflectionAdapter: # Get current notes for this user agent_notes = self._user_notes.get(self._current_user_id, "No notes yet about this user.") - # Build conversation with scaffolding (may involve LLM call for proper_scaffolding) + # Store for batch scaffolding (prepare_prompt may be called after batch scaffolding) + self._pending_agent_notes = agent_notes + self._pending_scaffolded = False + + # Build conversation with scaffolding if self.with_scaffolding and agent_notes != "No notes yet about this user.": - conversation_with_notes = self._add_scaffolding_to_conversation( - self._conversation_history, agent_notes - ) + if hasattr(self, '_scaffolding_result') and self._scaffolding_result is not None: + # Use pre-computed batch scaffolding result + conversation_with_notes = self.apply_scaffolding_response( + list(self._conversation_history), agent_notes, self._scaffolding_result) + self._scaffolding_result = None + self._pending_scaffolded = True + elif not self.with_proper_scaffolding: + conversation_with_notes = self._prepend_notes_to_conversation( + self._conversation_history, agent_notes) + self._pending_scaffolded = True + else: + # Sequential fallback - should not happen in batch mode + conversation_with_notes = self._add_scaffolding_to_conversation( + self._conversation_history, agent_notes) + self._pending_scaffolded = True else: conversation_with_notes = self._conversation_history @@ -357,30 +385,24 @@ class ReflectionAdapter: return None # All retries failed, keep old notes - def end_session(self, task_success: bool = False) -> Dict[str, Any]: + def end_session(self, task_success: bool = False, skip_note_update: bool = False) -> Dict[str, Any]: """ End session and update agent notes via reflection. - Uses the ORIGINAL update_agent_notes logic from CollaborativeAgents. + Args: + task_success: Whether the task was completed successfully + skip_note_update: If True, skip note update (already done via batch) """ if not self._current_user_id: return {} - # Get current notes - current_notes = self._user_notes.get(self._current_user_id, "No notes yet.") - - # Update notes via session-level reflection - if len(self._conversation_history) > 0: - result = self._update_agent_notes(current_notes, self._conversation_history) - + # Update notes via session-level reflection (skip if batch already did it) + if not skip_note_update and len(self._conversation_history) > 0: + result = self._update_agent_notes( + self._user_notes.get(self._current_user_id, "No notes yet."), + self._conversation_history) if result is not None and "agent_notes" in result: - updated_notes = result["agent_notes"] - self._user_notes[self._current_user_id] = updated_notes - print(f"[ReflectionAdapter] Updated notes for {self._current_user_id} " - f"({len(current_notes)} -> {len(updated_notes)} chars)") - else: - print(f"[ReflectionAdapter] Keeping old notes for {self._current_user_id} " - f"(update failed)") + self._user_notes[self._current_user_id] = result["agent_notes"] return { "turns": len(self._conversation_history), diff --git a/collaborativeagents/adapters/reflection_grpo_adapter.py b/collaborativeagents/adapters/reflection_grpo_adapter.py index 09c5b26..3c10942 100644 --- a/collaborativeagents/adapters/reflection_grpo_adapter.py +++ b/collaborativeagents/adapters/reflection_grpo_adapter.py @@ -18,10 +18,11 @@ import torch from transformers import AutoModelForCausalLM, AutoTokenizer from json_repair import repair_json -# Model paths - Use GRPO-trained model if available, fallback to base -GRPO_MODEL_PATH = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/collaborativeagents/training/outputs/grpo_reflection/final" -SFT_MODEL_PATH = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/collaborativeagents/training/outputs/sft_reflection" -DEFAULT_MODEL_PATH = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/models/llama-3.1-8b-instruct" +# Model paths - computed relative to project root +_PROJECT_ROOT = Path(__file__).parent.parent.parent +GRPO_MODEL_PATH = str(_PROJECT_ROOT / "collaborativeagents/training/outputs/grpo_reflection/final") +SFT_MODEL_PATH = str(_PROJECT_ROOT / "collaborativeagents/training/outputs/sft_reflection") +DEFAULT_MODEL_PATH = str(_PROJECT_ROOT / "models/llama-3.1-8b-instruct") def get_best_available_model(): """Get the best available model path (GRPO > SFT > base).""" -- cgit v1.2.3