summaryrefslogtreecommitdiff
path: root/collaborativeagents/adapters/personalized_llm_adapter.py
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-02-10 20:16:36 +0000
committerYurenHao0426 <blackhao0426@gmail.com>2026-02-10 20:16:36 +0000
commit5626080ca4c4219aec4888d6b9406d0d3349fb55 (patch)
tree86287d9fd5833e11ccd78566992540f2664fd195 /collaborativeagents/adapters/personalized_llm_adapter.py
parenta2036838807428424bbbaff507a6563749a83145 (diff)
Add RAG rewrite, 60-session experiment scripts, and analysis tools
- RAG rewrite adapter and vector preference pipeline in personalized_llm - 60-session experiment queue scripts (reflection, rag, rag_vector, rag_rewrite) - Vector-preference correlation analysis and visualization scripts - Local reward model batch processing improvements - Updated CLAUDE.md with full experiment documentation and notes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'collaborativeagents/adapters/personalized_llm_adapter.py')
-rw-r--r--collaborativeagents/adapters/personalized_llm_adapter.py267
1 files changed, 254 insertions, 13 deletions
diff --git a/collaborativeagents/adapters/personalized_llm_adapter.py b/collaborativeagents/adapters/personalized_llm_adapter.py
index b476272..488b241 100644
--- a/collaborativeagents/adapters/personalized_llm_adapter.py
+++ b/collaborativeagents/adapters/personalized_llm_adapter.py
@@ -36,12 +36,24 @@ class AdapterConfig:
enable_rl_updates: bool = True
use_user_vector: bool = True # Whether to use user vector in policy scoring
- # Paths (absolute paths to actual file locations in the repo)
+ # Paths - computed relative to project root
# Note: Using empty_store to start fresh - RAG will accumulate memories during evaluation
- user_store_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/users/collab_eval_store.npz"
- memory_cards_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_cards.jsonl"
- memory_embeddings_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/empty_store/memory_embeddings.npy"
- item_projection_path: str = "/projects/bfqt/users/yurenh2/ml-projects/personalization-user-model/data/corpora/item_projection.npz"
+ _project_root: str = field(default_factory=lambda: str(Path(__file__).parent.parent.parent))
+ user_store_path: str = ""
+ memory_cards_path: str = ""
+ memory_embeddings_path: str = ""
+ item_projection_path: str = ""
+
+ def __post_init__(self):
+ root = Path(self._project_root)
+ if not self.user_store_path:
+ self.user_store_path = str(root / "data/users/collab_eval_store.npz")
+ if not self.memory_cards_path:
+ self.memory_cards_path = str(root / "data/corpora/empty_store/memory_cards.jsonl")
+ if not self.memory_embeddings_path:
+ self.memory_embeddings_path = str(root / "data/corpora/empty_store/memory_embeddings.npy")
+ if not self.item_projection_path:
+ self.item_projection_path = str(root / "data/corpora/item_projection.npz")
# Multi-GPU assignment
device_assignment: Optional[Dict[str, str]] = None
@@ -64,10 +76,30 @@ class AdapterConfig:
# vLLM URL for local reward model (only used when reward_mode="llm_local")
reward_vllm_url: str = "http://localhost:8005/v1"
+ # Retrieval optimizations
+ enable_query_transform: bool = False # Transform queries for better retrieval matching
+ enable_global_preferences: bool = False # Separate global prefs that bypass retrieval
+ enable_preference_rewrite: bool = False # Use LLM to rewrite/merge retrieved preferences
+
+ # Dynamic topk settings
+ dynamic_topk: bool = False # Use dynamic selection based on rerank score distribution
+ dynamic_min_k: int = 3 # Minimum preferences to select
+ dynamic_max_k: int = 8 # Maximum preferences to select
+ dynamic_score_ratio: float = 0.5 # Threshold = top_score * ratio
+
+ # RL learning rate overrides
+ eta_long: float = None # Override RL learning rate for z_long (default 0.01)
+ eta_short: float = None # Override RL learning rate for z_short (default 0.05)
+
+ # Session-level preference consolidation
+ enable_preference_consolidation: bool = False # Consolidate preferences at session end
+ consolidation_threshold: int = 5 # Min preferences before consolidation kicks in
+
# Reward mapping for user behavior
preference_enforcement_reward: float = -0.8 # Negative reward when user enforces
disappointment_expression_reward: float = -0.4 # Milder negative for disappointment
positive_feedback_reward: float = 0.5 # When user expresses satisfaction
+ no_enforcement_reward: float = 0.1 # Small positive when user doesn't enforce (good turn)
task_completion_reward: float = 1.0 # When task is solved correctly
@@ -120,6 +152,17 @@ class PersonalizedLLMAdapter:
best_of_n=self.config.best_of_n,
reward_mode=self.config.reward_mode,
reward_vllm_url=self.config.reward_vllm_url,
+ enable_query_transform=self.config.enable_query_transform,
+ enable_global_preferences=self.config.enable_global_preferences,
+ enable_preference_rewrite=self.config.enable_preference_rewrite,
+ dynamic_topk=self.config.dynamic_topk,
+ dynamic_min_k=self.config.dynamic_min_k,
+ dynamic_max_k=self.config.dynamic_max_k,
+ dynamic_score_ratio=self.config.dynamic_score_ratio,
+ eta_long=self.config.eta_long,
+ eta_short=self.config.eta_short,
+ enable_preference_consolidation=self.config.enable_preference_consolidation,
+ consolidation_threshold=self.config.consolidation_threshold,
)
self._initialized = True
print("[Adapter] Initialization complete.")
@@ -221,7 +264,9 @@ class PersonalizedLLMAdapter:
self.initialize()
# Use chat_prepare from PersonalizedLLM
- result = self._llm.chat_prepare(self._current_user_id, query)
+ # skip_extraction=False to enable preference extraction from user messages
+ # skip_auto_reward=True because batch framework handles rewards via process_user_turn
+ result = self._llm.chat_prepare(self._current_user_id, query, skip_extraction=False, skip_auto_reward=True)
return result["messages"], result["context"]
def process_response(
@@ -294,24 +339,38 @@ class PersonalizedLLMAdapter:
This is called AFTER generate_response and BEFORE the next turn.
"""
# Derive reward from user behavior
- reward = 0.0
- gating = 1.0 # Always apply (could be conditional)
+ # Key insight: ALWAYS give a reward signal, not just for enforcement
+ # - Enforcement: negative reward (user had to correct agent)
+ # - No enforcement: small positive reward (agent did well)
+ # - Satisfaction/progress: larger positive reward
+ gating = 1.0 # Always apply
if enforce_preferences:
- reward = self.config.preference_enforcement_reward
+ reward = self.config.preference_enforcement_reward # -0.8
self._session_metrics["enforcements"] += 1
self._total_enforcements += 1
elif express_disappointment:
- reward = self.config.disappointment_expression_reward
+ reward = self.config.disappointment_expression_reward # -0.4
self._session_metrics["disappointments"] += 1
self._total_disappointments += 1
elif express_satisfaction or draft_answer_updated:
- reward = self.config.positive_feedback_reward
+ reward = self.config.positive_feedback_reward # +0.5
- # Apply feedback to PersonalizedLLM
- if self.config.enable_rl_updates and reward != 0.0:
+ else:
+ # No enforcement = good turn, give small positive reward
+ reward = self.config.no_enforcement_reward # +0.1
+
+ # Apply feedback to PersonalizedLLM (always, not just when reward != 0)
+ if self.config.enable_rl_updates:
+ # Debug: check if pending_rl_update exists
+ ctx = self._llm._sessions.get(self._current_user_id)
+ has_pending = ctx is not None and ctx.pending_rl_update is not None
+ has_chosen = (has_pending and
+ len(ctx.pending_rl_update.get("last_chosen_indices", [])) > 0) if has_pending else False
+ print(f"[DEBUG-RL] User={self._current_user_id} reward={reward:.2f} "
+ f"has_pending={has_pending} has_chosen={has_chosen}")
feedback = Feedback(
user_id=self._current_user_id,
turn_id=self._turn_counter - 1,
@@ -380,6 +439,66 @@ class PersonalizedLLMAdapter:
if self._initialized:
self._llm.persist()
+ def export_all_user_vectors(self) -> Dict[str, Dict[str, Any]]:
+ """
+ Export all user vectors with full state for analysis.
+
+ Returns:
+ Dict mapping user_id to dict containing:
+ - z_long: np.ndarray (long-term user vector)
+ - z_short: np.ndarray (short-term user vector)
+ - z_long_norm: float
+ - z_short_norm: float
+ - reward_ma: float (reward moving average)
+ """
+ if not self._initialized:
+ return {}
+
+ result = {}
+ for user_id, state in self._llm._user_store._states.items():
+ result[user_id] = {
+ "z_long": state.z_long.tolist(),
+ "z_short": state.z_short.tolist(),
+ "z_long_norm": float(np.linalg.norm(state.z_long)),
+ "z_short_norm": float(np.linalg.norm(state.z_short)),
+ "reward_ma": float(state.reward_ma),
+ }
+ return result
+
+ def export_user_vectors_npz(self, output_path: str) -> None:
+ """
+ Export all user vectors to a numpy .npz file for efficient storage and analysis.
+
+ Args:
+ output_path: Path to save the .npz file
+
+ The saved file contains:
+ - user_ids: array of user IDs
+ - z_long: [n_users, k] array of long-term vectors
+ - z_short: [n_users, k] array of short-term vectors
+ - reward_ma: [n_users] array of reward moving averages
+ """
+ if not self._initialized:
+ return
+
+ states = self._llm._user_store._states
+ if not states:
+ return
+
+ user_ids = list(states.keys())
+ z_long = np.stack([states[uid].z_long for uid in user_ids])
+ z_short = np.stack([states[uid].z_short for uid in user_ids])
+ reward_ma = np.array([states[uid].reward_ma for uid in user_ids])
+
+ np.savez(
+ output_path,
+ user_ids=np.array(user_ids),
+ z_long=z_long,
+ z_short=z_short,
+ reward_ma=reward_ma,
+ )
+ print(f"[Adapter] Exported {len(user_ids)} user vectors to {output_path}")
+
# =========================================================================
# CollaborativeAgents Interface Methods
# =========================================================================
@@ -491,6 +610,45 @@ def create_baseline_adapter(
use_user_vector=False, # No user vector in policy
llm_name=llm_name,
use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 6b: RAG with dynamic topk (min=3, max=8, ratio=0.5)
+ "rag_dynamic": AdapterConfig(
+ mode="nopersonal",
+ enable_preference_extraction=True,
+ enable_rl_updates=False,
+ use_user_vector=False,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ dynamic_topk=True,
+ dynamic_min_k=3,
+ dynamic_max_k=8,
+ dynamic_score_ratio=0.5,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 6c: RAG with preference rewrite (LLM merges preferences)
+ "rag_rewrite": AdapterConfig(
+ mode="nopersonal",
+ enable_preference_extraction=True,
+ enable_rl_updates=False,
+ use_user_vector=False,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ enable_preference_rewrite=True, # NEW: Use LLM to merge preferences
device_assignment={
"embed": "cuda:2",
"reranker": "cuda:3",
@@ -506,6 +664,89 @@ def create_baseline_adapter(
use_user_vector=True,
llm_name=llm_name,
use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 7a: RAG + Vector + Preference Rewrite (combines best of both)
+ "rag_rewrite_vector": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ enable_preference_rewrite=True, # LLM merges preferences
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 7b: RAG + Vector with higher learning rate (10x)
+ "rag_vector_fast": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ eta_long=0.1, # 10x default (0.01)
+ eta_short=0.5, # 10x default (0.05)
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 7c: RAG + Vector with session-level preference consolidation
+ "rag_vector_consolidate": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ enable_preference_consolidation=True,
+ consolidation_threshold=5,
+ device_assignment={
+ "embed": "cuda:2",
+ "reranker": "cuda:3",
+ "extractor": "cuda:2",
+ },
+ ),
+ # Baseline 7d: RAG + Vector with balanced rewards (10x LR + no_enforcement_reward)
+ # Key improvements:
+ # - 10x learning rate for faster adaptation
+ # - Small positive reward for turns without enforcement (+0.1)
+ # - Disappointment detection enabled
+ # - Balanced reward signal for proper REINFORCE learning
+ "rag_vector_balanced": AdapterConfig(
+ mode="full",
+ enable_preference_extraction=True,
+ enable_rl_updates=True,
+ use_user_vector=True,
+ llm_name=llm_name,
+ use_shared_models=use_shared_models,
+ enable_query_transform=True,
+ enable_global_preferences=True,
+ eta_long=0.1, # 10x default
+ eta_short=0.5, # 10x default
+ # Balanced reward structure
+ preference_enforcement_reward=-0.8,
+ disappointment_expression_reward=-0.4,
+ positive_feedback_reward=0.5,
+ no_enforcement_reward=0.1, # Key: positive signal for good turns
device_assignment={
"embed": "cuda:2",
"reranker": "cuda:3",