#!/usr/bin/env python3 """ Pilot test for 70B AWQ user model. Tests: 1. 70B AWQ model loads without OOM 2. User simulation works correctly 3. Multi-turn conversation completes 4. Memory usage is acceptable Run with 4xA100 GPUs. """ import sys import json import torch from pathlib import Path # Add paths sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent)) def print_gpu_memory(): """Print current GPU memory usage.""" print("\n=== GPU Memory Usage ===") for i in range(torch.cuda.device_count()): total = torch.cuda.get_device_properties(i).total_memory / 1e9 allocated = torch.cuda.memory_allocated(i) / 1e9 reserved = torch.cuda.memory_reserved(i) / 1e9 print(f" GPU {i}: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved, {total:.1f}GB total") print() def test_70b_user_agent(): """Test 70B user agent standalone.""" print("=" * 60) print("TEST 1: 70B AWQ User Agent Loading") print("=" * 60) from agents.local_user_agent import LocalUserAgent, DEFAULT_MODEL_PATH print(f"Default model path: {DEFAULT_MODEL_PATH}") print(f"Is AWQ model: {'awq' in DEFAULT_MODEL_PATH.lower()}") # Create user agent user_agent = LocalUserAgent( user_task_description="Help solve a math problem", problem="What is 2 + 2?", user_persona="A student learning math", user_preferences="- Show step by step solutions\n- Use simple language", ) print("\nGenerating user response...") print_gpu_memory() # Simulate a conversation conversation = [{"role": "assistant", "content": "How can I help you today?"}] response = user_agent.generate_user_response(conversation) print_gpu_memory() if response: print(f"SUCCESS! User response: {response.get('response', 'N/A')[:200]}...") print(f"Should terminate: {response.get('should_terminate', 'N/A')}") return True else: print("FAILED! User agent returned None") return False def test_multiturn_with_70b(): """Test multi-turn conversation with 70B user model.""" print("\n" + "=" * 60) print("TEST 2: Multi-turn Conversation with 70B User Model") print("=" * 60) from agents.local_user_agent import SharedLocalUserAgent, TERMINATION_SIGNAL from adapters.personalized_llm_adapter import create_baseline_adapter # Create vanilla adapter (uses Qwen 1.5B for agent) print("\nCreating vanilla adapter...") adapter = create_baseline_adapter("vanilla") adapter.initialize() print_gpu_memory() # Load a test profile profile_path = Path(__file__).parent.parent / "data/complex_profiles_v2/profiles_100.jsonl" with open(profile_path) as f: profile = json.loads(f.readline()) print(f"Loaded profile: {profile.get('user_id', 'unknown')}") # Create user agent with 70B model problem = "What is 15% of 80?" user_prefs = profile.get("preferences", [])[:3] pref_str = "\n".join([f"- {p}" for p in user_prefs]) print(f"\nUser preferences:\n{pref_str}") user_agent = SharedLocalUserAgent( user_task_description="Solve the math problem", problem=problem, user_persona=profile.get("persona", "A user"), user_preferences=pref_str, ) print_gpu_memory() # Start session adapter.start_session(user_id=profile.get("user_id", "test")) # Run multi-turn conversation conversation = [{"role": "assistant", "content": "How can I help you today?"}] turns = [] max_turns = 5 print(f"\nStarting {max_turns}-turn conversation...") for turn_num in range(max_turns): print(f"\n--- Turn {turn_num + 1} ---") # User turn user_response = user_agent.generate_user_response(conversation) if user_response is None: print("User agent failed!") break user_msg = user_response.get("response", "") print(f"USER: {user_msg[:150]}...") conversation.append({"role": "user", "content": user_msg}) turns.append({"role": "user", "content": user_msg}) # Check termination if user_response.get("should_terminate", False) or TERMINATION_SIGNAL in user_msg: print("\n[User terminated conversation]") break # Agent turn response = adapter.generate_response(user_msg, conversation[:-1]) agent_msg = response.get("response", str(response)) if isinstance(response, dict) else str(response) print(f"AGENT: {agent_msg[:150]}...") conversation.append({"role": "assistant", "content": agent_msg}) turns.append({"role": "assistant", "content": agent_msg}) # End session adapter.end_session() print(f"\n--- Results ---") print(f"Total turns: {len(turns)}") print(f"User turns: {len([t for t in turns if t['role'] == 'user'])}") print(f"Agent turns: {len([t for t in turns if t['role'] == 'assistant'])}") print_gpu_memory() return len(turns) > 2 # Success if more than single turn def test_memory_after_multiple_sessions(): """Test memory doesn't grow unboundedly after multiple sessions.""" print("\n" + "=" * 60) print("TEST 3: Memory Stability Across Sessions") print("=" * 60) from agents.local_user_agent import SharedLocalUserAgent, TERMINATION_SIGNAL from adapters.personalized_llm_adapter import create_baseline_adapter adapter = create_baseline_adapter("vanilla") adapter.initialize() profile_path = Path(__file__).parent.parent / "data/complex_profiles_v2/profiles_100.jsonl" with open(profile_path) as f: profile = json.loads(f.readline()) n_sessions = 3 print(f"\nRunning {n_sessions} sessions to check memory stability...") for session_idx in range(n_sessions): print(f"\n--- Session {session_idx + 1}/{n_sessions} ---") user_agent = SharedLocalUserAgent( user_task_description="Solve math", problem=f"What is {session_idx + 1} + {session_idx + 2}?", user_persona="A student", user_preferences="- Be concise", ) adapter.start_session(user_id=profile.get("user_id", "test")) conversation = [{"role": "assistant", "content": "How can I help?"}] for turn in range(3): user_response = user_agent.generate_user_response(conversation) if user_response is None or user_response.get("should_terminate"): break conversation.append({"role": "user", "content": user_response.get("response", "")}) response = adapter.generate_response(user_response.get("response", ""), conversation[:-1]) conversation.append({"role": "assistant", "content": response.get("response", str(response))}) adapter.end_session() print_gpu_memory() # Force garbage collection import gc gc.collect() torch.cuda.empty_cache() print("\nMemory stability test completed.") return True if __name__ == "__main__": import os os.environ["HF_HOME"] = "/projects/bfqt/users/yurenh2/hf_cache/huggingface" print("\n" + "=" * 60) print("70B AWQ USER MODEL PILOT TEST") print("=" * 60) print(f"PyTorch version: {torch.__version__}") print(f"CUDA available: {torch.cuda.is_available()}") print(f"GPU count: {torch.cuda.device_count()}") for i in range(torch.cuda.device_count()): print(f" GPU {i}: {torch.cuda.get_device_name(i)}") print_gpu_memory() results = {} # Test 1: User agent loading try: results["70b_load"] = test_70b_user_agent() except Exception as e: print(f"TEST 1 FAILED: {e}") import traceback traceback.print_exc() results["70b_load"] = False # Test 2: Multi-turn conversation (only if test 1 passed) if results.get("70b_load", False): try: results["multiturn"] = test_multiturn_with_70b() except Exception as e: print(f"TEST 2 FAILED: {e}") import traceback traceback.print_exc() results["multiturn"] = False else: print("\nSkipping TEST 2 (TEST 1 failed)") results["multiturn"] = False # Test 3: Memory stability (only if test 2 passed) if results.get("multiturn", False): try: results["memory_stable"] = test_memory_after_multiple_sessions() except Exception as e: print(f"TEST 3 FAILED: {e}") import traceback traceback.print_exc() results["memory_stable"] = False else: print("\nSkipping TEST 3 (TEST 2 failed)") results["memory_stable"] = False # Summary print("\n" + "=" * 60) print("TEST SUMMARY") print("=" * 60) for test_name, passed in results.items(): status = "PASS" if passed else "FAIL" print(f" {test_name}: {status}") all_passed = all(results.values()) print(f"\nOverall: {'ALL TESTS PASSED - Ready for full experiment!' if all_passed else 'SOME TESTS FAILED'}") print_gpu_memory() sys.exit(0 if all_passed else 1)