From 5626080ca4c4219aec4888d6b9406d0d3349fb55 Mon Sep 17 00:00:00 2001 From: YurenHao0426 Date: Tue, 10 Feb 2026 20:16:36 +0000 Subject: Add RAG rewrite, 60-session experiment scripts, and analysis tools - RAG rewrite adapter and vector preference pipeline in personalized_llm - 60-session experiment queue scripts (reflection, rag, rag_vector, rag_rewrite) - Vector-preference correlation analysis and visualization scripts - Local reward model batch processing improvements - Updated CLAUDE.md with full experiment documentation and notes Co-Authored-By: Claude Opus 4.6 --- src/personalization/retrieval/pipeline.py | 184 +++++++++++++++++++++++++----- 1 file changed, 156 insertions(+), 28 deletions(-) (limited to 'src/personalization/retrieval/pipeline.py') diff --git a/src/personalization/retrieval/pipeline.py b/src/personalization/retrieval/pipeline.py index e83940d..6cc7f3e 100644 --- a/src/personalization/retrieval/pipeline.py +++ b/src/personalization/retrieval/pipeline.py @@ -12,6 +12,51 @@ def cosine_similarity_matrix(E: np.ndarray, e_q: np.ndarray) -> np.ndarray: # E: [M, d], e_q: [d] return np.dot(E, e_q) + +def dynamic_topk_selection( + scores: np.ndarray, + min_k: int = 3, + max_k: int = 8, + score_ratio: float = 0.5, +) -> List[int]: + """ + Dynamically select top-k indices based on score distribution. + + Strategy: + 1. Sort by score descending + 2. Compute threshold = top_score * score_ratio + 3. Select all indices with score > threshold + 4. Clamp to [min_k, max_k] range + + Args: + scores: Array of scores (higher = better) + min_k: Minimum number of items to select + max_k: Maximum number of items to select + score_ratio: Threshold ratio relative to top score + + Returns: + List of selected indices (in descending score order) + """ + if len(scores) == 0: + return [] + + # Sort indices by score descending + sorted_indices = np.argsort(scores)[::-1] + sorted_scores = scores[sorted_indices] + + # Compute threshold + top_score = sorted_scores[0] + threshold = top_score * score_ratio + + # Find how many pass threshold + n_above_threshold = np.sum(sorted_scores > threshold) + + # Clamp to [min_k, max_k] + n_select = max(min_k, min(max_k, n_above_threshold)) + n_select = min(n_select, len(scores)) # Don't exceed available + + return sorted_indices[:n_select].tolist() + def dense_topk_indices( query: str, embed_model: EmbeddingModel, @@ -58,6 +103,49 @@ def dense_topk_indices( idx = np.argsort(sims)[-k:][::-1] return idx.tolist() +def dense_topk_indices_multi_query( + queries: List[str], + embed_model: EmbeddingModel, + memory_embeddings: np.ndarray, + valid_indices: List[int] = None, + topk: int = 64 +) -> List[int]: + """ + Multi-query dense retrieval: embed all queries, take max similarity per memory, + return top-k by max similarity (union effect). + """ + if len(memory_embeddings) == 0: + return [] + + # Embed all queries at once + e_qs = embed_model.encode(queries, normalize=True, return_tensor=False) + e_qs = np.array(e_qs, dtype=np.float32) # [Q, d] + + if valid_indices is not None: + if len(valid_indices) == 0: + return [] + E_sub = memory_embeddings[valid_indices] + # sims: [Q, M_sub] + sims = np.dot(e_qs, E_sub.T) + # max across queries per memory + max_sims = sims.max(axis=0) # [M_sub] + k = min(topk, len(max_sims)) + if k == 0: + return [] + idx_sub = np.argsort(max_sims)[-k:][::-1] + return [valid_indices[i] for i in idx_sub] + + # Global search + # sims: [Q, M] + sims = np.dot(e_qs, memory_embeddings.T) + max_sims = sims.max(axis=0) # [M] + k = min(topk, len(max_sims)) + if k == 0: + return [] + idx = np.argsort(max_sims)[-k:][::-1] + return idx.tolist() + + def retrieve_with_policy( user_id: str, query: str, @@ -74,6 +162,7 @@ def retrieve_with_policy( tau: float = 1.0, only_own_memories: bool = False, sample: bool = False, + queries: List[str] = None, ) -> Tuple[List[MemoryCard], np.ndarray, np.ndarray, List[int], np.ndarray]: """ Returns extended info for policy update: @@ -90,28 +179,37 @@ def retrieve_with_policy( if not valid_indices: return [], np.array([]), np.array([]), [], np.array([]) - # 1. Dense retrieval - dense_idx = dense_topk_indices( - query, - embed_model, - memory_embeddings, - valid_indices=valid_indices, - topk=topk_dense - ) + # 1. Dense retrieval (multi-query if available) + if queries and len(queries) > 1: + dense_idx = dense_topk_indices_multi_query( + queries, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) + else: + dense_idx = dense_topk_indices( + query, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) # DEBUG: Check for duplicates or out of bounds if len(dense_idx) > 0: import os if os.getenv("RETRIEVAL_DEBUG") == "1": print(f" [Pipeline] Dense Indices (Top {len(dense_idx)}): {dense_idx[:10]}...") print(f" [Pipeline] Max Index: {max(dense_idx)} | Memory Size: {len(memory_cards)}") - + if not dense_idx: return [], np.array([]), np.array([]), [], np.array([]) candidates = [memory_cards[i] for i in dense_idx] candidate_docs = [c.note_text for c in candidates] - # 2. Rerank base score (P(yes|q,m)) + # 2. Rerank base score (P(yes|q,m)) - always use original query for reranking # Skip reranking if we have fewer candidates than topk_rerank (saves GPU memory) if len(candidates) <= topk_rerank: base_scores = np.ones(len(candidates)) # Uniform scores @@ -165,14 +263,25 @@ def retrieve_no_policy( topk_dense: int = 64, topk_rerank: int = 8, only_own_memories: bool = False, + queries: List[str] = None, + dynamic_topk: bool = False, + dynamic_min_k: int = 3, + dynamic_max_k: int = 8, + dynamic_score_ratio: float = 0.5, ) -> Tuple[List[MemoryCard], np.ndarray, np.ndarray, List[int], np.ndarray]: """ Deterministic retrieval baseline (NoPersonal mode): - Dense retrieval -> Rerank -> Top-K (no policy sampling, no user vector influence) - + + Args: + dynamic_topk: If True, use dynamic selection based on score distribution + dynamic_min_k: Minimum items to select (when dynamic_topk=True) + dynamic_max_k: Maximum items to select (when dynamic_topk=True) + dynamic_score_ratio: Threshold = top_score * ratio (when dynamic_topk=True) + Returns same structure as retrieve_with_policy for compatibility: (candidates, candidate_item_vectors, base_scores, chosen_indices, rerank_scores_for_chosen) - + Note: candidate_item_vectors is empty array (not used in NoPersonal mode) The last return value is rerank scores instead of policy probs """ @@ -183,14 +292,23 @@ def retrieve_no_policy( if not valid_indices: return [], np.array([]), np.array([]), [], np.array([]) - # 1. Dense retrieval - dense_idx = dense_topk_indices( - query, - embed_model, - memory_embeddings, - valid_indices=valid_indices, - topk=topk_dense - ) + # 1. Dense retrieval (multi-query if available) + if queries and len(queries) > 1: + dense_idx = dense_topk_indices_multi_query( + queries, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) + else: + dense_idx = dense_topk_indices( + query, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) if not dense_idx: return [], np.array([]), np.array([]), [], np.array([]) @@ -198,23 +316,33 @@ def retrieve_no_policy( candidates = [memory_cards[i] for i in dense_idx] candidate_docs = [c.note_text for c in candidates] - # 2. Rerank base score (P(yes|q,m)) - # Skip reranking if we have fewer candidates than topk_rerank (saves GPU memory) - if len(candidates) <= topk_rerank: + # 2. Rerank base score (P(yes|q,m)) - always use original query for reranking + max_k = dynamic_max_k if dynamic_topk else topk_rerank + + # Skip reranking if we have fewer candidates than needed + if len(candidates) <= max_k: # Just return all candidates without reranking base_scores = np.ones(len(candidates)) # Uniform scores chosen_indices = list(range(len(candidates))) else: base_scores = np.array(reranker.score(query, candidate_docs)) - # 3. Deterministic Top-K selection based on rerank scores ONLY (no policy) - k = min(topk_rerank, len(base_scores)) - top_indices_local = base_scores.argsort()[-k:][::-1] - chosen_indices = top_indices_local.tolist() + # 3. Selection: dynamic or fixed top-K + if dynamic_topk: + chosen_indices = dynamic_topk_selection( + base_scores, + min_k=dynamic_min_k, + max_k=dynamic_max_k, + score_ratio=dynamic_score_ratio, + ) + else: + k = min(topk_rerank, len(base_scores)) + top_indices_local = base_scores.argsort()[-k:][::-1] + chosen_indices = top_indices_local.tolist() # Get scores for chosen items (for logging compatibility) chosen_scores = base_scores[chosen_indices] - + # Return empty item vectors (not used in NoPersonal mode) # Return rerank scores as the "probs" field for logging compatibility return candidates, np.array([]), base_scores, chosen_indices, chosen_scores -- cgit v1.2.3