diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-04 18:59:35 -0600 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-04 18:59:35 -0600 |
| commit | f1c2cc22d46a6976df3555391e667c7e61592fad (patch) | |
| tree | 0b37b52c8ff91042a742d3b3ec54542cb6d6e2f6 /run_experiments.py | |
Diffstat (limited to 'run_experiments.py')
| -rw-r--r-- | run_experiments.py | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/run_experiments.py b/run_experiments.py new file mode 100644 index 0000000..0cbcd67 --- /dev/null +++ b/run_experiments.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python3 +# run_experiments.py +""" +Experiment Runner for RLVR Floating-Point Precision Study. + +This script orchestrates the full experimental pipeline: +1. Training models with FP32 and bf16 precision across multiple seeds +2. Evaluating trained models on on-task and off-task benchmarks +3. Computing KL divergence and bf16 sparsity metrics + +Usage: + # Run full experiment + python run_experiments.py --mode full + + # Run training only + python run_experiments.py --mode train --precision_mode bf16 --seed 1 + + # Run evaluation only + python run_experiments.py --mode eval + + # Run analysis only + python run_experiments.py --mode analyze +""" + +import argparse +import json +import os +import subprocess +import sys +import logging +from typing import Dict, Any, List, Optional +from dataclasses import asdict +from concurrent.futures import ProcessPoolExecutor +import time + +from config import ( + ExperimentConfig, + make_training_config, + make_precision_config, + get_run_output_dir, + get_checkpoint_path, +) + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +# ============================================================================ +# Training Functions +# ============================================================================ + +def run_single_training( + precision_mode: str, + seed: int, + config: ExperimentConfig, + train_dataset_path: str, + dry_run: bool = False +) -> Dict[str, Any]: + """ + Run a single training job. + + Args: + precision_mode: "fp32" or "bf16" + seed: Random seed + config: Experiment configuration + train_dataset_path: Path to training data + dry_run: If True, only print command without running + + Returns: + Dictionary with job status + """ + output_dir = get_run_output_dir(config.train_logs_dir, precision_mode, seed) + + cmd = [ + sys.executable, + "train_rlvr.py", + "--precision_mode", precision_mode, + "--seed", str(seed), + "--output_dir", output_dir, + "--train_dataset_path", train_dataset_path, + "--model_name", config.base_model_path, + ] + + logger.info(f"Running training: {precision_mode} seed={seed}") + logger.info(f"Command: {' '.join(cmd)}") + + if dry_run: + return { + "status": "dry_run", + "precision_mode": precision_mode, + "seed": seed, + "output_dir": output_dir, + "command": " ".join(cmd), + } + + # Create output directory + os.makedirs(output_dir, exist_ok=True) + + # Run training + start_time = time.time() + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True + ) + duration = time.time() - start_time + + return { + "status": "success", + "precision_mode": precision_mode, + "seed": seed, + "output_dir": output_dir, + "duration_seconds": duration, + "stdout": result.stdout[-1000:], # Last 1000 chars + } + except subprocess.CalledProcessError as e: + return { + "status": "failed", + "precision_mode": precision_mode, + "seed": seed, + "output_dir": output_dir, + "error": str(e), + "stderr": e.stderr[-1000:] if e.stderr else None, + } + + +def run_all_training( + config: ExperimentConfig, + train_dataset_path: str, + dry_run: bool = False, + parallel: bool = False +) -> List[Dict[str, Any]]: + """ + Run training for all precision modes and seeds. + """ + jobs = [] + for precision_mode in config.precision_modes: + for seed in config.seeds: + jobs.append((precision_mode, seed)) + + results = [] + + if parallel and not dry_run: + # Run in parallel (one job per GPU assumed) + with ProcessPoolExecutor(max_workers=len(jobs)) as executor: + futures = [ + executor.submit( + run_single_training, + pm, s, config, train_dataset_path, dry_run + ) + for pm, s in jobs + ] + results = [f.result() for f in futures] + else: + # Run sequentially + for precision_mode, seed in jobs: + result = run_single_training( + precision_mode, seed, config, train_dataset_path, dry_run + ) + results.append(result) + + return results + + +# ============================================================================ +# Evaluation Functions +# ============================================================================ + +def run_single_evaluation( + precision_mode: str, + seed: int, + config: ExperimentConfig, + eval_base: bool = True, + dry_run: bool = False +) -> Dict[str, Any]: + """ + Run evaluation for a single trained model. + """ + run_dir = get_run_output_dir(config.train_logs_dir, precision_mode, seed) + ft_ckpt = get_checkpoint_path(run_dir) + output_path = os.path.join( + config.eval_metrics_dir, + f"{precision_mode}_seed{seed}.json" + ) + + cmd = [ + sys.executable, + "eval_policy.py", + "--base_ckpt", config.base_model_path, + "--ft_ckpt", ft_ckpt, + "--eval_tasks_config", config.eval_tasks_config_path, + "--output_path", output_path, + ] + + if eval_base: + cmd.append("--eval_base") + + logger.info(f"Running evaluation: {precision_mode} seed={seed}") + logger.info(f"Command: {' '.join(cmd)}") + + if dry_run: + return { + "status": "dry_run", + "precision_mode": precision_mode, + "seed": seed, + "output_path": output_path, + "command": " ".join(cmd), + } + + # Create output directory + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # Check if checkpoint exists + if not os.path.exists(ft_ckpt): + return { + "status": "skipped", + "precision_mode": precision_mode, + "seed": seed, + "reason": f"Checkpoint not found: {ft_ckpt}", + } + + # Run evaluation + start_time = time.time() + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True + ) + duration = time.time() - start_time + + return { + "status": "success", + "precision_mode": precision_mode, + "seed": seed, + "output_path": output_path, + "duration_seconds": duration, + } + except subprocess.CalledProcessError as e: + return { + "status": "failed", + "precision_mode": precision_mode, + "seed": seed, + "error": str(e), + "stderr": e.stderr[-1000:] if e.stderr else None, + } + + +def run_all_evaluations( + config: ExperimentConfig, + eval_base: bool = True, + dry_run: bool = False +) -> List[Dict[str, Any]]: + """ + Run evaluation for all trained models. + """ + results = [] + + for precision_mode in config.precision_modes: + for seed in config.seeds: + result = run_single_evaluation( + precision_mode, seed, config, eval_base, dry_run + ) + results.append(result) + + return results + + +# ============================================================================ +# bf16 Sparsity Analysis +# ============================================================================ + +def run_sparsity_analysis( + config: ExperimentConfig, + dry_run: bool = False +) -> List[Dict[str, Any]]: + """ + Compute bf16 sparsity for all bf16 runs. + """ + import torch + from transformers import AutoModelForCausalLM + from utils_bf16_sparsity import compute_bf16_sparsity, analyze_update_magnitudes + + results = [] + + # Only analyze bf16 runs + if "bf16" not in config.precision_modes: + logger.info("No bf16 runs to analyze for sparsity") + return results + + if dry_run: + for seed in config.seeds: + results.append({ + "status": "dry_run", + "precision_mode": "bf16", + "seed": seed, + }) + return results + + # Load base model once + logger.info(f"Loading base model: {config.base_model_path}") + base_model = AutoModelForCausalLM.from_pretrained( + config.base_model_path, + torch_dtype=torch.float32, + device_map="cpu" + ) + + for seed in config.seeds: + run_dir = get_run_output_dir(config.train_logs_dir, "bf16", seed) + ft_ckpt = get_checkpoint_path(run_dir) + + if not os.path.exists(ft_ckpt): + results.append({ + "status": "skipped", + "precision_mode": "bf16", + "seed": seed, + "reason": f"Checkpoint not found: {ft_ckpt}", + }) + continue + + logger.info(f"Computing sparsity for bf16 seed={seed}") + + # Load finetuned model + ft_model = AutoModelForCausalLM.from_pretrained( + ft_ckpt, + torch_dtype=torch.float32, + device_map="cpu" + ) + + # Compute sparsity + sparsity_result = compute_bf16_sparsity( + base_model=base_model, + finetuned_model=ft_model, + eta=config.bf16_sparsity_eta, + include_layer_stats=True + ) + + # Analyze update magnitudes + magnitude_result = analyze_update_magnitudes( + base_model=base_model, + finetuned_model=ft_model + ) + + # Save results + output_path = os.path.join( + config.eval_metrics_dir, + f"bf16_seed{seed}_sparsity.json" + ) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + full_result = { + "precision_mode": "bf16", + "seed": seed, + "sparsity": sparsity_result, + "magnitudes": magnitude_result, + } + + with open(output_path, "w") as f: + # Convert layer_stats to serializable format + serializable = { + k: v for k, v in full_result.items() + } + if "layer_stats" in serializable.get("sparsity", {}): + serializable["sparsity"]["layer_stats"] = { + k: {kk: vv for kk, vv in v.items() if kk != "shape"} + for k, v in serializable["sparsity"]["layer_stats"].items() + } + json.dump(serializable, f, indent=2, default=str) + + results.append({ + "status": "success", + "precision_mode": "bf16", + "seed": seed, + "sparsity_percent": sparsity_result["sparsity_percent"], + "output_path": output_path, + }) + + # Free memory + del ft_model + + return results + + +# ============================================================================ +# Main Entry Point +# ============================================================================ + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Run RLVR floating-point precision experiments" + ) + + parser.add_argument( + "--mode", + type=str, + default="full", + choices=["full", "train", "eval", "analyze", "sparsity"], + help="Execution mode" + ) + + # For single job mode + parser.add_argument( + "--precision_mode", + type=str, + choices=["fp32", "bf16"], + help="Precision mode (for train mode)" + ) + parser.add_argument( + "--seed", + type=int, + help="Random seed (for train mode)" + ) + + # Paths + parser.add_argument( + "--train_dataset_path", + type=str, + default="./data/dm_train.json", + help="Path to training dataset" + ) + parser.add_argument( + "--base_output_dir", + type=str, + default="./results", + help="Base output directory" + ) + parser.add_argument( + "--eval_tasks_config", + type=str, + default="./configs/eval_tasks_config.json", + help="Path to evaluation tasks config" + ) + parser.add_argument( + "--base_model", + type=str, + default="Qwen/Qwen2.5-Math-7B", + help="Base model path or HuggingFace ID" + ) + + # Execution options + parser.add_argument( + "--dry_run", + action="store_true", + help="Print commands without executing" + ) + parser.add_argument( + "--parallel", + action="store_true", + help="Run training jobs in parallel" + ) + parser.add_argument( + "--seeds", + type=int, + nargs="+", + default=[1, 2, 3, 4, 5], + help="Seeds to use" + ) + + return parser.parse_args() + + +def main() -> None: + """Main entry point.""" + args = parse_args() + + # Create experiment configuration + config = ExperimentConfig( + experiment_name="fp_precision_rlvr", + seeds=args.seeds, + precision_modes=["fp32", "bf16"], + base_model_path=args.base_model, + base_output_dir=args.base_output_dir, + train_logs_dir=os.path.join(args.base_output_dir, "train_logs"), + checkpoints_dir=os.path.join(args.base_output_dir, "checkpoints"), + eval_metrics_dir=os.path.join(args.base_output_dir, "eval_metrics"), + eval_tasks_config_path=args.eval_tasks_config, + ) + + # Create directories + os.makedirs(config.train_logs_dir, exist_ok=True) + os.makedirs(config.checkpoints_dir, exist_ok=True) + os.makedirs(config.eval_metrics_dir, exist_ok=True) + + # Save experiment config + config_path = os.path.join(args.base_output_dir, "experiment_config.json") + with open(config_path, "w") as f: + json.dump(asdict(config), f, indent=2) + logger.info(f"Saved experiment config to {config_path}") + + # Execute based on mode + if args.mode == "train": + if args.precision_mode and args.seed: + # Single training job + result = run_single_training( + args.precision_mode, + args.seed, + config, + args.train_dataset_path, + args.dry_run + ) + print(json.dumps(result, indent=2)) + else: + # All training jobs + results = run_all_training( + config, + args.train_dataset_path, + args.dry_run, + args.parallel + ) + print(json.dumps(results, indent=2)) + + elif args.mode == "eval": + results = run_all_evaluations(config, eval_base=True, dry_run=args.dry_run) + print(json.dumps(results, indent=2)) + + elif args.mode == "sparsity": + results = run_sparsity_analysis(config, dry_run=args.dry_run) + print(json.dumps(results, indent=2)) + + elif args.mode == "analyze": + # Run analysis script + analyze_cmd = [ + sys.executable, + "analyze_results.py", + "--results_dir", config.eval_metrics_dir, + "--output_dir", os.path.join(args.base_output_dir, "analysis"), + ] + if args.dry_run: + print(f"Would run: {' '.join(analyze_cmd)}") + else: + subprocess.run(analyze_cmd, check=True) + + elif args.mode == "full": + logger.info("="*60) + logger.info("RUNNING FULL EXPERIMENT PIPELINE") + logger.info("="*60) + + # Step 1: Training + logger.info("\n" + "="*60) + logger.info("STEP 1: Training") + logger.info("="*60) + train_results = run_all_training( + config, + args.train_dataset_path, + args.dry_run, + args.parallel + ) + + # Step 2: Evaluation + logger.info("\n" + "="*60) + logger.info("STEP 2: Evaluation") + logger.info("="*60) + eval_results = run_all_evaluations(config, dry_run=args.dry_run) + + # Step 3: Sparsity Analysis + logger.info("\n" + "="*60) + logger.info("STEP 3: Sparsity Analysis") + logger.info("="*60) + sparsity_results = run_sparsity_analysis(config, dry_run=args.dry_run) + + # Step 4: Results Analysis + logger.info("\n" + "="*60) + logger.info("STEP 4: Results Analysis") + logger.info("="*60) + if not args.dry_run: + analyze_cmd = [ + sys.executable, + "analyze_results.py", + "--results_dir", config.eval_metrics_dir, + "--output_dir", os.path.join(args.base_output_dir, "analysis"), + ] + subprocess.run(analyze_cmd, check=True) + + # Summary + logger.info("\n" + "="*60) + logger.info("EXPERIMENT COMPLETE") + logger.info("="*60) + + all_results = { + "training": train_results, + "evaluation": eval_results, + "sparsity": sparsity_results, + } + + summary_path = os.path.join(args.base_output_dir, "experiment_summary.json") + with open(summary_path, "w") as f: + json.dump(all_results, f, indent=2) + logger.info(f"Saved summary to {summary_path}") + + +if __name__ == "__main__": + main() + |
