From dc801c07cf38b0c495686463e6ca6f871a64440e Mon Sep 17 00:00:00 2001 From: YurenHao0426 Date: Tue, 27 Jan 2026 09:57:37 -0600 Subject: Add collaborativeagents module and update gitignore - Add collaborativeagents subproject with adapters, agents, and evaluation modules - Update .gitignore to exclude large binary files (.whl, .tar), wandb logs, and results Co-Authored-By: Claude Opus 4.5 --- collaborativeagents/scripts/test_70b_pilot.py | 281 ++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 collaborativeagents/scripts/test_70b_pilot.py (limited to 'collaborativeagents/scripts/test_70b_pilot.py') diff --git a/collaborativeagents/scripts/test_70b_pilot.py b/collaborativeagents/scripts/test_70b_pilot.py new file mode 100644 index 0000000..4bb27a3 --- /dev/null +++ b/collaborativeagents/scripts/test_70b_pilot.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +""" +Pilot test for 70B AWQ user model. + +Tests: +1. 70B AWQ model loads without OOM +2. User simulation works correctly +3. Multi-turn conversation completes +4. Memory usage is acceptable + +Run with 4xA100 GPUs. +""" + +import sys +import json +import torch +from pathlib import Path + +# Add paths +sys.path.insert(0, str(Path(__file__).parent.parent)) +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +def print_gpu_memory(): + """Print current GPU memory usage.""" + print("\n=== GPU Memory Usage ===") + for i in range(torch.cuda.device_count()): + total = torch.cuda.get_device_properties(i).total_memory / 1e9 + allocated = torch.cuda.memory_allocated(i) / 1e9 + reserved = torch.cuda.memory_reserved(i) / 1e9 + print(f" GPU {i}: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved, {total:.1f}GB total") + print() + + +def test_70b_user_agent(): + """Test 70B user agent standalone.""" + print("=" * 60) + print("TEST 1: 70B AWQ User Agent Loading") + print("=" * 60) + + from agents.local_user_agent import LocalUserAgent, DEFAULT_MODEL_PATH + + print(f"Default model path: {DEFAULT_MODEL_PATH}") + print(f"Is AWQ model: {'awq' in DEFAULT_MODEL_PATH.lower()}") + + # Create user agent + user_agent = LocalUserAgent( + user_task_description="Help solve a math problem", + problem="What is 2 + 2?", + user_persona="A student learning math", + user_preferences="- Show step by step solutions\n- Use simple language", + ) + + print("\nGenerating user response...") + print_gpu_memory() + + # Simulate a conversation + conversation = [{"role": "assistant", "content": "How can I help you today?"}] + response = user_agent.generate_user_response(conversation) + + print_gpu_memory() + + if response: + print(f"SUCCESS! User response: {response.get('response', 'N/A')[:200]}...") + print(f"Should terminate: {response.get('should_terminate', 'N/A')}") + return True + else: + print("FAILED! User agent returned None") + return False + + +def test_multiturn_with_70b(): + """Test multi-turn conversation with 70B user model.""" + print("\n" + "=" * 60) + print("TEST 2: Multi-turn Conversation with 70B User Model") + print("=" * 60) + + from agents.local_user_agent import SharedLocalUserAgent, TERMINATION_SIGNAL + from adapters.personalized_llm_adapter import create_baseline_adapter + + # Create vanilla adapter (uses Qwen 1.5B for agent) + print("\nCreating vanilla adapter...") + adapter = create_baseline_adapter("vanilla") + adapter.initialize() + + print_gpu_memory() + + # Load a test profile + profile_path = Path(__file__).parent.parent / "data/complex_profiles_v2/profiles_100.jsonl" + with open(profile_path) as f: + profile = json.loads(f.readline()) + + print(f"Loaded profile: {profile.get('user_id', 'unknown')}") + + # Create user agent with 70B model + problem = "What is 15% of 80?" + user_prefs = profile.get("preferences", [])[:3] + pref_str = "\n".join([f"- {p}" for p in user_prefs]) + + print(f"\nUser preferences:\n{pref_str}") + + user_agent = SharedLocalUserAgent( + user_task_description="Solve the math problem", + problem=problem, + user_persona=profile.get("persona", "A user"), + user_preferences=pref_str, + ) + + print_gpu_memory() + + # Start session + adapter.start_session(user_id=profile.get("user_id", "test")) + + # Run multi-turn conversation + conversation = [{"role": "assistant", "content": "How can I help you today?"}] + turns = [] + max_turns = 5 + + print(f"\nStarting {max_turns}-turn conversation...") + + for turn_num in range(max_turns): + print(f"\n--- Turn {turn_num + 1} ---") + + # User turn + user_response = user_agent.generate_user_response(conversation) + if user_response is None: + print("User agent failed!") + break + + user_msg = user_response.get("response", "") + print(f"USER: {user_msg[:150]}...") + + conversation.append({"role": "user", "content": user_msg}) + turns.append({"role": "user", "content": user_msg}) + + # Check termination + if user_response.get("should_terminate", False) or TERMINATION_SIGNAL in user_msg: + print("\n[User terminated conversation]") + break + + # Agent turn + response = adapter.generate_response(user_msg, conversation[:-1]) + agent_msg = response.get("response", str(response)) if isinstance(response, dict) else str(response) + print(f"AGENT: {agent_msg[:150]}...") + + conversation.append({"role": "assistant", "content": agent_msg}) + turns.append({"role": "assistant", "content": agent_msg}) + + # End session + adapter.end_session() + + print(f"\n--- Results ---") + print(f"Total turns: {len(turns)}") + print(f"User turns: {len([t for t in turns if t['role'] == 'user'])}") + print(f"Agent turns: {len([t for t in turns if t['role'] == 'assistant'])}") + + print_gpu_memory() + + return len(turns) > 2 # Success if more than single turn + + +def test_memory_after_multiple_sessions(): + """Test memory doesn't grow unboundedly after multiple sessions.""" + print("\n" + "=" * 60) + print("TEST 3: Memory Stability Across Sessions") + print("=" * 60) + + from agents.local_user_agent import SharedLocalUserAgent, TERMINATION_SIGNAL + from adapters.personalized_llm_adapter import create_baseline_adapter + + adapter = create_baseline_adapter("vanilla") + adapter.initialize() + + profile_path = Path(__file__).parent.parent / "data/complex_profiles_v2/profiles_100.jsonl" + with open(profile_path) as f: + profile = json.loads(f.readline()) + + n_sessions = 3 + print(f"\nRunning {n_sessions} sessions to check memory stability...") + + for session_idx in range(n_sessions): + print(f"\n--- Session {session_idx + 1}/{n_sessions} ---") + + user_agent = SharedLocalUserAgent( + user_task_description="Solve math", + problem=f"What is {session_idx + 1} + {session_idx + 2}?", + user_persona="A student", + user_preferences="- Be concise", + ) + + adapter.start_session(user_id=profile.get("user_id", "test")) + + conversation = [{"role": "assistant", "content": "How can I help?"}] + for turn in range(3): + user_response = user_agent.generate_user_response(conversation) + if user_response is None or user_response.get("should_terminate"): + break + conversation.append({"role": "user", "content": user_response.get("response", "")}) + + response = adapter.generate_response(user_response.get("response", ""), conversation[:-1]) + conversation.append({"role": "assistant", "content": response.get("response", str(response))}) + + adapter.end_session() + print_gpu_memory() + + # Force garbage collection + import gc + gc.collect() + torch.cuda.empty_cache() + + print("\nMemory stability test completed.") + return True + + +if __name__ == "__main__": + import os + os.environ["HF_HOME"] = "/projects/bfqt/users/yurenh2/hf_cache/huggingface" + + print("\n" + "=" * 60) + print("70B AWQ USER MODEL PILOT TEST") + print("=" * 60) + print(f"PyTorch version: {torch.__version__}") + print(f"CUDA available: {torch.cuda.is_available()}") + print(f"GPU count: {torch.cuda.device_count()}") + + for i in range(torch.cuda.device_count()): + print(f" GPU {i}: {torch.cuda.get_device_name(i)}") + + print_gpu_memory() + + results = {} + + # Test 1: User agent loading + try: + results["70b_load"] = test_70b_user_agent() + except Exception as e: + print(f"TEST 1 FAILED: {e}") + import traceback + traceback.print_exc() + results["70b_load"] = False + + # Test 2: Multi-turn conversation (only if test 1 passed) + if results.get("70b_load", False): + try: + results["multiturn"] = test_multiturn_with_70b() + except Exception as e: + print(f"TEST 2 FAILED: {e}") + import traceback + traceback.print_exc() + results["multiturn"] = False + else: + print("\nSkipping TEST 2 (TEST 1 failed)") + results["multiturn"] = False + + # Test 3: Memory stability (only if test 2 passed) + if results.get("multiturn", False): + try: + results["memory_stable"] = test_memory_after_multiple_sessions() + except Exception as e: + print(f"TEST 3 FAILED: {e}") + import traceback + traceback.print_exc() + results["memory_stable"] = False + else: + print("\nSkipping TEST 3 (TEST 2 failed)") + results["memory_stable"] = False + + # Summary + print("\n" + "=" * 60) + print("TEST SUMMARY") + print("=" * 60) + for test_name, passed in results.items(): + status = "PASS" if passed else "FAIL" + print(f" {test_name}: {status}") + + all_passed = all(results.values()) + print(f"\nOverall: {'ALL TESTS PASSED - Ready for full experiment!' if all_passed else 'SOME TESTS FAILED'}") + + print_gpu_memory() + + sys.exit(0 if all_passed else 1) -- cgit v1.2.3