diff options
Diffstat (limited to 'collaborativeagents/adapters/personalized_llm_adapter.py')
| -rw-r--r-- | collaborativeagents/adapters/personalized_llm_adapter.py | 267 |
1 files changed, 254 insertions, 13 deletions
diff --git a/collaborativeagents/adapters/personalized_llm_adapter.py b/collaborativeagents/adapters/personalized_llm_adapter.py index b476272..488b241 100644 --- a/collaborativeagents/adapters/personalized_llm_adapter.py +++ b/collaborativeagents/adapters/personalized_llm_adapter.py @@ -36,12 +36,24 @@ class AdapterConfig: enable_rl_updates: bool = True use_user_vector: bool = True # Whether to use user vector in policy scoring - # Paths (absolute paths to actual file locations in the repo) + # Paths - computed relative to project root # Note: Using empty_store to start fresh - RAG will accumulate memories during evaluation - user_store_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/users/collab_eval_store.npz" - memory_cards_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_cards.jsonl" - memory_embeddings_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_embeddings.npy" - item_projection_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/item_projection.npz" + _project_root: str = field(default_factory=lambda: str(Path(__file__).parent.parent.parent)) + user_store_path: str = "" + memory_cards_path: str = "" + memory_embeddings_path: str = "" + item_projection_path: str = "" + + def __post_init__(self): + root = Path(self._project_root) + if not self.user_store_path: + self.user_store_path = str(root / "data/users/collab_eval_store.npz") + if not self.memory_cards_path: + self.memory_cards_path = str(root / "data/corpora/empty_store/memory_cards.jsonl") + if not self.memory_embeddings_path: + self.memory_embeddings_path = str(root / "data/corpora/empty_store/memory_embeddings.npy") + if not self.item_projection_path: + self.item_projection_path = str(root / "data/corpora/item_projection.npz") # Multi-GPU assignment device_assignment: Optional[Dict[str, str]] = None @@ -64,10 +76,30 @@ class AdapterConfig: # vLLM URL for local reward model (only used when reward_mode="llm_local") reward_vllm_url: str = "http://localhost:8005/v1" + # Retrieval optimizations + enable_query_transform: bool = False # Transform queries for better retrieval matching + enable_global_preferences: bool = False # Separate global prefs that bypass retrieval + enable_preference_rewrite: bool = False # Use LLM to rewrite/merge retrieved preferences + + # Dynamic topk settings + dynamic_topk: bool = False # Use dynamic selection based on rerank score distribution + dynamic_min_k: int = 3 # Minimum preferences to select + dynamic_max_k: int = 8 # Maximum preferences to select + dynamic_score_ratio: float = 0.5 # Threshold = top_score * ratio + + # RL learning rate overrides + eta_long: float = None # Override RL learning rate for z_long (default 0.01) + eta_short: float = None # Override RL learning rate for z_short (default 0.05) + + # Session-level preference consolidation + enable_preference_consolidation: bool = False # Consolidate preferences at session end + consolidation_threshold: int = 5 # Min preferences before consolidation kicks in + # Reward mapping for user behavior preference_enforcement_reward: float = -0.8 # Negative reward when user enforces disappointment_expression_reward: float = -0.4 # Milder negative for disappointment positive_feedback_reward: float = 0.5 # When user expresses satisfaction + no_enforcement_reward: float = 0.1 # Small positive when user doesn't enforce (good turn) task_completion_reward: float = 1.0 # When task is solved correctly @@ -120,6 +152,17 @@ class PersonalizedLLMAdapter: best_of_n=self.config.best_of_n, reward_mode=self.config.reward_mode, reward_vllm_url=self.config.reward_vllm_url, + enable_query_transform=self.config.enable_query_transform, + enable_global_preferences=self.config.enable_global_preferences, + enable_preference_rewrite=self.config.enable_preference_rewrite, + dynamic_topk=self.config.dynamic_topk, + dynamic_min_k=self.config.dynamic_min_k, + dynamic_max_k=self.config.dynamic_max_k, + dynamic_score_ratio=self.config.dynamic_score_ratio, + eta_long=self.config.eta_long, + eta_short=self.config.eta_short, + enable_preference_consolidation=self.config.enable_preference_consolidation, + consolidation_threshold=self.config.consolidation_threshold, ) self._initialized = True print("[Adapter] Initialization complete.") @@ -221,7 +264,9 @@ class PersonalizedLLMAdapter: self.initialize() # Use chat_prepare from PersonalizedLLM - result = self._llm.chat_prepare(self._current_user_id, query) + # skip_extraction=False to enable preference extraction from user messages + # skip_auto_reward=True because batch framework handles rewards via process_user_turn + result = self._llm.chat_prepare(self._current_user_id, query, skip_extraction=False, skip_auto_reward=True) return result["messages"], result["context"] def process_response( @@ -294,24 +339,38 @@ class PersonalizedLLMAdapter: This is called AFTER generate_response and BEFORE the next turn. """ # Derive reward from user behavior - reward = 0.0 - gating = 1.0 # Always apply (could be conditional) + # Key insight: ALWAYS give a reward signal, not just for enforcement + # - Enforcement: negative reward (user had to correct agent) + # - No enforcement: small positive reward (agent did well) + # - Satisfaction/progress: larger positive reward + gating = 1.0 # Always apply if enforce_preferences: - reward = self.config.preference_enforcement_reward + reward = self.config.preference_enforcement_reward # -0.8 self._session_metrics["enforcements"] += 1 self._total_enforcements += 1 elif express_disappointment: - reward = self.config.disappointment_expression_reward + reward = self.config.disappointment_expression_reward # -0.4 self._session_metrics["disappointments"] += 1 self._total_disappointments += 1 elif express_satisfaction or draft_answer_updated: - reward = self.config.positive_feedback_reward + reward = self.config.positive_feedback_reward # +0.5 - # Apply feedback to PersonalizedLLM - if self.config.enable_rl_updates and reward != 0.0: + else: + # No enforcement = good turn, give small positive reward + reward = self.config.no_enforcement_reward # +0.1 + + # Apply feedback to PersonalizedLLM (always, not just when reward != 0) + if self.config.enable_rl_updates: + # Debug: check if pending_rl_update exists + ctx = self._llm._sessions.get(self._current_user_id) + has_pending = ctx is not None and ctx.pending_rl_update is not None + has_chosen = (has_pending and + len(ctx.pending_rl_update.get("last_chosen_indices", [])) > 0) if has_pending else False + print(f"[DEBUG-RL] User={self._current_user_id} reward={reward:.2f} " + f"has_pending={has_pending} has_chosen={has_chosen}") feedback = Feedback( user_id=self._current_user_id, turn_id=self._turn_counter - 1, @@ -380,6 +439,66 @@ class PersonalizedLLMAdapter: if self._initialized: self._llm.persist() + def export_all_user_vectors(self) -> Dict[str, Dict[str, Any]]: + """ + Export all user vectors with full state for analysis. + + Returns: + Dict mapping user_id to dict containing: + - z_long: np.ndarray (long-term user vector) + - z_short: np.ndarray (short-term user vector) + - z_long_norm: float + - z_short_norm: float + - reward_ma: float (reward moving average) + """ + if not self._initialized: + return {} + + result = {} + for user_id, state in self._llm._user_store._states.items(): + result[user_id] = { + "z_long": state.z_long.tolist(), + "z_short": state.z_short.tolist(), + "z_long_norm": float(np.linalg.norm(state.z_long)), + "z_short_norm": float(np.linalg.norm(state.z_short)), + "reward_ma": float(state.reward_ma), + } + return result + + def export_user_vectors_npz(self, output_path: str) -> None: + """ + Export all user vectors to a numpy .npz file for efficient storage and analysis. + + Args: + output_path: Path to save the .npz file + + The saved file contains: + - user_ids: array of user IDs + - z_long: [n_users, k] array of long-term vectors + - z_short: [n_users, k] array of short-term vectors + - reward_ma: [n_users] array of reward moving averages + """ + if not self._initialized: + return + + states = self._llm._user_store._states + if not states: + return + + user_ids = list(states.keys()) + z_long = np.stack([states[uid].z_long for uid in user_ids]) + z_short = np.stack([states[uid].z_short for uid in user_ids]) + reward_ma = np.array([states[uid].reward_ma for uid in user_ids]) + + np.savez( + output_path, + user_ids=np.array(user_ids), + z_long=z_long, + z_short=z_short, + reward_ma=reward_ma, + ) + print(f"[Adapter] Exported {len(user_ids)} user vectors to {output_path}") + # ========================================================================= # CollaborativeAgents Interface Methods # ========================================================================= @@ -491,6 +610,45 @@ def create_baseline_adapter( use_user_vector=False, # No user vector in policy llm_name=llm_name, use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 6b: RAG with dynamic topk (min=3, max=8, ratio=0.5) + "rag_dynamic": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + dynamic_topk=True, + dynamic_min_k=3, + dynamic_max_k=8, + dynamic_score_ratio=0.5, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 6c: RAG with preference rewrite (LLM merges preferences) + "rag_rewrite": AdapterConfig( + mode="nopersonal", + enable_preference_extraction=True, + enable_rl_updates=False, + use_user_vector=False, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + enable_preference_rewrite=True, # NEW: Use LLM to merge preferences device_assignment={ "embed": "cuda:2", "reranker": "cuda:3", @@ -506,6 +664,89 @@ def create_baseline_adapter( use_user_vector=True, llm_name=llm_name, use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7a: RAG + Vector + Preference Rewrite (combines best of both) + "rag_rewrite_vector": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + enable_preference_rewrite=True, # LLM merges preferences + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7b: RAG + Vector with higher learning rate (10x) + "rag_vector_fast": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + eta_long=0.1, # 10x default (0.01) + eta_short=0.5, # 10x default (0.05) + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7c: RAG + Vector with session-level preference consolidation + "rag_vector_consolidate": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + enable_preference_consolidation=True, + consolidation_threshold=5, + device_assignment={ + "embed": "cuda:2", + "reranker": "cuda:3", + "extractor": "cuda:2", + }, + ), + # Baseline 7d: RAG + Vector with balanced rewards (10x LR + no_enforcement_reward) + # Key improvements: + # - 10x learning rate for faster adaptation + # - Small positive reward for turns without enforcement (+0.1) + # - Disappointment detection enabled + # - Balanced reward signal for proper REINFORCE learning + "rag_vector_balanced": AdapterConfig( + mode="full", + enable_preference_extraction=True, + enable_rl_updates=True, + use_user_vector=True, + llm_name=llm_name, + use_shared_models=use_shared_models, + enable_query_transform=True, + enable_global_preferences=True, + eta_long=0.1, # 10x default + eta_short=0.5, # 10x default + # Balanced reward structure + preference_enforcement_reward=-0.8, + disappointment_expression_reward=-0.4, + positive_feedback_reward=0.5, + no_enforcement_reward=0.1, # Key: positive signal for good turns device_assignment={ "embed": "cuda:2", "reranker": "cuda:3", |
