diff options
| author | YurenHao0426 <Blackhao0426@gmail.com> | 2026-03-18 18:25:09 -0500 |
|---|---|---|
| committer | YurenHao0426 <Blackhao0426@gmail.com> | 2026-03-18 18:25:09 -0500 |
| commit | b6c3e4e51eeab703b40284459c6e9fff2151216c (patch) | |
| tree | 221410886f23214575f93b9ef44fa8431c9a6dfc /src/personalization/retrieval | |
Initial release: VARS - personalized LLM with RAG and user vector learning
Diffstat (limited to 'src/personalization/retrieval')
12 files changed, 436 insertions, 0 deletions
diff --git a/src/personalization/retrieval/__init__.py b/src/personalization/retrieval/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/__init__.py diff --git a/src/personalization/retrieval/chunking/__init__.py b/src/personalization/retrieval/chunking/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/chunking/__init__.py diff --git a/src/personalization/retrieval/chunking/rules.py b/src/personalization/retrieval/chunking/rules.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/chunking/rules.py diff --git a/src/personalization/retrieval/pipeline.py b/src/personalization/retrieval/pipeline.py new file mode 100644 index 0000000..6cc7f3e --- /dev/null +++ b/src/personalization/retrieval/pipeline.py @@ -0,0 +1,388 @@ +from typing import List, Tuple +import numpy as np + +from personalization.models.embedding.base import EmbeddingModel +from personalization.models.reranker.base import Reranker +from personalization.retrieval.preference_store.schemas import MemoryCard +from personalization.user_model.tensor_store import UserTensorStore, UserState +from personalization.user_model.scoring import score_with_user +from personalization.user_model.policy.reinforce import compute_policy_scores + +def cosine_similarity_matrix(E: np.ndarray, e_q: np.ndarray) -> np.ndarray: + # E: [M, d], e_q: [d] + return np.dot(E, e_q) + + +def dynamic_topk_selection( + scores: np.ndarray, + min_k: int = 3, + max_k: int = 8, + score_ratio: float = 0.5, +) -> List[int]: + """ + Dynamically select top-k indices based on score distribution. + + Strategy: + 1. Sort by score descending + 2. Compute threshold = top_score * score_ratio + 3. Select all indices with score > threshold + 4. Clamp to [min_k, max_k] range + + Args: + scores: Array of scores (higher = better) + min_k: Minimum number of items to select + max_k: Maximum number of items to select + score_ratio: Threshold ratio relative to top score + + Returns: + List of selected indices (in descending score order) + """ + if len(scores) == 0: + return [] + + # Sort indices by score descending + sorted_indices = np.argsort(scores)[::-1] + sorted_scores = scores[sorted_indices] + + # Compute threshold + top_score = sorted_scores[0] + threshold = top_score * score_ratio + + # Find how many pass threshold + n_above_threshold = np.sum(sorted_scores > threshold) + + # Clamp to [min_k, max_k] + n_select = max(min_k, min(max_k, n_above_threshold)) + n_select = min(n_select, len(scores)) # Don't exceed available + + return sorted_indices[:n_select].tolist() + +def dense_topk_indices( + query: str, + embed_model: EmbeddingModel, + memory_embeddings: np.ndarray, + valid_indices: List[int] = None, + topk: int = 64 +) -> List[int]: + """ + Return indices of topk memories based on dense embedding similarity. + If valid_indices is provided, only search within that subset. + """ + if valid_indices is not None and len(valid_indices) == 0: + return [] + + e_q_list = embed_model.encode([query], normalize=True, return_tensor=False) + e_q = np.array(e_q_list[0], dtype=np.float32) + + # Select subset of embeddings if restricted + if valid_indices is not None: + # subset_embeddings = memory_embeddings[valid_indices] + # But valid_indices might be arbitrary. + # Efficient way: only dot product with subset + # E_sub: [M_sub, d] + E_sub = memory_embeddings[valid_indices] + sims_sub = np.dot(E_sub, e_q) + + # Topk within subset + k = min(topk, len(sims_sub)) + if k == 0: + return [] + + # argsort gives indices relative to E_sub (0..M_sub-1) + # We need to map back to original indices + idx_sub = np.argsort(sims_sub)[-k:][::-1] + + return [valid_indices[i] for i in idx_sub] + + # Global search + sims = np.dot(memory_embeddings, e_q) + k = min(topk, len(memory_embeddings)) + if k == 0: + return [] + + idx = np.argsort(sims)[-k:][::-1] + return idx.tolist() + +def dense_topk_indices_multi_query( + queries: List[str], + embed_model: EmbeddingModel, + memory_embeddings: np.ndarray, + valid_indices: List[int] = None, + topk: int = 64 +) -> List[int]: + """ + Multi-query dense retrieval: embed all queries, take max similarity per memory, + return top-k by max similarity (union effect). + """ + if len(memory_embeddings) == 0: + return [] + + # Embed all queries at once + e_qs = embed_model.encode(queries, normalize=True, return_tensor=False) + e_qs = np.array(e_qs, dtype=np.float32) # [Q, d] + + if valid_indices is not None: + if len(valid_indices) == 0: + return [] + E_sub = memory_embeddings[valid_indices] + # sims: [Q, M_sub] + sims = np.dot(e_qs, E_sub.T) + # max across queries per memory + max_sims = sims.max(axis=0) # [M_sub] + k = min(topk, len(max_sims)) + if k == 0: + return [] + idx_sub = np.argsort(max_sims)[-k:][::-1] + return [valid_indices[i] for i in idx_sub] + + # Global search + # sims: [Q, M] + sims = np.dot(e_qs, memory_embeddings.T) + max_sims = sims.max(axis=0) # [M] + k = min(topk, len(max_sims)) + if k == 0: + return [] + idx = np.argsort(max_sims)[-k:][::-1] + return idx.tolist() + + +def retrieve_with_policy( + user_id: str, + query: str, + embed_model: EmbeddingModel, + reranker: Reranker, + memory_cards: List[MemoryCard], + memory_embeddings: np.ndarray, # shape: [M, d] + user_store: UserTensorStore, + item_vectors: np.ndarray, # shape: [M, k], v_m + topk_dense: int = 64, + topk_rerank: int = 8, + beta_long: float = 0.0, + beta_short: float = 0.0, + tau: float = 1.0, + only_own_memories: bool = False, + sample: bool = False, + queries: List[str] = None, +) -> Tuple[List[MemoryCard], np.ndarray, np.ndarray, List[int], np.ndarray]: + """ + Returns extended info for policy update: + (candidates, candidate_item_vectors, base_scores, chosen_indices, policy_probs) + + Args: + sample: If True, use stochastic sampling from policy distribution (for training/exploration). + If False, use deterministic top-k by policy scores (for evaluation). + """ + # 0. Filter indices if needed + valid_indices = None + if only_own_memories: + valid_indices = [i for i, card in enumerate(memory_cards) if card.user_id == user_id] + if not valid_indices: + return [], np.array([]), np.array([]), [], np.array([]) + + # 1. Dense retrieval (multi-query if available) + if queries and len(queries) > 1: + dense_idx = dense_topk_indices_multi_query( + queries, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) + else: + dense_idx = dense_topk_indices( + query, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) + # DEBUG: Check for duplicates or out of bounds + if len(dense_idx) > 0: + import os + if os.getenv("RETRIEVAL_DEBUG") == "1": + print(f" [Pipeline] Dense Indices (Top {len(dense_idx)}): {dense_idx[:10]}...") + print(f" [Pipeline] Max Index: {max(dense_idx)} | Memory Size: {len(memory_cards)}") + + if not dense_idx: + return [], np.array([]), np.array([]), [], np.array([]) + + candidates = [memory_cards[i] for i in dense_idx] + candidate_docs = [c.note_text for c in candidates] + + # 2. Rerank base score (P(yes|q,m)) - always use original query for reranking + # Skip reranking if we have fewer candidates than topk_rerank (saves GPU memory) + if len(candidates) <= topk_rerank: + base_scores = np.ones(len(candidates)) # Uniform scores + else: + base_scores = np.array(reranker.score(query, candidate_docs)) + + # 3. Policy Scoring (Softmax) + user_state: UserState = user_store.get_state(user_id) + candidate_vectors = item_vectors[dense_idx] # [K, k] + + policy_out = compute_policy_scores( + base_scores=base_scores, + user_state=user_state, + item_vectors=candidate_vectors, + beta_long=beta_long, + beta_short=beta_short, + tau=tau + ) + + # 4. Selection: Greedy (eval) or Stochastic (training) + k = min(topk_rerank, len(policy_out.scores)) + + if sample: + # Stochastic sampling from policy distribution (for training/exploration) + # Sample k indices without replacement, weighted by policy probs + probs = policy_out.probs + # Normalize to ensure sum to 1 (handle numerical issues) + probs = probs / (probs.sum() + 1e-10) + # Sample without replacement + chosen_indices = np.random.choice( + len(probs), size=k, replace=False, p=probs + ).tolist() + else: + # Deterministic top-k by policy scores (for evaluation) + top_indices_local = policy_out.scores.argsort()[-k:][::-1] + chosen_indices = top_indices_local.tolist() + + import os + if os.getenv("RETRIEVAL_DEBUG") == "1": + print(f" [Pipeline] Candidates: {len(candidates)} | Chosen Indices: {chosen_indices} | Sample: {sample}") + + return candidates, candidate_vectors, base_scores, chosen_indices, policy_out.probs + +def retrieve_no_policy( + user_id: str, + query: str, + embed_model: EmbeddingModel, + reranker: Reranker, + memory_cards: List[MemoryCard], + memory_embeddings: np.ndarray, # shape: [M, d] + topk_dense: int = 64, + topk_rerank: int = 8, + only_own_memories: bool = False, + queries: List[str] = None, + dynamic_topk: bool = False, + dynamic_min_k: int = 3, + dynamic_max_k: int = 8, + dynamic_score_ratio: float = 0.5, +) -> Tuple[List[MemoryCard], np.ndarray, np.ndarray, List[int], np.ndarray]: + """ + Deterministic retrieval baseline (NoPersonal mode): + - Dense retrieval -> Rerank -> Top-K (no policy sampling, no user vector influence) + + Args: + dynamic_topk: If True, use dynamic selection based on score distribution + dynamic_min_k: Minimum items to select (when dynamic_topk=True) + dynamic_max_k: Maximum items to select (when dynamic_topk=True) + dynamic_score_ratio: Threshold = top_score * ratio (when dynamic_topk=True) + + Returns same structure as retrieve_with_policy for compatibility: + (candidates, candidate_item_vectors, base_scores, chosen_indices, rerank_scores_for_chosen) + + Note: candidate_item_vectors is empty array (not used in NoPersonal mode) + The last return value is rerank scores instead of policy probs + """ + # 0. Filter indices if needed + valid_indices = None + if only_own_memories: + valid_indices = [i for i, card in enumerate(memory_cards) if card.user_id == user_id] + if not valid_indices: + return [], np.array([]), np.array([]), [], np.array([]) + + # 1. Dense retrieval (multi-query if available) + if queries and len(queries) > 1: + dense_idx = dense_topk_indices_multi_query( + queries, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) + else: + dense_idx = dense_topk_indices( + query, + embed_model, + memory_embeddings, + valid_indices=valid_indices, + topk=topk_dense + ) + + if not dense_idx: + return [], np.array([]), np.array([]), [], np.array([]) + + candidates = [memory_cards[i] for i in dense_idx] + candidate_docs = [c.note_text for c in candidates] + + # 2. Rerank base score (P(yes|q,m)) - always use original query for reranking + max_k = dynamic_max_k if dynamic_topk else topk_rerank + + # Skip reranking if we have fewer candidates than needed + if len(candidates) <= max_k: + # Just return all candidates without reranking + base_scores = np.ones(len(candidates)) # Uniform scores + chosen_indices = list(range(len(candidates))) + else: + base_scores = np.array(reranker.score(query, candidate_docs)) + + # 3. Selection: dynamic or fixed top-K + if dynamic_topk: + chosen_indices = dynamic_topk_selection( + base_scores, + min_k=dynamic_min_k, + max_k=dynamic_max_k, + score_ratio=dynamic_score_ratio, + ) + else: + k = min(topk_rerank, len(base_scores)) + top_indices_local = base_scores.argsort()[-k:][::-1] + chosen_indices = top_indices_local.tolist() + + # Get scores for chosen items (for logging compatibility) + chosen_scores = base_scores[chosen_indices] + + # Return empty item vectors (not used in NoPersonal mode) + # Return rerank scores as the "probs" field for logging compatibility + return candidates, np.array([]), base_scores, chosen_indices, chosen_scores + + +def retrieve_with_rerank( + user_id: str, + query: str, + embed_model: EmbeddingModel, + reranker: Reranker, + memory_cards: List[MemoryCard], + memory_embeddings: np.ndarray, # shape: [M, d] + user_store: UserTensorStore, + item_vectors: np.ndarray, # shape: [M, k], v_m + topk_dense: int = 64, + topk_rerank: int = 8, + beta_long: float = 0.0, + beta_short: float = 0.0, + only_own_memories: bool = False, +) -> List[MemoryCard]: + """ + Wrapper around retrieve_with_policy for standard inference. + """ + candidates, _, _, chosen_indices, _ = retrieve_with_policy( + user_id=user_id, + query=query, + embed_model=embed_model, + reranker=reranker, + memory_cards=memory_cards, + memory_embeddings=memory_embeddings, + user_store=user_store, + item_vectors=item_vectors, + topk_dense=topk_dense, + topk_rerank=topk_rerank, + beta_long=beta_long, + beta_short=beta_short, + tau=1.0, # Default tau + only_own_memories=only_own_memories + ) + + return [candidates[i] for i in chosen_indices] + + diff --git a/src/personalization/retrieval/preference_store/__init__.py b/src/personalization/retrieval/preference_store/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/preference_store/__init__.py diff --git a/src/personalization/retrieval/preference_store/base.py b/src/personalization/retrieval/preference_store/base.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/preference_store/base.py diff --git a/src/personalization/retrieval/preference_store/schemas.py b/src/personalization/retrieval/preference_store/schemas.py new file mode 100644 index 0000000..5245025 --- /dev/null +++ b/src/personalization/retrieval/preference_store/schemas.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from typing import List, Literal, Optional, Dict, Any + +from pydantic import BaseModel, Field, confloat + + +class Preference(BaseModel): + condition: str = Field( + ..., min_length=1, max_length=128, description="When the rule applies" + ) + action: str = Field( + ..., min_length=1, max_length=256, description="What to do in that case" + ) + confidence: confloat(ge=0.0, le=1.0) = Field( + ..., description="Confidence the rule is correct" + ) + + +class PreferenceList(BaseModel): + preferences: List[Preference] = Field(default_factory=list) + + +def preference_list_json_schema() -> dict: + return PreferenceList.model_json_schema() + + +class ChatTurn(BaseModel): + user_id: str + session_id: str + turn_id: int + role: Literal["user", "assistant"] + text: str + timestamp: Optional[float] = None + meta: Dict[str, Any] = Field(default_factory=dict) + + +class MemoryCard(BaseModel): + card_id: str + user_id: str + source_session_id: str + source_turn_ids: List[int] + raw_queries: List[str] # The original user utterances + preference_list: PreferenceList + note_text: str # Summarized "condition: action" text + embedding_e: List[float] # The embedding vector + kind: Literal["pref", "fact"] = "pref" + is_global: bool = False # True = always include in prompt, bypass retrieval diff --git a/src/personalization/retrieval/preference_store/vector_kv.py b/src/personalization/retrieval/preference_store/vector_kv.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/preference_store/vector_kv.py diff --git a/src/personalization/retrieval/rerank.py b/src/personalization/retrieval/rerank.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/rerank.py diff --git a/src/personalization/retrieval/store/__init__.py b/src/personalization/retrieval/store/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/store/__init__.py diff --git a/src/personalization/retrieval/store/base.py b/src/personalization/retrieval/store/base.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/store/base.py diff --git a/src/personalization/retrieval/store/faiss_store.py b/src/personalization/retrieval/store/faiss_store.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/retrieval/store/faiss_store.py |
