summaryrefslogtreecommitdiff
path: root/putnam-bench-anon/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'putnam-bench-anon/scripts')
-rw-r--r--putnam-bench-anon/scripts/__init__.py1
-rw-r--r--putnam-bench-anon/scripts/batch_evaluate.py1211
-rw-r--r--putnam-bench-anon/scripts/benchmark.py481
-rw-r--r--putnam-bench-anon/scripts/compare_original_vs_kernel_test.py630
-rw-r--r--putnam-bench-anon/scripts/health_check.py376
-rw-r--r--putnam-bench-anon/scripts/regrade.py284
6 files changed, 2983 insertions, 0 deletions
diff --git a/putnam-bench-anon/scripts/__init__.py b/putnam-bench-anon/scripts/__init__.py
new file mode 100644
index 0000000..389f811
--- /dev/null
+++ b/putnam-bench-anon/scripts/__init__.py
@@ -0,0 +1 @@
+"""Scripts package for Putnam mathematical problem solver.""" \ No newline at end of file
diff --git a/putnam-bench-anon/scripts/batch_evaluate.py b/putnam-bench-anon/scripts/batch_evaluate.py
new file mode 100644
index 0000000..6fde90b
--- /dev/null
+++ b/putnam-bench-anon/scripts/batch_evaluate.py
@@ -0,0 +1,1211 @@
+#!/usr/bin/env python3
+"""
+Batch evaluation script for processing entire datasets with multiple providers.
+
+This script efficiently processes all JSON files in the dataset directory,
+supports multiple AI providers, and generates comprehensive evaluation reports.
+
+Features:
+- Incremental saving: Results are saved after each problem completes
+- Simple resume support: Skip already completed problems based on checkpoint
+- Multi-provider support
+- Comprehensive evaluation reports
+
+Usage:
+ python batch_evaluate.py --provider openai --output results/openai_results.json
+ python batch_evaluate.py --provider anthropic --variant kernel_variant --max-concurrent 5
+
+Resume usage (simplified):
+ # Resume with same configuration
+ python batch_evaluate.py --provider openai --dataset dataset/ --resume checkpoint_file.json
+
+ # Resume with different settings (checkpoint only provides skip list)
+ python batch_evaluate.py --provider openai --dataset dataset/ --concurrent 10 --resume checkpoint_file.json
+"""
+
+import asyncio
+import json
+import sys
+import time
+from pathlib import Path
+import argparse
+from typing import List, Dict, Any
+import logging
+from datetime import datetime
+import shutil
+
+try:
+ from tqdm import tqdm
+ HAS_TQDM = True
+except ImportError:
+ HAS_TQDM = False
+ # Fallback progress bar
+ class tqdm:
+ def __init__(self, total=None, desc=None, **kwargs):
+ self.total = total
+ self.n = 0
+ self.desc = desc
+ print(f"{desc}: Starting...")
+
+ def update(self, n=1):
+ self.n += n
+ if self.total:
+ percent = (self.n / self.total) * 100
+ print(f"{self.desc}: {self.n}/{self.total} ({percent:.1f}%)", end='\r')
+
+ def set_postfix(self, postfix_dict):
+ pass
+
+ def close(self):
+ print() # New line after progress
+
+# Add the loader module to the path
+sys.path.append(str(Path(__file__).parent))
+
+from loader import create_loader, get_supported_providers
+
+
+def setup_logging(output_dir: Path):
+ """Setup logging configuration."""
+ log_file = output_dir / f"evaluation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
+
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.FileHandler(log_file),
+ logging.StreamHandler(sys.stdout)
+ ]
+ )
+
+ return logging.getLogger(__name__)
+
+
+async def load_dataset(dataset_path: Path, max_files: int = None) -> List[Dict[str, Any]]:
+ """Load all JSON files from the dataset directory."""
+ json_files = list(dataset_path.glob("*.json"))
+
+ if max_files:
+ json_files = json_files[:max_files]
+
+ problems = []
+ for json_file in json_files:
+ try:
+ with open(json_file, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ data['_source_file'] = str(json_file.name)
+ problems.append(data)
+ except Exception as e:
+ logging.warning(f"Failed to load {json_file}: {str(e)}")
+
+ return problems
+
+
+async def process_single_problem(loader, problem_data: Dict[str, Any],
+ variant_type: str, solver_model: str = None,
+ grader_model: str = None) -> Dict[str, Any]:
+ """Process a single problem and return results with metadata."""
+ start_time = time.time()
+
+ try:
+ result = await loader.test_single_problem(
+ problem_data,
+ variant_type=variant_type,
+ solver_model=solver_model,
+ grader_model=grader_model
+ )
+
+ # Add metadata
+ result['_metadata'] = {
+ 'source_file': problem_data.get('_source_file', 'unknown'),
+ 'variant_type': variant_type,
+ 'processing_time': time.time() - start_time,
+ 'timestamp': datetime.now().isoformat(),
+ 'models_used': {
+ 'solver': solver_model or loader.solver_model,
+ 'grader': grader_model or loader.grader_model
+ }
+ }
+
+ return result
+
+ except Exception as e:
+ # Return error information
+ return {
+ 'error': str(e),
+ 'final_grade': 0,
+ '_metadata': {
+ 'source_file': problem_data.get('_source_file', 'unknown'),
+ 'variant_type': variant_type,
+ 'processing_time': time.time() - start_time,
+ 'timestamp': datetime.now().isoformat(),
+ 'error': True
+ }
+ }
+
+
+async def batch_evaluate(dataset_path: Path = None, provider: str = None, variant_type: str = "original",
+ max_concurrent: int = 3, max_files: int = None,
+ solver_model: str = None, grader_model: str = None,
+ output_file: Path = None, resume_checkpoint: Path = None,
+ **loader_kwargs) -> Dict[str, Any]:
+ """
+ Batch evaluate problems using specified provider with resume support.
+
+ Args:
+ dataset_path: Path to dataset directory (required for new runs or old checkpoint format)
+ provider: AI provider name (required for new runs or old checkpoint format)
+ variant_type: Problem variant to use
+ max_concurrent: Maximum concurrent evaluations
+ max_files: Maximum number of files to process (None for all)
+ solver_model: Override solver model
+ grader_model: Override grader model
+ output_file: Output file path
+ resume_checkpoint: Path to checkpoint file to resume from
+ **loader_kwargs: Additional arguments for loader
+
+ Returns:
+ Dictionary with evaluation results and statistics
+ """
+ logger = logging.getLogger(__name__)
+
+ # Check if resuming from checkpoint
+ if resume_checkpoint and resume_checkpoint.exists():
+ logger.info(f"Resuming from checkpoint: {resume_checkpoint}")
+ with open(resume_checkpoint, 'r', encoding='utf-8') as f:
+ checkpoint_data = json.load(f)
+
+ # Simple resume: just restore completed indices and results
+ completed_indices = set(checkpoint_data.get('completed_indices', []))
+ results = checkpoint_data.get('results', [])
+ failed_indices = checkpoint_data.get('failed_indices', [])
+ successful_indices = checkpoint_data.get('successful_indices', [])
+ correct_indices = checkpoint_data.get('correct_indices', [])
+
+ # Always require dataset_path and provider from command line
+ if not dataset_path:
+ raise ValueError("dataset_path is required when resuming")
+ if not provider:
+ raise ValueError("provider is required when resuming")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ checkpoint_file = resume_checkpoint # Continue using the same checkpoint file
+ logger.info(f"Resuming with {len(completed_indices)} completed problems out of {len(problems)}")
+ else:
+ # New evaluation - validate required parameters
+ if not dataset_path:
+ raise ValueError("dataset_path is required for new evaluation")
+ if not provider:
+ raise ValueError("provider is required for new evaluation")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ # Initialize state for new run
+ completed_indices = set()
+ results = []
+ failed_indices = []
+ successful_indices = []
+ correct_indices = []
+
+ # Create checkpoint file name
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ if output_file:
+ checkpoint_file = output_file.parent / f"checkpoint_{output_file.stem}_{timestamp}.json"
+ else:
+ checkpoint_file = Path(f"checkpoint_{provider}_{variant_type}_{timestamp}.json")
+
+ # Create loader
+ logger.info(f"Creating {provider} loader")
+
+ # Include solver_model and grader_model in loader_kwargs if specified
+ if solver_model:
+ loader_kwargs['solver_model'] = solver_model
+ if grader_model:
+ loader_kwargs['grader_model'] = grader_model
+
+ loader = create_loader(provider, **loader_kwargs)
+
+ # Health check
+ logger.info("Performing health check...")
+ if not await loader.health_check():
+ raise RuntimeError(f"Health check failed for {provider}")
+
+ # Cost estimation
+ logger.info("Estimating costs...")
+ cost_info = await loader.estimate_cost(len(problems))
+ logger.info(f"Estimated cost: ${cost_info.get('total_cost', 0):.2f}")
+
+ # Progress tracking
+ remaining_problems = [p for p in problems if p.get('index', 'unknown') not in completed_indices]
+ progress_bar = tqdm(total=len(problems), desc=f"Evaluating with {provider}", initial=len(completed_indices))
+
+ # Semaphore for concurrency control
+ semaphore = asyncio.Semaphore(max_concurrent)
+
+ def save_checkpoint():
+ """Save current state to checkpoint file - simplified version"""
+ checkpoint_data = {
+ 'timestamp': datetime.now().isoformat(),
+ # Only save essential state information
+ 'completed_indices': list(completed_indices),
+ 'successful_indices': successful_indices,
+ 'failed_indices': failed_indices,
+ 'correct_indices': correct_indices,
+ 'results': results,
+ # Save minimal config for reference (not for resume)
+ 'dataset_path': str(dataset_path), # For convenience
+ 'total_problems': len(problems),
+ 'current_config': {
+ 'provider': provider,
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model
+ }
+ }
+
+ # Write to temporary file first, then move (atomic operation)
+ temp_file = checkpoint_file.with_suffix('.tmp')
+ with open(temp_file, 'w', encoding='utf-8') as f:
+ json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
+
+ # Atomic rename
+ temp_file.replace(checkpoint_file)
+
+ async def evaluate_problem(problem_data: Dict[str, Any]) -> Dict[str, Any]:
+ """Evaluate a single problem with concurrency control."""
+ problem_index = problem_data.get('index', 'unknown')
+
+ # Skip if already completed
+ if problem_index in completed_indices:
+ return None
+
+ async with semaphore:
+ try:
+ result = await loader.test_single_problem(
+ problem_data,
+ variant_type=variant_type
+ )
+
+ # Track success/failure based on technical completion, not correctness
+ if result.get('status') == 'completed':
+ successful_indices.append(result['index']) # Successfully processed
+ if result.get('correct'):
+ correct_indices.append(result['index']) # Also correct
+ else:
+ failed_indices.append(result['index']) # Technical failure
+
+ # Add to results and mark as completed
+ results.append(result)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint immediately after each problem
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error evaluating problem {problem_index}: {e}")
+ result = {
+ 'index': problem_index,
+ 'status': 'error',
+ 'error': str(e),
+ 'error_type': type(e).__name__
+ }
+
+ # Add to results and mark as completed (even if failed)
+ results.append(result)
+ failed_indices.append(problem_index)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ # Run evaluations
+ start_time = time.time()
+
+ try:
+ # Create tasks only for remaining problems
+ tasks = [evaluate_problem(problem) for problem in remaining_problems]
+
+ if tasks:
+ # Execute all tasks concurrently (limited by semaphore)
+ await asyncio.gather(*tasks)
+ else:
+ logger.info("All problems already completed!")
+
+ except KeyboardInterrupt:
+ logger.info("Evaluation interrupted by user. Progress saved to checkpoint.")
+ logger.info(f"To resume, use: --resume {checkpoint_file}")
+ raise
+
+ finally:
+ progress_bar.close()
+
+ # Calculate statistics
+ total_time = time.time() - start_time
+ completed_results = [r for r in results if r.get('status') == 'completed']
+ grades = [r['grade']['grade'] for r in completed_results
+ if r.get('grade', {}).get('status') == 'success' and 'grade' in r.get('grade', {})]
+
+ # Calculate numeric grades (CORRECT=5, INCORRECT=2.5)
+ numeric_grades = [5.0 if g == 'CORRECT' else 2.5 for g in grades]
+ average_grade = sum(numeric_grades) / len(numeric_grades) if numeric_grades else 0.0
+
+ summary = {
+ 'total_problems': len(problems),
+ 'completed': len(completed_results),
+ 'successful': len(successful_indices), # Technical success (completed processing)
+ 'failed': len(failed_indices), # Technical failures
+ 'correct_answers': len(correct_indices), # Mathematically correct answers
+ 'incorrect_answers': len(successful_indices) - len(correct_indices), # Wrong but processed
+ 'success_rate': (len(successful_indices) / len(problems) * 100) if problems else 0, # Technical success rate
+ 'accuracy_rate': (len(correct_indices) / len(successful_indices) * 100) if successful_indices else 0, # Correctness rate
+ 'average_grade': average_grade,
+ 'total_time_seconds': total_time,
+ 'problems_per_second': len(problems) / total_time if total_time > 0 else 0,
+ 'provider': provider,
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model,
+ 'max_concurrent': max_concurrent,
+ 'estimated_cost': cost_info,
+ 'checkpoint_file': str(checkpoint_file)
+ }
+
+ # Create full results
+ full_results = {
+ 'summary': summary,
+ 'problems': results,
+ 'successful_indices': successful_indices, # Technical successes
+ 'failed_indices': failed_indices, # Technical failures
+ 'correct_indices': correct_indices, # Correct answers
+ 'timestamp': datetime.now().isoformat()
+ }
+
+ # Save final results
+ if output_file:
+ logger.info(f"Saving final results to {output_file}")
+ with open(output_file, 'w', encoding='utf-8') as f:
+ json.dump(full_results, f, indent=2, ensure_ascii=False)
+
+ # Clean up checkpoint file after successful completion
+ if checkpoint_file.exists():
+ logger.info(f"Removing checkpoint file: {checkpoint_file}")
+ checkpoint_file.unlink()
+
+ # Print summary
+ logger.info(f"\n{'='*60}")
+ logger.info("EVALUATION SUMMARY")
+ logger.info(f"{'='*60}")
+ logger.info(f"Provider: {provider}")
+ logger.info(f"Variant: {variant_type}")
+ logger.info(f"Total problems: {summary['total_problems']}")
+ logger.info(f"✅ Successfully processed: {summary['successful']} ({summary['success_rate']:.1f}%)")
+ logger.info(f"💥 Technical failures: {summary['failed']}")
+ logger.info(f"🎯 Correct answers: {summary['correct_answers']} ({summary['accuracy_rate']:.1f}% of processed)")
+ logger.info(f"❌ Wrong answers: {summary['incorrect_answers']}")
+ logger.info(f"Average grade: {summary['average_grade']:.2f}")
+ logger.info(f"Total time: {summary['total_time_seconds']:.1f}s")
+ logger.info(f"Speed: {summary['problems_per_second']:.2f} problems/second")
+
+ # Cleanup
+ if hasattr(loader, '__aexit__'):
+ await loader.__aexit__(None, None, None)
+
+ return full_results
+
+
+async def batch_evaluate_cross(dataset_path: Path = None,
+ solver_provider: str = None,
+ grader_provider: str = None,
+ variant_type: str = "original",
+ max_concurrent: int = 3,
+ max_files: int = None,
+ solver_model: str = None,
+ grader_model: str = None,
+ output_file: Path = None,
+ resume_checkpoint: Path = None,
+ vllm_url: str = None,
+ device: str = None,
+ quick: bool = False) -> Dict[str, Any]:
+ """
+ Batch evaluate problems using different providers for solving and grading with resume support.
+
+ Args:
+ dataset_path: Path to dataset directory (required for new runs, ignored for resume)
+ solver_provider: Provider for solving problems (required for new runs, ignored for resume)
+ grader_provider: Provider for grading (if None, uses solver_provider)
+ variant_type: Problem variant to use
+ max_concurrent: Maximum concurrent evaluations
+ max_files: Maximum number of files to process (None for all)
+ solver_model: Override solver model
+ grader_model: Override grader model
+ output_file: Output file path
+ resume_checkpoint: Path to checkpoint file to resume from
+ vllm_url: VLLM server URL if using VLLM
+ device: Device for HuggingFace models
+
+ Returns:
+ Dictionary with evaluation results and statistics
+ """
+ logger = logging.getLogger(__name__)
+
+ # Check if resuming from checkpoint
+ if resume_checkpoint and resume_checkpoint.exists():
+ logger.info(f"Resuming from checkpoint: {resume_checkpoint}")
+ with open(resume_checkpoint, 'r', encoding='utf-8') as f:
+ checkpoint_data = json.load(f)
+
+ # Simple resume: just restore completed indices and results
+ completed_indices = set(checkpoint_data.get('completed_indices', []))
+ results = checkpoint_data.get('results', [])
+ failed_indices = checkpoint_data.get('failed_indices', [])
+ successful_indices = checkpoint_data.get('successful_indices', [])
+ correct_indices = checkpoint_data.get('correct_indices', [])
+
+ # Always require providers and dataset_path from command line
+ if not dataset_path:
+ raise ValueError("dataset_path is required when resuming")
+ if not solver_provider:
+ raise ValueError("solver_provider is required when resuming")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ checkpoint_file = resume_checkpoint # Continue using the same checkpoint file
+ logger.info(f"Resuming with {len(completed_indices)} completed problems out of {len(problems)}")
+ else:
+ # New evaluation - validate required parameters
+ if not dataset_path:
+ raise ValueError("dataset_path is required for new evaluation")
+ if not solver_provider:
+ raise ValueError("solver_provider is required for new evaluation")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ # Initialize state for new run
+ completed_indices = set()
+ results = []
+ failed_indices = []
+ successful_indices = []
+ correct_indices = []
+
+ # Create checkpoint file name
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ if output_file:
+ checkpoint_file = output_file.parent / f"checkpoint_{output_file.stem}_{timestamp}.json"
+ else:
+ checkpoint_file = Path(f"checkpoint_cross_{solver_provider}_{grader_provider or solver_provider}_{variant_type}_{timestamp}.json")
+
+ # Create cross-provider loader
+ logger.info(f"Creating cross-provider loader: solver={solver_provider}, grader={grader_provider or solver_provider}")
+
+ from loader import create_cross_provider_loader
+
+ # Prepare kwargs for each provider
+ loader_kwargs = {}
+
+ # VLLM settings
+ if vllm_url:
+ if solver_provider == 'vllm':
+ loader_kwargs['solver_kwargs'] = {'base_url': vllm_url}
+ if grader_provider == 'vllm':
+ loader_kwargs['grader_kwargs'] = {'base_url': vllm_url}
+
+ # HuggingFace settings
+ if device:
+ if solver_provider == 'huggingface':
+ loader_kwargs['solver_kwargs'] = {'device': device}
+ if grader_provider == 'huggingface':
+ loader_kwargs['grader_kwargs'] = {'device': device}
+
+ # Add quick mode if specified
+ if quick:
+ loader_kwargs['quick'] = True
+
+ loader = create_cross_provider_loader(
+ solver_provider=solver_provider,
+ grader_provider=grader_provider,
+ solver_model=solver_model,
+ grader_model=grader_model,
+ **loader_kwargs
+ )
+
+ # Health check
+ logger.info("Performing health check...")
+ if not await loader.health_check():
+ raise RuntimeError(f"Health check failed")
+
+ # Cost estimation
+ logger.info("Estimating costs...")
+ cost_info = await loader.estimate_cost(len(problems))
+ logger.info(f"Estimated cost: ${cost_info.get('total_cost', 0):.2f}")
+
+ # Progress tracking
+ remaining_problems = [p for p in problems if p.get('index', 'unknown') not in completed_indices]
+ progress_bar = tqdm(total=len(problems), desc=f"Evaluating (solver={solver_provider}, grader={grader_provider or solver_provider})", initial=len(completed_indices))
+
+ # Semaphore for concurrency control
+ semaphore = asyncio.Semaphore(max_concurrent)
+
+ def save_checkpoint():
+ """Save current state to checkpoint file - simplified version"""
+ checkpoint_data = {
+ 'timestamp': datetime.now().isoformat(),
+ # Only save essential state information
+ 'completed_indices': list(completed_indices),
+ 'successful_indices': successful_indices,
+ 'failed_indices': failed_indices,
+ 'correct_indices': correct_indices,
+ 'results': results,
+ # Save minimal config for reference (not for resume)
+ 'dataset_path': str(dataset_path), # For convenience
+ 'total_problems': len(problems),
+ 'current_config': {
+ 'solver_provider': solver_provider,
+ 'grader_provider': grader_provider or solver_provider,
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model
+ }
+ }
+
+ # Write to temporary file first, then move (atomic operation)
+ temp_file = checkpoint_file.with_suffix('.tmp')
+ with open(temp_file, 'w', encoding='utf-8') as f:
+ json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
+
+ # Atomic rename
+ temp_file.replace(checkpoint_file)
+
+ async def evaluate_problem(problem_data: Dict[str, Any]) -> Dict[str, Any]:
+ """Evaluate a single problem with concurrency control."""
+ problem_index = problem_data.get('index', 'unknown')
+
+ # Skip if already completed
+ if problem_index in completed_indices:
+ return None
+
+ async with semaphore:
+ try:
+ result = await loader.test_single_problem(
+ problem_data,
+ variant_type=variant_type
+ )
+
+ # Track success/failure based on technical completion, not correctness
+ if result.get('status') == 'completed':
+ successful_indices.append(result['index']) # Successfully processed
+ if result.get('correct'):
+ correct_indices.append(result['index']) # Also correct
+ else:
+ failed_indices.append(result['index']) # Technical failure
+
+ # Add to results and mark as completed
+ results.append(result)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint immediately after each problem
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ except Exception as e:
+ import traceback
+
+ # Capture full error details
+ error_details = {
+ 'error_message': str(e),
+ 'error_type': type(e).__name__,
+ 'traceback': traceback.format_exc(),
+ 'timestamp': datetime.now().isoformat(),
+ 'problem_index': problem_index,
+ 'problem_title': problem_data.get('title', 'unknown')
+ }
+
+ # Try to capture HTTP-specific details if available
+ if hasattr(e, 'response'):
+ try:
+ error_details['http_status'] = e.response.status_code
+ error_details['http_headers'] = dict(e.response.headers)
+ error_details['http_response_text'] = e.response.text
+ except:
+ pass
+
+ # Try to capture request details if available
+ if hasattr(e, 'request'):
+ try:
+ error_details['request_method'] = e.request.method
+ error_details['request_url'] = e.request.url
+ error_details['request_headers'] = dict(e.request.headers)
+ # Don't log request body as it might contain sensitive info
+ except:
+ pass
+
+ # Log detailed error
+ logger.error(f"DETAILED ERROR for problem {problem_index}:")
+ logger.error(f" Error Type: {error_details['error_type']}")
+ logger.error(f" Error Message: {error_details['error_message']}")
+ logger.error(f" Problem Title: {error_details['problem_title']}")
+
+ if 'http_status' in error_details:
+ logger.error(f" HTTP Status: {error_details['http_status']}")
+ logger.error(f" HTTP Response: {error_details['http_response_text'][:500]}...")
+
+ logger.error(f" Full Traceback:\n{error_details['traceback']}")
+
+ # Save to detailed error log
+ error_log_file = output_file.parent / f"detailed_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" if output_file else Path(f"detailed_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
+
+ try:
+ # Load existing errors if file exists
+ if error_log_file.exists():
+ with open(error_log_file, 'r') as f:
+ existing_errors = json.load(f)
+ else:
+ existing_errors = []
+
+ # Add new error
+ existing_errors.append(error_details)
+
+ # Save updated errors
+ with open(error_log_file, 'w') as f:
+ json.dump(existing_errors, f, indent=2, ensure_ascii=False)
+
+ logger.info(f"Detailed error saved to {error_log_file}")
+
+ except Exception as save_error:
+ logger.error(f"Failed to save detailed error log: {save_error}")
+
+ result = {
+ 'index': problem_index,
+ 'status': 'error',
+ 'error': str(e),
+ 'error_type': type(e).__name__,
+ 'error_details': error_details
+ }
+
+ # Add to results and mark as completed (even if failed)
+ results.append(result)
+ failed_indices.append(problem_index)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ # Run evaluations
+ start_time = time.time()
+
+ try:
+ # Create tasks only for remaining problems
+ tasks = [evaluate_problem(problem) for problem in remaining_problems]
+
+ if tasks:
+ # Execute all tasks concurrently (limited by semaphore)
+ await asyncio.gather(*tasks)
+ else:
+ logger.info("All problems already completed!")
+
+ except KeyboardInterrupt:
+ logger.info("Evaluation interrupted by user. Progress saved to checkpoint.")
+ logger.info(f"To resume, use: --resume {checkpoint_file}")
+ raise
+
+ finally:
+ progress_bar.close()
+
+ # Calculate statistics
+ total_time = time.time() - start_time
+ completed_results = [r for r in results if r.get('status') == 'completed']
+ grades = [r['grade']['grade'] for r in completed_results
+ if r.get('grade', {}).get('status') == 'success' and 'grade' in r.get('grade', {})]
+
+ # Calculate numeric grades (CORRECT=5, INCORRECT=2.5)
+ numeric_grades = [5.0 if g == 'CORRECT' else 2.5 for g in grades]
+ average_grade = sum(numeric_grades) / len(numeric_grades) if numeric_grades else 0.0
+
+ model_info = loader.get_model_info()
+
+ summary = {
+ 'total_problems': len(problems),
+ 'completed': len(completed_results),
+ 'successful': len(successful_indices), # Technical success (completed processing)
+ 'failed': len(failed_indices), # Technical failures
+ 'correct_answers': len(correct_indices), # Mathematically correct answers
+ 'incorrect_answers': len(successful_indices) - len(correct_indices), # Wrong but processed
+ 'success_rate': (len(successful_indices) / len(problems) * 100) if problems else 0, # Technical success rate
+ 'accuracy_rate': (len(correct_indices) / len(successful_indices) * 100) if successful_indices else 0, # Correctness rate
+ 'average_grade': average_grade,
+ 'total_time_seconds': total_time,
+ 'problems_per_second': len(problems) / total_time if total_time > 0 else 0,
+ 'solver_provider': model_info.get('solver_provider', solver_provider),
+ 'grader_provider': model_info.get('grader_provider', grader_provider or solver_provider),
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model,
+ 'max_concurrent': max_concurrent,
+ 'estimated_cost': cost_info,
+ 'is_cross_provider': model_info.get('is_cross_provider', False)
+ }
+
+ # Create full results
+ full_results = {
+ 'summary': summary,
+ 'problems': results,
+ 'successful_indices': successful_indices, # Technical successes
+ 'failed_indices': failed_indices, # Technical failures
+ 'correct_indices': correct_indices, # Correct answers
+ 'timestamp': datetime.now().isoformat()
+ }
+
+ # Save if requested
+ if output_file:
+ logger.info(f"Saving results to {output_file}")
+ with open(output_file, 'w', encoding='utf-8') as f:
+ json.dump(full_results, f, indent=2, ensure_ascii=False)
+
+ # Print summary
+ logger.info(f"\n{'='*60}")
+ logger.info("CROSS-PROVIDER EVALUATION SUMMARY")
+ logger.info(f"{'='*60}")
+ logger.info(f"Solver Provider: {summary['solver_provider']} ({loader.solver_model})")
+ logger.info(f"Grader Provider: {summary['grader_provider']} ({loader.grader_model})")
+ logger.info(f"Variant: {variant_type}")
+ logger.info(f"Total problems: {summary['total_problems']}")
+ logger.info(f"✅ Successfully processed: {summary['successful']} ({summary['success_rate']:.1f}%)")
+ logger.info(f"💥 Technical failures: {summary['failed']}")
+ logger.info(f"🎯 Correct answers: {summary['correct_answers']} ({summary['accuracy_rate']:.1f}% of processed)")
+ logger.info(f"❌ Wrong answers: {summary['incorrect_answers']}")
+ logger.info(f"Average grade: {summary['average_grade']:.2f}")
+ logger.info(f"Total time: {summary['total_time_seconds']:.1f}s")
+ logger.info(f"Speed: {summary['problems_per_second']:.2f} problems/second")
+
+ # Cleanup
+ if hasattr(loader, '__aexit__'):
+ await loader.__aexit__(None, None, None)
+
+ return full_results
+
+
+async def batch_evaluate_all_variants(dataset_path: Path, provider: str,
+ variants: List[str] = None,
+ max_concurrent: int = 3, max_files: int = None,
+ solver_model: str = None, grader_model: str = None,
+ output_dir: Path = None,
+ base_url: str = None, device: str = None) -> Dict[str, Any]:
+ """
+ Batch evaluate problems across all variants using specified provider.
+
+ Args:
+ dataset_path: Path to dataset directory
+ provider: AI provider name
+ variants: List of variants to test (None for all)
+ max_concurrent: Maximum concurrent evaluations
+ max_files: Maximum number of files to process per variant (None for all)
+ solver_model: Override solver model
+ grader_model: Override grader model
+ output_dir: Output directory path
+ **loader_kwargs: Additional arguments for loader
+
+ Returns:
+ Dictionary with all variant results and comparative analysis
+ """
+ if variants is None:
+ variants = ["original", "descriptive_long", "descriptive_long_confusing",
+ "descriptive_long_misleading", "garbled_string", "kernel_variant"]
+
+ if output_dir is None:
+ output_dir = Path("results")
+
+ logger = logging.getLogger(__name__)
+
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ config_name = f"{provider}"
+ if solver_model:
+ config_name += f"_{solver_model.replace('/', '_').replace('-', '_')}"
+
+ # Create configuration-specific output directory
+ config_output_dir = output_dir / f"{config_name}_{timestamp}"
+ config_output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Prepare loader kwargs based on provider
+ loader_kwargs = {}
+ if provider == 'vllm' and base_url:
+ loader_kwargs['base_url'] = base_url
+ elif provider == 'huggingface' and device:
+ loader_kwargs['device'] = device
+
+ logger.info(f"🚀 Starting multi-variant test for {config_name}")
+ logger.info(f"📊 Testing {len(variants)} variants with up to {max_files or 'ALL'} files each")
+
+ overall_start_time = time.time()
+ variant_results = {}
+
+ # Create overall progress bar for variants if tqdm is available
+ if HAS_TQDM:
+ variant_progress = tqdm.tqdm(total=len(variants), desc="Variants",
+ unit="variant", position=1, leave=True)
+
+ for i, variant in enumerate(variants):
+ logger.info(f"\n📝 [{i+1}/{len(variants)}] Testing variant: {variant}")
+ variant_start_time = time.time()
+
+ # Output file for this variant
+ variant_output_file = config_output_dir / f"{variant}_{timestamp}.json"
+
+ try:
+ # Run batch evaluation for this variant
+ result = await batch_evaluate(
+ dataset_path=dataset_path,
+ provider=provider,
+ variant_type=variant,
+ max_concurrent=max_concurrent,
+ max_files=max_files,
+ solver_model=solver_model,
+ grader_model=grader_model,
+ output_file=variant_output_file,
+ **loader_kwargs
+ )
+
+ variant_time = time.time() - variant_start_time
+
+ # Extract key metrics
+ summary = result.get('summary', {})
+ variant_results[variant] = {
+ 'status': 'success',
+ 'output_file': str(variant_output_file),
+ 'total_problems': summary.get('total_problems', 0),
+ 'successful_evaluations': summary.get('successful', 0),
+ 'correct_evaluations': summary.get('correct_answers', 0),
+ 'incorrect_evaluations': summary.get('incorrect_answers', 0),
+ 'failed_evaluations': summary.get('failed', 0),
+ 'success_rate': summary.get('success_rate', 0),
+ 'average_grade': summary.get('average_grade', 0),
+ 'total_processing_time': summary.get('total_time_seconds', 0),
+ 'avg_time_per_problem': summary.get('problems_per_second', 0),
+ 'variant_test_time': variant_time,
+ 'grade_distribution': result.get('problems', []) # Assuming 'problems' contains all results
+ }
+
+ logger.info(f"✅ {variant}: "
+ f"Grade {summary.get('average_grade', 0):.2f}, "
+ f"Success {summary.get('success_rate', 0):.1f}%, "
+ f"Time {variant_time/60:.1f}min")
+
+ except Exception as e:
+ variant_time = time.time() - variant_start_time
+ error_msg = str(e)
+
+ variant_results[variant] = {
+ 'status': 'failed',
+ 'error': error_msg,
+ 'variant_test_time': variant_time
+ }
+
+ logger.error(f"❌ {variant} failed: {error_msg}")
+
+ # Update variant progress bar
+ if HAS_TQDM and 'variant_progress' in locals():
+ variant_progress.update(1)
+ successful_variants_count = len([v for v, r in variant_results.items() if r.get('status') == 'success'])
+ variant_progress.set_postfix({
+ 'Success': successful_variants_count,
+ 'Failed': len(variant_results) - successful_variants_count
+ })
+
+ # Close variant progress bar
+ if HAS_TQDM and 'variant_progress' in locals():
+ variant_progress.close()
+
+ overall_time = time.time() - overall_start_time
+
+ # Generate comprehensive summary
+ successful_variants = [v for v, r in variant_results.items() if r.get('status') == 'success']
+ failed_variants = [v for v, r in variant_results.items() if r.get('status') == 'failed']
+
+ # Calculate aggregate statistics
+ if successful_variants:
+ total_problems = sum(variant_results[v].get('total_problems', 0) for v in successful_variants)
+ total_successful = sum(variant_results[v].get('successful_evaluations', 0) for v in successful_variants)
+ total_correct = sum(variant_results[v].get('correct_evaluations', 0) for v in successful_variants)
+ total_incorrect = sum(variant_results[v].get('incorrect_evaluations', 0) for v in successful_variants)
+ total_failed = sum(variant_results[v].get('failed_evaluations', 0) for v in successful_variants)
+
+ grades = [variant_results[v].get('average_grade', 0) for v in successful_variants]
+ success_rates = [variant_results[v].get('success_rate', 0) for v in successful_variants]
+ times = [variant_results[v].get('avg_time_per_problem', 0) for v in successful_variants]
+
+ overall_avg_grade = sum(grades) / len(grades) if grades else 0
+ overall_success_rate = sum(success_rates) / len(success_rates) if success_rates else 0
+ overall_avg_time = sum(times) / len(times) if times else 0
+
+ # Find best and worst performing variants
+ best_variant = max(successful_variants, key=lambda v: variant_results[v].get('average_grade', 0))
+ worst_variant = min(successful_variants, key=lambda v: variant_results[v].get('average_grade', 0))
+
+ fastest_variant = min(successful_variants, key=lambda v: variant_results[v].get('avg_time_per_problem', float('inf')))
+ slowest_variant = max(successful_variants, key=lambda v: variant_results[v].get('avg_time_per_problem', 0))
+ else:
+ total_problems = total_successful = total_correct = total_incorrect = total_failed = 0
+ overall_avg_grade = overall_success_rate = overall_avg_time = 0
+ best_variant = worst_variant = fastest_variant = slowest_variant = None
+
+ summary_result = {
+ 'configuration': {
+ 'provider': provider,
+ 'solver_model': solver_model,
+ 'grader_model': grader_model,
+ 'base_url': base_url,
+ 'device': device,
+ 'timestamp': timestamp
+ },
+ 'test_overview': {
+ 'total_variants_tested': len(variant_results),
+ 'successful_variants': len(successful_variants),
+ 'failed_variants': len(failed_variants),
+ 'total_test_time_minutes': overall_time / 60,
+ 'variants_list': list(variant_results.keys())
+ },
+ 'aggregate_metrics': {
+ 'total_problems_across_variants': total_problems,
+ 'total_successful_evaluations': total_successful,
+ 'total_correct_evaluations': total_correct,
+ 'total_incorrect_evaluations': total_incorrect,
+ 'total_technical_failures': total_failed,
+ 'overall_average_grade': overall_avg_grade,
+ 'overall_success_rate': overall_success_rate,
+ 'overall_avg_time_per_problem': overall_avg_time
+ },
+ 'variant_comparison': {
+ 'best_performing_variant': {
+ 'variant': best_variant,
+ 'grade': variant_results.get(best_variant, {}).get('average_grade', 0) if best_variant else 0
+ },
+ 'worst_performing_variant': {
+ 'variant': worst_variant,
+ 'grade': variant_results.get(worst_variant, {}).get('average_grade', 0) if worst_variant else 0
+ },
+ 'fastest_variant': {
+ 'variant': fastest_variant,
+ 'time_per_problem': variant_results.get(fastest_variant, {}).get('avg_time_per_problem', 0) if fastest_variant else 0
+ },
+ 'slowest_variant': {
+ 'variant': slowest_variant,
+ 'time_per_problem': variant_results.get(slowest_variant, {}).get('avg_time_per_problem', 0) if slowest_variant else 0
+ }
+ },
+ 'detailed_variant_results': variant_results
+ }
+
+ # Save configuration summary
+ summary_file = config_output_dir / f"SUMMARY_{config_name}_{timestamp}.json"
+ with open(summary_file, 'w', encoding='utf-8') as f:
+ json.dump(summary_result, f, indent=2, ensure_ascii=False)
+
+ # Print summary to console
+ logger.info("\n" + "="*80)
+ logger.info("📊 MULTI-VARIANT TEST SUMMARY REPORT")
+ logger.info("="*80)
+
+ logger.info(f"🤖 Provider: {provider}")
+ if solver_model:
+ logger.info(f"🧠 Solver Model: {solver_model}")
+ if grader_model:
+ logger.info(f"📝 Grader Model: {grader_model}")
+
+ logger.info(f"\n📋 Test Overview:")
+ logger.info(f" Total variants tested: {len(variant_results)}")
+ logger.info(f" Successful variants: {len(successful_variants)}")
+ logger.info(f" Failed variants: {len(failed_variants)}")
+ logger.info(f" Total test time: {overall_time/60:.1f} minutes")
+
+ if total_problems > 0:
+ logger.info(f"\n📈 Aggregate Performance:")
+ logger.info(f" Total problems: {total_problems}")
+ logger.info(f" Overall average grade: {overall_avg_grade:.2f}")
+ logger.info(f" Overall success rate: {overall_success_rate:.1f}%")
+ logger.info(f" Average time per problem: {overall_avg_time:.2f}s")
+
+ if best_variant:
+ logger.info(f"\n🏆 Variant Performance:")
+ logger.info(f" Best performing: {best_variant} (Grade: {variant_results[best_variant]['average_grade']:.2f})")
+ logger.info(f" Worst performing: {worst_variant} (Grade: {variant_results[worst_variant]['average_grade']:.2f})")
+ logger.info(f" Fastest: {fastest_variant} ({variant_results[fastest_variant]['avg_time_per_problem']:.2f}s/problem)")
+ logger.info(f" Slowest: {slowest_variant} ({variant_results[slowest_variant]['avg_time_per_problem']:.2f}s/problem)")
+
+ logger.info("="*80)
+ logger.info(f"💾 Configuration summary saved to {summary_file}")
+
+ return summary_result
+
+
+async def main():
+ """Main function."""
+ parser = argparse.ArgumentParser(description="Batch evaluate mathematical problems")
+
+ # Required arguments
+ parser.add_argument("--provider", required=True, choices=get_supported_providers(),
+ help="AI provider to use")
+
+ # Dataset options
+ parser.add_argument("--dataset", default="dataset",
+ help="Dataset directory path (default: dataset)")
+ parser.add_argument("--variant", default="original",
+ choices=["original", "descriptive_long", "descriptive_long_confusing",
+ "descriptive_long_misleading", "garbled_string", "kernel_variant"],
+ help="Problem variant to use (default: original)")
+ parser.add_argument("--all-variants", action="store_true",
+ help="Test all 6 problem variants instead of just one")
+ parser.add_argument("--variants", nargs="+",
+ choices=["original", "descriptive_long", "descriptive_long_confusing",
+ "descriptive_long_misleading", "garbled_string", "kernel_variant"],
+ help="Specific variants to test (use with --all-variants)")
+ parser.add_argument("--max-files", type=int,
+ help="Maximum number of files to process per variant (default: all)")
+
+ # Processing options
+ parser.add_argument("--max-concurrent", type=int, default=3,
+ help="Maximum concurrent evaluations (default: 3)")
+ parser.add_argument("--solver-model",
+ help="Override solver model")
+ parser.add_argument("--grader-model",
+ help="Override grader model")
+
+ # Output options
+ parser.add_argument("--output", type=Path,
+ help="Output file path (default: results/[provider]_[timestamp].json)")
+ parser.add_argument("--output-dir", type=Path, default="results",
+ help="Output directory (default: results)")
+ parser.add_argument("--resume", type=Path,
+ help="Path to checkpoint file to resume from")
+
+ # Provider-specific options
+ parser.add_argument("--base-url",
+ help="Base URL for VLLM provider")
+ parser.add_argument("--device", default="auto",
+ help="Device for HuggingFace provider (auto/cuda/cpu)")
+
+ args = parser.parse_args()
+
+ # Setup output directory and logging
+ args.output_dir.mkdir(parents=True, exist_ok=True)
+ logger = setup_logging(args.output_dir)
+
+ # Default output file if not specified
+ if not args.output:
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ args.output = args.output_dir / f"{args.provider}_{args.variant}_{timestamp}.json"
+
+ # Prepare loader kwargs based on provider
+ loader_kwargs = {}
+ if args.provider == 'vllm' and args.base_url:
+ loader_kwargs['base_url'] = args.base_url
+ elif args.provider == 'huggingface' and args.device:
+ loader_kwargs['device'] = args.device
+
+ try:
+ if args.all_variants or args.variants:
+ # Multi-variant evaluation
+ variants_to_test = args.variants if args.variants else None
+ results = await batch_evaluate_all_variants(
+ dataset_path=Path(args.dataset),
+ provider=args.provider,
+ variants=variants_to_test,
+ max_concurrent=args.max_concurrent,
+ max_files=args.max_files,
+ solver_model=args.solver_model,
+ grader_model=args.grader_model,
+ output_dir=args.output_dir,
+ base_url=args.base_url,
+ device=args.device
+ )
+
+ logger.info(f"Multi-variant evaluation completed successfully!")
+ logger.info(f"Overall average grade: {results['aggregate_metrics']['overall_average_grade']:.2f}")
+ logger.info(f"Overall success rate: {results['aggregate_metrics']['overall_success_rate']:.1f}%")
+ else:
+ # Single variant evaluation
+ results = await batch_evaluate(
+ dataset_path=Path(args.dataset),
+ provider=args.provider,
+ variant_type=args.variant,
+ max_concurrent=args.max_concurrent,
+ max_files=args.max_files,
+ solver_model=args.solver_model,
+ grader_model=args.grader_model,
+ output_file=args.output,
+ resume_checkpoint=args.resume,
+ **loader_kwargs
+ )
+
+ logger.info(f"Batch evaluation completed successfully!")
+ logger.info(f"Average grade: {results['summary']['average_grade']:.2f}")
+ logger.info(f"Success rate: {results['summary']['success_rate']:.1f}%")
+
+ except KeyboardInterrupt:
+ logger.info("Evaluation interrupted by user")
+ except Exception as e:
+ logger.error(f"Evaluation failed: {str(e)}")
+ return 1
+
+ return 0
+
+
+if __name__ == "__main__":
+ exit(asyncio.run(main())) \ No newline at end of file
diff --git a/putnam-bench-anon/scripts/benchmark.py b/putnam-bench-anon/scripts/benchmark.py
new file mode 100644
index 0000000..2fed228
--- /dev/null
+++ b/putnam-bench-anon/scripts/benchmark.py
@@ -0,0 +1,481 @@
+#!/usr/bin/env python3
+"""
+Benchmark script for comparing AI providers and models on mathematical problems.
+
+This script runs comparative evaluations across multiple providers, models, and
+problem variants to assess performance, accuracy, cost, and speed trade-offs.
+
+Usage:
+ python benchmark.py --config benchmark_config.json
+ python benchmark.py --quick-test # Quick 3-problem test across all providers
+ python benchmark.py --providers openai anthropic --models gpt-4o-mini claude-3-5-haiku
+"""
+
+import asyncio
+import json
+import sys
+import time
+from pathlib import Path
+import argparse
+from typing import List, Dict, Any, Tuple
+import logging
+from datetime import datetime
+import itertools
+import statistics
+
+# Add the loader module to the path
+sys.path.append(str(Path(__file__).parent))
+
+from loader import create_loader, get_supported_providers, get_default_models
+
+
+class BenchmarkRunner:
+ """Benchmark runner for AI providers."""
+
+ def __init__(self, output_dir: Path = Path("benchmark_results")):
+ self.output_dir = output_dir
+ self.output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Setup logging
+ log_file = self.output_dir / f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.FileHandler(log_file),
+ logging.StreamHandler(sys.stdout)
+ ]
+ )
+ self.logger = logging.getLogger(__name__)
+
+ async def load_test_problems(self, dataset_path: Path, max_problems: int = 10) -> List[Dict[str, Any]]:
+ """Load test problems from dataset."""
+ json_files = list(dataset_path.glob("*.json"))[:max_problems]
+
+ problems = []
+ for json_file in json_files:
+ try:
+ with open(json_file, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ data['_source_file'] = str(json_file.name)
+ problems.append(data)
+ except Exception as e:
+ self.logger.warning(f"Failed to load {json_file}: {str(e)}")
+
+ return problems
+
+ async def run_single_configuration(self,
+ provider: str,
+ solver_model: str,
+ grader_model: str,
+ problems: List[Dict[str, Any]],
+ variant_type: str = "original",
+ **loader_kwargs) -> Dict[str, Any]:
+ """Run benchmark for a single provider/model configuration."""
+ config_name = f"{provider}_{solver_model}_{grader_model}".replace("/", "_").replace("-", "_")
+ self.logger.info(f"🚀 Testing configuration: {config_name}")
+
+ result = {
+ 'configuration': {
+ 'provider': provider,
+ 'solver_model': solver_model,
+ 'grader_model': grader_model,
+ 'variant_type': variant_type,
+ 'loader_kwargs': loader_kwargs
+ },
+ 'metrics': {},
+ 'problems': [],
+ 'errors': []
+ }
+
+ try:
+ # Create loader
+ loader = create_loader(
+ provider,
+ solver_model=solver_model,
+ grader_model=grader_model,
+ **loader_kwargs
+ )
+
+ # Health check
+ if not await loader.health_check():
+ raise RuntimeError(f"Health check failed for {provider}")
+
+ # Cost estimation
+ cost_info = await loader.estimate_cost(len(problems))
+ result['metrics']['estimated_cost'] = cost_info
+
+ # Process each problem
+ start_time = time.time()
+ grades = []
+ processing_times = []
+
+ for i, problem in enumerate(problems):
+ problem_start = time.time()
+
+ try:
+ problem_result = await loader.test_single_problem(
+ problem,
+ variant_type=variant_type
+ )
+
+ processing_time = time.time() - problem_start
+ # Convert boolean 'correct' to numeric grade (10 for correct, 0 for incorrect)
+ grade = 10 if problem_result.get('correct', False) else 0
+
+ grades.append(grade)
+ processing_times.append(processing_time)
+
+ result['problems'].append({
+ 'source_file': problem.get('_source_file', f'problem_{i}'),
+ 'grade': grade,
+ 'processing_time': processing_time,
+ 'solution_length': len(problem_result.get('solution', '')),
+ 'grading_feedback_length': len(str(problem_result.get('grading_result', {}).get('feedback', '')))
+ })
+
+ self.logger.info(f" Problem {i+1}/{len(problems)}: Grade {grade} ({processing_time:.2f}s)")
+
+ except Exception as e:
+ error_info = {
+ 'problem_index': i,
+ 'source_file': problem.get('_source_file', f'problem_{i}'),
+ 'error': str(e),
+ 'processing_time': time.time() - problem_start
+ }
+ result['errors'].append(error_info)
+ self.logger.error(f" Problem {i+1}/{len(problems)} failed: {str(e)}")
+
+ total_time = time.time() - start_time
+
+ # Calculate metrics
+ if grades:
+ result['metrics'].update({
+ 'total_problems': len(problems),
+ 'successful_problems': len(grades),
+ 'failed_problems': len(result['errors']),
+ 'success_rate': len(grades) / len(problems) * 100,
+ 'average_grade': statistics.mean(grades),
+ 'median_grade': statistics.median(grades),
+ 'grade_std': statistics.stdev(grades) if len(grades) > 1 else 0,
+ 'max_grade': max(grades),
+ 'min_grade': min(grades),
+ 'total_time': total_time,
+ 'average_time_per_problem': statistics.mean(processing_times),
+ 'median_time_per_problem': statistics.median(processing_times),
+ 'total_time_successful': sum(processing_times),
+ 'throughput_problems_per_minute': len(grades) / (total_time / 60) if total_time > 0 else 0
+ })
+ else:
+ result['metrics'].update({
+ 'total_problems': len(problems),
+ 'successful_problems': 0,
+ 'failed_problems': len(result['errors']),
+ 'success_rate': 0,
+ 'total_time': total_time,
+ 'error_rate': 100
+ })
+
+ self.logger.info(f"✅ Configuration completed: {result['metrics']['success_rate']:.1f}% success, "
+ f"avg grade: {result['metrics'].get('average_grade', 0):.2f}")
+
+ except Exception as e:
+ result['metrics']['fatal_error'] = str(e)
+ self.logger.error(f"❌ Configuration failed: {str(e)}")
+
+ return result
+
+ async def run_comparative_benchmark(self,
+ configurations: List[Dict[str, Any]],
+ problems: List[Dict[str, Any]],
+ variant_type: str = "original") -> Dict[str, Any]:
+ """Run comparative benchmark across multiple configurations."""
+ self.logger.info(f"🏁 Starting comparative benchmark with {len(configurations)} configurations")
+ self.logger.info(f"📊 Testing {len(problems)} problems with variant: {variant_type}")
+
+ benchmark_start = time.time()
+ results = []
+
+ for i, config in enumerate(configurations):
+ self.logger.info(f"\n📋 Configuration {i+1}/{len(configurations)}")
+
+ provider = config['provider']
+ solver_model = config.get('solver_model')
+ grader_model = config.get('grader_model')
+ loader_kwargs = config.get('loader_kwargs', {})
+
+ # Use defaults if not specified
+ if not solver_model or not grader_model:
+ defaults = get_default_models(provider)
+ solver_model = solver_model or defaults['solver_model']
+ grader_model = grader_model or defaults['grader_model']
+
+ config_result = await self.run_single_configuration(
+ provider=provider,
+ solver_model=solver_model,
+ grader_model=grader_model,
+ problems=problems,
+ variant_type=variant_type,
+ **loader_kwargs
+ )
+
+ results.append(config_result)
+
+ total_benchmark_time = time.time() - benchmark_start
+
+ # Generate comparison report
+ report = self.generate_comparison_report(results, total_benchmark_time)
+
+ # Save detailed results
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ detailed_file = self.output_dir / f"benchmark_detailed_{timestamp}.json"
+ with open(detailed_file, 'w', encoding='utf-8') as f:
+ json.dump({
+ 'benchmark_info': {
+ 'timestamp': datetime.now().isoformat(),
+ 'total_configurations': len(configurations),
+ 'total_problems': len(problems),
+ 'variant_type': variant_type,
+ 'total_time': total_benchmark_time
+ },
+ 'configurations': configurations,
+ 'results': results,
+ 'comparison_report': report
+ }, f, indent=2, ensure_ascii=False)
+
+ self.logger.info(f"💾 Detailed results saved to {detailed_file}")
+
+ return report
+
+ def generate_comparison_report(self, results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]:
+ """Generate comparison report from benchmark results."""
+ self.logger.info("\n" + "="*60)
+ self.logger.info("📊 BENCHMARK COMPARISON REPORT")
+ self.logger.info("="*60)
+
+ # Filter successful results
+ successful_results = [r for r in results if r['metrics'].get('success_rate', 0) > 0]
+
+ if not successful_results:
+ self.logger.warning("⚠️ No successful configurations found!")
+ return {'error': 'No successful configurations'}
+
+ # Ranking by different metrics
+ rankings = {
+ 'accuracy': sorted(successful_results, key=lambda x: x['metrics']['average_grade'], reverse=True),
+ 'speed': sorted(successful_results, key=lambda x: x['metrics']['average_time_per_problem']),
+ 'throughput': sorted(successful_results, key=lambda x: x['metrics']['throughput_problems_per_minute'], reverse=True),
+ 'success_rate': sorted(successful_results, key=lambda x: x['metrics']['success_rate'], reverse=True)
+ }
+
+ # Print rankings
+ for metric, ranked_results in rankings.items():
+ self.logger.info(f"\n🏆 Top 3 by {metric.upper()}:")
+ for i, result in enumerate(ranked_results[:3]):
+ config = result['configuration']
+ metrics = result['metrics']
+ provider = config['provider']
+ solver = config['solver_model']
+
+ if metric == 'accuracy':
+ value = f"{metrics['average_grade']:.2f}"
+ elif metric == 'speed':
+ value = f"{metrics['average_time_per_problem']:.2f}s"
+ elif metric == 'throughput':
+ value = f"{metrics['throughput_problems_per_minute']:.1f} prob/min"
+ elif metric == 'success_rate':
+ value = f"{metrics['success_rate']:.1f}%"
+
+ self.logger.info(f" {i+1}. {provider}/{solver}: {value}")
+
+ # Calculate cost efficiency
+ cost_efficiency = []
+ for result in successful_results:
+ metrics = result['metrics']
+ cost_info = metrics.get('estimated_cost', {})
+ total_cost = cost_info.get('total_cost', 0)
+ avg_grade = metrics.get('average_grade', 0)
+
+ if total_cost > 0 and avg_grade > 0:
+ efficiency = avg_grade / total_cost # Grade per unit cost
+ cost_efficiency.append({
+ 'result': result,
+ 'efficiency': efficiency,
+ 'cost': total_cost,
+ 'grade': avg_grade
+ })
+
+ if cost_efficiency:
+ cost_efficiency.sort(key=lambda x: x['efficiency'], reverse=True)
+ self.logger.info(f"\n💰 Top 3 by COST EFFICIENCY (Grade/Cost):")
+ for i, item in enumerate(cost_efficiency[:3]):
+ config = item['result']['configuration']
+ provider = config['provider']
+ solver = config['solver_model']
+ self.logger.info(f" {i+1}. {provider}/{solver}: {item['efficiency']:.2f} "
+ f"(Grade: {item['grade']:.2f}, Cost: {item['cost']:.4f})")
+
+ # Overall statistics
+ all_grades = []
+ all_times = []
+ all_success_rates = []
+
+ for result in successful_results:
+ metrics = result['metrics']
+ all_grades.append(metrics['average_grade'])
+ all_times.append(metrics['average_time_per_problem'])
+ all_success_rates.append(metrics['success_rate'])
+
+ self.logger.info(f"\n📈 OVERALL STATISTICS:")
+ self.logger.info(f" Configurations tested: {len(results)}")
+ self.logger.info(f" Successful configurations: {len(successful_results)}")
+ self.logger.info(f" Average grade across all: {statistics.mean(all_grades):.2f}")
+ self.logger.info(f" Average time per problem: {statistics.mean(all_times):.2f}s")
+ self.logger.info(f" Average success rate: {statistics.mean(all_success_rates):.1f}%")
+ self.logger.info(f" Total benchmark time: {total_time/60:.2f} minutes")
+
+ # Generate final report
+ report = {
+ 'summary': {
+ 'total_configurations': len(results),
+ 'successful_configurations': len(successful_results),
+ 'overall_avg_grade': statistics.mean(all_grades) if all_grades else 0,
+ 'overall_avg_time': statistics.mean(all_times) if all_times else 0,
+ 'overall_avg_success_rate': statistics.mean(all_success_rates) if all_success_rates else 0,
+ 'total_benchmark_time': total_time
+ },
+ 'rankings': {
+ metric: [
+ {
+ 'provider': r['configuration']['provider'],
+ 'solver_model': r['configuration']['solver_model'],
+ 'grader_model': r['configuration']['grader_model'],
+ 'score': r['metrics'][metric_key]
+ }
+ for r in ranked[:5] # Top 5
+ ] for metric, ranked in rankings.items()
+ for metric_key in [{'accuracy': 'average_grade', 'speed': 'average_time_per_problem',
+ 'throughput': 'throughput_problems_per_minute', 'success_rate': 'success_rate'}[metric]]
+ },
+ 'cost_efficiency': [
+ {
+ 'provider': item['result']['configuration']['provider'],
+ 'solver_model': item['result']['configuration']['solver_model'],
+ 'efficiency': item['efficiency'],
+ 'grade': item['grade'],
+ 'cost': item['cost']
+ }
+ for item in cost_efficiency[:5]
+ ] if cost_efficiency else []
+ }
+
+ return report
+
+
+async def run_quick_test():
+ """Run a quick test across all providers with 3 problems."""
+ runner = BenchmarkRunner()
+
+ # Load 3 test problems
+ problems = await runner.load_test_problems(Path("dataset"), max_problems=3)
+ if not problems:
+ print("❌ No test problems found in dataset directory")
+ return
+
+ # Default configurations for all providers
+ configurations = []
+ for provider in get_supported_providers():
+ config = {'provider': provider}
+
+ # Provider-specific settings
+ if provider == 'vllm':
+ config['loader_kwargs'] = {'base_url': 'http://localhost:8000/v1'}
+ elif provider == 'huggingface':
+ config['loader_kwargs'] = {
+ 'device': 'cpu',
+ 'solver_model': 'microsoft/DialoGPT-small',
+ 'grader_model': 'microsoft/DialoGPT-small'
+ }
+
+ configurations.append(config)
+
+ # Run benchmark
+ await runner.run_comparative_benchmark(configurations, problems)
+
+
+async def run_custom_benchmark(config_file: Path):
+ """Run benchmark from configuration file."""
+ with open(config_file, 'r', encoding='utf-8') as f:
+ config = json.load(f)
+
+ runner = BenchmarkRunner(Path(config.get('output_dir', 'benchmark_results')))
+
+ # Load problems
+ dataset_path = Path(config.get('dataset_path', 'dataset'))
+ max_problems = config.get('max_problems', 10)
+ variant_type = config.get('variant_type', 'original')
+
+ problems = await runner.load_test_problems(dataset_path, max_problems)
+ if not problems:
+ print(f"❌ No problems found in {dataset_path}")
+ return
+
+ # Load configurations
+ configurations = config.get('configurations', [])
+ if not configurations:
+ print("❌ No configurations specified in config file")
+ return
+
+ # Run benchmark
+ await runner.run_comparative_benchmark(configurations, problems, variant_type)
+
+
+async def main():
+ """Main function."""
+ parser = argparse.ArgumentParser(description="Benchmark AI providers on mathematical problems")
+
+ # Benchmark modes
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument("--config", type=Path, help="Configuration file path")
+ group.add_argument("--quick-test", action="store_true",
+ help="Quick test with 3 problems across all providers")
+
+ # Custom benchmark options
+ parser.add_argument("--providers", nargs="+", choices=get_supported_providers(),
+ help="Providers to test (for custom benchmark)")
+ parser.add_argument("--models", nargs="+",
+ help="Models to test (for custom benchmark)")
+ parser.add_argument("--dataset", type=Path, default="dataset",
+ help="Dataset path (default: dataset)")
+ parser.add_argument("--max-problems", type=int, default=10,
+ help="Maximum problems to test (default: 10)")
+ parser.add_argument("--variant", default="original",
+ choices=["original", "descriptive_long", "kernel_variant"],
+ help="Problem variant (default: original)")
+ parser.add_argument("--output-dir", type=Path, default="benchmark_results",
+ help="Output directory (default: benchmark_results)")
+
+ args = parser.parse_args()
+
+ try:
+ if args.quick_test:
+ await run_quick_test()
+ elif args.config:
+ await run_custom_benchmark(args.config)
+ else:
+ # Custom benchmark mode (placeholder for future implementation)
+ print("Custom benchmark mode not yet implemented. Use --config or --quick-test.")
+ return 1
+
+ return 0
+
+ except KeyboardInterrupt:
+ print("\n⏸️ Benchmark interrupted by user")
+ return 1
+ except Exception as e:
+ print(f"\n❌ Benchmark failed: {str(e)}")
+ return 1
+
+
+if __name__ == "__main__":
+ exit(asyncio.run(main())) \ No newline at end of file
diff --git a/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py
new file mode 100644
index 0000000..76952bd
--- /dev/null
+++ b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py
@@ -0,0 +1,630 @@
+#!/usr/bin/env python3
+"""
+原题 vs Kernel Variant 数学能力对比测试
+使用4o-mini解题,o3严格评分,比较两种题目的正确率差异
+"""
+
+import os
+import json
+import asyncio
+import pathlib
+import time
+import re
+import random
+from typing import Dict, List, Tuple, Optional
+import click
+import tqdm
+from openai import AsyncOpenAI, RateLimitError, APIError, APIConnectionError
+
+# Configuration
+SOLVER_MODEL = "gpt-4o-mini" # 用于解题的模型
+GRADER_MODEL = "o3" # 用于评分的模型
+SRC_DIR = pathlib.Path("raw/json")
+RESULTS_DIR = pathlib.Path("results/comparison_test")
+RESULTS_DIR.mkdir(parents=True, exist_ok=True)
+
+RETRIES = 4
+TIMEOUT_BASE = 600
+RESP_FMT = {"type": "json_object"}
+
+# 解题系统prompt - 4o-mini
+SOLVER_SYSTEM_PROMPT = """You are an expert mathematician solving competition-level problems.
+Provide detailed, step-by-step solutions with clear mathematical reasoning.
+
+Requirements:
+- Show all your work and intermediate steps
+- Justify each major step of your reasoning
+- Use proper mathematical notation
+- Be thorough but concise
+- State your final answer clearly
+
+Solve the problem completely and rigorously."""
+
+SOLVER_USER_TEMPLATE = """Please solve this mathematical problem:
+
+{problem_statement}
+
+Provide a complete solution with detailed reasoning. Return your response in JSON format:
+{{"solution": "your complete step-by-step solution with mathematical reasoning",
+ "final_answer": "your final answer in a clear, concise form"}}"""
+
+# 证明题严格评分系统prompt - o3
+PROOF_GRADER_SYSTEM_PROMPT = """You are an extremely strict mathematical grader evaluating competition-level PROOF problems.
+
+GRADING STANDARDS (BE VERY STRICT):
+- Mathematical rigor: Every step must be mathematically sound and justified
+- Logical flow: The reasoning must be clear, complete, and logically connected
+- Correctness: All calculations, algebraic manipulations, and conclusions must be correct
+- Completeness: The solution must address all parts of the problem fully
+- Precision: Mathematical statements must be precise and unambiguous
+
+FAILING CRITERIA (Mark as INCORRECT if ANY of these apply):
+- Any unjustified logical leap or gap in reasoning
+- Any computational error, no matter how small
+- Missing steps in critical parts of the argument
+- Imprecise or ambiguous mathematical statements
+- Incorrect final answer, even if approach is partially correct
+- Circular reasoning or logical fallacies
+- Misuse of mathematical theorems or definitions
+
+BE EXTREMELY STRICT. Competition mathematics proofs require perfect precision."""
+
+# 计算题相对宽松评分系统prompt - o3
+CALCULATION_GRADER_SYSTEM_PROMPT = """You are a mathematical grader evaluating competition-level CALCULATION problems.
+
+GRADING STANDARDS FOR CALCULATION PROBLEMS:
+- Primary focus: Is the final answer correct?
+- Secondary focus: Is the overall approach reasonable and mathematically sound?
+- Computation: Allow minor computational slips if the method is correct and final answer is right
+
+GRADING CRITERIA:
+- CORRECT: Final answer is correct AND approach is fundamentally sound
+- INCORRECT: Final answer is wrong OR approach is fundamentally flawed
+
+For calculation problems, the final numerical answer is the most important criterion.
+Minor intermediate errors are acceptable if they don't affect the final result."""
+
+PROOF_GRADER_USER_TEMPLATE = """Grade this PROOF solution with extreme strictness.
+
+PROBLEM:
+{problem_statement}
+
+STUDENT SOLUTION:
+{solution}
+
+CORRECT REFERENCE SOLUTION:
+{reference_solution}
+
+Evaluate with maximum strictness. Every logical step must be perfect. Return JSON with:
+{{"grade": "CORRECT" or "INCORRECT",
+ "detailed_feedback": "specific detailed analysis of what is right/wrong",
+ "major_issues": "list of significant mathematical errors or gaps",
+ "final_answer_correct": true or false,
+ "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed),
+ "overall_assessment": "comprehensive evaluation summary"}}"""
+
+CALCULATION_GRADER_USER_TEMPLATE = """Grade this CALCULATION solution with focus on final answer correctness.
+
+PROBLEM:
+{problem_statement}
+
+STUDENT SOLUTION:
+{solution}
+
+CORRECT REFERENCE SOLUTION:
+{reference_solution}
+
+Focus primarily on whether the final answer is correct. Return JSON with:
+{{"grade": "CORRECT" or "INCORRECT",
+ "detailed_feedback": "specific detailed analysis of what is right/wrong",
+ "major_issues": "list of significant mathematical errors or gaps",
+ "final_answer_correct": true or false,
+ "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed),
+ "overall_assessment": "comprehensive evaluation summary"}}"""
+
+JSON_RE = re.compile(r"\{[\s\S]*\}")
+
+def parse_json_response(raw: str) -> Optional[Dict]:
+ """Parse JSON from LLM response with fallback strategies."""
+ if not raw:
+ return None
+
+ try:
+ return json.loads(raw)
+ except:
+ pass
+
+ match = JSON_RE.search(raw)
+ if match:
+ try:
+ return json.loads(match.group(0))
+ except:
+ pass
+
+ try:
+ fixed = raw.replace('\\"', '"').replace('\\\\', '\\')
+ return json.loads(fixed)
+ except:
+ pass
+
+ return None
+
+def to_str(x) -> str:
+ """Convert various types to string safely."""
+ if x is None:
+ return ""
+ if isinstance(x, str):
+ return x
+ if isinstance(x, (list, tuple)):
+ return "\n".join(map(str, x))
+ return str(x)
+
+async def call_api_with_retry(cli: AsyncOpenAI, model: str, messages: List[Dict]) -> Tuple[Optional[Dict], str]:
+ """Make OpenAI API call with retry logic."""
+ raw_response = ""
+
+ for attempt in range(1, RETRIES + 1):
+ timeout = TIMEOUT_BASE * (2 ** (attempt - 1))
+ try:
+ # Set temperature based on model
+ # o3, o3-mini, and o4-mini require temperature 1.0
+ if any(model_name in model.lower() for model_name in ['o3', 'o3-mini', 'o4-mini']):
+ temperature = 1.0
+ else:
+ # Use temperature 0.0 for deterministic solving with other models
+ temperature = 0.0
+
+ response = await asyncio.wait_for(
+ cli.chat.completions.create(
+ model=model,
+ messages=messages,
+ temperature=temperature,
+ response_format=RESP_FMT,
+ ),
+ timeout=timeout,
+ )
+ raw_response = response.choices[0].message.content or ""
+ parsed = parse_json_response(raw_response)
+ if parsed:
+ return parsed, raw_response
+ raise ValueError("Failed to parse JSON response")
+
+ except RateLimitError as e:
+ print(f"🚫 RateLimitError (attempt {attempt}/{RETRIES}): {str(e)}")
+ if "insufficient_quota" in str(e):
+ print("⏳ Detected quota exhaustion - sleeping 15 minutes")
+ await asyncio.sleep(900)
+ else:
+ sleep_time = 2 ** attempt + random.random()
+ print(f" ⏰ Rate limited, sleeping {sleep_time:.1f}s")
+ await asyncio.sleep(sleep_time)
+
+ except (APIError, APIConnectionError, asyncio.TimeoutError, ValueError) as e:
+ print(f"❌ {type(e).__name__} (attempt {attempt}/{RETRIES}): {str(e)}")
+ if attempt == RETRIES:
+ return None, raw_response
+ sleep_time = 2 ** attempt + random.random()
+ print(f" ⏰ Retrying in {sleep_time:.1f}s")
+ await asyncio.sleep(sleep_time)
+
+ return None, raw_response
+
+async def solve_problem(cli: AsyncOpenAI, problem_statement: str) -> Tuple[Optional[Dict], str]:
+ """让4o-mini解题"""
+ messages = [
+ {"role": "system", "content": SOLVER_SYSTEM_PROMPT},
+ {"role": "user", "content": SOLVER_USER_TEMPLATE.format(
+ problem_statement=problem_statement
+ )}
+ ]
+ return await call_api_with_retry(cli, SOLVER_MODEL, messages)
+
+async def grade_solution(cli: AsyncOpenAI, problem_statement: str, solution: str,
+ reference_solution: str, problem_type: str = "proof") -> Tuple[Optional[Dict], str]:
+ """让o3根据题型评分 - 证明题严格,计算题注重答案"""
+ if problem_type == "calculation":
+ system_prompt = CALCULATION_GRADER_SYSTEM_PROMPT
+ user_template = CALCULATION_GRADER_USER_TEMPLATE
+ else: # Default to proof (strict grading)
+ system_prompt = PROOF_GRADER_SYSTEM_PROMPT
+ user_template = PROOF_GRADER_USER_TEMPLATE
+
+ messages = [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_template.format(
+ problem_statement=problem_statement,
+ solution=solution,
+ reference_solution=reference_solution
+ )}
+ ]
+ return await call_api_with_retry(cli, GRADER_MODEL, messages)
+
+async def test_single_file(file_path: pathlib.Path, cli: AsyncOpenAI) -> Dict:
+ """测试单个文件的原题和kernel variant"""
+ try:
+ # 加载数据
+ data = json.loads(file_path.read_text(encoding='utf-8'))
+ index = data.get("index", file_path.stem)
+
+ # 检查必要字段
+ original_question = to_str(data.get("question", "")).strip()
+ original_solution = to_str(data.get("solution", "")).strip()
+ problem_type = data.get("problem_type", "proof") # 默认为证明题,严格评分
+
+ kv = data.get("variants", {}).get("kernel_variant")
+ if not kv:
+ return {
+ "index": index,
+ "status": "skipped",
+ "reason": "no_kernel_variant"
+ }
+
+ kernel_question = to_str(kv.get("question", "")).strip()
+ kernel_solution = to_str(kv.get("solution", "")).strip()
+
+ if not all([original_question, original_solution, kernel_question, kernel_solution]):
+ return {
+ "index": index,
+ "status": "skipped",
+ "reason": "missing_fields"
+ }
+
+ print(f"🧮 Testing {index} (Type: {problem_type.upper()})")
+ start_time = time.time()
+
+ result = {
+ "index": index,
+ "status": "completed",
+ "timestamp": time.time(),
+ "problem_type": problem_type,
+ "original": {},
+ "kernel_variant": {},
+ "comparison": {}
+ }
+
+ # 1. 让4o-mini解原题
+ print(f" 📝 Solving original problem...")
+ orig_solve_result, orig_solve_raw = await solve_problem(cli, original_question)
+
+ if not orig_solve_result:
+ result["original"]["solve_status"] = "failed"
+ result["status"] = "failed"
+ return result
+
+ orig_student_solution = to_str(orig_solve_result.get("solution", "")).strip()
+ orig_final_answer = to_str(orig_solve_result.get("final_answer", "")).strip()
+
+ result["original"]["student_solution"] = orig_student_solution
+ result["original"]["student_final_answer"] = orig_final_answer
+ result["original"]["solve_status"] = "success"
+
+ # 2. 让4o-mini解kernel variant
+ print(f" 📝 Solving kernel variant...")
+ kv_solve_result, kv_solve_raw = await solve_problem(cli, kernel_question)
+
+ if not kv_solve_result:
+ result["kernel_variant"]["solve_status"] = "failed"
+ result["status"] = "failed"
+ return result
+
+ kv_student_solution = to_str(kv_solve_result.get("solution", "")).strip()
+ kv_final_answer = to_str(kv_solve_result.get("final_answer", "")).strip()
+
+ result["kernel_variant"]["student_solution"] = kv_student_solution
+ result["kernel_variant"]["student_final_answer"] = kv_final_answer
+ result["kernel_variant"]["solve_status"] = "success"
+
+ # 3. o3根据题型评分原题解答
+ grading_style = "STRICT" if problem_type == "proof" else "LENIENT"
+ print(f" 🔍 Grading original solution ({grading_style})...")
+ orig_grade_result, orig_grade_raw = await grade_solution(
+ cli, original_question, orig_student_solution, original_solution, problem_type
+ )
+
+ if not orig_grade_result:
+ result["original"]["grade_status"] = "failed"
+ else:
+ result["original"]["grade_status"] = "success"
+ result["original"]["grade"] = orig_grade_result.get("grade", "UNKNOWN")
+ result["original"]["detailed_feedback"] = orig_grade_result.get("detailed_feedback", "")
+ result["original"]["major_issues"] = orig_grade_result.get("major_issues", "")
+ result["original"]["final_answer_correct"] = orig_grade_result.get("final_answer_correct", False)
+ result["original"]["reasoning_rigor_score"] = orig_grade_result.get("reasoning_rigor_score", 0)
+ result["original"]["overall_assessment"] = orig_grade_result.get("overall_assessment", "")
+
+ # 4. o3根据题型评分kernel variant解答
+ print(f" 🔍 Grading kernel variant solution ({grading_style})...")
+ kv_grade_result, kv_grade_raw = await grade_solution(
+ cli, kernel_question, kv_student_solution, kernel_solution, problem_type
+ )
+
+ if not kv_grade_result:
+ result["kernel_variant"]["grade_status"] = "failed"
+ else:
+ result["kernel_variant"]["grade_status"] = "success"
+ result["kernel_variant"]["grade"] = kv_grade_result.get("grade", "UNKNOWN")
+ result["kernel_variant"]["detailed_feedback"] = kv_grade_result.get("detailed_feedback", "")
+ result["kernel_variant"]["major_issues"] = kv_grade_result.get("major_issues", "")
+ result["kernel_variant"]["final_answer_correct"] = kv_grade_result.get("final_answer_correct", False)
+ result["kernel_variant"]["reasoning_rigor_score"] = kv_grade_result.get("reasoning_rigor_score", 0)
+ result["kernel_variant"]["overall_assessment"] = kv_grade_result.get("overall_assessment", "")
+
+ # 5. 比较分析
+ if (result["original"]["grade_status"] == "success" and
+ result["kernel_variant"]["grade_status"] == "success"):
+
+ orig_correct = result["original"]["grade"] == "CORRECT"
+ kv_correct = result["kernel_variant"]["grade"] == "CORRECT"
+
+ result["comparison"]["original_correct"] = orig_correct
+ result["comparison"]["kernel_variant_correct"] = kv_correct
+ result["comparison"]["both_correct"] = orig_correct and kv_correct
+ result["comparison"]["both_incorrect"] = not orig_correct and not kv_correct
+ result["comparison"]["original_harder"] = not orig_correct and kv_correct # 原题更难
+ result["comparison"]["kernel_variant_harder"] = orig_correct and not kv_correct # kernel variant更难
+
+ orig_rigor = result["original"]["reasoning_rigor_score"]
+ kv_rigor = result["kernel_variant"]["reasoning_rigor_score"]
+ result["comparison"]["rigor_difference"] = orig_rigor - kv_rigor # 正数=原题推理更严谨
+
+ total_time = time.time() - start_time
+ result["processing_time"] = total_time
+
+ print(f" ✅ Completed {index} in {total_time:.1f}s")
+ if result["comparison"]:
+ orig_status = "✅" if result["comparison"]["original_correct"] else "❌"
+ kv_status = "✅" if result["comparison"]["kernel_variant_correct"] else "❌"
+ print(f" Original: {orig_status}, Kernel Variant: {kv_status}")
+
+ return result
+
+ except Exception as e:
+ return {
+ "index": index if 'index' in locals() else file_path.stem,
+ "status": "error",
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "timestamp": time.time()
+ }
+
+async def save_detailed_results(results: List[Dict], output_file: str):
+ """保存详细结果"""
+ output_path = RESULTS_DIR / f"{output_file}_detailed.json"
+ try:
+ output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding='utf-8')
+ print(f"💾 Detailed results saved to {output_path}")
+ except Exception as e:
+ print(f"❌ Failed to save detailed results: {e}")
+
+def generate_summary_report(results: List[Dict]) -> Dict:
+ """生成汇总报告"""
+ summary = {
+ "total_files": len(results),
+ "completed": 0,
+ "failed": 0,
+ "skipped": 0,
+ "by_problem_type": {
+ "proof": {"count": 0, "original_correct": 0, "kv_correct": 0},
+ "calculation": {"count": 0, "original_correct": 0, "kv_correct": 0}
+ },
+ "original_stats": {"correct": 0, "incorrect": 0, "total_graded": 0},
+ "kernel_variant_stats": {"correct": 0, "incorrect": 0, "total_graded": 0},
+ "comparison_stats": {
+ "both_correct": 0,
+ "both_incorrect": 0,
+ "original_harder": 0,
+ "kernel_variant_harder": 0,
+ "total_compared": 0
+ },
+ "rigor_analysis": {
+ "original_avg_rigor": 0,
+ "kernel_variant_avg_rigor": 0,
+ "rigor_difference_avg": 0
+ }
+ }
+
+ orig_rigor_scores = []
+ kv_rigor_scores = []
+ rigor_differences = []
+
+ for result in results:
+ if result["status"] == "completed":
+ summary["completed"] += 1
+
+ # 按题型统计
+ ptype = result.get("problem_type", "proof")
+ if ptype in summary["by_problem_type"]:
+ summary["by_problem_type"][ptype]["count"] += 1
+ if result["original"].get("grade") == "CORRECT":
+ summary["by_problem_type"][ptype]["original_correct"] += 1
+ if result["kernel_variant"].get("grade") == "CORRECT":
+ summary["by_problem_type"][ptype]["kv_correct"] += 1
+
+ # 原题统计
+ if result["original"].get("grade_status") == "success":
+ summary["original_stats"]["total_graded"] += 1
+ if result["original"]["grade"] == "CORRECT":
+ summary["original_stats"]["correct"] += 1
+ else:
+ summary["original_stats"]["incorrect"] += 1
+ orig_rigor_scores.append(result["original"]["reasoning_rigor_score"])
+
+ # kernel variant统计
+ if result["kernel_variant"].get("grade_status") == "success":
+ summary["kernel_variant_stats"]["total_graded"] += 1
+ if result["kernel_variant"]["grade"] == "CORRECT":
+ summary["kernel_variant_stats"]["correct"] += 1
+ else:
+ summary["kernel_variant_stats"]["incorrect"] += 1
+ kv_rigor_scores.append(result["kernel_variant"]["reasoning_rigor_score"])
+
+ # 比较统计
+ if result.get("comparison"):
+ summary["comparison_stats"]["total_compared"] += 1
+ comp = result["comparison"]
+ if comp["both_correct"]:
+ summary["comparison_stats"]["both_correct"] += 1
+ elif comp["both_incorrect"]:
+ summary["comparison_stats"]["both_incorrect"] += 1
+ elif comp["original_harder"]:
+ summary["comparison_stats"]["original_harder"] += 1
+ elif comp["kernel_variant_harder"]:
+ summary["comparison_stats"]["kernel_variant_harder"] += 1
+
+ rigor_differences.append(comp["rigor_difference"])
+
+ elif result["status"] == "skipped":
+ summary["skipped"] += 1
+ else:
+ summary["failed"] += 1
+
+ # 计算平均分
+ if orig_rigor_scores:
+ summary["rigor_analysis"]["original_avg_rigor"] = sum(orig_rigor_scores) / len(orig_rigor_scores)
+ if kv_rigor_scores:
+ summary["rigor_analysis"]["kernel_variant_avg_rigor"] = sum(kv_rigor_scores) / len(kv_rigor_scores)
+ if rigor_differences:
+ summary["rigor_analysis"]["rigor_difference_avg"] = sum(rigor_differences) / len(rigor_differences)
+
+ # 计算正确率
+ if summary["original_stats"]["total_graded"] > 0:
+ summary["original_stats"]["accuracy"] = summary["original_stats"]["correct"] / summary["original_stats"]["total_graded"]
+
+ if summary["kernel_variant_stats"]["total_graded"] > 0:
+ summary["kernel_variant_stats"]["accuracy"] = summary["kernel_variant_stats"]["correct"] / summary["kernel_variant_stats"]["total_graded"]
+
+ return summary
+
+def print_summary_report(summary: Dict):
+ """打印汇总报告"""
+ print("\n" + "="*80)
+ print("📊 ORIGINAL vs KERNEL VARIANT COMPARISON REPORT")
+ print("="*80)
+
+ print(f"📁 Total files: {summary['total_files']}")
+ print(f"✅ Completed: {summary['completed']}")
+ print(f"⏭️ Skipped: {summary['skipped']}")
+ print(f"❌ Failed: {summary['failed']}")
+
+ print(f"\n📈 ACCURACY COMPARISON:")
+ orig_acc = summary["original_stats"].get("accuracy", 0) * 100
+ kv_acc = summary["kernel_variant_stats"].get("accuracy", 0) * 100
+ print(f"Original Problems: {orig_acc:.1f}% ({summary['original_stats']['correct']}/{summary['original_stats']['total_graded']})")
+ print(f"Kernel Variants: {kv_acc:.1f}% ({summary['kernel_variant_stats']['correct']}/{summary['kernel_variant_stats']['total_graded']})")
+
+ if orig_acc > 0 and kv_acc > 0:
+ diff = orig_acc - kv_acc
+ if diff > 5:
+ print(f"📉 Kernel variants are {diff:.1f}% harder (as expected)")
+ elif diff < -5:
+ print(f"📈 Original problems are {-diff:.1f}% harder (unexpected)")
+ else:
+ print(f"📊 Similar difficulty (difference: {diff:.1f}%)")
+
+ print(f"\n🎯 BY PROBLEM TYPE:")
+ for ptype, stats in summary["by_problem_type"].items():
+ if stats["count"] > 0:
+ orig_acc_type = (stats["original_correct"] / stats["count"]) * 100
+ kv_acc_type = (stats["kv_correct"] / stats["count"]) * 100
+ grading_note = " (STRICT grading)" if ptype == "proof" else " (LENIENT grading)"
+ print(f"{ptype.upper()} Problems{grading_note}:")
+ print(f" Original: {orig_acc_type:.1f}% ({stats['original_correct']}/{stats['count']})")
+ print(f" Kernel Variant: {kv_acc_type:.1f}% ({stats['kv_correct']}/{stats['count']})")
+ if stats["count"] >= 3: # Only show difference if we have enough samples
+ type_diff = orig_acc_type - kv_acc_type
+ print(f" Difference: {type_diff:+.1f}%")
+
+ print(f"\n🔍 DETAILED COMPARISON:")
+ comp = summary["comparison_stats"]
+ total = comp["total_compared"]
+ if total > 0:
+ print(f"Both correct: {comp['both_correct']:3d} ({comp['both_correct']/total*100:.1f}%)")
+ print(f"Both incorrect: {comp['both_incorrect']:3d} ({comp['both_incorrect']/total*100:.1f}%)")
+ print(f"Original harder: {comp['original_harder']:3d} ({comp['original_harder']/total*100:.1f}%)")
+ print(f"Kernel variant harder: {comp['kernel_variant_harder']:3d} ({comp['kernel_variant_harder']/total*100:.1f}%)")
+
+ print(f"\n📏 REASONING RIGOR ANALYSIS:")
+ rigor = summary["rigor_analysis"]
+ print(f"Original avg rigor: {rigor['original_avg_rigor']:.2f}/10")
+ print(f"Kernel variant rigor: {rigor['kernel_variant_avg_rigor']:.2f}/10")
+ print(f"Difference: {rigor['rigor_difference_avg']:.2f} (positive = original more rigorous)")
+
+ print("="*80)
+
+@click.command()
+@click.option("-c", "--concurrency", default=16, show_default=True,
+ help="Maximum concurrent processing tasks")
+@click.option("--max-files", default=50, show_default=True,
+ help="Maximum number of files to test (for quick testing)")
+@click.option("--file-pattern", default="*.json", show_default=True,
+ help="File pattern to process")
+@click.option("--output-prefix", default="comparison_test", show_default=True,
+ help="Prefix for output files")
+@click.option("--debug", is_flag=True, help="Enable debug output")
+def main(concurrency: int, max_files: int, file_pattern: str, output_prefix: str, debug: bool):
+ """原题 vs Kernel Variant 数学能力对比测试"""
+ print(f"🧪 Starting Original vs Kernel Variant Comparison Test")
+ print(f" Solver Model: {SOLVER_MODEL}")
+ print(f" Grader Model: {GRADER_MODEL}")
+ print(f" Max files: {max_files}")
+ print(f" Concurrency: {concurrency}")
+
+ if not os.getenv("OPENAI_API_KEY"):
+ print("❌ OPENAI_API_KEY environment variable not set!")
+ return
+
+ # 找到测试文件
+ all_files = sorted(SRC_DIR.glob(file_pattern))
+ if max_files > 0:
+ all_files = all_files[:max_files]
+
+ print(f"📁 Testing {len(all_files)} files")
+
+ if not all_files:
+ print("❌ No files found to test!")
+ return
+
+ async def run_test():
+ cli = AsyncOpenAI()
+ sem = asyncio.Semaphore(concurrency)
+
+ async def worker(file_path: pathlib.Path):
+ async with sem:
+ return await test_single_file(file_path, cli)
+
+ # 执行测试
+ results = []
+ progress_bar = tqdm.tqdm(total=len(all_files), desc="Testing", unit="file")
+
+ tasks = [worker(f) for f in all_files]
+ for coro in asyncio.as_completed(tasks):
+ result = await coro
+ results.append(result)
+ progress_bar.update(1)
+
+ progress_bar.close()
+ return results
+
+ # 运行测试
+ results = asyncio.run(run_test())
+
+ # 保存详细结果
+ timestamp = int(time.time())
+ output_name = f"{output_prefix}_{timestamp}"
+ asyncio.run(save_detailed_results(results, output_name))
+
+ # 生成并显示汇总报告
+ summary = generate_summary_report(results)
+ print_summary_report(summary)
+
+ # 保存汇总报告
+ summary_path = RESULTS_DIR / f"{output_name}_summary.json"
+ try:
+ summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
+ print(f"💾 Summary report saved to {summary_path}")
+ except Exception as e:
+ print(f"❌ Failed to save summary: {e}")
+
+if __name__ == "__main__":
+ main()
+ \ No newline at end of file
diff --git a/putnam-bench-anon/scripts/health_check.py b/putnam-bench-anon/scripts/health_check.py
new file mode 100644
index 0000000..65c7855
--- /dev/null
+++ b/putnam-bench-anon/scripts/health_check.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python3
+"""
+Health check script for all AI providers.
+
+This script tests connectivity, API keys, and basic functionality for all
+supported AI providers. Useful for troubleshooting and verifying setup.
+
+Usage:
+ python health_check.py # Check all providers
+ python health_check.py --provider openai # Check specific provider
+ python health_check.py --detailed # Detailed diagnostics
+"""
+
+import asyncio
+import json
+import sys
+import os
+from pathlib import Path
+import argparse
+from typing import Dict, List, Any
+from datetime import datetime
+import platform
+
+# Add the loader module to the path
+sys.path.append(str(Path(__file__).parent))
+
+from loader import create_loader, get_supported_providers, get_default_models
+
+
+class HealthChecker:
+ """Health checker for AI providers."""
+
+ def __init__(self, detailed: bool = False):
+ self.detailed = detailed
+ self.results = {}
+
+ async def check_system_info(self) -> Dict[str, Any]:
+ """Check system information."""
+ import psutil
+
+ return {
+ 'python_version': platform.python_version(),
+ 'platform': platform.platform(),
+ 'cpu_count': psutil.cpu_count(),
+ 'memory_total_gb': round(psutil.virtual_memory().total / (1024**3), 2),
+ 'memory_available_gb': round(psutil.virtual_memory().available / (1024**3), 2),
+ 'disk_free_gb': round(psutil.disk_usage('.').free / (1024**3), 2),
+ 'timestamp': datetime.now().isoformat()
+ }
+
+ async def check_environment_variables(self) -> Dict[str, Any]:
+ """Check required environment variables."""
+ env_vars = {
+ 'OPENAI_API_KEY': os.getenv('OPENAI_API_KEY'),
+ 'ANTHROPIC_API_KEY': os.getenv('ANTHROPIC_API_KEY'),
+ 'GOOGLE_API_KEY': os.getenv('GOOGLE_API_KEY'),
+ }
+
+ return {
+ var: {
+ 'set': bool(value),
+ 'length': len(value) if value else 0,
+ 'preview': value[:8] + '...' if value and len(value) > 8 else value
+ }
+ for var, value in env_vars.items()
+ }
+
+ async def check_dependencies(self) -> Dict[str, Any]:
+ """Check required Python packages."""
+ dependencies = {
+ 'openai': 'OpenAI API client',
+ 'anthropic': 'Anthropic API client',
+ 'google-generativeai': 'Google Gemini API client',
+ 'transformers': 'HuggingFace transformers',
+ 'torch': 'PyTorch for local models',
+ 'vllm': 'VLLM for local serving',
+ 'psutil': 'System monitoring'
+ }
+
+ results = {}
+ for package, description in dependencies.items():
+ try:
+ if package == 'google-generativeai':
+ import google.generativeai
+ version = getattr(google.generativeai, '__version__', 'unknown')
+ else:
+ module = __import__(package)
+ version = getattr(module, '__version__', 'unknown')
+
+ results[package] = {
+ 'installed': True,
+ 'version': version,
+ 'description': description
+ }
+ except ImportError:
+ results[package] = {
+ 'installed': False,
+ 'version': None,
+ 'description': description
+ }
+
+ return results
+
+ async def check_provider(self, provider: str) -> Dict[str, Any]:
+ """Check a specific AI provider."""
+ print(f"🔍 Checking {provider}...")
+
+ result = {
+ 'provider': provider,
+ 'available': False,
+ 'health_check_passed': False,
+ 'error': None,
+ 'response_time': None,
+ 'models': {},
+ 'cost_estimation': None
+ }
+
+ try:
+ # Get default models
+ default_models = get_default_models(provider)
+ result['models']['defaults'] = default_models
+
+ # Provider-specific configuration
+ loader_kwargs = {}
+ if provider == 'vllm':
+ loader_kwargs['base_url'] = 'http://localhost:8000/v1'
+ elif provider == 'huggingface':
+ loader_kwargs['device'] = 'cpu' # Use CPU for testing
+ # Use smaller models for testing
+ loader_kwargs['solver_model'] = 'microsoft/DialoGPT-small'
+ loader_kwargs['grader_model'] = 'microsoft/DialoGPT-small'
+
+ # Create loader
+ start_time = asyncio.get_event_loop().time()
+ loader = create_loader(provider, **loader_kwargs)
+ creation_time = asyncio.get_event_loop().time() - start_time
+
+ result['available'] = True
+ result['creation_time'] = creation_time
+
+ # Get model info
+ model_info = loader.get_model_info()
+ result['models']['configured'] = model_info
+
+ # Health check
+ health_start = asyncio.get_event_loop().time()
+ health_passed = await asyncio.wait_for(loader.health_check(), timeout=60)
+ health_time = asyncio.get_event_loop().time() - health_start
+
+ result['health_check_passed'] = health_passed
+ result['response_time'] = health_time
+
+ if health_passed:
+ # Cost estimation
+ try:
+ cost_info = await loader.estimate_cost(10)
+ result['cost_estimation'] = cost_info
+ except Exception as e:
+ result['cost_estimation_error'] = str(e)
+
+ # Try to list models if available
+ if hasattr(loader, 'list_models'):
+ try:
+ available_models = await loader.list_models()
+ result['models']['available'] = available_models[:10] # Limit output
+ except Exception as e:
+ result['models']['list_error'] = str(e)
+
+ except asyncio.TimeoutError:
+ result['error'] = 'Health check timed out'
+ except Exception as e:
+ result['error'] = str(e)
+
+ return result
+
+ async def check_all_providers(self, specific_provider: str = None) -> Dict[str, Any]:
+ """Check all providers or a specific one."""
+ providers = [specific_provider] if specific_provider else get_supported_providers()
+
+ print("🏥 AI Provider Health Check")
+ print("=" * 50)
+
+ # System information
+ if self.detailed:
+ print("📊 System Information:")
+ system_info = await self.check_system_info()
+ for key, value in system_info.items():
+ print(f" {key}: {value}")
+ print()
+
+ # Environment variables
+ print("🔧 Environment Variables:")
+ env_info = await self.check_environment_variables()
+ for var, info in env_info.items():
+ status = "✅" if info['set'] else "❌"
+ print(f" {status} {var}: {'Set' if info['set'] else 'Not set'}")
+ print()
+
+ # Dependencies
+ print("📦 Dependencies:")
+ dep_info = await self.check_dependencies()
+ for package, info in dep_info.items():
+ status = "✅" if info['installed'] else "❌"
+ version = f" (v{info['version']})" if info['installed'] and info['version'] != 'unknown' else ""
+ print(f" {status} {package}{version}")
+ print()
+
+ # Provider checks
+ print("🤖 Provider Health Checks:")
+ provider_results = {}
+
+ for provider in providers:
+ provider_result = await self.check_provider(provider)
+ provider_results[provider] = provider_result
+
+ # Print summary
+ if provider_result['available']:
+ if provider_result['health_check_passed']:
+ status = "✅"
+ details = f"({provider_result['response_time']:.2f}s)"
+ else:
+ status = "⚠️"
+ details = "(Health check failed)"
+ else:
+ status = "❌"
+ details = f"({provider_result['error']})"
+
+ print(f" {status} {provider.upper()}: {details}")
+
+ print()
+
+ # Summary
+ total_providers = len(providers)
+ healthy_providers = sum(1 for r in provider_results.values()
+ if r['available'] and r['health_check_passed'])
+
+ print("📋 Summary:")
+ print(f" Total providers checked: {total_providers}")
+ print(f" Healthy providers: {healthy_providers}")
+ print(f" Success rate: {healthy_providers/total_providers*100:.1f}%")
+
+ # Detailed results
+ final_results = {
+ 'timestamp': datetime.now().isoformat(),
+ 'summary': {
+ 'total_providers': total_providers,
+ 'healthy_providers': healthy_providers,
+ 'success_rate': healthy_providers/total_providers*100
+ },
+ 'environment': env_info,
+ 'dependencies': dep_info,
+ 'providers': provider_results
+ }
+
+ if self.detailed:
+ final_results['system'] = system_info
+
+ return final_results
+
+ async def run_diagnostics(self, provider: str) -> Dict[str, Any]:
+ """Run detailed diagnostics for a specific provider."""
+ print(f"🔧 Running detailed diagnostics for {provider}...")
+
+ result = await self.check_provider(provider)
+
+ # Additional detailed checks
+ if result['available'] and result['health_check_passed']:
+ print(f"✅ {provider} is healthy!")
+
+ # Test with a simple problem
+ print("🧪 Testing with a simple math problem...")
+ try:
+ loader_kwargs = {}
+ if provider == 'vllm':
+ loader_kwargs['base_url'] = 'http://localhost:8000/v1'
+ elif provider == 'huggingface':
+ loader_kwargs['device'] = 'cpu'
+ loader_kwargs['solver_model'] = 'microsoft/DialoGPT-small'
+ loader_kwargs['grader_model'] = 'microsoft/DialoGPT-small'
+
+ loader = create_loader(provider, **loader_kwargs)
+
+ # Simple test problem
+ test_problem = {
+ 'original': {
+ 'problem_statement': 'What is 2 + 2?',
+ 'solution': 'The answer is 4.',
+ 'problem_type': 'calculation'
+ }
+ }
+
+ start_time = asyncio.get_event_loop().time()
+ test_result = await asyncio.wait_for(
+ loader.test_single_problem(test_problem, variant_type='original'),
+ timeout=120
+ )
+ test_time = asyncio.get_event_loop().time() - start_time
+
+ result['test_problem'] = {
+ 'success': True,
+ 'time': test_time,
+ 'grade': 10 if test_result.get('correct', False) else 0,
+ 'solution_length': len(test_result.get('solve', {}).get('solution', ''))
+ }
+ print(f" ✅ Test completed in {test_time:.2f}s")
+ print(f" 📊 Grade: {10 if test_result.get('correct', False) else 0} ({'CORRECT' if test_result.get('correct', False) else 'INCORRECT'})")
+
+ except asyncio.TimeoutError:
+ result['test_problem'] = {'success': False, 'error': 'Test timed out'}
+ print(" ⚠️ Test problem timed out")
+ except Exception as e:
+ result['test_problem'] = {'success': False, 'error': str(e)}
+ print(f" ❌ Test problem failed: {str(e)}")
+
+ return result
+
+
+async def main():
+ """Main function."""
+ parser = argparse.ArgumentParser(description="Health check for AI providers")
+ parser.add_argument("--provider", choices=get_supported_providers(),
+ help="Check specific provider only")
+ parser.add_argument("--detailed", action="store_true",
+ help="Show detailed system information")
+ parser.add_argument("--diagnostics", action="store_true",
+ help="Run detailed diagnostics (requires --provider)")
+ parser.add_argument("--output", type=Path,
+ help="Save results to JSON file")
+ parser.add_argument("--quiet", action="store_true",
+ help="Suppress output, save to file only")
+
+ args = parser.parse_args()
+
+ if args.diagnostics and not args.provider:
+ print("❌ Error: --diagnostics requires --provider")
+ return 1
+
+ # Redirect output if quiet
+ if args.quiet:
+ import io
+ sys.stdout = io.StringIO()
+
+ checker = HealthChecker(detailed=args.detailed)
+
+ try:
+ if args.diagnostics:
+ results = await checker.run_diagnostics(args.provider)
+ else:
+ results = await checker.check_all_providers(args.provider)
+
+ # Save to file if requested
+ if args.output:
+ args.output.parent.mkdir(parents=True, exist_ok=True)
+ with open(args.output, 'w', encoding='utf-8') as f:
+ json.dump(results, f, indent=2, ensure_ascii=False)
+
+ if not args.quiet:
+ print(f"\n💾 Results saved to {args.output}")
+
+ # Print JSON if quiet mode
+ if args.quiet:
+ sys.stdout = sys.__stdout__
+ print(json.dumps(results, indent=2))
+
+ return 0
+
+ except KeyboardInterrupt:
+ print("\n⏸️ Health check interrupted by user")
+ return 1
+ except Exception as e:
+ print(f"\n❌ Health check failed: {str(e)}")
+ return 1
+
+
+if __name__ == "__main__":
+ exit(asyncio.run(main())) \ No newline at end of file
diff --git a/putnam-bench-anon/scripts/regrade.py b/putnam-bench-anon/scripts/regrade.py
new file mode 100644
index 0000000..ffc177e
--- /dev/null
+++ b/putnam-bench-anon/scripts/regrade.py
@@ -0,0 +1,284 @@
+#!/usr/bin/env python3
+"""
+Re-grade an existing results JSON file using a (possibly different) grader model.
+
+The script loads a results file produced by `batch_evaluate.py` (or a compatible
+JSON list) and re-grades every problem using the specified grader. No solving
+is performed – instead we reuse the previously generated solutions stored in
+`solve.solution`.
+
+Example usage
+-------------
+python regrade.py \
+ --results-file results/o3/o3_original.json \
+ --dataset-dir dataset/ \
+ --provider openai \
+ --grader-model o3 \
+ --max-concurrent 5 \
+ --output results/regraded_o3_original.json
+
+"""
+
+import argparse
+import asyncio
+import json
+import sys
+import time
+from pathlib import Path
+from typing import Any, Dict, List
+from datetime import datetime
+import logging
+
+# Determine directories
+SCRIPT_DIR = Path(__file__).resolve().parent
+PROJECT_ROOT = SCRIPT_DIR.parent # one level up
+
+# Add both the script dir and project root to PYTHONPATH to locate 'loader'
+sys.path.append(str(SCRIPT_DIR))
+sys.path.append(str(PROJECT_ROOT))
+
+from loader import create_loader # type: ignore
+
+try:
+ from tqdm import tqdm # type: ignore
+ HAS_TQDM = True
+except ImportError: # pragma: no cover
+ HAS_TQDM = False
+
+ class tqdm: # type: ignore
+ """Minimal fallback if tqdm is not available."""
+
+ def __init__(self, total=None, desc=None, **kwargs):
+ self.total = total
+ self.n = 0
+ self.desc = desc or ""
+ print(f"{self.desc}: starting …")
+
+ def update(self, n=1):
+ self.n += n
+ if self.total:
+ pct = self.n / self.total * 100
+ print(f"{self.desc}: {self.n}/{self.total} ({pct:.1f}%)", end="\r")
+
+ def set_postfix(self, _):
+ pass
+
+ def close(self):
+ print() # newline
+
+
+###############################################################################
+# Helper functions
+###############################################################################
+
+
+def load_dataset(dataset_dir: Path) -> Dict[str, Dict[str, Any]]:
+ """Read every JSON file in *dataset_dir* and return a mapping index → data."""
+ dataset: Dict[str, Dict[str, Any]] = {}
+ for json_file in dataset_dir.glob("*.json"):
+ try:
+ with open(json_file, "r", encoding="utf-8") as fh:
+ data = json.load(fh)
+ idx = data.get("index")
+ if idx:
+ dataset[idx] = data
+ except Exception as exc: # pragma: no cover – best-effort ingest
+ logging.warning("Failed to load %s: %s", json_file, exc)
+ return dataset
+
+
+async def regrade_problem(loader, # type: ignore[valid-type]
+ problem_record: Dict[str, Any],
+ dataset_entry: Dict[str, Any],
+ variant_type: str) -> Dict[str, Any]:
+ """Re-grade one problem and return a new result dict."""
+
+ idx = problem_record.get("index", "unknown")
+ problem_type = dataset_entry.get("problem_type", "proof")
+
+ # Extract question & reference solution according to variant
+ if variant_type == "original":
+ question = str(dataset_entry.get("question", "")).strip()
+ reference_solution = str(dataset_entry.get("solution", "")).strip()
+ else:
+ variant = dataset_entry.get("variants", {}).get(variant_type, {})
+ question = str(variant.get("question", "")).strip()
+ reference_solution = str(variant.get("solution", "")).strip()
+
+ if not question or not reference_solution:
+ return {
+ "index": idx,
+ "status": "skipped",
+ "reason": "missing_fields",
+ }
+
+ # Previously generated solution
+ student_solution = str(problem_record.get("solve", {}).get("solution", "")).strip()
+ final_answer = str(problem_record.get("solve", {}).get("final_answer", "")).strip()
+
+ # Grade the solution (temperature hard-coded inside create_loader for o-series)
+ grade_result, _raw = await loader.grade_solution(
+ question,
+ student_solution,
+ reference_solution,
+ problem_type,
+ )
+
+ # Build merged record retaining original fields + new grade
+ new_record = {
+ "index": idx,
+ "variant_type": variant_type,
+ "problem_type": problem_type,
+ "solve": {
+ "solution": student_solution,
+ "final_answer": final_answer,
+ },
+ "grade": grade_result or {"status": "failed"},
+ }
+
+ # Convenience shortcut for correctness
+ new_record["correct"] = new_record["grade"].get("grade") == "CORRECT"
+ return new_record
+
+
+###############################################################################
+# Main orchestration
+###############################################################################
+
+
+async def main() -> None: # noqa: C901 – single entry-point
+ parser = argparse.ArgumentParser(description="Re-grade an existing results file")
+ parser.add_argument("--results-file", required=True, type=Path, help="Path to existing results JSON")
+ parser.add_argument("--dataset-dir", required=True, type=Path, help="Directory containing dataset JSON files")
+ parser.add_argument("--provider", default="openai", help="Grader provider (default: openai)")
+ parser.add_argument("--grader-model", default="o3", help="Grader model name (default: o3)")
+ parser.add_argument("--max-concurrent", type=int, default=3, help="Max concurrent API calls")
+ parser.add_argument("--variant-type", default="original", help="Problem variant used in results file")
+ parser.add_argument("--output", type=Path, help="Where to write re-graded results (JSON)")
+ parser.add_argument("--quick", action="store_true", help="Quick mode – single retry, shorter timeouts")
+ parser.add_argument("--debug", action="store_true", help="Verbose JSON-parsing debug")
+
+ args = parser.parse_args()
+
+ # Configure logging early
+ logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s [%(levelname)s] %(message)s",
+ handlers=[logging.StreamHandler(sys.stdout)],
+ )
+
+ if not args.results_file.exists():
+ logging.error("Results file %s does not exist", args.results_file)
+ sys.exit(1)
+
+ if not args.dataset_dir.exists():
+ logging.error("Dataset directory %s does not exist", args.dataset_dir)
+ sys.exit(1)
+
+ # Load dataset into memory once
+ logging.info("Loading dataset from %s", args.dataset_dir)
+ dataset_map = load_dataset(args.dataset_dir)
+ logging.info("Loaded %d dataset entries", len(dataset_map))
+
+ # Load results JSON (support two formats: {'problems':[...]} or simple list)
+ with open(args.results_file, "r", encoding="utf-8") as fh:
+ raw_data = json.load(fh)
+
+ if isinstance(raw_data, dict) and "problems" in raw_data:
+ original_problems: List[Dict[str, Any]] = raw_data["problems"] # type: ignore[assignment]
+ elif isinstance(raw_data, list):
+ original_problems = raw_data # type: ignore[assignment]
+ else:
+ logging.error("Unsupported results file structure – expected list or dict with key 'problems'.")
+ sys.exit(1)
+
+ if not original_problems:
+ logging.warning("No problems found in results file – nothing to re-grade.")
+ sys.exit(0)
+
+ # Create loader – we only need grader, but solver_model must be provided; reuse grader_model
+ loader = create_loader(
+ args.provider,
+ solver_model=args.grader_model,
+ grader_model=args.grader_model,
+ quick=args.quick,
+ debug=args.debug,
+ )
+
+ if not await loader.health_check():
+ logging.error("Health check failed for provider %s", args.provider)
+ sys.exit(1)
+
+ # Estimate costs (rough – assumes avg lengths; tweak as needed)
+ cost_info = await loader.estimate_cost(len(original_problems))
+ logging.info("Estimated grading cost: $%.2f", cost_info.get("total_cost", 0))
+
+ # Concurrency control
+ semaphore = asyncio.Semaphore(args.max_concurrent)
+
+ async def wrapper(problem_record):
+ idx = problem_record.get("index", "unknown")
+ if idx not in dataset_map:
+ logging.warning("Dataset entry for index %s not found – skipping", idx)
+ return {"index": idx, "status": "skipped", "reason": "dataset_missing"}
+ async with semaphore:
+ return await regrade_problem(
+ loader,
+ problem_record,
+ dataset_map[idx],
+ args.variant_type,
+ )
+
+ # Progress bar setup
+ pbar = tqdm(total=len(original_problems), desc="Re-grading")
+ results: List[Dict[str, Any]] = []
+
+ async def gather_tasks():
+ for coro in asyncio.as_completed([wrapper(rec) for rec in original_problems]):
+ res = await coro
+ results.append(res)
+ pbar.update(1)
+ await gather_tasks()
+ pbar.close()
+
+ # Build summary
+ completed = [r for r in results if r.get("grade", {}).get("status") == "success"]
+ grades = [r["grade"].get("grade") for r in completed]
+ numeric = [5.0 if g == "CORRECT" else 2.5 for g in grades]
+
+ summary = {
+ "total_problems": len(results),
+ "completed": len(completed),
+ "correct": sum(1 for g in grades if g == "CORRECT"),
+ "incorrect": sum(1 for g in grades if g == "INCORRECT"),
+ "average_grade": sum(numeric) / len(numeric) if numeric else 0.0,
+ "provider": args.provider,
+ "grader_model": args.grader_model,
+ "variant_type": args.variant_type,
+ "estimated_cost": cost_info,
+ "timestamp": datetime.now().isoformat(),
+ }
+
+ output_payload = {
+ "summary": summary,
+ "problems": results,
+ }
+
+ # Determine output path
+ if args.output:
+ out_path = args.output
+ else:
+ stem = args.results_file.stem + f"_regraded_{args.grader_model}"
+ out_path = args.results_file.with_name(stem + args.results_file.suffix)
+
+ with open(out_path, "w", encoding="utf-8") as fh:
+ json.dump(output_payload, fh, indent=2, ensure_ascii=False)
+ logging.info("Saved re-graded results to %s", out_path)
+
+ # Clean up HTTP client if applicable
+ if hasattr(loader, "__aexit__"):
+ await loader.__aexit__(None, None, None)
+
+
+if __name__ == "__main__":
+ asyncio.run(main()) \ No newline at end of file