summaryrefslogtreecommitdiff
path: root/run_experiments.py
diff options
context:
space:
mode:
authorYurenHao0426 <blackhao0426@gmail.com>2026-02-04 18:59:35 -0600
committerYurenHao0426 <blackhao0426@gmail.com>2026-02-04 18:59:35 -0600
commitf1c2cc22d46a6976df3555391e667c7e61592fad (patch)
tree0b37b52c8ff91042a742d3b3ec54542cb6d6e2f6 /run_experiments.py
Initial commit: RL floating-point noise projectHEADmain
Diffstat (limited to 'run_experiments.py')
-rw-r--r--run_experiments.py601
1 files changed, 601 insertions, 0 deletions
diff --git a/run_experiments.py b/run_experiments.py
new file mode 100644
index 0000000..0cbcd67
--- /dev/null
+++ b/run_experiments.py
@@ -0,0 +1,601 @@
+#!/usr/bin/env python3
+# run_experiments.py
+"""
+Experiment Runner for RLVR Floating-Point Precision Study.
+
+This script orchestrates the full experimental pipeline:
+1. Training models with FP32 and bf16 precision across multiple seeds
+2. Evaluating trained models on on-task and off-task benchmarks
+3. Computing KL divergence and bf16 sparsity metrics
+
+Usage:
+ # Run full experiment
+ python run_experiments.py --mode full
+
+ # Run training only
+ python run_experiments.py --mode train --precision_mode bf16 --seed 1
+
+ # Run evaluation only
+ python run_experiments.py --mode eval
+
+ # Run analysis only
+ python run_experiments.py --mode analyze
+"""
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import logging
+from typing import Dict, Any, List, Optional
+from dataclasses import asdict
+from concurrent.futures import ProcessPoolExecutor
+import time
+
+from config import (
+ ExperimentConfig,
+ make_training_config,
+ make_precision_config,
+ get_run_output_dir,
+ get_checkpoint_path,
+)
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+)
+logger = logging.getLogger(__name__)
+
+
+# ============================================================================
+# Training Functions
+# ============================================================================
+
+def run_single_training(
+ precision_mode: str,
+ seed: int,
+ config: ExperimentConfig,
+ train_dataset_path: str,
+ dry_run: bool = False
+) -> Dict[str, Any]:
+ """
+ Run a single training job.
+
+ Args:
+ precision_mode: "fp32" or "bf16"
+ seed: Random seed
+ config: Experiment configuration
+ train_dataset_path: Path to training data
+ dry_run: If True, only print command without running
+
+ Returns:
+ Dictionary with job status
+ """
+ output_dir = get_run_output_dir(config.train_logs_dir, precision_mode, seed)
+
+ cmd = [
+ sys.executable,
+ "train_rlvr.py",
+ "--precision_mode", precision_mode,
+ "--seed", str(seed),
+ "--output_dir", output_dir,
+ "--train_dataset_path", train_dataset_path,
+ "--model_name", config.base_model_path,
+ ]
+
+ logger.info(f"Running training: {precision_mode} seed={seed}")
+ logger.info(f"Command: {' '.join(cmd)}")
+
+ if dry_run:
+ return {
+ "status": "dry_run",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "output_dir": output_dir,
+ "command": " ".join(cmd),
+ }
+
+ # Create output directory
+ os.makedirs(output_dir, exist_ok=True)
+
+ # Run training
+ start_time = time.time()
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ check=True
+ )
+ duration = time.time() - start_time
+
+ return {
+ "status": "success",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "output_dir": output_dir,
+ "duration_seconds": duration,
+ "stdout": result.stdout[-1000:], # Last 1000 chars
+ }
+ except subprocess.CalledProcessError as e:
+ return {
+ "status": "failed",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "output_dir": output_dir,
+ "error": str(e),
+ "stderr": e.stderr[-1000:] if e.stderr else None,
+ }
+
+
+def run_all_training(
+ config: ExperimentConfig,
+ train_dataset_path: str,
+ dry_run: bool = False,
+ parallel: bool = False
+) -> List[Dict[str, Any]]:
+ """
+ Run training for all precision modes and seeds.
+ """
+ jobs = []
+ for precision_mode in config.precision_modes:
+ for seed in config.seeds:
+ jobs.append((precision_mode, seed))
+
+ results = []
+
+ if parallel and not dry_run:
+ # Run in parallel (one job per GPU assumed)
+ with ProcessPoolExecutor(max_workers=len(jobs)) as executor:
+ futures = [
+ executor.submit(
+ run_single_training,
+ pm, s, config, train_dataset_path, dry_run
+ )
+ for pm, s in jobs
+ ]
+ results = [f.result() for f in futures]
+ else:
+ # Run sequentially
+ for precision_mode, seed in jobs:
+ result = run_single_training(
+ precision_mode, seed, config, train_dataset_path, dry_run
+ )
+ results.append(result)
+
+ return results
+
+
+# ============================================================================
+# Evaluation Functions
+# ============================================================================
+
+def run_single_evaluation(
+ precision_mode: str,
+ seed: int,
+ config: ExperimentConfig,
+ eval_base: bool = True,
+ dry_run: bool = False
+) -> Dict[str, Any]:
+ """
+ Run evaluation for a single trained model.
+ """
+ run_dir = get_run_output_dir(config.train_logs_dir, precision_mode, seed)
+ ft_ckpt = get_checkpoint_path(run_dir)
+ output_path = os.path.join(
+ config.eval_metrics_dir,
+ f"{precision_mode}_seed{seed}.json"
+ )
+
+ cmd = [
+ sys.executable,
+ "eval_policy.py",
+ "--base_ckpt", config.base_model_path,
+ "--ft_ckpt", ft_ckpt,
+ "--eval_tasks_config", config.eval_tasks_config_path,
+ "--output_path", output_path,
+ ]
+
+ if eval_base:
+ cmd.append("--eval_base")
+
+ logger.info(f"Running evaluation: {precision_mode} seed={seed}")
+ logger.info(f"Command: {' '.join(cmd)}")
+
+ if dry_run:
+ return {
+ "status": "dry_run",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "output_path": output_path,
+ "command": " ".join(cmd),
+ }
+
+ # Create output directory
+ os.makedirs(os.path.dirname(output_path), exist_ok=True)
+
+ # Check if checkpoint exists
+ if not os.path.exists(ft_ckpt):
+ return {
+ "status": "skipped",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "reason": f"Checkpoint not found: {ft_ckpt}",
+ }
+
+ # Run evaluation
+ start_time = time.time()
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ check=True
+ )
+ duration = time.time() - start_time
+
+ return {
+ "status": "success",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "output_path": output_path,
+ "duration_seconds": duration,
+ }
+ except subprocess.CalledProcessError as e:
+ return {
+ "status": "failed",
+ "precision_mode": precision_mode,
+ "seed": seed,
+ "error": str(e),
+ "stderr": e.stderr[-1000:] if e.stderr else None,
+ }
+
+
+def run_all_evaluations(
+ config: ExperimentConfig,
+ eval_base: bool = True,
+ dry_run: bool = False
+) -> List[Dict[str, Any]]:
+ """
+ Run evaluation for all trained models.
+ """
+ results = []
+
+ for precision_mode in config.precision_modes:
+ for seed in config.seeds:
+ result = run_single_evaluation(
+ precision_mode, seed, config, eval_base, dry_run
+ )
+ results.append(result)
+
+ return results
+
+
+# ============================================================================
+# bf16 Sparsity Analysis
+# ============================================================================
+
+def run_sparsity_analysis(
+ config: ExperimentConfig,
+ dry_run: bool = False
+) -> List[Dict[str, Any]]:
+ """
+ Compute bf16 sparsity for all bf16 runs.
+ """
+ import torch
+ from transformers import AutoModelForCausalLM
+ from utils_bf16_sparsity import compute_bf16_sparsity, analyze_update_magnitudes
+
+ results = []
+
+ # Only analyze bf16 runs
+ if "bf16" not in config.precision_modes:
+ logger.info("No bf16 runs to analyze for sparsity")
+ return results
+
+ if dry_run:
+ for seed in config.seeds:
+ results.append({
+ "status": "dry_run",
+ "precision_mode": "bf16",
+ "seed": seed,
+ })
+ return results
+
+ # Load base model once
+ logger.info(f"Loading base model: {config.base_model_path}")
+ base_model = AutoModelForCausalLM.from_pretrained(
+ config.base_model_path,
+ torch_dtype=torch.float32,
+ device_map="cpu"
+ )
+
+ for seed in config.seeds:
+ run_dir = get_run_output_dir(config.train_logs_dir, "bf16", seed)
+ ft_ckpt = get_checkpoint_path(run_dir)
+
+ if not os.path.exists(ft_ckpt):
+ results.append({
+ "status": "skipped",
+ "precision_mode": "bf16",
+ "seed": seed,
+ "reason": f"Checkpoint not found: {ft_ckpt}",
+ })
+ continue
+
+ logger.info(f"Computing sparsity for bf16 seed={seed}")
+
+ # Load finetuned model
+ ft_model = AutoModelForCausalLM.from_pretrained(
+ ft_ckpt,
+ torch_dtype=torch.float32,
+ device_map="cpu"
+ )
+
+ # Compute sparsity
+ sparsity_result = compute_bf16_sparsity(
+ base_model=base_model,
+ finetuned_model=ft_model,
+ eta=config.bf16_sparsity_eta,
+ include_layer_stats=True
+ )
+
+ # Analyze update magnitudes
+ magnitude_result = analyze_update_magnitudes(
+ base_model=base_model,
+ finetuned_model=ft_model
+ )
+
+ # Save results
+ output_path = os.path.join(
+ config.eval_metrics_dir,
+ f"bf16_seed{seed}_sparsity.json"
+ )
+ os.makedirs(os.path.dirname(output_path), exist_ok=True)
+
+ full_result = {
+ "precision_mode": "bf16",
+ "seed": seed,
+ "sparsity": sparsity_result,
+ "magnitudes": magnitude_result,
+ }
+
+ with open(output_path, "w") as f:
+ # Convert layer_stats to serializable format
+ serializable = {
+ k: v for k, v in full_result.items()
+ }
+ if "layer_stats" in serializable.get("sparsity", {}):
+ serializable["sparsity"]["layer_stats"] = {
+ k: {kk: vv for kk, vv in v.items() if kk != "shape"}
+ for k, v in serializable["sparsity"]["layer_stats"].items()
+ }
+ json.dump(serializable, f, indent=2, default=str)
+
+ results.append({
+ "status": "success",
+ "precision_mode": "bf16",
+ "seed": seed,
+ "sparsity_percent": sparsity_result["sparsity_percent"],
+ "output_path": output_path,
+ })
+
+ # Free memory
+ del ft_model
+
+ return results
+
+
+# ============================================================================
+# Main Entry Point
+# ============================================================================
+
+def parse_args() -> argparse.Namespace:
+ """Parse command line arguments."""
+ parser = argparse.ArgumentParser(
+ description="Run RLVR floating-point precision experiments"
+ )
+
+ parser.add_argument(
+ "--mode",
+ type=str,
+ default="full",
+ choices=["full", "train", "eval", "analyze", "sparsity"],
+ help="Execution mode"
+ )
+
+ # For single job mode
+ parser.add_argument(
+ "--precision_mode",
+ type=str,
+ choices=["fp32", "bf16"],
+ help="Precision mode (for train mode)"
+ )
+ parser.add_argument(
+ "--seed",
+ type=int,
+ help="Random seed (for train mode)"
+ )
+
+ # Paths
+ parser.add_argument(
+ "--train_dataset_path",
+ type=str,
+ default="./data/dm_train.json",
+ help="Path to training dataset"
+ )
+ parser.add_argument(
+ "--base_output_dir",
+ type=str,
+ default="./results",
+ help="Base output directory"
+ )
+ parser.add_argument(
+ "--eval_tasks_config",
+ type=str,
+ default="./configs/eval_tasks_config.json",
+ help="Path to evaluation tasks config"
+ )
+ parser.add_argument(
+ "--base_model",
+ type=str,
+ default="Qwen/Qwen2.5-Math-7B",
+ help="Base model path or HuggingFace ID"
+ )
+
+ # Execution options
+ parser.add_argument(
+ "--dry_run",
+ action="store_true",
+ help="Print commands without executing"
+ )
+ parser.add_argument(
+ "--parallel",
+ action="store_true",
+ help="Run training jobs in parallel"
+ )
+ parser.add_argument(
+ "--seeds",
+ type=int,
+ nargs="+",
+ default=[1, 2, 3, 4, 5],
+ help="Seeds to use"
+ )
+
+ return parser.parse_args()
+
+
+def main() -> None:
+ """Main entry point."""
+ args = parse_args()
+
+ # Create experiment configuration
+ config = ExperimentConfig(
+ experiment_name="fp_precision_rlvr",
+ seeds=args.seeds,
+ precision_modes=["fp32", "bf16"],
+ base_model_path=args.base_model,
+ base_output_dir=args.base_output_dir,
+ train_logs_dir=os.path.join(args.base_output_dir, "train_logs"),
+ checkpoints_dir=os.path.join(args.base_output_dir, "checkpoints"),
+ eval_metrics_dir=os.path.join(args.base_output_dir, "eval_metrics"),
+ eval_tasks_config_path=args.eval_tasks_config,
+ )
+
+ # Create directories
+ os.makedirs(config.train_logs_dir, exist_ok=True)
+ os.makedirs(config.checkpoints_dir, exist_ok=True)
+ os.makedirs(config.eval_metrics_dir, exist_ok=True)
+
+ # Save experiment config
+ config_path = os.path.join(args.base_output_dir, "experiment_config.json")
+ with open(config_path, "w") as f:
+ json.dump(asdict(config), f, indent=2)
+ logger.info(f"Saved experiment config to {config_path}")
+
+ # Execute based on mode
+ if args.mode == "train":
+ if args.precision_mode and args.seed:
+ # Single training job
+ result = run_single_training(
+ args.precision_mode,
+ args.seed,
+ config,
+ args.train_dataset_path,
+ args.dry_run
+ )
+ print(json.dumps(result, indent=2))
+ else:
+ # All training jobs
+ results = run_all_training(
+ config,
+ args.train_dataset_path,
+ args.dry_run,
+ args.parallel
+ )
+ print(json.dumps(results, indent=2))
+
+ elif args.mode == "eval":
+ results = run_all_evaluations(config, eval_base=True, dry_run=args.dry_run)
+ print(json.dumps(results, indent=2))
+
+ elif args.mode == "sparsity":
+ results = run_sparsity_analysis(config, dry_run=args.dry_run)
+ print(json.dumps(results, indent=2))
+
+ elif args.mode == "analyze":
+ # Run analysis script
+ analyze_cmd = [
+ sys.executable,
+ "analyze_results.py",
+ "--results_dir", config.eval_metrics_dir,
+ "--output_dir", os.path.join(args.base_output_dir, "analysis"),
+ ]
+ if args.dry_run:
+ print(f"Would run: {' '.join(analyze_cmd)}")
+ else:
+ subprocess.run(analyze_cmd, check=True)
+
+ elif args.mode == "full":
+ logger.info("="*60)
+ logger.info("RUNNING FULL EXPERIMENT PIPELINE")
+ logger.info("="*60)
+
+ # Step 1: Training
+ logger.info("\n" + "="*60)
+ logger.info("STEP 1: Training")
+ logger.info("="*60)
+ train_results = run_all_training(
+ config,
+ args.train_dataset_path,
+ args.dry_run,
+ args.parallel
+ )
+
+ # Step 2: Evaluation
+ logger.info("\n" + "="*60)
+ logger.info("STEP 2: Evaluation")
+ logger.info("="*60)
+ eval_results = run_all_evaluations(config, dry_run=args.dry_run)
+
+ # Step 3: Sparsity Analysis
+ logger.info("\n" + "="*60)
+ logger.info("STEP 3: Sparsity Analysis")
+ logger.info("="*60)
+ sparsity_results = run_sparsity_analysis(config, dry_run=args.dry_run)
+
+ # Step 4: Results Analysis
+ logger.info("\n" + "="*60)
+ logger.info("STEP 4: Results Analysis")
+ logger.info("="*60)
+ if not args.dry_run:
+ analyze_cmd = [
+ sys.executable,
+ "analyze_results.py",
+ "--results_dir", config.eval_metrics_dir,
+ "--output_dir", os.path.join(args.base_output_dir, "analysis"),
+ ]
+ subprocess.run(analyze_cmd, check=True)
+
+ # Summary
+ logger.info("\n" + "="*60)
+ logger.info("EXPERIMENT COMPLETE")
+ logger.info("="*60)
+
+ all_results = {
+ "training": train_results,
+ "evaluation": eval_results,
+ "sparsity": sparsity_results,
+ }
+
+ summary_path = os.path.join(args.base_output_dir, "experiment_summary.json")
+ with open(summary_path, "w") as f:
+ json.dump(all_results, f, indent=2)
+ logger.info(f"Saved summary to {summary_path}")
+
+
+if __name__ == "__main__":
+ main()
+