summaryrefslogtreecommitdiff
path: root/src/personalization/retrieval
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-02-10 20:16:36 +0000
committerYurenHao0426 <blackhao0426@gmail.com>2026-02-10 20:16:36 +0000
commit5626080ca4c4219aec4888d6b9406d0d3349fb55 (patch)
tree86287d9fd5833e11ccd78566992540f2664fd195 /src/personalization/retrieval
parenta2036838807428424bbbaff507a6563749a83145 (diff)
Add RAG rewrite, 60-session experiment scripts, and analysis tools
- RAG rewrite adapter and vector preference pipeline in personalized_llm - 60-session experiment queue scripts (reflection, rag, rag_vector, rag_rewrite) - Vector-preference correlation analysis and visualization scripts - Local reward model batch processing improvements - Updated CLAUDE.md with full experiment documentation and notes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'src/personalization/retrieval')
-rw-r--r--src/personalization/retrieval/pipeline.py184
-rw-r--r--src/personalization/retrieval/preference_store/schemas.py1
2 files changed, 157 insertions, 28 deletions
diff --git a/src/personalization/retrieval/pipeline.py b/src/personalization/retrieval/pipeline.py
index e83940d..6cc7f3e 100644
--- a/src/personalization/retrieval/pipeline.py
+++ b/src/personalization/retrieval/pipeline.py
@@ -12,6 +12,51 @@ def cosine_similarity_matrix(E: np.ndarray, e_q: np.ndarray) -> np.ndarray:
# E: [M, d], e_q: [d]
return np.dot(E, e_q)
+
+def dynamic_topk_selection(
+ scores: np.ndarray,
+ min_k: int = 3,
+ max_k: int = 8,
+ score_ratio: float = 0.5,
+) -> List[int]:
+ """
+ Dynamically select top-k indices based on score distribution.
+
+ Strategy:
+ 1. Sort by score descending
+ 2. Compute threshold = top_score * score_ratio
+ 3. Select all indices with score > threshold
+ 4. Clamp to [min_k, max_k] range
+
+ Args:
+ scores: Array of scores (higher = better)
+ min_k: Minimum number of items to select
+ max_k: Maximum number of items to select
+ score_ratio: Threshold ratio relative to top score
+
+ Returns:
+ List of selected indices (in descending score order)
+ """
+ if len(scores) == 0:
+ return []
+
+ # Sort indices by score descending
+ sorted_indices = np.argsort(scores)[::-1]
+ sorted_scores = scores[sorted_indices]
+
+ # Compute threshold
+ top_score = sorted_scores[0]
+ threshold = top_score * score_ratio
+
+ # Find how many pass threshold
+ n_above_threshold = np.sum(sorted_scores > threshold)
+
+ # Clamp to [min_k, max_k]
+ n_select = max(min_k, min(max_k, n_above_threshold))
+ n_select = min(n_select, len(scores)) # Don't exceed available
+
+ return sorted_indices[:n_select].tolist()
+
def dense_topk_indices(
query: str,
embed_model: EmbeddingModel,
@@ -58,6 +103,49 @@ def dense_topk_indices(
idx = np.argsort(sims)[-k:][::-1]
return idx.tolist()
+def dense_topk_indices_multi_query(
+ queries: List[str],
+ embed_model: EmbeddingModel,
+ memory_embeddings: np.ndarray,
+ valid_indices: List[int] = None,
+ topk: int = 64
+) -> List[int]:
+ """
+ Multi-query dense retrieval: embed all queries, take max similarity per memory,
+ return top-k by max similarity (union effect).
+ """
+ if len(memory_embeddings) == 0:
+ return []
+
+ # Embed all queries at once
+ e_qs = embed_model.encode(queries, normalize=True, return_tensor=False)
+ e_qs = np.array(e_qs, dtype=np.float32) # [Q, d]
+
+ if valid_indices is not None:
+ if len(valid_indices) == 0:
+ return []
+ E_sub = memory_embeddings[valid_indices]
+ # sims: [Q, M_sub]
+ sims = np.dot(e_qs, E_sub.T)
+ # max across queries per memory
+ max_sims = sims.max(axis=0) # [M_sub]
+ k = min(topk, len(max_sims))
+ if k == 0:
+ return []
+ idx_sub = np.argsort(max_sims)[-k:][::-1]
+ return [valid_indices[i] for i in idx_sub]
+
+ # Global search
+ # sims: [Q, M]
+ sims = np.dot(e_qs, memory_embeddings.T)
+ max_sims = sims.max(axis=0) # [M]
+ k = min(topk, len(max_sims))
+ if k == 0:
+ return []
+ idx = np.argsort(max_sims)[-k:][::-1]
+ return idx.tolist()
+
+
def retrieve_with_policy(
user_id: str,
query: str,
@@ -74,6 +162,7 @@ def retrieve_with_policy(
tau: float = 1.0,
only_own_memories: bool = False,
sample: bool = False,
+ queries: List[str] = None,
) -> Tuple[List[MemoryCard], np.ndarray, np.ndarray, List[int], np.ndarray]:
"""
Returns extended info for policy update:
@@ -90,28 +179,37 @@ def retrieve_with_policy(
if not valid_indices:
return [], np.array([]), np.array([]), [], np.array([])
- # 1. Dense retrieval
- dense_idx = dense_topk_indices(
- query,
- embed_model,
- memory_embeddings,
- valid_indices=valid_indices,
- topk=topk_dense
- )
+ # 1. Dense retrieval (multi-query if available)
+ if queries and len(queries) > 1:
+ dense_idx = dense_topk_indices_multi_query(
+ queries,
+ embed_model,
+ memory_embeddings,
+ valid_indices=valid_indices,
+ topk=topk_dense
+ )
+ else:
+ dense_idx = dense_topk_indices(
+ query,
+ embed_model,
+ memory_embeddings,
+ valid_indices=valid_indices,
+ topk=topk_dense
+ )
# DEBUG: Check for duplicates or out of bounds
if len(dense_idx) > 0:
import os
if os.getenv("RETRIEVAL_DEBUG") == "1":
print(f" [Pipeline] Dense Indices (Top {len(dense_idx)}): {dense_idx[:10]}...")
print(f" [Pipeline] Max Index: {max(dense_idx)} | Memory Size: {len(memory_cards)}")
-
+
if not dense_idx:
return [], np.array([]), np.array([]), [], np.array([])
candidates = [memory_cards[i] for i in dense_idx]
candidate_docs = [c.note_text for c in candidates]
- # 2. Rerank base score (P(yes|q,m))
+ # 2. Rerank base score (P(yes|q,m)) - always use original query for reranking
# Skip reranking if we have fewer candidates than topk_rerank (saves GPU memory)
if len(candidates) <= topk_rerank:
base_scores = np.ones(len(candidates)) # Uniform scores
@@ -165,14 +263,25 @@ def retrieve_no_policy(
topk_dense: int = 64,
topk_rerank: int = 8,
only_own_memories: bool = False,
+ queries: List[str] = None,
+ dynamic_topk: bool = False,
+ dynamic_min_k: int = 3,
+ dynamic_max_k: int = 8,
+ dynamic_score_ratio: float = 0.5,
) -> Tuple[List[MemoryCard], np.ndarray, np.ndarray, List[int], np.ndarray]:
"""
Deterministic retrieval baseline (NoPersonal mode):
- Dense retrieval -> Rerank -> Top-K (no policy sampling, no user vector influence)
-
+
+ Args:
+ dynamic_topk: If True, use dynamic selection based on score distribution
+ dynamic_min_k: Minimum items to select (when dynamic_topk=True)
+ dynamic_max_k: Maximum items to select (when dynamic_topk=True)
+ dynamic_score_ratio: Threshold = top_score * ratio (when dynamic_topk=True)
+
Returns same structure as retrieve_with_policy for compatibility:
(candidates, candidate_item_vectors, base_scores, chosen_indices, rerank_scores_for_chosen)
-
+
Note: candidate_item_vectors is empty array (not used in NoPersonal mode)
The last return value is rerank scores instead of policy probs
"""
@@ -183,14 +292,23 @@ def retrieve_no_policy(
if not valid_indices:
return [], np.array([]), np.array([]), [], np.array([])
- # 1. Dense retrieval
- dense_idx = dense_topk_indices(
- query,
- embed_model,
- memory_embeddings,
- valid_indices=valid_indices,
- topk=topk_dense
- )
+ # 1. Dense retrieval (multi-query if available)
+ if queries and len(queries) > 1:
+ dense_idx = dense_topk_indices_multi_query(
+ queries,
+ embed_model,
+ memory_embeddings,
+ valid_indices=valid_indices,
+ topk=topk_dense
+ )
+ else:
+ dense_idx = dense_topk_indices(
+ query,
+ embed_model,
+ memory_embeddings,
+ valid_indices=valid_indices,
+ topk=topk_dense
+ )
if not dense_idx:
return [], np.array([]), np.array([]), [], np.array([])
@@ -198,23 +316,33 @@ def retrieve_no_policy(
candidates = [memory_cards[i] for i in dense_idx]
candidate_docs = [c.note_text for c in candidates]
- # 2. Rerank base score (P(yes|q,m))
- # Skip reranking if we have fewer candidates than topk_rerank (saves GPU memory)
- if len(candidates) <= topk_rerank:
+ # 2. Rerank base score (P(yes|q,m)) - always use original query for reranking
+ max_k = dynamic_max_k if dynamic_topk else topk_rerank
+
+ # Skip reranking if we have fewer candidates than needed
+ if len(candidates) <= max_k:
# Just return all candidates without reranking
base_scores = np.ones(len(candidates)) # Uniform scores
chosen_indices = list(range(len(candidates)))
else:
base_scores = np.array(reranker.score(query, candidate_docs))
- # 3. Deterministic Top-K selection based on rerank scores ONLY (no policy)
- k = min(topk_rerank, len(base_scores))
- top_indices_local = base_scores.argsort()[-k:][::-1]
- chosen_indices = top_indices_local.tolist()
+ # 3. Selection: dynamic or fixed top-K
+ if dynamic_topk:
+ chosen_indices = dynamic_topk_selection(
+ base_scores,
+ min_k=dynamic_min_k,
+ max_k=dynamic_max_k,
+ score_ratio=dynamic_score_ratio,
+ )
+ else:
+ k = min(topk_rerank, len(base_scores))
+ top_indices_local = base_scores.argsort()[-k:][::-1]
+ chosen_indices = top_indices_local.tolist()
# Get scores for chosen items (for logging compatibility)
chosen_scores = base_scores[chosen_indices]
-
+
# Return empty item vectors (not used in NoPersonal mode)
# Return rerank scores as the "probs" field for logging compatibility
return candidates, np.array([]), base_scores, chosen_indices, chosen_scores
diff --git a/src/personalization/retrieval/preference_store/schemas.py b/src/personalization/retrieval/preference_store/schemas.py
index eb82558..5245025 100644
--- a/src/personalization/retrieval/preference_store/schemas.py
+++ b/src/personalization/retrieval/preference_store/schemas.py
@@ -45,3 +45,4 @@ class MemoryCard(BaseModel):
note_text: str # Summarized "condition: action" text
embedding_e: List[float] # The embedding vector
kind: Literal["pref", "fact"] = "pref"
+ is_global: bool = False # True = always include in prompt, bypass retrieval