From dc801c07cf38b0c495686463e6ca6f871a64440e Mon Sep 17 00:00:00 2001 From: YurenHao0426 Date: Tue, 27 Jan 2026 09:57:37 -0600 Subject: Add collaborativeagents module and update gitignore - Add collaborativeagents subproject with adapters, agents, and evaluation modules - Update .gitignore to exclude large binary files (.whl, .tar), wandb logs, and results Co-Authored-By: Claude Opus 4.5 --- .../evaluation/baselines/__init__.py | 7 + src/personalization/evaluation/baselines/base.py | 83 +++++++++ .../evaluation/baselines/no_memory.py | 143 +++++++++++++++ .../evaluation/baselines/rag_memory.py | 204 +++++++++++++++++++++ 4 files changed, 437 insertions(+) create mode 100644 src/personalization/evaluation/baselines/__init__.py create mode 100644 src/personalization/evaluation/baselines/base.py create mode 100644 src/personalization/evaluation/baselines/no_memory.py create mode 100644 src/personalization/evaluation/baselines/rag_memory.py (limited to 'src/personalization/evaluation/baselines') diff --git a/src/personalization/evaluation/baselines/__init__.py b/src/personalization/evaluation/baselines/__init__.py new file mode 100644 index 0000000..b6a5761 --- /dev/null +++ b/src/personalization/evaluation/baselines/__init__.py @@ -0,0 +1,7 @@ +from .base import BaselineAgent, AgentResponse +from .no_memory import NoMemoryAgent +from .rag_memory import RAGMemoryAgent + +__all__ = ["BaselineAgent", "AgentResponse", "NoMemoryAgent", "RAGMemoryAgent"] + + diff --git a/src/personalization/evaluation/baselines/base.py b/src/personalization/evaluation/baselines/base.py new file mode 100644 index 0000000..a3051bd --- /dev/null +++ b/src/personalization/evaluation/baselines/base.py @@ -0,0 +1,83 @@ +""" +Base class for all baseline agents. + +All agents must implement: +- respond(): Generate a response to user query +- end_session(): Called when a session ends (for memory updates) +- reset_user(): Reset all state for a user +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import List, Dict, Any, Optional + + +@dataclass +class AgentResponse: + """Response from an agent.""" + answer: str + debug_info: Dict[str, Any] = field(default_factory=dict) + + +class BaselineAgent(ABC): + """Abstract base class for all baseline agents.""" + + def __init__(self, model_name: str, **kwargs): + """ + Args: + model_name: Name/path of the LLM to use + **kwargs: Additional configuration + """ + self.model_name = model_name + self.config = kwargs + + @abstractmethod + def respond( + self, + user_id: str, + query: str, + conversation_history: List[Dict[str, str]], + **kwargs + ) -> AgentResponse: + """ + Generate a response to the user's query. + + Args: + user_id: Unique identifier for the user + query: Current user message + conversation_history: List of previous messages [{"role": "user/assistant", "content": "..."}] + **kwargs: Additional context (e.g., task info) + + Returns: + AgentResponse with answer and debug info + """ + pass + + @abstractmethod + def end_session(self, user_id: str, conversation: List[Dict[str, str]]): + """ + Called when a session (one task) ends. + Use this to update memory, notes, etc. + + Args: + user_id: User identifier + conversation: Complete conversation from this session + """ + pass + + @abstractmethod + def reset_user(self, user_id: str): + """ + Completely reset all state for a user. + Called at the start of a new experiment. + + Args: + user_id: User identifier + """ + pass + + def get_name(self) -> str: + """Get a descriptive name for this agent.""" + return self.__class__.__name__ + + diff --git a/src/personalization/evaluation/baselines/no_memory.py b/src/personalization/evaluation/baselines/no_memory.py new file mode 100644 index 0000000..bf4a7cf --- /dev/null +++ b/src/personalization/evaluation/baselines/no_memory.py @@ -0,0 +1,143 @@ +""" +No Memory Baseline (T1) + +A simple agent that has no memory of previous sessions. +Only sees the current conversation history within a session. +""" + +from typing import List, Dict, Any, Optional +import os + +from .base import BaselineAgent, AgentResponse + + +# System prompt for the agent +AGENT_SYSTEM_PROMPT = """You are a helpful AI assistant helping users solve problems. + +Guidelines: +- If the user's request is unclear, ask for clarification +- Provide clear, well-structured answers +- Adapt to user feedback and preferences expressed in the conversation +- Be helpful and do your best to solve the user's problem + +Your output should be a direct response to the user.""" + + +class NoMemoryAgent(BaselineAgent): + """ + T1: Base model with no memory. + + This agent: + - Has no memory across sessions + - Only uses current conversation context + - Represents the baseline "no personalization" case + """ + + def __init__( + self, + model_name: str = "llama-8b", + api_base: Optional[str] = None, + api_key: Optional[str] = None, + max_new_tokens: int = 512, + temperature: float = 0.7, + **kwargs + ): + super().__init__(model_name, **kwargs) + + self.api_base = api_base or os.getenv("OPENAI_API_BASE", "http://localhost:8003/v1") + self.api_key = api_key or os.getenv("OPENAI_API_KEY", "EMPTY") + self.max_new_tokens = max_new_tokens + self.temperature = temperature + + # Initialize client + self._init_client() + + def _init_client(self): + """Initialize the LLM client.""" + try: + import openai + self.client = openai.OpenAI( + base_url=self.api_base, + api_key=self.api_key, + ) + except Exception as e: + print(f"Warning: Could not initialize OpenAI client: {e}") + self.client = None + + def _build_messages( + self, + conversation_history: List[Dict[str, str]], + query: str, + ) -> List[Dict[str, str]]: + """Build messages for the LLM.""" + messages = [{"role": "system", "content": AGENT_SYSTEM_PROMPT}] + + # Add conversation history + for msg in conversation_history: + messages.append({ + "role": msg["role"], + "content": msg["content"], + }) + + # Add current query if not already in history + if not conversation_history or conversation_history[-1]["content"] != query: + messages.append({"role": "user", "content": query}) + + return messages + + def respond( + self, + user_id: str, + query: str, + conversation_history: List[Dict[str, str]], + **kwargs + ) -> AgentResponse: + """Generate response using only current conversation context.""" + + messages = self._build_messages(conversation_history, query) + + if self.client is None: + # Fallback for testing without LLM + return AgentResponse( + answer=f"[NoMemoryAgent] Response to: {query[:50]}...", + debug_info={"mode": "fallback", "num_messages": len(messages)}, + ) + + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=messages, + max_tokens=self.max_new_tokens, + temperature=self.temperature, + ) + + answer = response.choices[0].message.content + + return AgentResponse( + answer=answer, + debug_info={ + "num_messages": len(messages), + "prompt_tokens": response.usage.prompt_tokens if response.usage else 0, + "completion_tokens": response.usage.completion_tokens if response.usage else 0, + }, + ) + + except Exception as e: + print(f"Error calling LLM: {e}") + return AgentResponse( + answer=f"I apologize, but I encountered an error. Let me try again: {query[:100]}", + debug_info={"error": str(e)}, + ) + + def end_session(self, user_id: str, conversation: List[Dict[str, str]]): + """No-op for no-memory agent.""" + pass + + def reset_user(self, user_id: str): + """No-op for no-memory agent.""" + pass + + def get_name(self) -> str: + return f"NoMemory({self.model_name})" + + diff --git a/src/personalization/evaluation/baselines/rag_memory.py b/src/personalization/evaluation/baselines/rag_memory.py new file mode 100644 index 0000000..2b391c3 --- /dev/null +++ b/src/personalization/evaluation/baselines/rag_memory.py @@ -0,0 +1,204 @@ +""" +RAG Memory Baseline (Y3/Y4) + +Wraps the PersonalizedLLM for use in the evaluation framework. +Y3: Extractor + RAG (mode="nopersonal") +Y4: Extractor + RAG + User Vector (mode="full") +""" + +from typing import List, Dict, Any, Optional +import os +import sys + +from .base import BaselineAgent, AgentResponse + +# Add src to path for imports +_src_path = os.path.join(os.path.dirname(__file__), "../../../..") +if _src_path not in sys.path: + sys.path.insert(0, _src_path) + + +class RAGMemoryAgent(BaselineAgent): + """ + Y3/Y4: RAG-based memory with optional user vector. + + This agent: + - Extracts preferences from conversations using the extractor + - Stores preferences as memory cards + - Retrieves relevant memories using RAG for each query + - (Y4 only) Uses user vector to personalize retrieval + """ + + def __init__( + self, + model_name: str = "llama-8b", + mode: str = "nopersonal", # "nopersonal" for Y3, "full" for Y4 + memory_cards_path: str = None, + memory_embeddings_path: str = None, + enable_preference_extraction: bool = True, + enable_rl_updates: bool = False, + only_own_memories: bool = True, + **kwargs + ): + """ + Args: + model_name: LLM model to use + mode: "nopersonal" (Y3) or "full" (Y4) + memory_cards_path: Path to memory cards file + memory_embeddings_path: Path to embeddings file + enable_preference_extraction: Whether to extract preferences + enable_rl_updates: Whether to update user vectors (Y4 only) + only_own_memories: Only retrieve user's own memories + """ + super().__init__(model_name, **kwargs) + + self.mode = mode + self.enable_rl_updates = enable_rl_updates and (mode == "full") + + # Default paths + base_dir = os.path.join(os.path.dirname(__file__), "../../../../..") + self.memory_cards_path = memory_cards_path or os.path.join( + base_dir, "data/eval/memory_cards.jsonl" + ) + self.memory_embeddings_path = memory_embeddings_path or os.path.join( + base_dir, "data/eval/memory_embeddings.npy" + ) + + self.enable_preference_extraction = enable_preference_extraction + self.only_own_memories = only_own_memories + + # Lazy initialization + self._llm = None + self._initialized = False + + def _ensure_initialized(self): + """Lazy initialization of PersonalizedLLM.""" + if self._initialized: + return + + try: + from personalization.serving.personalized_llm import PersonalizedLLM + + self._llm = PersonalizedLLM( + mode=self.mode, + enable_preference_extraction=self.enable_preference_extraction, + enable_rl_updates=self.enable_rl_updates, + only_own_memories=self.only_own_memories, + memory_cards_path=self.memory_cards_path, + memory_embeddings_path=self.memory_embeddings_path, + eval_mode=True, # Deterministic selection + ) + self._initialized = True + + except Exception as e: + print(f"Warning: Could not initialize PersonalizedLLM: {e}") + print("Falling back to simple response mode.") + self._llm = None + self._initialized = True + + def respond( + self, + user_id: str, + query: str, + conversation_history: List[Dict[str, str]], + **kwargs + ) -> AgentResponse: + """Generate response using RAG memory.""" + + self._ensure_initialized() + + if self._llm is None: + # Fallback mode + return AgentResponse( + answer=f"[RAGMemoryAgent-{self.mode}] Response to: {query[:50]}...", + debug_info={"mode": "fallback"}, + ) + + try: + # Use PersonalizedLLM's chat interface + response = self._llm.chat(user_id, query) + + debug_info = { + "mode": self.mode, + "num_memories_retrieved": len(response.debug.selected_memory_ids) if response.debug else 0, + "selected_memories": response.debug.selected_memory_notes if response.debug else [], + "extracted_preferences": response.debug.extracted_preferences if response.debug else [], + } + + if response.debug and response.debug.extra: + debug_info.update(response.debug.extra) + + return AgentResponse( + answer=response.answer, + debug_info=debug_info, + ) + + except Exception as e: + print(f"Error in RAGMemoryAgent.respond: {e}") + return AgentResponse( + answer=f"I apologize for the error. Regarding: {query[:100]}", + debug_info={"error": str(e)}, + ) + + def end_session(self, user_id: str, conversation: List[Dict[str, str]]): + """ + Called at end of session. + PersonalizedLLM already extracts preferences during chat(), + so we just reset the session state. + """ + self._ensure_initialized() + + if self._llm is not None: + self._llm.reset_session(user_id) + + def reset_user(self, user_id: str): + """Reset all state for a user.""" + self._ensure_initialized() + + if self._llm is not None: + self._llm.reset_user(user_id) + + def apply_feedback(self, user_id: str, reward: float, gating: float = 1.0): + """ + Apply feedback for user vector updates (Y4 only). + + Args: + user_id: User identifier + reward: Reward signal (e.g., from preference satisfaction) + gating: Gating signal (1.0 = use this feedback, 0.0 = skip) + """ + if not self.enable_rl_updates or self._llm is None: + return + + try: + from personalization.serving.personalized_llm import Feedback + + feedback = Feedback( + user_id=user_id, + turn_id=0, # Not used in current implementation + reward=reward, + gating=gating, + ) + self._llm.apply_feedback(feedback) + + except Exception as e: + print(f"Error applying feedback: {e}") + + def get_user_state(self, user_id: str) -> Dict[str, Any]: + """Get user state summary (for Y4 analysis).""" + self._ensure_initialized() + + if self._llm is not None: + return self._llm.get_user_state_summary(user_id) + return {} + + def persist(self): + """Save all state to disk.""" + if self._llm is not None: + self._llm.persist() + + def get_name(self) -> str: + mode_name = "RAG" if self.mode == "nopersonal" else "RAG+UV" + return f"{mode_name}({self.model_name})" + + -- cgit v1.2.3