diff options
Diffstat (limited to 'src/personalization/user_model')
| -rw-r--r-- | src/personalization/user_model/__init__.py | 0 | ||||
| -rw-r--r-- | src/personalization/user_model/features.py | 49 | ||||
| -rw-r--r-- | src/personalization/user_model/policy/__init__.py | 0 | ||||
| -rw-r--r-- | src/personalization/user_model/policy/optimizer.py | 0 | ||||
| -rw-r--r-- | src/personalization/user_model/policy/reinforce.py | 104 | ||||
| -rw-r--r-- | src/personalization/user_model/scoring.py | 25 | ||||
| -rw-r--r-- | src/personalization/user_model/session_state.py | 19 | ||||
| -rw-r--r-- | src/personalization/user_model/tensor_store.py | 80 |
8 files changed, 277 insertions, 0 deletions
diff --git a/src/personalization/user_model/__init__.py b/src/personalization/user_model/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/user_model/__init__.py diff --git a/src/personalization/user_model/features.py b/src/personalization/user_model/features.py new file mode 100644 index 0000000..a4508b4 --- /dev/null +++ b/src/personalization/user_model/features.py @@ -0,0 +1,49 @@ +import numpy as np +from dataclasses import dataclass +from sklearn.decomposition import PCA + +@dataclass +class ItemProjection: + P: np.ndarray # [k, d] + mean: np.ndarray # [d] + + @classmethod + def from_pca(cls, embeddings: np.ndarray, k: int) -> "ItemProjection": + """ + embeddings: [M, d] + """ + mean = embeddings.mean(axis=0) + centered = embeddings - mean + + # Ensure k is not larger than min(n_samples, n_features) + n_samples, n_features = embeddings.shape + actual_k = min(k, n_samples, n_features) + + pca = PCA(n_components=actual_k) + pca.fit(centered) + + # pca.components_: [k, d] + P = pca.components_ # Each row is a principal component vector + + # If we had to reduce k, we might want to pad P or handle it? + # For now, let's assume we get what we asked for or less if data is small. + # But for the system we want fixed k. + # If actual_k < k, we should pad with zeros to match expected dimension. + if actual_k < k: + padding = np.zeros((k - actual_k, n_features), dtype=P.dtype) + P = np.vstack([P, padding]) + + return cls(P=P, mean=mean) + + def transform_embeddings(self, E: np.ndarray) -> np.ndarray: + """ + E: [N, d] -> [N, k] + """ + return (E - self.mean) @ self.P.T + + def transform_vector(self, e: np.ndarray) -> np.ndarray: + """ + e: [d] -> [k] + """ + return self.P @ (e - self.mean) + diff --git a/src/personalization/user_model/policy/__init__.py b/src/personalization/user_model/policy/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/user_model/policy/__init__.py diff --git a/src/personalization/user_model/policy/optimizer.py b/src/personalization/user_model/policy/optimizer.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/personalization/user_model/policy/optimizer.py diff --git a/src/personalization/user_model/policy/reinforce.py b/src/personalization/user_model/policy/reinforce.py new file mode 100644 index 0000000..adfaef7 --- /dev/null +++ b/src/personalization/user_model/policy/reinforce.py @@ -0,0 +1,104 @@ +from typing import Sequence, List +from dataclasses import dataclass +import numpy as np + +from personalization.user_model.tensor_store import UserState + +@dataclass +class PolicyScores: + scores: np.ndarray # [K] s(q_t, m; u) + probs: np.ndarray # [K] π_z(m|q_t) + +def compute_policy_scores( + base_scores: np.ndarray, # [K], from reranker + user_state: UserState, + item_vectors: np.ndarray, # [K, k], v_m for the K candidates + beta_long: float, + beta_short: float, + tau: float, +) -> PolicyScores: + """ + Compute personalized scores and softmax probabilities. + s(q_t, m; u) = s_0(q_t,m) + z_t^{(eff)}.T @ v_m + z_t^{(eff)} = beta_long * z_long + beta_short * z_short + """ + if len(item_vectors) == 0: + return PolicyScores(scores=np.array([]), probs=np.array([])) + + z_eff = beta_long * user_state.z_long + beta_short * user_state.z_short + + # Calculate personalized term + # item_vectors: [K, k] + # z_eff: [k] + # term: [K] + personalization_term = np.dot(item_vectors, z_eff) + + # Total scores + scores = base_scores + personalization_term + + # Softmax + # Use exp(score/tau) + # Subtract max for stability + scaled_scores = scores / tau + exp_scores = np.exp(scaled_scores - np.max(scaled_scores)) + probs = exp_scores / np.sum(exp_scores) + + return PolicyScores(scores=scores, probs=probs) + +def reinforce_update_user_state( + user_state: UserState, + item_vectors: np.ndarray, # [K, k] for candidates + chosen_indices: Sequence[int], # indices of A_t in 0..K-1 + policy_probs: np.ndarray, # [K] π_z(m|q_t) + reward_hat: float, # \hat r_t + gating: float, # g_t + tau: float, + eta_long: float, + eta_short: float, + ema_alpha: float, + short_decay: float, +) -> bool: + """ + In-place update user_state.z_long / z_short / reward_ma via REINFORCE. + Returns True if update occurred, False otherwise. + """ + if len(chosen_indices) == 0: + return False + + # 1. Baseline Advantage + advantage = gating * (reward_hat - user_state.reward_ma) + + # Optimization: skip if advantage is negligible + if abs(advantage) < 1e-6: + return False + + # 2. Chosen Vector Average (v_{chosen,t}) + chosen_mask = np.zeros(len(item_vectors), dtype=np.float32) + for idx in chosen_indices: + idx_int = int(idx) + if 0 <= idx_int < len(item_vectors): + chosen_mask[idx_int] = 1.0 + + if chosen_mask.sum() == 0: + return False + + chosen_mask /= chosen_mask.sum() # Normalize to average + v_chosen = np.dot(chosen_mask, item_vectors) # [k] + + # 3. Expected Vector (\mu_t(z)) + # policy_probs: [K] + # item_vectors: [K, k] + v_expect = np.dot(policy_probs, item_vectors) # [k] + + # 4. Gradient Direction + grad = (advantage / tau) * (v_chosen - v_expect) + + # 5. Update Vectors + user_state.z_long += eta_long * grad + user_state.z_short = (1.0 - short_decay) * user_state.z_short + eta_short * grad + + # 6. Update Reward Baseline (EMA) + user_state.reward_ma = (1.0 - ema_alpha) * user_state.reward_ma + ema_alpha * reward_hat + + return True + diff --git a/src/personalization/user_model/scoring.py b/src/personalization/user_model/scoring.py new file mode 100644 index 0000000..75ffc84 --- /dev/null +++ b/src/personalization/user_model/scoring.py @@ -0,0 +1,25 @@ +import numpy as np +from .tensor_store import UserState + +def score_with_user( + base_score: float, + user_state: UserState, + v_m: np.ndarray, # [k] + beta_long: float, + beta_short: float, +) -> float: + """ + Personalized scoring: + s = base_score + (beta_long * z_long + beta_short * z_short) . v_m + Day2: beta_long = beta_short = 0 -> s == base_score + """ + z_eff = beta_long * user_state.z_long + beta_short * user_state.z_short + # dot product + # Ensure shapes match + if v_m.shape != z_eff.shape: + # Just in case of dimension mismatch + return float(base_score) + + term = np.dot(z_eff, v_m) + return float(base_score + term) + diff --git a/src/personalization/user_model/session_state.py b/src/personalization/user_model/session_state.py new file mode 100644 index 0000000..5cd2243 --- /dev/null +++ b/src/personalization/user_model/session_state.py @@ -0,0 +1,19 @@ +from dataclasses import dataclass, field +from typing import List, Optional +import numpy as np + +from personalization.retrieval.preference_store.schemas import ChatTurn, MemoryCard + +@dataclass +class OnlineSessionState: + user_id: str + history: List[ChatTurn] = field(default_factory=list) + last_query: Optional[str] = None + last_answer: Optional[str] = None + last_memories: List[MemoryCard] = field(default_factory=list) + last_query_embedding: Optional[np.ndarray] = None + last_candidate_item_vectors: Optional[np.ndarray] = None # [K, k] + last_policy_probs: Optional[np.ndarray] = None # [K] + last_chosen_indices: List[int] = field(default_factory=list) + + diff --git a/src/personalization/user_model/tensor_store.py b/src/personalization/user_model/tensor_store.py new file mode 100644 index 0000000..42dbf4e --- /dev/null +++ b/src/personalization/user_model/tensor_store.py @@ -0,0 +1,80 @@ +import numpy as np +from dataclasses import dataclass +from typing import Dict, Optional +import os + +@dataclass +class UserState: + user_id: str + z_long: np.ndarray # [k] + z_short: np.ndarray # [k] + reward_ma: float # baseline for reward, init 0.0 + +class UserTensorStore: + def __init__(self, k: int, path: str): + self.k = k + self.path = path + self._states: Dict[str, UserState] = {} + self._load() + + # Calculate global mean for initialization + if self._states: + z_all = np.stack([st.z_long for st in self._states.values()]) + self.global_init_z = np.mean(z_all, axis=0) + else: + self.global_init_z = np.zeros(self.k, dtype=np.float32) + + def _load(self): + if os.path.exists(self.path): + try: + data = np.load(self.path, allow_pickle=True) + # Assume saved as dict of user_id -> dict/object + # For simplicity, let's say we save a single dict in a .npy or .npz + # But np.save/load with pickle is tricky for complex objects. + # Let's save as .npz where each key is user_id and value is a structured array or just use z_long for now? + # A robust way for prototype: + # save multiple arrays: "u1_long", "u1_short", "u1_meta" + pass + # For Day 2 prototype, we might just re-init from init script or rely on memory if not persisting strictly. + # But let's try to load if we can. + + # Let's implement a simple npz schema: + # keys: "{uid}_long", "{uid}_short", "{uid}_meta" (meta=[reward_ma]) + for key in data.files: + if key.endswith("_long"): + uid = key[:-5] + z_long = data[key] + z_short = data.get(f"{uid}_short", np.zeros(self.k)) + meta = data.get(f"{uid}_meta", np.array([0.0])) + self._states[uid] = UserState(uid, z_long, z_short, float(meta[0])) + except Exception as e: + print(f"Warning: Failed to load UserStore from {self.path}: {e}") + + def _save(self): + # Save to npz + save_dict = {} + for uid, state in self._states.items(): + save_dict[f"{uid}_long"] = state.z_long + save_dict[f"{uid}_short"] = state.z_short + save_dict[f"{uid}_meta"] = np.array([state.reward_ma]) + np.savez(self.path, **save_dict) + + def get_state(self, user_id: str) -> UserState: + if user_id not in self._states: + # Lazy init with global mean for new users + state = UserState( + user_id=user_id, + z_long=self.global_init_z.copy(), + z_short=np.zeros(self.k, dtype=np.float32), + reward_ma=0.0, + ) + self._states[user_id] = state + return self._states[user_id] + + def save_state(self, state: UserState) -> None: + self._states[state.user_id] = state + + def persist(self): + """Public method to force save to disk.""" + self._save() + |
