summaryrefslogtreecommitdiff
path: root/src/personalization/feedback
diff options
context:
space:
mode:
Diffstat (limited to 'src/personalization/feedback')
-rw-r--r--src/personalization/feedback/__init__.py0
-rw-r--r--src/personalization/feedback/gating.py72
-rw-r--r--src/personalization/feedback/handlers.py50
-rw-r--r--src/personalization/feedback/online_update.py0
-rw-r--r--src/personalization/feedback/reward_model.py64
-rw-r--r--src/personalization/feedback/sampler.py109
-rw-r--r--src/personalization/feedback/schemas.py23
7 files changed, 318 insertions, 0 deletions
diff --git a/src/personalization/feedback/__init__.py b/src/personalization/feedback/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/personalization/feedback/__init__.py
diff --git a/src/personalization/feedback/gating.py b/src/personalization/feedback/gating.py
new file mode 100644
index 0000000..d741874
--- /dev/null
+++ b/src/personalization/feedback/gating.py
@@ -0,0 +1,72 @@
+import numpy as np
+from personalization.feedback.schemas import TurnSample
+
+def cosine_sim_batch(matrix: np.ndarray, vector: np.ndarray) -> np.ndarray:
+ # matrix: [N, d], vector: [d]
+ # return: [N]
+ norm_m = np.linalg.norm(matrix, axis=1)
+ norm_v = np.linalg.norm(vector)
+
+ # Avoid div by zero
+ den = norm_m * norm_v
+ den[den == 0] = 1e-9
+
+ return np.dot(matrix, vector) / den
+
+def estimate_retrieval_gating(sample: TurnSample, reward_hat: float) -> float:
+ """
+ Return g_t in [0,1], representing how much the reward is due to retrieval.
+ """
+ e_q = sample.query_embedding_t
+ e_q1 = sample.query_embedding_t1
+
+ if e_q is None or e_q1 is None or not sample.memories:
+ return 0.5 # Neutral
+
+ # We need embeddings of the memories.
+ # In a real pipeline, we might pass them in sample.memory_embeddings.
+ # If missing, we can't compute sim.
+ if sample.memory_embeddings is None:
+ # Try to use embedding_e from memory cards if available
+ # But MemoryCard.embedding_e is List[float]
+ try:
+ mem_embs = np.array([m.embedding_e for m in sample.memories])
+ if mem_embs.shape[1] == 0: # Empty embeddings
+ return 0.5
+ except:
+ return 0.5
+ else:
+ mem_embs = sample.memory_embeddings
+
+ # Compute similarities
+ # shape: [K]
+ sims_q = cosine_sim_batch(mem_embs, e_q)
+ sims_q1 = cosine_sim_batch(mem_embs, e_q1)
+
+ s_q_max = sims_q.max() if len(sims_q) > 0 else 0
+ s_q1_max = sims_q1.max() if len(sims_q1) > 0 else 0
+
+ g = 0.5
+
+ # Heuristics
+
+ # Case A: Retrieval clearly irrelevant + bad reward
+ # q_t / q_{t+1} have low similarity to memories -> likely retrieval failure (or no relevant memories)
+ if reward_hat < -0.5 and s_q_max < 0.2 and s_q1_max < 0.2:
+ g = 0.9 # Blame retrieval (for failing to find anything, or nothing exists)
+
+ # Case B: Retrieval looks good but reward is bad
+ # Memories are relevant to query, but user still unhappy -> LLM didn't use them well?
+ elif reward_hat < -0.5 and s_q_max > 0.5:
+ g = 0.2 # Likely LLM fault
+
+ # Case C: Good reward
+ # If reward is high, we assume both did okay.
+ elif reward_hat > 0.5:
+ if s_q_max > 0.4:
+ g = 0.6 # Retrieval helped
+ else:
+ g = 0.3 # LLM handled it without strong retrieval help
+
+ return float(g)
+
diff --git a/src/personalization/feedback/handlers.py b/src/personalization/feedback/handlers.py
new file mode 100644
index 0000000..60a8d17
--- /dev/null
+++ b/src/personalization/feedback/handlers.py
@@ -0,0 +1,50 @@
+from typing import Tuple, List, Optional
+import numpy as np
+
+from personalization.retrieval.preference_store.schemas import MemoryCard
+from personalization.feedback.schemas import TurnSample
+from personalization.feedback.reward_model import estimate_reward
+from personalization.feedback.gating import estimate_retrieval_gating
+
+def eval_step(
+ q_t: str,
+ answer_t: str,
+ q_t1: str,
+ memories_t: List[MemoryCard],
+ query_embedding_t: Optional[np.ndarray] = None,
+ query_embedding_t1: Optional[np.ndarray] = None,
+) -> Tuple[float, float]:
+ """
+ Unified evaluation interface.
+ Given (q_t, a_t, q_{t+1}, memories), returns (reward_hat, gating_hat).
+ """
+
+ # Construct a lightweight TurnSample
+ # We might need embeddings for gating. If not provided, gating might return default.
+
+ # Ensure memories have embeddings for gating
+ mem_embs = None
+ if memories_t and memories_t[0].embedding_e:
+ try:
+ mem_embs = np.array([m.embedding_e for m in memories_t])
+ except:
+ pass
+
+ sample = TurnSample(
+ user_id="", # Not needed for simple eval
+ session_id="",
+ turn_id=0,
+ query_t=q_t,
+ answer_t=answer_t,
+ query_t1=q_t1,
+ memories=memories_t,
+ query_embedding_t=query_embedding_t,
+ query_embedding_t1=query_embedding_t1,
+ memory_embeddings=mem_embs
+ )
+
+ r_hat = estimate_reward(sample)
+ g_hat = estimate_retrieval_gating(sample, r_hat)
+
+ return r_hat, g_hat
+
diff --git a/src/personalization/feedback/online_update.py b/src/personalization/feedback/online_update.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/personalization/feedback/online_update.py
diff --git a/src/personalization/feedback/reward_model.py b/src/personalization/feedback/reward_model.py
new file mode 100644
index 0000000..3584b43
--- /dev/null
+++ b/src/personalization/feedback/reward_model.py
@@ -0,0 +1,64 @@
+import numpy as np
+from personalization.feedback.schemas import TurnSample
+
+def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
+ norm_a = np.linalg.norm(a)
+ norm_b = np.linalg.norm(b)
+ if norm_a == 0 or norm_b == 0:
+ return 0.0
+ return float(np.dot(a, b) / (norm_a * norm_b))
+
+def estimate_reward(sample: TurnSample) -> float:
+ """
+ Return a scalar reward_hat, indicating if the previous answer was helpful.
+ Range: [-1.0, 1.0] (approx)
+ """
+
+ # 1. Language/Topic Coherence
+ if sample.query_embedding_t is None or sample.query_embedding_t1 is None:
+ topic_sim = 0.5
+ else:
+ topic_sim = cosine_sim(sample.query_embedding_t, sample.query_embedding_t1)
+
+ # 2. Negative Keywords (Complaint/Correction)
+ negative_keywords = [
+ "you didn't", "that's not", "incorrect", "redo", "again", "explain more",
+ "doesn't help", "wrong", "no", "not what i asked",
+ "你没", "不是", "这不是", "重来", "重新", "不对", "错了", "没说清楚"
+ ]
+
+ # 3. Positive Keywords (Follow-up/Elaboration)
+ positive_keywords = [
+ "can you elaborate", "give an example", "continue", "what if", "based on that",
+ "thanks", "good", "great", "cool",
+ "能不能详细一点", "举个例子", "再继续", "那如果", "接下来", "在这个基础上", "谢谢", "不错"
+ ]
+
+ q1_lower = sample.query_t1.lower()
+
+ has_negative = any(kw in q1_lower for kw in negative_keywords)
+ has_positive = any(kw in q1_lower for kw in positive_keywords)
+
+ reward = 0.0
+
+ if has_negative:
+ reward -= 1.0
+
+ if has_positive:
+ # Only reward if topic similarity is decent, otherwise might be "thanks, bye" (end of session)
+ # But "thanks" is good.
+ reward += 0.5
+ if topic_sim > 0.3:
+ reward += 0.5
+
+ if topic_sim < 0.2:
+ # Topic shift -> previous interaction likely finished or failed.
+ # If no explicit positive/negative, assume neutral/slightly decayed.
+ # If user changes topic, it often means the previous task is done (neutral/positive)
+ # OR they gave up (negative). Hard to tell.
+ # Let's dampen the reward towards 0.
+ reward *= 0.5
+
+ # Clip
+ return max(-1.0, min(1.0, reward))
+
diff --git a/src/personalization/feedback/sampler.py b/src/personalization/feedback/sampler.py
new file mode 100644
index 0000000..9e26912
--- /dev/null
+++ b/src/personalization/feedback/sampler.py
@@ -0,0 +1,109 @@
+from typing import Iterable, List, Optional
+import numpy as np
+from tqdm import tqdm
+
+from personalization.retrieval.preference_store.schemas import ChatTurn, MemoryCard
+from personalization.feedback.schemas import TurnSample
+from personalization.retrieval.pipeline import retrieve_with_rerank
+from personalization.models.llm.qwen_instruct import QwenInstruct
+from personalization.models.embedding.base import EmbeddingModel
+from personalization.models.reranker.base import Reranker
+from personalization.user_model.tensor_store import UserTensorStore
+
+def build_turn_samples_from_sessions(
+ sessions: Iterable[List[ChatTurn]],
+ embed_model: EmbeddingModel,
+ llm: QwenInstruct,
+ reranker: Reranker,
+ memory_cards: List[MemoryCard],
+ memory_embeddings: np.ndarray,
+ user_store: UserTensorStore,
+ item_vectors: np.ndarray,
+ max_samples: Optional[int] = None,
+ topk_dense: int = 64,
+ topk_rerank: int = 3,
+) -> List[TurnSample]:
+ samples = []
+
+ for turns in tqdm(sessions, desc="Building TurnSamples"):
+ if max_samples and len(samples) >= max_samples:
+ break
+
+ # Ensure sorted by turn_id
+ sorted_turns = sorted(turns, key=lambda x: x.turn_id)
+
+ # Iterate to find (q_t, a_t, q_{t+1})
+ for i in range(len(sorted_turns)):
+ if max_samples and len(samples) >= max_samples:
+ break
+
+ q_t = sorted_turns[i]
+ if q_t.role != "user":
+ continue
+
+ # Find next user turn
+ # Also try to find assistant response in between
+ a_t_text = ""
+ q_t1 = None
+
+ # Look ahead
+ for j in range(i + 1, len(sorted_turns)):
+ next_turn = sorted_turns[j]
+ if next_turn.role == "assistant" and not a_t_text:
+ a_t_text = next_turn.text
+ elif next_turn.role == "user":
+ q_t1 = next_turn
+ break
+
+ if not q_t1:
+ # End of session or no subsequent user query
+ continue
+
+ # We have q_t, a_t (optional but preferred), q_t1
+ # If a_t is missing, we might skip or use empty string.
+ # For RL, we usually need the answer to evaluate quality.
+ # If dataset doesn't have assistant turns, we might need to generate one?
+ # For now, let's proceed even if a_t is empty, or maybe require it.
+ if not a_t_text:
+ # Try to use LLM to generate if needed, but for offline sampling
+ # from existing chats, we prefer existing answers.
+ # If using OASST1, it should have assistant turns.
+ pass
+
+ # 3. Retrieve memories for q_t
+ memories_t = retrieve_with_rerank(
+ user_id=q_t.user_id,
+ query=q_t.text,
+ embed_model=embed_model,
+ reranker=reranker,
+ memory_cards=memory_cards,
+ memory_embeddings=memory_embeddings,
+ user_store=user_store,
+ item_vectors=item_vectors,
+ topk_dense=topk_dense,
+ topk_rerank=topk_rerank,
+ beta_long=0.0,
+ beta_short=0.0,
+ only_own_memories=True # Assume we want user specific memories
+ )
+
+ # 4. Precompute embeddings
+ # We can do this efficiently later or batch, but here per sample
+ e_q_t = embed_model.encode([q_t.text], return_tensor=False)[0]
+ e_q_t1 = embed_model.encode([q_t1.text], return_tensor=False)[0]
+
+ sample = TurnSample(
+ user_id=q_t.user_id,
+ session_id=q_t.session_id,
+ turn_id=q_t.turn_id,
+ query_t=q_t.text,
+ answer_t=a_t_text,
+ query_t1=q_t1.text,
+ memories=memories_t,
+ query_embedding_t=np.array(e_q_t),
+ query_embedding_t1=np.array(e_q_t1)
+ )
+ samples.append(sample)
+
+ return samples
+
diff --git a/src/personalization/feedback/schemas.py b/src/personalization/feedback/schemas.py
new file mode 100644
index 0000000..b15db80
--- /dev/null
+++ b/src/personalization/feedback/schemas.py
@@ -0,0 +1,23 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import List, Optional, Any
+import numpy as np
+
+from personalization.retrieval.preference_store.schemas import MemoryCard
+
+@dataclass
+class TurnSample:
+ user_id: str
+ session_id: str
+ turn_id: int # index of q_t within the session
+ query_t: str # q_t
+ answer_t: str # a_t
+ query_t1: str # q_{t+1}
+ memories: List[MemoryCard] # A_t
+
+ # Optional pre-computed vectors and features
+ query_embedding_t: Optional[np.ndarray] = None
+ query_embedding_t1: Optional[np.ndarray] = None
+ memory_embeddings: Optional[np.ndarray] = None # corresponding e_m or v_m for memories
+