summaryrefslogtreecommitdiff
path: root/collaborativeagents/scripts/test_70b_pilot.py
diff options
context:
space:
mode:
Diffstat (limited to 'collaborativeagents/scripts/test_70b_pilot.py')
-rw-r--r--collaborativeagents/scripts/test_70b_pilot.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/collaborativeagents/scripts/test_70b_pilot.py b/collaborativeagents/scripts/test_70b_pilot.py
new file mode 100644
index 0000000..4bb27a3
--- /dev/null
+++ b/collaborativeagents/scripts/test_70b_pilot.py
@@ -0,0 +1,281 @@
+#!/usr/bin/env python3
+"""
+Pilot test for 70B AWQ user model.
+
+Tests:
+1. 70B AWQ model loads without OOM
+2. User simulation works correctly
+3. Multi-turn conversation completes
+4. Memory usage is acceptable
+
+Run with 4xA100 GPUs.
+"""
+
+import sys
+import json
+import torch
+from pathlib import Path
+
+# Add paths
+sys.path.insert(0, str(Path(__file__).parent.parent))
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+
+def print_gpu_memory():
+ """Print current GPU memory usage."""
+ print("\n=== GPU Memory Usage ===")
+ for i in range(torch.cuda.device_count()):
+ total = torch.cuda.get_device_properties(i).total_memory / 1e9
+ allocated = torch.cuda.memory_allocated(i) / 1e9
+ reserved = torch.cuda.memory_reserved(i) / 1e9
+ print(f" GPU {i}: {allocated:.1f}GB allocated, {reserved:.1f}GB reserved, {total:.1f}GB total")
+ print()
+
+
+def test_70b_user_agent():
+ """Test 70B user agent standalone."""
+ print("=" * 60)
+ print("TEST 1: 70B AWQ User Agent Loading")
+ print("=" * 60)
+
+ from agents.local_user_agent import LocalUserAgent, DEFAULT_MODEL_PATH
+
+ print(f"Default model path: {DEFAULT_MODEL_PATH}")
+ print(f"Is AWQ model: {'awq' in DEFAULT_MODEL_PATH.lower()}")
+
+ # Create user agent
+ user_agent = LocalUserAgent(
+ user_task_description="Help solve a math problem",
+ problem="What is 2 + 2?",
+ user_persona="A student learning math",
+ user_preferences="- Show step by step solutions\n- Use simple language",
+ )
+
+ print("\nGenerating user response...")
+ print_gpu_memory()
+
+ # Simulate a conversation
+ conversation = [{"role": "assistant", "content": "How can I help you today?"}]
+ response = user_agent.generate_user_response(conversation)
+
+ print_gpu_memory()
+
+ if response:
+ print(f"SUCCESS! User response: {response.get('response', 'N/A')[:200]}...")
+ print(f"Should terminate: {response.get('should_terminate', 'N/A')}")
+ return True
+ else:
+ print("FAILED! User agent returned None")
+ return False
+
+
+def test_multiturn_with_70b():
+ """Test multi-turn conversation with 70B user model."""
+ print("\n" + "=" * 60)
+ print("TEST 2: Multi-turn Conversation with 70B User Model")
+ print("=" * 60)
+
+ from agents.local_user_agent import SharedLocalUserAgent, TERMINATION_SIGNAL
+ from adapters.personalized_llm_adapter import create_baseline_adapter
+
+ # Create vanilla adapter (uses Qwen 1.5B for agent)
+ print("\nCreating vanilla adapter...")
+ adapter = create_baseline_adapter("vanilla")
+ adapter.initialize()
+
+ print_gpu_memory()
+
+ # Load a test profile
+ profile_path = Path(__file__).parent.parent / "data/complex_profiles_v2/profiles_100.jsonl"
+ with open(profile_path) as f:
+ profile = json.loads(f.readline())
+
+ print(f"Loaded profile: {profile.get('user_id', 'unknown')}")
+
+ # Create user agent with 70B model
+ problem = "What is 15% of 80?"
+ user_prefs = profile.get("preferences", [])[:3]
+ pref_str = "\n".join([f"- {p}" for p in user_prefs])
+
+ print(f"\nUser preferences:\n{pref_str}")
+
+ user_agent = SharedLocalUserAgent(
+ user_task_description="Solve the math problem",
+ problem=problem,
+ user_persona=profile.get("persona", "A user"),
+ user_preferences=pref_str,
+ )
+
+ print_gpu_memory()
+
+ # Start session
+ adapter.start_session(user_id=profile.get("user_id", "test"))
+
+ # Run multi-turn conversation
+ conversation = [{"role": "assistant", "content": "How can I help you today?"}]
+ turns = []
+ max_turns = 5
+
+ print(f"\nStarting {max_turns}-turn conversation...")
+
+ for turn_num in range(max_turns):
+ print(f"\n--- Turn {turn_num + 1} ---")
+
+ # User turn
+ user_response = user_agent.generate_user_response(conversation)
+ if user_response is None:
+ print("User agent failed!")
+ break
+
+ user_msg = user_response.get("response", "")
+ print(f"USER: {user_msg[:150]}...")
+
+ conversation.append({"role": "user", "content": user_msg})
+ turns.append({"role": "user", "content": user_msg})
+
+ # Check termination
+ if user_response.get("should_terminate", False) or TERMINATION_SIGNAL in user_msg:
+ print("\n[User terminated conversation]")
+ break
+
+ # Agent turn
+ response = adapter.generate_response(user_msg, conversation[:-1])
+ agent_msg = response.get("response", str(response)) if isinstance(response, dict) else str(response)
+ print(f"AGENT: {agent_msg[:150]}...")
+
+ conversation.append({"role": "assistant", "content": agent_msg})
+ turns.append({"role": "assistant", "content": agent_msg})
+
+ # End session
+ adapter.end_session()
+
+ print(f"\n--- Results ---")
+ print(f"Total turns: {len(turns)}")
+ print(f"User turns: {len([t for t in turns if t['role'] == 'user'])}")
+ print(f"Agent turns: {len([t for t in turns if t['role'] == 'assistant'])}")
+
+ print_gpu_memory()
+
+ return len(turns) > 2 # Success if more than single turn
+
+
+def test_memory_after_multiple_sessions():
+ """Test memory doesn't grow unboundedly after multiple sessions."""
+ print("\n" + "=" * 60)
+ print("TEST 3: Memory Stability Across Sessions")
+ print("=" * 60)
+
+ from agents.local_user_agent import SharedLocalUserAgent, TERMINATION_SIGNAL
+ from adapters.personalized_llm_adapter import create_baseline_adapter
+
+ adapter = create_baseline_adapter("vanilla")
+ adapter.initialize()
+
+ profile_path = Path(__file__).parent.parent / "data/complex_profiles_v2/profiles_100.jsonl"
+ with open(profile_path) as f:
+ profile = json.loads(f.readline())
+
+ n_sessions = 3
+ print(f"\nRunning {n_sessions} sessions to check memory stability...")
+
+ for session_idx in range(n_sessions):
+ print(f"\n--- Session {session_idx + 1}/{n_sessions} ---")
+
+ user_agent = SharedLocalUserAgent(
+ user_task_description="Solve math",
+ problem=f"What is {session_idx + 1} + {session_idx + 2}?",
+ user_persona="A student",
+ user_preferences="- Be concise",
+ )
+
+ adapter.start_session(user_id=profile.get("user_id", "test"))
+
+ conversation = [{"role": "assistant", "content": "How can I help?"}]
+ for turn in range(3):
+ user_response = user_agent.generate_user_response(conversation)
+ if user_response is None or user_response.get("should_terminate"):
+ break
+ conversation.append({"role": "user", "content": user_response.get("response", "")})
+
+ response = adapter.generate_response(user_response.get("response", ""), conversation[:-1])
+ conversation.append({"role": "assistant", "content": response.get("response", str(response))})
+
+ adapter.end_session()
+ print_gpu_memory()
+
+ # Force garbage collection
+ import gc
+ gc.collect()
+ torch.cuda.empty_cache()
+
+ print("\nMemory stability test completed.")
+ return True
+
+
+if __name__ == "__main__":
+ import os
+ os.environ["HF_HOME"] = "/projects/bfqt/users/yurenh2/hf_cache/huggingface"
+
+ print("\n" + "=" * 60)
+ print("70B AWQ USER MODEL PILOT TEST")
+ print("=" * 60)
+ print(f"PyTorch version: {torch.__version__}")
+ print(f"CUDA available: {torch.cuda.is_available()}")
+ print(f"GPU count: {torch.cuda.device_count()}")
+
+ for i in range(torch.cuda.device_count()):
+ print(f" GPU {i}: {torch.cuda.get_device_name(i)}")
+
+ print_gpu_memory()
+
+ results = {}
+
+ # Test 1: User agent loading
+ try:
+ results["70b_load"] = test_70b_user_agent()
+ except Exception as e:
+ print(f"TEST 1 FAILED: {e}")
+ import traceback
+ traceback.print_exc()
+ results["70b_load"] = False
+
+ # Test 2: Multi-turn conversation (only if test 1 passed)
+ if results.get("70b_load", False):
+ try:
+ results["multiturn"] = test_multiturn_with_70b()
+ except Exception as e:
+ print(f"TEST 2 FAILED: {e}")
+ import traceback
+ traceback.print_exc()
+ results["multiturn"] = False
+ else:
+ print("\nSkipping TEST 2 (TEST 1 failed)")
+ results["multiturn"] = False
+
+ # Test 3: Memory stability (only if test 2 passed)
+ if results.get("multiturn", False):
+ try:
+ results["memory_stable"] = test_memory_after_multiple_sessions()
+ except Exception as e:
+ print(f"TEST 3 FAILED: {e}")
+ import traceback
+ traceback.print_exc()
+ results["memory_stable"] = False
+ else:
+ print("\nSkipping TEST 3 (TEST 2 failed)")
+ results["memory_stable"] = False
+
+ # Summary
+ print("\n" + "=" * 60)
+ print("TEST SUMMARY")
+ print("=" * 60)
+ for test_name, passed in results.items():
+ status = "PASS" if passed else "FAIL"
+ print(f" {test_name}: {status}")
+
+ all_passed = all(results.values())
+ print(f"\nOverall: {'ALL TESTS PASSED - Ready for full experiment!' if all_passed else 'SOME TESTS FAILED'}")
+
+ print_gpu_memory()
+
+ sys.exit(0 if all_passed else 1)