#!/usr/bin/env python3 # run_experiments.py """ Experiment Runner for RLVR Floating-Point Precision Study. This script orchestrates the full experimental pipeline: 1. Training models with FP32 and bf16 precision across multiple seeds 2. Evaluating trained models on on-task and off-task benchmarks 3. Computing KL divergence and bf16 sparsity metrics Usage: # Run full experiment python run_experiments.py --mode full # Run training only python run_experiments.py --mode train --precision_mode bf16 --seed 1 # Run evaluation only python run_experiments.py --mode eval # Run analysis only python run_experiments.py --mode analyze """ import argparse import json import os import subprocess import sys import logging from typing import Dict, Any, List, Optional from dataclasses import asdict from concurrent.futures import ProcessPoolExecutor import time from config import ( ExperimentConfig, make_training_config, make_precision_config, get_run_output_dir, get_checkpoint_path, ) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # ============================================================================ # Training Functions # ============================================================================ def run_single_training( precision_mode: str, seed: int, config: ExperimentConfig, train_dataset_path: str, dry_run: bool = False ) -> Dict[str, Any]: """ Run a single training job. Args: precision_mode: "fp32" or "bf16" seed: Random seed config: Experiment configuration train_dataset_path: Path to training data dry_run: If True, only print command without running Returns: Dictionary with job status """ output_dir = get_run_output_dir(config.train_logs_dir, precision_mode, seed) cmd = [ sys.executable, "train_rlvr.py", "--precision_mode", precision_mode, "--seed", str(seed), "--output_dir", output_dir, "--train_dataset_path", train_dataset_path, "--model_name", config.base_model_path, ] logger.info(f"Running training: {precision_mode} seed={seed}") logger.info(f"Command: {' '.join(cmd)}") if dry_run: return { "status": "dry_run", "precision_mode": precision_mode, "seed": seed, "output_dir": output_dir, "command": " ".join(cmd), } # Create output directory os.makedirs(output_dir, exist_ok=True) # Run training start_time = time.time() try: result = subprocess.run( cmd, capture_output=True, text=True, check=True ) duration = time.time() - start_time return { "status": "success", "precision_mode": precision_mode, "seed": seed, "output_dir": output_dir, "duration_seconds": duration, "stdout": result.stdout[-1000:], # Last 1000 chars } except subprocess.CalledProcessError as e: return { "status": "failed", "precision_mode": precision_mode, "seed": seed, "output_dir": output_dir, "error": str(e), "stderr": e.stderr[-1000:] if e.stderr else None, } def run_all_training( config: ExperimentConfig, train_dataset_path: str, dry_run: bool = False, parallel: bool = False ) -> List[Dict[str, Any]]: """ Run training for all precision modes and seeds. """ jobs = [] for precision_mode in config.precision_modes: for seed in config.seeds: jobs.append((precision_mode, seed)) results = [] if parallel and not dry_run: # Run in parallel (one job per GPU assumed) with ProcessPoolExecutor(max_workers=len(jobs)) as executor: futures = [ executor.submit( run_single_training, pm, s, config, train_dataset_path, dry_run ) for pm, s in jobs ] results = [f.result() for f in futures] else: # Run sequentially for precision_mode, seed in jobs: result = run_single_training( precision_mode, seed, config, train_dataset_path, dry_run ) results.append(result) return results # ============================================================================ # Evaluation Functions # ============================================================================ def run_single_evaluation( precision_mode: str, seed: int, config: ExperimentConfig, eval_base: bool = True, dry_run: bool = False ) -> Dict[str, Any]: """ Run evaluation for a single trained model. """ run_dir = get_run_output_dir(config.train_logs_dir, precision_mode, seed) ft_ckpt = get_checkpoint_path(run_dir) output_path = os.path.join( config.eval_metrics_dir, f"{precision_mode}_seed{seed}.json" ) cmd = [ sys.executable, "eval_policy.py", "--base_ckpt", config.base_model_path, "--ft_ckpt", ft_ckpt, "--eval_tasks_config", config.eval_tasks_config_path, "--output_path", output_path, ] if eval_base: cmd.append("--eval_base") logger.info(f"Running evaluation: {precision_mode} seed={seed}") logger.info(f"Command: {' '.join(cmd)}") if dry_run: return { "status": "dry_run", "precision_mode": precision_mode, "seed": seed, "output_path": output_path, "command": " ".join(cmd), } # Create output directory os.makedirs(os.path.dirname(output_path), exist_ok=True) # Check if checkpoint exists if not os.path.exists(ft_ckpt): return { "status": "skipped", "precision_mode": precision_mode, "seed": seed, "reason": f"Checkpoint not found: {ft_ckpt}", } # Run evaluation start_time = time.time() try: result = subprocess.run( cmd, capture_output=True, text=True, check=True ) duration = time.time() - start_time return { "status": "success", "precision_mode": precision_mode, "seed": seed, "output_path": output_path, "duration_seconds": duration, } except subprocess.CalledProcessError as e: return { "status": "failed", "precision_mode": precision_mode, "seed": seed, "error": str(e), "stderr": e.stderr[-1000:] if e.stderr else None, } def run_all_evaluations( config: ExperimentConfig, eval_base: bool = True, dry_run: bool = False ) -> List[Dict[str, Any]]: """ Run evaluation for all trained models. """ results = [] for precision_mode in config.precision_modes: for seed in config.seeds: result = run_single_evaluation( precision_mode, seed, config, eval_base, dry_run ) results.append(result) return results # ============================================================================ # bf16 Sparsity Analysis # ============================================================================ def run_sparsity_analysis( config: ExperimentConfig, dry_run: bool = False ) -> List[Dict[str, Any]]: """ Compute bf16 sparsity for all bf16 runs. """ import torch from transformers import AutoModelForCausalLM from utils_bf16_sparsity import compute_bf16_sparsity, analyze_update_magnitudes results = [] # Only analyze bf16 runs if "bf16" not in config.precision_modes: logger.info("No bf16 runs to analyze for sparsity") return results if dry_run: for seed in config.seeds: results.append({ "status": "dry_run", "precision_mode": "bf16", "seed": seed, }) return results # Load base model once logger.info(f"Loading base model: {config.base_model_path}") base_model = AutoModelForCausalLM.from_pretrained( config.base_model_path, torch_dtype=torch.float32, device_map="cpu" ) for seed in config.seeds: run_dir = get_run_output_dir(config.train_logs_dir, "bf16", seed) ft_ckpt = get_checkpoint_path(run_dir) if not os.path.exists(ft_ckpt): results.append({ "status": "skipped", "precision_mode": "bf16", "seed": seed, "reason": f"Checkpoint not found: {ft_ckpt}", }) continue logger.info(f"Computing sparsity for bf16 seed={seed}") # Load finetuned model ft_model = AutoModelForCausalLM.from_pretrained( ft_ckpt, torch_dtype=torch.float32, device_map="cpu" ) # Compute sparsity sparsity_result = compute_bf16_sparsity( base_model=base_model, finetuned_model=ft_model, eta=config.bf16_sparsity_eta, include_layer_stats=True ) # Analyze update magnitudes magnitude_result = analyze_update_magnitudes( base_model=base_model, finetuned_model=ft_model ) # Save results output_path = os.path.join( config.eval_metrics_dir, f"bf16_seed{seed}_sparsity.json" ) os.makedirs(os.path.dirname(output_path), exist_ok=True) full_result = { "precision_mode": "bf16", "seed": seed, "sparsity": sparsity_result, "magnitudes": magnitude_result, } with open(output_path, "w") as f: # Convert layer_stats to serializable format serializable = { k: v for k, v in full_result.items() } if "layer_stats" in serializable.get("sparsity", {}): serializable["sparsity"]["layer_stats"] = { k: {kk: vv for kk, vv in v.items() if kk != "shape"} for k, v in serializable["sparsity"]["layer_stats"].items() } json.dump(serializable, f, indent=2, default=str) results.append({ "status": "success", "precision_mode": "bf16", "seed": seed, "sparsity_percent": sparsity_result["sparsity_percent"], "output_path": output_path, }) # Free memory del ft_model return results # ============================================================================ # Main Entry Point # ============================================================================ def parse_args() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Run RLVR floating-point precision experiments" ) parser.add_argument( "--mode", type=str, default="full", choices=["full", "train", "eval", "analyze", "sparsity"], help="Execution mode" ) # For single job mode parser.add_argument( "--precision_mode", type=str, choices=["fp32", "bf16"], help="Precision mode (for train mode)" ) parser.add_argument( "--seed", type=int, help="Random seed (for train mode)" ) # Paths parser.add_argument( "--train_dataset_path", type=str, default="./data/dm_train.json", help="Path to training dataset" ) parser.add_argument( "--base_output_dir", type=str, default="./results", help="Base output directory" ) parser.add_argument( "--eval_tasks_config", type=str, default="./configs/eval_tasks_config.json", help="Path to evaluation tasks config" ) parser.add_argument( "--base_model", type=str, default="Qwen/Qwen2.5-Math-7B", help="Base model path or HuggingFace ID" ) # Execution options parser.add_argument( "--dry_run", action="store_true", help="Print commands without executing" ) parser.add_argument( "--parallel", action="store_true", help="Run training jobs in parallel" ) parser.add_argument( "--seeds", type=int, nargs="+", default=[1, 2, 3, 4, 5], help="Seeds to use" ) return parser.parse_args() def main() -> None: """Main entry point.""" args = parse_args() # Create experiment configuration config = ExperimentConfig( experiment_name="fp_precision_rlvr", seeds=args.seeds, precision_modes=["fp32", "bf16"], base_model_path=args.base_model, base_output_dir=args.base_output_dir, train_logs_dir=os.path.join(args.base_output_dir, "train_logs"), checkpoints_dir=os.path.join(args.base_output_dir, "checkpoints"), eval_metrics_dir=os.path.join(args.base_output_dir, "eval_metrics"), eval_tasks_config_path=args.eval_tasks_config, ) # Create directories os.makedirs(config.train_logs_dir, exist_ok=True) os.makedirs(config.checkpoints_dir, exist_ok=True) os.makedirs(config.eval_metrics_dir, exist_ok=True) # Save experiment config config_path = os.path.join(args.base_output_dir, "experiment_config.json") with open(config_path, "w") as f: json.dump(asdict(config), f, indent=2) logger.info(f"Saved experiment config to {config_path}") # Execute based on mode if args.mode == "train": if args.precision_mode and args.seed: # Single training job result = run_single_training( args.precision_mode, args.seed, config, args.train_dataset_path, args.dry_run ) print(json.dumps(result, indent=2)) else: # All training jobs results = run_all_training( config, args.train_dataset_path, args.dry_run, args.parallel ) print(json.dumps(results, indent=2)) elif args.mode == "eval": results = run_all_evaluations(config, eval_base=True, dry_run=args.dry_run) print(json.dumps(results, indent=2)) elif args.mode == "sparsity": results = run_sparsity_analysis(config, dry_run=args.dry_run) print(json.dumps(results, indent=2)) elif args.mode == "analyze": # Run analysis script analyze_cmd = [ sys.executable, "analyze_results.py", "--results_dir", config.eval_metrics_dir, "--output_dir", os.path.join(args.base_output_dir, "analysis"), ] if args.dry_run: print(f"Would run: {' '.join(analyze_cmd)}") else: subprocess.run(analyze_cmd, check=True) elif args.mode == "full": logger.info("="*60) logger.info("RUNNING FULL EXPERIMENT PIPELINE") logger.info("="*60) # Step 1: Training logger.info("\n" + "="*60) logger.info("STEP 1: Training") logger.info("="*60) train_results = run_all_training( config, args.train_dataset_path, args.dry_run, args.parallel ) # Step 2: Evaluation logger.info("\n" + "="*60) logger.info("STEP 2: Evaluation") logger.info("="*60) eval_results = run_all_evaluations(config, dry_run=args.dry_run) # Step 3: Sparsity Analysis logger.info("\n" + "="*60) logger.info("STEP 3: Sparsity Analysis") logger.info("="*60) sparsity_results = run_sparsity_analysis(config, dry_run=args.dry_run) # Step 4: Results Analysis logger.info("\n" + "="*60) logger.info("STEP 4: Results Analysis") logger.info("="*60) if not args.dry_run: analyze_cmd = [ sys.executable, "analyze_results.py", "--results_dir", config.eval_metrics_dir, "--output_dir", os.path.join(args.base_output_dir, "analysis"), ] subprocess.run(analyze_cmd, check=True) # Summary logger.info("\n" + "="*60) logger.info("EXPERIMENT COMPLETE") logger.info("="*60) all_results = { "training": train_results, "evaluation": eval_results, "sparsity": sparsity_results, } summary_path = os.path.join(args.base_output_dir, "experiment_summary.json") with open(summary_path, "w") as f: json.dump(all_results, f, indent=2) logger.info(f"Saved summary to {summary_path}") if __name__ == "__main__": main()