diff options
Diffstat (limited to 'putnam-bench-anon/scripts')
| -rw-r--r-- | putnam-bench-anon/scripts/__init__.py | 1 | ||||
| -rw-r--r-- | putnam-bench-anon/scripts/batch_evaluate.py | 1211 | ||||
| -rw-r--r-- | putnam-bench-anon/scripts/benchmark.py | 481 | ||||
| -rw-r--r-- | putnam-bench-anon/scripts/compare_original_vs_kernel_test.py | 630 | ||||
| -rw-r--r-- | putnam-bench-anon/scripts/health_check.py | 376 | ||||
| -rw-r--r-- | putnam-bench-anon/scripts/regrade.py | 284 |
6 files changed, 2983 insertions, 0 deletions
diff --git a/putnam-bench-anon/scripts/__init__.py b/putnam-bench-anon/scripts/__init__.py new file mode 100644 index 0000000..389f811 --- /dev/null +++ b/putnam-bench-anon/scripts/__init__.py @@ -0,0 +1 @@ +"""Scripts package for Putnam mathematical problem solver."""
\ No newline at end of file diff --git a/putnam-bench-anon/scripts/batch_evaluate.py b/putnam-bench-anon/scripts/batch_evaluate.py new file mode 100644 index 0000000..6fde90b --- /dev/null +++ b/putnam-bench-anon/scripts/batch_evaluate.py @@ -0,0 +1,1211 @@ +#!/usr/bin/env python3 +""" +Batch evaluation script for processing entire datasets with multiple providers. + +This script efficiently processes all JSON files in the dataset directory, +supports multiple AI providers, and generates comprehensive evaluation reports. + +Features: +- Incremental saving: Results are saved after each problem completes +- Simple resume support: Skip already completed problems based on checkpoint +- Multi-provider support +- Comprehensive evaluation reports + +Usage: + python batch_evaluate.py --provider openai --output results/openai_results.json + python batch_evaluate.py --provider anthropic --variant kernel_variant --max-concurrent 5 + +Resume usage (simplified): + # Resume with same configuration + python batch_evaluate.py --provider openai --dataset dataset/ --resume checkpoint_file.json + + # Resume with different settings (checkpoint only provides skip list) + python batch_evaluate.py --provider openai --dataset dataset/ --concurrent 10 --resume checkpoint_file.json +""" + +import asyncio +import json +import sys +import time +from pathlib import Path +import argparse +from typing import List, Dict, Any +import logging +from datetime import datetime +import shutil + +try: + from tqdm import tqdm + HAS_TQDM = True +except ImportError: + HAS_TQDM = False + # Fallback progress bar + class tqdm: + def __init__(self, total=None, desc=None, **kwargs): + self.total = total + self.n = 0 + self.desc = desc + print(f"{desc}: Starting...") + + def update(self, n=1): + self.n += n + if self.total: + percent = (self.n / self.total) * 100 + print(f"{self.desc}: {self.n}/{self.total} ({percent:.1f}%)", end='\r') + + def set_postfix(self, postfix_dict): + pass + + def close(self): + print() # New line after progress + +# Add the loader module to the path +sys.path.append(str(Path(__file__).parent)) + +from loader import create_loader, get_supported_providers + + +def setup_logging(output_dir: Path): + """Setup logging configuration.""" + log_file = output_dir / f"evaluation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] + ) + + return logging.getLogger(__name__) + + +async def load_dataset(dataset_path: Path, max_files: int = None) -> List[Dict[str, Any]]: + """Load all JSON files from the dataset directory.""" + json_files = list(dataset_path.glob("*.json")) + + if max_files: + json_files = json_files[:max_files] + + problems = [] + for json_file in json_files: + try: + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + data['_source_file'] = str(json_file.name) + problems.append(data) + except Exception as e: + logging.warning(f"Failed to load {json_file}: {str(e)}") + + return problems + + +async def process_single_problem(loader, problem_data: Dict[str, Any], + variant_type: str, solver_model: str = None, + grader_model: str = None) -> Dict[str, Any]: + """Process a single problem and return results with metadata.""" + start_time = time.time() + + try: + result = await loader.test_single_problem( + problem_data, + variant_type=variant_type, + solver_model=solver_model, + grader_model=grader_model + ) + + # Add metadata + result['_metadata'] = { + 'source_file': problem_data.get('_source_file', 'unknown'), + 'variant_type': variant_type, + 'processing_time': time.time() - start_time, + 'timestamp': datetime.now().isoformat(), + 'models_used': { + 'solver': solver_model or loader.solver_model, + 'grader': grader_model or loader.grader_model + } + } + + return result + + except Exception as e: + # Return error information + return { + 'error': str(e), + 'final_grade': 0, + '_metadata': { + 'source_file': problem_data.get('_source_file', 'unknown'), + 'variant_type': variant_type, + 'processing_time': time.time() - start_time, + 'timestamp': datetime.now().isoformat(), + 'error': True + } + } + + +async def batch_evaluate(dataset_path: Path = None, provider: str = None, variant_type: str = "original", + max_concurrent: int = 3, max_files: int = None, + solver_model: str = None, grader_model: str = None, + output_file: Path = None, resume_checkpoint: Path = None, + **loader_kwargs) -> Dict[str, Any]: + """ + Batch evaluate problems using specified provider with resume support. + + Args: + dataset_path: Path to dataset directory (required for new runs or old checkpoint format) + provider: AI provider name (required for new runs or old checkpoint format) + variant_type: Problem variant to use + max_concurrent: Maximum concurrent evaluations + max_files: Maximum number of files to process (None for all) + solver_model: Override solver model + grader_model: Override grader model + output_file: Output file path + resume_checkpoint: Path to checkpoint file to resume from + **loader_kwargs: Additional arguments for loader + + Returns: + Dictionary with evaluation results and statistics + """ + logger = logging.getLogger(__name__) + + # Check if resuming from checkpoint + if resume_checkpoint and resume_checkpoint.exists(): + logger.info(f"Resuming from checkpoint: {resume_checkpoint}") + with open(resume_checkpoint, 'r', encoding='utf-8') as f: + checkpoint_data = json.load(f) + + # Simple resume: just restore completed indices and results + completed_indices = set(checkpoint_data.get('completed_indices', [])) + results = checkpoint_data.get('results', []) + failed_indices = checkpoint_data.get('failed_indices', []) + successful_indices = checkpoint_data.get('successful_indices', []) + correct_indices = checkpoint_data.get('correct_indices', []) + + # Always require dataset_path and provider from command line + if not dataset_path: + raise ValueError("dataset_path is required when resuming") + if not provider: + raise ValueError("provider is required when resuming") + + # Load dataset + logger.info(f"Loading dataset from {dataset_path}") + problems = await load_dataset(dataset_path, max_files) + logger.info(f"Loaded {len(problems)} problems") + + if not problems: + raise ValueError("No problems found in dataset") + + checkpoint_file = resume_checkpoint # Continue using the same checkpoint file + logger.info(f"Resuming with {len(completed_indices)} completed problems out of {len(problems)}") + else: + # New evaluation - validate required parameters + if not dataset_path: + raise ValueError("dataset_path is required for new evaluation") + if not provider: + raise ValueError("provider is required for new evaluation") + + # Load dataset + logger.info(f"Loading dataset from {dataset_path}") + problems = await load_dataset(dataset_path, max_files) + logger.info(f"Loaded {len(problems)} problems") + + if not problems: + raise ValueError("No problems found in dataset") + + # Initialize state for new run + completed_indices = set() + results = [] + failed_indices = [] + successful_indices = [] + correct_indices = [] + + # Create checkpoint file name + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + if output_file: + checkpoint_file = output_file.parent / f"checkpoint_{output_file.stem}_{timestamp}.json" + else: + checkpoint_file = Path(f"checkpoint_{provider}_{variant_type}_{timestamp}.json") + + # Create loader + logger.info(f"Creating {provider} loader") + + # Include solver_model and grader_model in loader_kwargs if specified + if solver_model: + loader_kwargs['solver_model'] = solver_model + if grader_model: + loader_kwargs['grader_model'] = grader_model + + loader = create_loader(provider, **loader_kwargs) + + # Health check + logger.info("Performing health check...") + if not await loader.health_check(): + raise RuntimeError(f"Health check failed for {provider}") + + # Cost estimation + logger.info("Estimating costs...") + cost_info = await loader.estimate_cost(len(problems)) + logger.info(f"Estimated cost: ${cost_info.get('total_cost', 0):.2f}") + + # Progress tracking + remaining_problems = [p for p in problems if p.get('index', 'unknown') not in completed_indices] + progress_bar = tqdm(total=len(problems), desc=f"Evaluating with {provider}", initial=len(completed_indices)) + + # Semaphore for concurrency control + semaphore = asyncio.Semaphore(max_concurrent) + + def save_checkpoint(): + """Save current state to checkpoint file - simplified version""" + checkpoint_data = { + 'timestamp': datetime.now().isoformat(), + # Only save essential state information + 'completed_indices': list(completed_indices), + 'successful_indices': successful_indices, + 'failed_indices': failed_indices, + 'correct_indices': correct_indices, + 'results': results, + # Save minimal config for reference (not for resume) + 'dataset_path': str(dataset_path), # For convenience + 'total_problems': len(problems), + 'current_config': { + 'provider': provider, + 'variant_type': variant_type, + 'solver_model': loader.solver_model, + 'grader_model': loader.grader_model + } + } + + # Write to temporary file first, then move (atomic operation) + temp_file = checkpoint_file.with_suffix('.tmp') + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(checkpoint_data, f, indent=2, ensure_ascii=False) + + # Atomic rename + temp_file.replace(checkpoint_file) + + async def evaluate_problem(problem_data: Dict[str, Any]) -> Dict[str, Any]: + """Evaluate a single problem with concurrency control.""" + problem_index = problem_data.get('index', 'unknown') + + # Skip if already completed + if problem_index in completed_indices: + return None + + async with semaphore: + try: + result = await loader.test_single_problem( + problem_data, + variant_type=variant_type + ) + + # Track success/failure based on technical completion, not correctness + if result.get('status') == 'completed': + successful_indices.append(result['index']) # Successfully processed + if result.get('correct'): + correct_indices.append(result['index']) # Also correct + else: + failed_indices.append(result['index']) # Technical failure + + # Add to results and mark as completed + results.append(result) + completed_indices.add(problem_index) + + # Save checkpoint immediately after each problem + save_checkpoint() + + progress_bar.update(1) + progress_bar.set_postfix({ + 'success': len(successful_indices), + 'failed': len(failed_indices), + 'saved': len(completed_indices) + }) + + return result + + except Exception as e: + logger.error(f"Error evaluating problem {problem_index}: {e}") + result = { + 'index': problem_index, + 'status': 'error', + 'error': str(e), + 'error_type': type(e).__name__ + } + + # Add to results and mark as completed (even if failed) + results.append(result) + failed_indices.append(problem_index) + completed_indices.add(problem_index) + + # Save checkpoint + save_checkpoint() + + progress_bar.update(1) + progress_bar.set_postfix({ + 'success': len(successful_indices), + 'failed': len(failed_indices), + 'saved': len(completed_indices) + }) + + return result + + # Run evaluations + start_time = time.time() + + try: + # Create tasks only for remaining problems + tasks = [evaluate_problem(problem) for problem in remaining_problems] + + if tasks: + # Execute all tasks concurrently (limited by semaphore) + await asyncio.gather(*tasks) + else: + logger.info("All problems already completed!") + + except KeyboardInterrupt: + logger.info("Evaluation interrupted by user. Progress saved to checkpoint.") + logger.info(f"To resume, use: --resume {checkpoint_file}") + raise + + finally: + progress_bar.close() + + # Calculate statistics + total_time = time.time() - start_time + completed_results = [r for r in results if r.get('status') == 'completed'] + grades = [r['grade']['grade'] for r in completed_results + if r.get('grade', {}).get('status') == 'success' and 'grade' in r.get('grade', {})] + + # Calculate numeric grades (CORRECT=5, INCORRECT=2.5) + numeric_grades = [5.0 if g == 'CORRECT' else 2.5 for g in grades] + average_grade = sum(numeric_grades) / len(numeric_grades) if numeric_grades else 0.0 + + summary = { + 'total_problems': len(problems), + 'completed': len(completed_results), + 'successful': len(successful_indices), # Technical success (completed processing) + 'failed': len(failed_indices), # Technical failures + 'correct_answers': len(correct_indices), # Mathematically correct answers + 'incorrect_answers': len(successful_indices) - len(correct_indices), # Wrong but processed + 'success_rate': (len(successful_indices) / len(problems) * 100) if problems else 0, # Technical success rate + 'accuracy_rate': (len(correct_indices) / len(successful_indices) * 100) if successful_indices else 0, # Correctness rate + 'average_grade': average_grade, + 'total_time_seconds': total_time, + 'problems_per_second': len(problems) / total_time if total_time > 0 else 0, + 'provider': provider, + 'variant_type': variant_type, + 'solver_model': loader.solver_model, + 'grader_model': loader.grader_model, + 'max_concurrent': max_concurrent, + 'estimated_cost': cost_info, + 'checkpoint_file': str(checkpoint_file) + } + + # Create full results + full_results = { + 'summary': summary, + 'problems': results, + 'successful_indices': successful_indices, # Technical successes + 'failed_indices': failed_indices, # Technical failures + 'correct_indices': correct_indices, # Correct answers + 'timestamp': datetime.now().isoformat() + } + + # Save final results + if output_file: + logger.info(f"Saving final results to {output_file}") + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(full_results, f, indent=2, ensure_ascii=False) + + # Clean up checkpoint file after successful completion + if checkpoint_file.exists(): + logger.info(f"Removing checkpoint file: {checkpoint_file}") + checkpoint_file.unlink() + + # Print summary + logger.info(f"\n{'='*60}") + logger.info("EVALUATION SUMMARY") + logger.info(f"{'='*60}") + logger.info(f"Provider: {provider}") + logger.info(f"Variant: {variant_type}") + logger.info(f"Total problems: {summary['total_problems']}") + logger.info(f"✅ Successfully processed: {summary['successful']} ({summary['success_rate']:.1f}%)") + logger.info(f"💥 Technical failures: {summary['failed']}") + logger.info(f"🎯 Correct answers: {summary['correct_answers']} ({summary['accuracy_rate']:.1f}% of processed)") + logger.info(f"❌ Wrong answers: {summary['incorrect_answers']}") + logger.info(f"Average grade: {summary['average_grade']:.2f}") + logger.info(f"Total time: {summary['total_time_seconds']:.1f}s") + logger.info(f"Speed: {summary['problems_per_second']:.2f} problems/second") + + # Cleanup + if hasattr(loader, '__aexit__'): + await loader.__aexit__(None, None, None) + + return full_results + + +async def batch_evaluate_cross(dataset_path: Path = None, + solver_provider: str = None, + grader_provider: str = None, + variant_type: str = "original", + max_concurrent: int = 3, + max_files: int = None, + solver_model: str = None, + grader_model: str = None, + output_file: Path = None, + resume_checkpoint: Path = None, + vllm_url: str = None, + device: str = None, + quick: bool = False) -> Dict[str, Any]: + """ + Batch evaluate problems using different providers for solving and grading with resume support. + + Args: + dataset_path: Path to dataset directory (required for new runs, ignored for resume) + solver_provider: Provider for solving problems (required for new runs, ignored for resume) + grader_provider: Provider for grading (if None, uses solver_provider) + variant_type: Problem variant to use + max_concurrent: Maximum concurrent evaluations + max_files: Maximum number of files to process (None for all) + solver_model: Override solver model + grader_model: Override grader model + output_file: Output file path + resume_checkpoint: Path to checkpoint file to resume from + vllm_url: VLLM server URL if using VLLM + device: Device for HuggingFace models + + Returns: + Dictionary with evaluation results and statistics + """ + logger = logging.getLogger(__name__) + + # Check if resuming from checkpoint + if resume_checkpoint and resume_checkpoint.exists(): + logger.info(f"Resuming from checkpoint: {resume_checkpoint}") + with open(resume_checkpoint, 'r', encoding='utf-8') as f: + checkpoint_data = json.load(f) + + # Simple resume: just restore completed indices and results + completed_indices = set(checkpoint_data.get('completed_indices', [])) + results = checkpoint_data.get('results', []) + failed_indices = checkpoint_data.get('failed_indices', []) + successful_indices = checkpoint_data.get('successful_indices', []) + correct_indices = checkpoint_data.get('correct_indices', []) + + # Always require providers and dataset_path from command line + if not dataset_path: + raise ValueError("dataset_path is required when resuming") + if not solver_provider: + raise ValueError("solver_provider is required when resuming") + + # Load dataset + logger.info(f"Loading dataset from {dataset_path}") + problems = await load_dataset(dataset_path, max_files) + logger.info(f"Loaded {len(problems)} problems") + + if not problems: + raise ValueError("No problems found in dataset") + + checkpoint_file = resume_checkpoint # Continue using the same checkpoint file + logger.info(f"Resuming with {len(completed_indices)} completed problems out of {len(problems)}") + else: + # New evaluation - validate required parameters + if not dataset_path: + raise ValueError("dataset_path is required for new evaluation") + if not solver_provider: + raise ValueError("solver_provider is required for new evaluation") + + # Load dataset + logger.info(f"Loading dataset from {dataset_path}") + problems = await load_dataset(dataset_path, max_files) + logger.info(f"Loaded {len(problems)} problems") + + if not problems: + raise ValueError("No problems found in dataset") + + # Initialize state for new run + completed_indices = set() + results = [] + failed_indices = [] + successful_indices = [] + correct_indices = [] + + # Create checkpoint file name + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + if output_file: + checkpoint_file = output_file.parent / f"checkpoint_{output_file.stem}_{timestamp}.json" + else: + checkpoint_file = Path(f"checkpoint_cross_{solver_provider}_{grader_provider or solver_provider}_{variant_type}_{timestamp}.json") + + # Create cross-provider loader + logger.info(f"Creating cross-provider loader: solver={solver_provider}, grader={grader_provider or solver_provider}") + + from loader import create_cross_provider_loader + + # Prepare kwargs for each provider + loader_kwargs = {} + + # VLLM settings + if vllm_url: + if solver_provider == 'vllm': + loader_kwargs['solver_kwargs'] = {'base_url': vllm_url} + if grader_provider == 'vllm': + loader_kwargs['grader_kwargs'] = {'base_url': vllm_url} + + # HuggingFace settings + if device: + if solver_provider == 'huggingface': + loader_kwargs['solver_kwargs'] = {'device': device} + if grader_provider == 'huggingface': + loader_kwargs['grader_kwargs'] = {'device': device} + + # Add quick mode if specified + if quick: + loader_kwargs['quick'] = True + + loader = create_cross_provider_loader( + solver_provider=solver_provider, + grader_provider=grader_provider, + solver_model=solver_model, + grader_model=grader_model, + **loader_kwargs + ) + + # Health check + logger.info("Performing health check...") + if not await loader.health_check(): + raise RuntimeError(f"Health check failed") + + # Cost estimation + logger.info("Estimating costs...") + cost_info = await loader.estimate_cost(len(problems)) + logger.info(f"Estimated cost: ${cost_info.get('total_cost', 0):.2f}") + + # Progress tracking + remaining_problems = [p for p in problems if p.get('index', 'unknown') not in completed_indices] + progress_bar = tqdm(total=len(problems), desc=f"Evaluating (solver={solver_provider}, grader={grader_provider or solver_provider})", initial=len(completed_indices)) + + # Semaphore for concurrency control + semaphore = asyncio.Semaphore(max_concurrent) + + def save_checkpoint(): + """Save current state to checkpoint file - simplified version""" + checkpoint_data = { + 'timestamp': datetime.now().isoformat(), + # Only save essential state information + 'completed_indices': list(completed_indices), + 'successful_indices': successful_indices, + 'failed_indices': failed_indices, + 'correct_indices': correct_indices, + 'results': results, + # Save minimal config for reference (not for resume) + 'dataset_path': str(dataset_path), # For convenience + 'total_problems': len(problems), + 'current_config': { + 'solver_provider': solver_provider, + 'grader_provider': grader_provider or solver_provider, + 'variant_type': variant_type, + 'solver_model': loader.solver_model, + 'grader_model': loader.grader_model + } + } + + # Write to temporary file first, then move (atomic operation) + temp_file = checkpoint_file.with_suffix('.tmp') + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(checkpoint_data, f, indent=2, ensure_ascii=False) + + # Atomic rename + temp_file.replace(checkpoint_file) + + async def evaluate_problem(problem_data: Dict[str, Any]) -> Dict[str, Any]: + """Evaluate a single problem with concurrency control.""" + problem_index = problem_data.get('index', 'unknown') + + # Skip if already completed + if problem_index in completed_indices: + return None + + async with semaphore: + try: + result = await loader.test_single_problem( + problem_data, + variant_type=variant_type + ) + + # Track success/failure based on technical completion, not correctness + if result.get('status') == 'completed': + successful_indices.append(result['index']) # Successfully processed + if result.get('correct'): + correct_indices.append(result['index']) # Also correct + else: + failed_indices.append(result['index']) # Technical failure + + # Add to results and mark as completed + results.append(result) + completed_indices.add(problem_index) + + # Save checkpoint immediately after each problem + save_checkpoint() + + progress_bar.update(1) + progress_bar.set_postfix({ + 'success': len(successful_indices), + 'failed': len(failed_indices), + 'saved': len(completed_indices) + }) + + return result + + except Exception as e: + import traceback + + # Capture full error details + error_details = { + 'error_message': str(e), + 'error_type': type(e).__name__, + 'traceback': traceback.format_exc(), + 'timestamp': datetime.now().isoformat(), + 'problem_index': problem_index, + 'problem_title': problem_data.get('title', 'unknown') + } + + # Try to capture HTTP-specific details if available + if hasattr(e, 'response'): + try: + error_details['http_status'] = e.response.status_code + error_details['http_headers'] = dict(e.response.headers) + error_details['http_response_text'] = e.response.text + except: + pass + + # Try to capture request details if available + if hasattr(e, 'request'): + try: + error_details['request_method'] = e.request.method + error_details['request_url'] = e.request.url + error_details['request_headers'] = dict(e.request.headers) + # Don't log request body as it might contain sensitive info + except: + pass + + # Log detailed error + logger.error(f"DETAILED ERROR for problem {problem_index}:") + logger.error(f" Error Type: {error_details['error_type']}") + logger.error(f" Error Message: {error_details['error_message']}") + logger.error(f" Problem Title: {error_details['problem_title']}") + + if 'http_status' in error_details: + logger.error(f" HTTP Status: {error_details['http_status']}") + logger.error(f" HTTP Response: {error_details['http_response_text'][:500]}...") + + logger.error(f" Full Traceback:\n{error_details['traceback']}") + + # Save to detailed error log + error_log_file = output_file.parent / f"detailed_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" if output_file else Path(f"detailed_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") + + try: + # Load existing errors if file exists + if error_log_file.exists(): + with open(error_log_file, 'r') as f: + existing_errors = json.load(f) + else: + existing_errors = [] + + # Add new error + existing_errors.append(error_details) + + # Save updated errors + with open(error_log_file, 'w') as f: + json.dump(existing_errors, f, indent=2, ensure_ascii=False) + + logger.info(f"Detailed error saved to {error_log_file}") + + except Exception as save_error: + logger.error(f"Failed to save detailed error log: {save_error}") + + result = { + 'index': problem_index, + 'status': 'error', + 'error': str(e), + 'error_type': type(e).__name__, + 'error_details': error_details + } + + # Add to results and mark as completed (even if failed) + results.append(result) + failed_indices.append(problem_index) + completed_indices.add(problem_index) + + # Save checkpoint + save_checkpoint() + + progress_bar.update(1) + progress_bar.set_postfix({ + 'success': len(successful_indices), + 'failed': len(failed_indices), + 'saved': len(completed_indices) + }) + + return result + + # Run evaluations + start_time = time.time() + + try: + # Create tasks only for remaining problems + tasks = [evaluate_problem(problem) for problem in remaining_problems] + + if tasks: + # Execute all tasks concurrently (limited by semaphore) + await asyncio.gather(*tasks) + else: + logger.info("All problems already completed!") + + except KeyboardInterrupt: + logger.info("Evaluation interrupted by user. Progress saved to checkpoint.") + logger.info(f"To resume, use: --resume {checkpoint_file}") + raise + + finally: + progress_bar.close() + + # Calculate statistics + total_time = time.time() - start_time + completed_results = [r for r in results if r.get('status') == 'completed'] + grades = [r['grade']['grade'] for r in completed_results + if r.get('grade', {}).get('status') == 'success' and 'grade' in r.get('grade', {})] + + # Calculate numeric grades (CORRECT=5, INCORRECT=2.5) + numeric_grades = [5.0 if g == 'CORRECT' else 2.5 for g in grades] + average_grade = sum(numeric_grades) / len(numeric_grades) if numeric_grades else 0.0 + + model_info = loader.get_model_info() + + summary = { + 'total_problems': len(problems), + 'completed': len(completed_results), + 'successful': len(successful_indices), # Technical success (completed processing) + 'failed': len(failed_indices), # Technical failures + 'correct_answers': len(correct_indices), # Mathematically correct answers + 'incorrect_answers': len(successful_indices) - len(correct_indices), # Wrong but processed + 'success_rate': (len(successful_indices) / len(problems) * 100) if problems else 0, # Technical success rate + 'accuracy_rate': (len(correct_indices) / len(successful_indices) * 100) if successful_indices else 0, # Correctness rate + 'average_grade': average_grade, + 'total_time_seconds': total_time, + 'problems_per_second': len(problems) / total_time if total_time > 0 else 0, + 'solver_provider': model_info.get('solver_provider', solver_provider), + 'grader_provider': model_info.get('grader_provider', grader_provider or solver_provider), + 'variant_type': variant_type, + 'solver_model': loader.solver_model, + 'grader_model': loader.grader_model, + 'max_concurrent': max_concurrent, + 'estimated_cost': cost_info, + 'is_cross_provider': model_info.get('is_cross_provider', False) + } + + # Create full results + full_results = { + 'summary': summary, + 'problems': results, + 'successful_indices': successful_indices, # Technical successes + 'failed_indices': failed_indices, # Technical failures + 'correct_indices': correct_indices, # Correct answers + 'timestamp': datetime.now().isoformat() + } + + # Save if requested + if output_file: + logger.info(f"Saving results to {output_file}") + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(full_results, f, indent=2, ensure_ascii=False) + + # Print summary + logger.info(f"\n{'='*60}") + logger.info("CROSS-PROVIDER EVALUATION SUMMARY") + logger.info(f"{'='*60}") + logger.info(f"Solver Provider: {summary['solver_provider']} ({loader.solver_model})") + logger.info(f"Grader Provider: {summary['grader_provider']} ({loader.grader_model})") + logger.info(f"Variant: {variant_type}") + logger.info(f"Total problems: {summary['total_problems']}") + logger.info(f"✅ Successfully processed: {summary['successful']} ({summary['success_rate']:.1f}%)") + logger.info(f"💥 Technical failures: {summary['failed']}") + logger.info(f"🎯 Correct answers: {summary['correct_answers']} ({summary['accuracy_rate']:.1f}% of processed)") + logger.info(f"❌ Wrong answers: {summary['incorrect_answers']}") + logger.info(f"Average grade: {summary['average_grade']:.2f}") + logger.info(f"Total time: {summary['total_time_seconds']:.1f}s") + logger.info(f"Speed: {summary['problems_per_second']:.2f} problems/second") + + # Cleanup + if hasattr(loader, '__aexit__'): + await loader.__aexit__(None, None, None) + + return full_results + + +async def batch_evaluate_all_variants(dataset_path: Path, provider: str, + variants: List[str] = None, + max_concurrent: int = 3, max_files: int = None, + solver_model: str = None, grader_model: str = None, + output_dir: Path = None, + base_url: str = None, device: str = None) -> Dict[str, Any]: + """ + Batch evaluate problems across all variants using specified provider. + + Args: + dataset_path: Path to dataset directory + provider: AI provider name + variants: List of variants to test (None for all) + max_concurrent: Maximum concurrent evaluations + max_files: Maximum number of files to process per variant (None for all) + solver_model: Override solver model + grader_model: Override grader model + output_dir: Output directory path + **loader_kwargs: Additional arguments for loader + + Returns: + Dictionary with all variant results and comparative analysis + """ + if variants is None: + variants = ["original", "descriptive_long", "descriptive_long_confusing", + "descriptive_long_misleading", "garbled_string", "kernel_variant"] + + if output_dir is None: + output_dir = Path("results") + + logger = logging.getLogger(__name__) + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + config_name = f"{provider}" + if solver_model: + config_name += f"_{solver_model.replace('/', '_').replace('-', '_')}" + + # Create configuration-specific output directory + config_output_dir = output_dir / f"{config_name}_{timestamp}" + config_output_dir.mkdir(parents=True, exist_ok=True) + + # Prepare loader kwargs based on provider + loader_kwargs = {} + if provider == 'vllm' and base_url: + loader_kwargs['base_url'] = base_url + elif provider == 'huggingface' and device: + loader_kwargs['device'] = device + + logger.info(f"🚀 Starting multi-variant test for {config_name}") + logger.info(f"📊 Testing {len(variants)} variants with up to {max_files or 'ALL'} files each") + + overall_start_time = time.time() + variant_results = {} + + # Create overall progress bar for variants if tqdm is available + if HAS_TQDM: + variant_progress = tqdm.tqdm(total=len(variants), desc="Variants", + unit="variant", position=1, leave=True) + + for i, variant in enumerate(variants): + logger.info(f"\n📝 [{i+1}/{len(variants)}] Testing variant: {variant}") + variant_start_time = time.time() + + # Output file for this variant + variant_output_file = config_output_dir / f"{variant}_{timestamp}.json" + + try: + # Run batch evaluation for this variant + result = await batch_evaluate( + dataset_path=dataset_path, + provider=provider, + variant_type=variant, + max_concurrent=max_concurrent, + max_files=max_files, + solver_model=solver_model, + grader_model=grader_model, + output_file=variant_output_file, + **loader_kwargs + ) + + variant_time = time.time() - variant_start_time + + # Extract key metrics + summary = result.get('summary', {}) + variant_results[variant] = { + 'status': 'success', + 'output_file': str(variant_output_file), + 'total_problems': summary.get('total_problems', 0), + 'successful_evaluations': summary.get('successful', 0), + 'correct_evaluations': summary.get('correct_answers', 0), + 'incorrect_evaluations': summary.get('incorrect_answers', 0), + 'failed_evaluations': summary.get('failed', 0), + 'success_rate': summary.get('success_rate', 0), + 'average_grade': summary.get('average_grade', 0), + 'total_processing_time': summary.get('total_time_seconds', 0), + 'avg_time_per_problem': summary.get('problems_per_second', 0), + 'variant_test_time': variant_time, + 'grade_distribution': result.get('problems', []) # Assuming 'problems' contains all results + } + + logger.info(f"✅ {variant}: " + f"Grade {summary.get('average_grade', 0):.2f}, " + f"Success {summary.get('success_rate', 0):.1f}%, " + f"Time {variant_time/60:.1f}min") + + except Exception as e: + variant_time = time.time() - variant_start_time + error_msg = str(e) + + variant_results[variant] = { + 'status': 'failed', + 'error': error_msg, + 'variant_test_time': variant_time + } + + logger.error(f"❌ {variant} failed: {error_msg}") + + # Update variant progress bar + if HAS_TQDM and 'variant_progress' in locals(): + variant_progress.update(1) + successful_variants_count = len([v for v, r in variant_results.items() if r.get('status') == 'success']) + variant_progress.set_postfix({ + 'Success': successful_variants_count, + 'Failed': len(variant_results) - successful_variants_count + }) + + # Close variant progress bar + if HAS_TQDM and 'variant_progress' in locals(): + variant_progress.close() + + overall_time = time.time() - overall_start_time + + # Generate comprehensive summary + successful_variants = [v for v, r in variant_results.items() if r.get('status') == 'success'] + failed_variants = [v for v, r in variant_results.items() if r.get('status') == 'failed'] + + # Calculate aggregate statistics + if successful_variants: + total_problems = sum(variant_results[v].get('total_problems', 0) for v in successful_variants) + total_successful = sum(variant_results[v].get('successful_evaluations', 0) for v in successful_variants) + total_correct = sum(variant_results[v].get('correct_evaluations', 0) for v in successful_variants) + total_incorrect = sum(variant_results[v].get('incorrect_evaluations', 0) for v in successful_variants) + total_failed = sum(variant_results[v].get('failed_evaluations', 0) for v in successful_variants) + + grades = [variant_results[v].get('average_grade', 0) for v in successful_variants] + success_rates = [variant_results[v].get('success_rate', 0) for v in successful_variants] + times = [variant_results[v].get('avg_time_per_problem', 0) for v in successful_variants] + + overall_avg_grade = sum(grades) / len(grades) if grades else 0 + overall_success_rate = sum(success_rates) / len(success_rates) if success_rates else 0 + overall_avg_time = sum(times) / len(times) if times else 0 + + # Find best and worst performing variants + best_variant = max(successful_variants, key=lambda v: variant_results[v].get('average_grade', 0)) + worst_variant = min(successful_variants, key=lambda v: variant_results[v].get('average_grade', 0)) + + fastest_variant = min(successful_variants, key=lambda v: variant_results[v].get('avg_time_per_problem', float('inf'))) + slowest_variant = max(successful_variants, key=lambda v: variant_results[v].get('avg_time_per_problem', 0)) + else: + total_problems = total_successful = total_correct = total_incorrect = total_failed = 0 + overall_avg_grade = overall_success_rate = overall_avg_time = 0 + best_variant = worst_variant = fastest_variant = slowest_variant = None + + summary_result = { + 'configuration': { + 'provider': provider, + 'solver_model': solver_model, + 'grader_model': grader_model, + 'base_url': base_url, + 'device': device, + 'timestamp': timestamp + }, + 'test_overview': { + 'total_variants_tested': len(variant_results), + 'successful_variants': len(successful_variants), + 'failed_variants': len(failed_variants), + 'total_test_time_minutes': overall_time / 60, + 'variants_list': list(variant_results.keys()) + }, + 'aggregate_metrics': { + 'total_problems_across_variants': total_problems, + 'total_successful_evaluations': total_successful, + 'total_correct_evaluations': total_correct, + 'total_incorrect_evaluations': total_incorrect, + 'total_technical_failures': total_failed, + 'overall_average_grade': overall_avg_grade, + 'overall_success_rate': overall_success_rate, + 'overall_avg_time_per_problem': overall_avg_time + }, + 'variant_comparison': { + 'best_performing_variant': { + 'variant': best_variant, + 'grade': variant_results.get(best_variant, {}).get('average_grade', 0) if best_variant else 0 + }, + 'worst_performing_variant': { + 'variant': worst_variant, + 'grade': variant_results.get(worst_variant, {}).get('average_grade', 0) if worst_variant else 0 + }, + 'fastest_variant': { + 'variant': fastest_variant, + 'time_per_problem': variant_results.get(fastest_variant, {}).get('avg_time_per_problem', 0) if fastest_variant else 0 + }, + 'slowest_variant': { + 'variant': slowest_variant, + 'time_per_problem': variant_results.get(slowest_variant, {}).get('avg_time_per_problem', 0) if slowest_variant else 0 + } + }, + 'detailed_variant_results': variant_results + } + + # Save configuration summary + summary_file = config_output_dir / f"SUMMARY_{config_name}_{timestamp}.json" + with open(summary_file, 'w', encoding='utf-8') as f: + json.dump(summary_result, f, indent=2, ensure_ascii=False) + + # Print summary to console + logger.info("\n" + "="*80) + logger.info("📊 MULTI-VARIANT TEST SUMMARY REPORT") + logger.info("="*80) + + logger.info(f"🤖 Provider: {provider}") + if solver_model: + logger.info(f"🧠 Solver Model: {solver_model}") + if grader_model: + logger.info(f"📝 Grader Model: {grader_model}") + + logger.info(f"\n📋 Test Overview:") + logger.info(f" Total variants tested: {len(variant_results)}") + logger.info(f" Successful variants: {len(successful_variants)}") + logger.info(f" Failed variants: {len(failed_variants)}") + logger.info(f" Total test time: {overall_time/60:.1f} minutes") + + if total_problems > 0: + logger.info(f"\n📈 Aggregate Performance:") + logger.info(f" Total problems: {total_problems}") + logger.info(f" Overall average grade: {overall_avg_grade:.2f}") + logger.info(f" Overall success rate: {overall_success_rate:.1f}%") + logger.info(f" Average time per problem: {overall_avg_time:.2f}s") + + if best_variant: + logger.info(f"\n🏆 Variant Performance:") + logger.info(f" Best performing: {best_variant} (Grade: {variant_results[best_variant]['average_grade']:.2f})") + logger.info(f" Worst performing: {worst_variant} (Grade: {variant_results[worst_variant]['average_grade']:.2f})") + logger.info(f" Fastest: {fastest_variant} ({variant_results[fastest_variant]['avg_time_per_problem']:.2f}s/problem)") + logger.info(f" Slowest: {slowest_variant} ({variant_results[slowest_variant]['avg_time_per_problem']:.2f}s/problem)") + + logger.info("="*80) + logger.info(f"💾 Configuration summary saved to {summary_file}") + + return summary_result + + +async def main(): + """Main function.""" + parser = argparse.ArgumentParser(description="Batch evaluate mathematical problems") + + # Required arguments + parser.add_argument("--provider", required=True, choices=get_supported_providers(), + help="AI provider to use") + + # Dataset options + parser.add_argument("--dataset", default="dataset", + help="Dataset directory path (default: dataset)") + parser.add_argument("--variant", default="original", + choices=["original", "descriptive_long", "descriptive_long_confusing", + "descriptive_long_misleading", "garbled_string", "kernel_variant"], + help="Problem variant to use (default: original)") + parser.add_argument("--all-variants", action="store_true", + help="Test all 6 problem variants instead of just one") + parser.add_argument("--variants", nargs="+", + choices=["original", "descriptive_long", "descriptive_long_confusing", + "descriptive_long_misleading", "garbled_string", "kernel_variant"], + help="Specific variants to test (use with --all-variants)") + parser.add_argument("--max-files", type=int, + help="Maximum number of files to process per variant (default: all)") + + # Processing options + parser.add_argument("--max-concurrent", type=int, default=3, + help="Maximum concurrent evaluations (default: 3)") + parser.add_argument("--solver-model", + help="Override solver model") + parser.add_argument("--grader-model", + help="Override grader model") + + # Output options + parser.add_argument("--output", type=Path, + help="Output file path (default: results/[provider]_[timestamp].json)") + parser.add_argument("--output-dir", type=Path, default="results", + help="Output directory (default: results)") + parser.add_argument("--resume", type=Path, + help="Path to checkpoint file to resume from") + + # Provider-specific options + parser.add_argument("--base-url", + help="Base URL for VLLM provider") + parser.add_argument("--device", default="auto", + help="Device for HuggingFace provider (auto/cuda/cpu)") + + args = parser.parse_args() + + # Setup output directory and logging + args.output_dir.mkdir(parents=True, exist_ok=True) + logger = setup_logging(args.output_dir) + + # Default output file if not specified + if not args.output: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + args.output = args.output_dir / f"{args.provider}_{args.variant}_{timestamp}.json" + + # Prepare loader kwargs based on provider + loader_kwargs = {} + if args.provider == 'vllm' and args.base_url: + loader_kwargs['base_url'] = args.base_url + elif args.provider == 'huggingface' and args.device: + loader_kwargs['device'] = args.device + + try: + if args.all_variants or args.variants: + # Multi-variant evaluation + variants_to_test = args.variants if args.variants else None + results = await batch_evaluate_all_variants( + dataset_path=Path(args.dataset), + provider=args.provider, + variants=variants_to_test, + max_concurrent=args.max_concurrent, + max_files=args.max_files, + solver_model=args.solver_model, + grader_model=args.grader_model, + output_dir=args.output_dir, + base_url=args.base_url, + device=args.device + ) + + logger.info(f"Multi-variant evaluation completed successfully!") + logger.info(f"Overall average grade: {results['aggregate_metrics']['overall_average_grade']:.2f}") + logger.info(f"Overall success rate: {results['aggregate_metrics']['overall_success_rate']:.1f}%") + else: + # Single variant evaluation + results = await batch_evaluate( + dataset_path=Path(args.dataset), + provider=args.provider, + variant_type=args.variant, + max_concurrent=args.max_concurrent, + max_files=args.max_files, + solver_model=args.solver_model, + grader_model=args.grader_model, + output_file=args.output, + resume_checkpoint=args.resume, + **loader_kwargs + ) + + logger.info(f"Batch evaluation completed successfully!") + logger.info(f"Average grade: {results['summary']['average_grade']:.2f}") + logger.info(f"Success rate: {results['summary']['success_rate']:.1f}%") + + except KeyboardInterrupt: + logger.info("Evaluation interrupted by user") + except Exception as e: + logger.error(f"Evaluation failed: {str(e)}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit(asyncio.run(main()))
\ No newline at end of file diff --git a/putnam-bench-anon/scripts/benchmark.py b/putnam-bench-anon/scripts/benchmark.py new file mode 100644 index 0000000..2fed228 --- /dev/null +++ b/putnam-bench-anon/scripts/benchmark.py @@ -0,0 +1,481 @@ +#!/usr/bin/env python3 +""" +Benchmark script for comparing AI providers and models on mathematical problems. + +This script runs comparative evaluations across multiple providers, models, and +problem variants to assess performance, accuracy, cost, and speed trade-offs. + +Usage: + python benchmark.py --config benchmark_config.json + python benchmark.py --quick-test # Quick 3-problem test across all providers + python benchmark.py --providers openai anthropic --models gpt-4o-mini claude-3-5-haiku +""" + +import asyncio +import json +import sys +import time +from pathlib import Path +import argparse +from typing import List, Dict, Any, Tuple +import logging +from datetime import datetime +import itertools +import statistics + +# Add the loader module to the path +sys.path.append(str(Path(__file__).parent)) + +from loader import create_loader, get_supported_providers, get_default_models + + +class BenchmarkRunner: + """Benchmark runner for AI providers.""" + + def __init__(self, output_dir: Path = Path("benchmark_results")): + self.output_dir = output_dir + self.output_dir.mkdir(parents=True, exist_ok=True) + + # Setup logging + log_file = self.output_dir / f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] + ) + self.logger = logging.getLogger(__name__) + + async def load_test_problems(self, dataset_path: Path, max_problems: int = 10) -> List[Dict[str, Any]]: + """Load test problems from dataset.""" + json_files = list(dataset_path.glob("*.json"))[:max_problems] + + problems = [] + for json_file in json_files: + try: + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + data['_source_file'] = str(json_file.name) + problems.append(data) + except Exception as e: + self.logger.warning(f"Failed to load {json_file}: {str(e)}") + + return problems + + async def run_single_configuration(self, + provider: str, + solver_model: str, + grader_model: str, + problems: List[Dict[str, Any]], + variant_type: str = "original", + **loader_kwargs) -> Dict[str, Any]: + """Run benchmark for a single provider/model configuration.""" + config_name = f"{provider}_{solver_model}_{grader_model}".replace("/", "_").replace("-", "_") + self.logger.info(f"🚀 Testing configuration: {config_name}") + + result = { + 'configuration': { + 'provider': provider, + 'solver_model': solver_model, + 'grader_model': grader_model, + 'variant_type': variant_type, + 'loader_kwargs': loader_kwargs + }, + 'metrics': {}, + 'problems': [], + 'errors': [] + } + + try: + # Create loader + loader = create_loader( + provider, + solver_model=solver_model, + grader_model=grader_model, + **loader_kwargs + ) + + # Health check + if not await loader.health_check(): + raise RuntimeError(f"Health check failed for {provider}") + + # Cost estimation + cost_info = await loader.estimate_cost(len(problems)) + result['metrics']['estimated_cost'] = cost_info + + # Process each problem + start_time = time.time() + grades = [] + processing_times = [] + + for i, problem in enumerate(problems): + problem_start = time.time() + + try: + problem_result = await loader.test_single_problem( + problem, + variant_type=variant_type + ) + + processing_time = time.time() - problem_start + # Convert boolean 'correct' to numeric grade (10 for correct, 0 for incorrect) + grade = 10 if problem_result.get('correct', False) else 0 + + grades.append(grade) + processing_times.append(processing_time) + + result['problems'].append({ + 'source_file': problem.get('_source_file', f'problem_{i}'), + 'grade': grade, + 'processing_time': processing_time, + 'solution_length': len(problem_result.get('solution', '')), + 'grading_feedback_length': len(str(problem_result.get('grading_result', {}).get('feedback', ''))) + }) + + self.logger.info(f" Problem {i+1}/{len(problems)}: Grade {grade} ({processing_time:.2f}s)") + + except Exception as e: + error_info = { + 'problem_index': i, + 'source_file': problem.get('_source_file', f'problem_{i}'), + 'error': str(e), + 'processing_time': time.time() - problem_start + } + result['errors'].append(error_info) + self.logger.error(f" Problem {i+1}/{len(problems)} failed: {str(e)}") + + total_time = time.time() - start_time + + # Calculate metrics + if grades: + result['metrics'].update({ + 'total_problems': len(problems), + 'successful_problems': len(grades), + 'failed_problems': len(result['errors']), + 'success_rate': len(grades) / len(problems) * 100, + 'average_grade': statistics.mean(grades), + 'median_grade': statistics.median(grades), + 'grade_std': statistics.stdev(grades) if len(grades) > 1 else 0, + 'max_grade': max(grades), + 'min_grade': min(grades), + 'total_time': total_time, + 'average_time_per_problem': statistics.mean(processing_times), + 'median_time_per_problem': statistics.median(processing_times), + 'total_time_successful': sum(processing_times), + 'throughput_problems_per_minute': len(grades) / (total_time / 60) if total_time > 0 else 0 + }) + else: + result['metrics'].update({ + 'total_problems': len(problems), + 'successful_problems': 0, + 'failed_problems': len(result['errors']), + 'success_rate': 0, + 'total_time': total_time, + 'error_rate': 100 + }) + + self.logger.info(f"✅ Configuration completed: {result['metrics']['success_rate']:.1f}% success, " + f"avg grade: {result['metrics'].get('average_grade', 0):.2f}") + + except Exception as e: + result['metrics']['fatal_error'] = str(e) + self.logger.error(f"❌ Configuration failed: {str(e)}") + + return result + + async def run_comparative_benchmark(self, + configurations: List[Dict[str, Any]], + problems: List[Dict[str, Any]], + variant_type: str = "original") -> Dict[str, Any]: + """Run comparative benchmark across multiple configurations.""" + self.logger.info(f"🏁 Starting comparative benchmark with {len(configurations)} configurations") + self.logger.info(f"📊 Testing {len(problems)} problems with variant: {variant_type}") + + benchmark_start = time.time() + results = [] + + for i, config in enumerate(configurations): + self.logger.info(f"\n📋 Configuration {i+1}/{len(configurations)}") + + provider = config['provider'] + solver_model = config.get('solver_model') + grader_model = config.get('grader_model') + loader_kwargs = config.get('loader_kwargs', {}) + + # Use defaults if not specified + if not solver_model or not grader_model: + defaults = get_default_models(provider) + solver_model = solver_model or defaults['solver_model'] + grader_model = grader_model or defaults['grader_model'] + + config_result = await self.run_single_configuration( + provider=provider, + solver_model=solver_model, + grader_model=grader_model, + problems=problems, + variant_type=variant_type, + **loader_kwargs + ) + + results.append(config_result) + + total_benchmark_time = time.time() - benchmark_start + + # Generate comparison report + report = self.generate_comparison_report(results, total_benchmark_time) + + # Save detailed results + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + detailed_file = self.output_dir / f"benchmark_detailed_{timestamp}.json" + with open(detailed_file, 'w', encoding='utf-8') as f: + json.dump({ + 'benchmark_info': { + 'timestamp': datetime.now().isoformat(), + 'total_configurations': len(configurations), + 'total_problems': len(problems), + 'variant_type': variant_type, + 'total_time': total_benchmark_time + }, + 'configurations': configurations, + 'results': results, + 'comparison_report': report + }, f, indent=2, ensure_ascii=False) + + self.logger.info(f"💾 Detailed results saved to {detailed_file}") + + return report + + def generate_comparison_report(self, results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]: + """Generate comparison report from benchmark results.""" + self.logger.info("\n" + "="*60) + self.logger.info("📊 BENCHMARK COMPARISON REPORT") + self.logger.info("="*60) + + # Filter successful results + successful_results = [r for r in results if r['metrics'].get('success_rate', 0) > 0] + + if not successful_results: + self.logger.warning("⚠️ No successful configurations found!") + return {'error': 'No successful configurations'} + + # Ranking by different metrics + rankings = { + 'accuracy': sorted(successful_results, key=lambda x: x['metrics']['average_grade'], reverse=True), + 'speed': sorted(successful_results, key=lambda x: x['metrics']['average_time_per_problem']), + 'throughput': sorted(successful_results, key=lambda x: x['metrics']['throughput_problems_per_minute'], reverse=True), + 'success_rate': sorted(successful_results, key=lambda x: x['metrics']['success_rate'], reverse=True) + } + + # Print rankings + for metric, ranked_results in rankings.items(): + self.logger.info(f"\n🏆 Top 3 by {metric.upper()}:") + for i, result in enumerate(ranked_results[:3]): + config = result['configuration'] + metrics = result['metrics'] + provider = config['provider'] + solver = config['solver_model'] + + if metric == 'accuracy': + value = f"{metrics['average_grade']:.2f}" + elif metric == 'speed': + value = f"{metrics['average_time_per_problem']:.2f}s" + elif metric == 'throughput': + value = f"{metrics['throughput_problems_per_minute']:.1f} prob/min" + elif metric == 'success_rate': + value = f"{metrics['success_rate']:.1f}%" + + self.logger.info(f" {i+1}. {provider}/{solver}: {value}") + + # Calculate cost efficiency + cost_efficiency = [] + for result in successful_results: + metrics = result['metrics'] + cost_info = metrics.get('estimated_cost', {}) + total_cost = cost_info.get('total_cost', 0) + avg_grade = metrics.get('average_grade', 0) + + if total_cost > 0 and avg_grade > 0: + efficiency = avg_grade / total_cost # Grade per unit cost + cost_efficiency.append({ + 'result': result, + 'efficiency': efficiency, + 'cost': total_cost, + 'grade': avg_grade + }) + + if cost_efficiency: + cost_efficiency.sort(key=lambda x: x['efficiency'], reverse=True) + self.logger.info(f"\n💰 Top 3 by COST EFFICIENCY (Grade/Cost):") + for i, item in enumerate(cost_efficiency[:3]): + config = item['result']['configuration'] + provider = config['provider'] + solver = config['solver_model'] + self.logger.info(f" {i+1}. {provider}/{solver}: {item['efficiency']:.2f} " + f"(Grade: {item['grade']:.2f}, Cost: {item['cost']:.4f})") + + # Overall statistics + all_grades = [] + all_times = [] + all_success_rates = [] + + for result in successful_results: + metrics = result['metrics'] + all_grades.append(metrics['average_grade']) + all_times.append(metrics['average_time_per_problem']) + all_success_rates.append(metrics['success_rate']) + + self.logger.info(f"\n📈 OVERALL STATISTICS:") + self.logger.info(f" Configurations tested: {len(results)}") + self.logger.info(f" Successful configurations: {len(successful_results)}") + self.logger.info(f" Average grade across all: {statistics.mean(all_grades):.2f}") + self.logger.info(f" Average time per problem: {statistics.mean(all_times):.2f}s") + self.logger.info(f" Average success rate: {statistics.mean(all_success_rates):.1f}%") + self.logger.info(f" Total benchmark time: {total_time/60:.2f} minutes") + + # Generate final report + report = { + 'summary': { + 'total_configurations': len(results), + 'successful_configurations': len(successful_results), + 'overall_avg_grade': statistics.mean(all_grades) if all_grades else 0, + 'overall_avg_time': statistics.mean(all_times) if all_times else 0, + 'overall_avg_success_rate': statistics.mean(all_success_rates) if all_success_rates else 0, + 'total_benchmark_time': total_time + }, + 'rankings': { + metric: [ + { + 'provider': r['configuration']['provider'], + 'solver_model': r['configuration']['solver_model'], + 'grader_model': r['configuration']['grader_model'], + 'score': r['metrics'][metric_key] + } + for r in ranked[:5] # Top 5 + ] for metric, ranked in rankings.items() + for metric_key in [{'accuracy': 'average_grade', 'speed': 'average_time_per_problem', + 'throughput': 'throughput_problems_per_minute', 'success_rate': 'success_rate'}[metric]] + }, + 'cost_efficiency': [ + { + 'provider': item['result']['configuration']['provider'], + 'solver_model': item['result']['configuration']['solver_model'], + 'efficiency': item['efficiency'], + 'grade': item['grade'], + 'cost': item['cost'] + } + for item in cost_efficiency[:5] + ] if cost_efficiency else [] + } + + return report + + +async def run_quick_test(): + """Run a quick test across all providers with 3 problems.""" + runner = BenchmarkRunner() + + # Load 3 test problems + problems = await runner.load_test_problems(Path("dataset"), max_problems=3) + if not problems: + print("❌ No test problems found in dataset directory") + return + + # Default configurations for all providers + configurations = [] + for provider in get_supported_providers(): + config = {'provider': provider} + + # Provider-specific settings + if provider == 'vllm': + config['loader_kwargs'] = {'base_url': 'http://localhost:8000/v1'} + elif provider == 'huggingface': + config['loader_kwargs'] = { + 'device': 'cpu', + 'solver_model': 'microsoft/DialoGPT-small', + 'grader_model': 'microsoft/DialoGPT-small' + } + + configurations.append(config) + + # Run benchmark + await runner.run_comparative_benchmark(configurations, problems) + + +async def run_custom_benchmark(config_file: Path): + """Run benchmark from configuration file.""" + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + + runner = BenchmarkRunner(Path(config.get('output_dir', 'benchmark_results'))) + + # Load problems + dataset_path = Path(config.get('dataset_path', 'dataset')) + max_problems = config.get('max_problems', 10) + variant_type = config.get('variant_type', 'original') + + problems = await runner.load_test_problems(dataset_path, max_problems) + if not problems: + print(f"❌ No problems found in {dataset_path}") + return + + # Load configurations + configurations = config.get('configurations', []) + if not configurations: + print("❌ No configurations specified in config file") + return + + # Run benchmark + await runner.run_comparative_benchmark(configurations, problems, variant_type) + + +async def main(): + """Main function.""" + parser = argparse.ArgumentParser(description="Benchmark AI providers on mathematical problems") + + # Benchmark modes + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--config", type=Path, help="Configuration file path") + group.add_argument("--quick-test", action="store_true", + help="Quick test with 3 problems across all providers") + + # Custom benchmark options + parser.add_argument("--providers", nargs="+", choices=get_supported_providers(), + help="Providers to test (for custom benchmark)") + parser.add_argument("--models", nargs="+", + help="Models to test (for custom benchmark)") + parser.add_argument("--dataset", type=Path, default="dataset", + help="Dataset path (default: dataset)") + parser.add_argument("--max-problems", type=int, default=10, + help="Maximum problems to test (default: 10)") + parser.add_argument("--variant", default="original", + choices=["original", "descriptive_long", "kernel_variant"], + help="Problem variant (default: original)") + parser.add_argument("--output-dir", type=Path, default="benchmark_results", + help="Output directory (default: benchmark_results)") + + args = parser.parse_args() + + try: + if args.quick_test: + await run_quick_test() + elif args.config: + await run_custom_benchmark(args.config) + else: + # Custom benchmark mode (placeholder for future implementation) + print("Custom benchmark mode not yet implemented. Use --config or --quick-test.") + return 1 + + return 0 + + except KeyboardInterrupt: + print("\n⏸️ Benchmark interrupted by user") + return 1 + except Exception as e: + print(f"\n❌ Benchmark failed: {str(e)}") + return 1 + + +if __name__ == "__main__": + exit(asyncio.run(main()))
\ No newline at end of file diff --git a/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py new file mode 100644 index 0000000..76952bd --- /dev/null +++ b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py @@ -0,0 +1,630 @@ +#!/usr/bin/env python3 +""" +原题 vs Kernel Variant 数学能力对比测试 +使用4o-mini解题,o3严格评分,比较两种题目的正确率差异 +""" + +import os +import json +import asyncio +import pathlib +import time +import re +import random +from typing import Dict, List, Tuple, Optional +import click +import tqdm +from openai import AsyncOpenAI, RateLimitError, APIError, APIConnectionError + +# Configuration +SOLVER_MODEL = "gpt-4o-mini" # 用于解题的模型 +GRADER_MODEL = "o3" # 用于评分的模型 +SRC_DIR = pathlib.Path("raw/json") +RESULTS_DIR = pathlib.Path("results/comparison_test") +RESULTS_DIR.mkdir(parents=True, exist_ok=True) + +RETRIES = 4 +TIMEOUT_BASE = 600 +RESP_FMT = {"type": "json_object"} + +# 解题系统prompt - 4o-mini +SOLVER_SYSTEM_PROMPT = """You are an expert mathematician solving competition-level problems. +Provide detailed, step-by-step solutions with clear mathematical reasoning. + +Requirements: +- Show all your work and intermediate steps +- Justify each major step of your reasoning +- Use proper mathematical notation +- Be thorough but concise +- State your final answer clearly + +Solve the problem completely and rigorously.""" + +SOLVER_USER_TEMPLATE = """Please solve this mathematical problem: + +{problem_statement} + +Provide a complete solution with detailed reasoning. Return your response in JSON format: +{{"solution": "your complete step-by-step solution with mathematical reasoning", + "final_answer": "your final answer in a clear, concise form"}}""" + +# 证明题严格评分系统prompt - o3 +PROOF_GRADER_SYSTEM_PROMPT = """You are an extremely strict mathematical grader evaluating competition-level PROOF problems. + +GRADING STANDARDS (BE VERY STRICT): +- Mathematical rigor: Every step must be mathematically sound and justified +- Logical flow: The reasoning must be clear, complete, and logically connected +- Correctness: All calculations, algebraic manipulations, and conclusions must be correct +- Completeness: The solution must address all parts of the problem fully +- Precision: Mathematical statements must be precise and unambiguous + +FAILING CRITERIA (Mark as INCORRECT if ANY of these apply): +- Any unjustified logical leap or gap in reasoning +- Any computational error, no matter how small +- Missing steps in critical parts of the argument +- Imprecise or ambiguous mathematical statements +- Incorrect final answer, even if approach is partially correct +- Circular reasoning or logical fallacies +- Misuse of mathematical theorems or definitions + +BE EXTREMELY STRICT. Competition mathematics proofs require perfect precision.""" + +# 计算题相对宽松评分系统prompt - o3 +CALCULATION_GRADER_SYSTEM_PROMPT = """You are a mathematical grader evaluating competition-level CALCULATION problems. + +GRADING STANDARDS FOR CALCULATION PROBLEMS: +- Primary focus: Is the final answer correct? +- Secondary focus: Is the overall approach reasonable and mathematically sound? +- Computation: Allow minor computational slips if the method is correct and final answer is right + +GRADING CRITERIA: +- CORRECT: Final answer is correct AND approach is fundamentally sound +- INCORRECT: Final answer is wrong OR approach is fundamentally flawed + +For calculation problems, the final numerical answer is the most important criterion. +Minor intermediate errors are acceptable if they don't affect the final result.""" + +PROOF_GRADER_USER_TEMPLATE = """Grade this PROOF solution with extreme strictness. + +PROBLEM: +{problem_statement} + +STUDENT SOLUTION: +{solution} + +CORRECT REFERENCE SOLUTION: +{reference_solution} + +Evaluate with maximum strictness. Every logical step must be perfect. Return JSON with: +{{"grade": "CORRECT" or "INCORRECT", + "detailed_feedback": "specific detailed analysis of what is right/wrong", + "major_issues": "list of significant mathematical errors or gaps", + "final_answer_correct": true or false, + "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed), + "overall_assessment": "comprehensive evaluation summary"}}""" + +CALCULATION_GRADER_USER_TEMPLATE = """Grade this CALCULATION solution with focus on final answer correctness. + +PROBLEM: +{problem_statement} + +STUDENT SOLUTION: +{solution} + +CORRECT REFERENCE SOLUTION: +{reference_solution} + +Focus primarily on whether the final answer is correct. Return JSON with: +{{"grade": "CORRECT" or "INCORRECT", + "detailed_feedback": "specific detailed analysis of what is right/wrong", + "major_issues": "list of significant mathematical errors or gaps", + "final_answer_correct": true or false, + "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed), + "overall_assessment": "comprehensive evaluation summary"}}""" + +JSON_RE = re.compile(r"\{[\s\S]*\}") + +def parse_json_response(raw: str) -> Optional[Dict]: + """Parse JSON from LLM response with fallback strategies.""" + if not raw: + return None + + try: + return json.loads(raw) + except: + pass + + match = JSON_RE.search(raw) + if match: + try: + return json.loads(match.group(0)) + except: + pass + + try: + fixed = raw.replace('\\"', '"').replace('\\\\', '\\') + return json.loads(fixed) + except: + pass + + return None + +def to_str(x) -> str: + """Convert various types to string safely.""" + if x is None: + return "" + if isinstance(x, str): + return x + if isinstance(x, (list, tuple)): + return "\n".join(map(str, x)) + return str(x) + +async def call_api_with_retry(cli: AsyncOpenAI, model: str, messages: List[Dict]) -> Tuple[Optional[Dict], str]: + """Make OpenAI API call with retry logic.""" + raw_response = "" + + for attempt in range(1, RETRIES + 1): + timeout = TIMEOUT_BASE * (2 ** (attempt - 1)) + try: + # Set temperature based on model + # o3, o3-mini, and o4-mini require temperature 1.0 + if any(model_name in model.lower() for model_name in ['o3', 'o3-mini', 'o4-mini']): + temperature = 1.0 + else: + # Use temperature 0.0 for deterministic solving with other models + temperature = 0.0 + + response = await asyncio.wait_for( + cli.chat.completions.create( + model=model, + messages=messages, + temperature=temperature, + response_format=RESP_FMT, + ), + timeout=timeout, + ) + raw_response = response.choices[0].message.content or "" + parsed = parse_json_response(raw_response) + if parsed: + return parsed, raw_response + raise ValueError("Failed to parse JSON response") + + except RateLimitError as e: + print(f"🚫 RateLimitError (attempt {attempt}/{RETRIES}): {str(e)}") + if "insufficient_quota" in str(e): + print("⏳ Detected quota exhaustion - sleeping 15 minutes") + await asyncio.sleep(900) + else: + sleep_time = 2 ** attempt + random.random() + print(f" ⏰ Rate limited, sleeping {sleep_time:.1f}s") + await asyncio.sleep(sleep_time) + + except (APIError, APIConnectionError, asyncio.TimeoutError, ValueError) as e: + print(f"❌ {type(e).__name__} (attempt {attempt}/{RETRIES}): {str(e)}") + if attempt == RETRIES: + return None, raw_response + sleep_time = 2 ** attempt + random.random() + print(f" ⏰ Retrying in {sleep_time:.1f}s") + await asyncio.sleep(sleep_time) + + return None, raw_response + +async def solve_problem(cli: AsyncOpenAI, problem_statement: str) -> Tuple[Optional[Dict], str]: + """让4o-mini解题""" + messages = [ + {"role": "system", "content": SOLVER_SYSTEM_PROMPT}, + {"role": "user", "content": SOLVER_USER_TEMPLATE.format( + problem_statement=problem_statement + )} + ] + return await call_api_with_retry(cli, SOLVER_MODEL, messages) + +async def grade_solution(cli: AsyncOpenAI, problem_statement: str, solution: str, + reference_solution: str, problem_type: str = "proof") -> Tuple[Optional[Dict], str]: + """让o3根据题型评分 - 证明题严格,计算题注重答案""" + if problem_type == "calculation": + system_prompt = CALCULATION_GRADER_SYSTEM_PROMPT + user_template = CALCULATION_GRADER_USER_TEMPLATE + else: # Default to proof (strict grading) + system_prompt = PROOF_GRADER_SYSTEM_PROMPT + user_template = PROOF_GRADER_USER_TEMPLATE + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_template.format( + problem_statement=problem_statement, + solution=solution, + reference_solution=reference_solution + )} + ] + return await call_api_with_retry(cli, GRADER_MODEL, messages) + +async def test_single_file(file_path: pathlib.Path, cli: AsyncOpenAI) -> Dict: + """测试单个文件的原题和kernel variant""" + try: + # 加载数据 + data = json.loads(file_path.read_text(encoding='utf-8')) + index = data.get("index", file_path.stem) + + # 检查必要字段 + original_question = to_str(data.get("question", "")).strip() + original_solution = to_str(data.get("solution", "")).strip() + problem_type = data.get("problem_type", "proof") # 默认为证明题,严格评分 + + kv = data.get("variants", {}).get("kernel_variant") + if not kv: + return { + "index": index, + "status": "skipped", + "reason": "no_kernel_variant" + } + + kernel_question = to_str(kv.get("question", "")).strip() + kernel_solution = to_str(kv.get("solution", "")).strip() + + if not all([original_question, original_solution, kernel_question, kernel_solution]): + return { + "index": index, + "status": "skipped", + "reason": "missing_fields" + } + + print(f"🧮 Testing {index} (Type: {problem_type.upper()})") + start_time = time.time() + + result = { + "index": index, + "status": "completed", + "timestamp": time.time(), + "problem_type": problem_type, + "original": {}, + "kernel_variant": {}, + "comparison": {} + } + + # 1. 让4o-mini解原题 + print(f" 📝 Solving original problem...") + orig_solve_result, orig_solve_raw = await solve_problem(cli, original_question) + + if not orig_solve_result: + result["original"]["solve_status"] = "failed" + result["status"] = "failed" + return result + + orig_student_solution = to_str(orig_solve_result.get("solution", "")).strip() + orig_final_answer = to_str(orig_solve_result.get("final_answer", "")).strip() + + result["original"]["student_solution"] = orig_student_solution + result["original"]["student_final_answer"] = orig_final_answer + result["original"]["solve_status"] = "success" + + # 2. 让4o-mini解kernel variant + print(f" 📝 Solving kernel variant...") + kv_solve_result, kv_solve_raw = await solve_problem(cli, kernel_question) + + if not kv_solve_result: + result["kernel_variant"]["solve_status"] = "failed" + result["status"] = "failed" + return result + + kv_student_solution = to_str(kv_solve_result.get("solution", "")).strip() + kv_final_answer = to_str(kv_solve_result.get("final_answer", "")).strip() + + result["kernel_variant"]["student_solution"] = kv_student_solution + result["kernel_variant"]["student_final_answer"] = kv_final_answer + result["kernel_variant"]["solve_status"] = "success" + + # 3. o3根据题型评分原题解答 + grading_style = "STRICT" if problem_type == "proof" else "LENIENT" + print(f" 🔍 Grading original solution ({grading_style})...") + orig_grade_result, orig_grade_raw = await grade_solution( + cli, original_question, orig_student_solution, original_solution, problem_type + ) + + if not orig_grade_result: + result["original"]["grade_status"] = "failed" + else: + result["original"]["grade_status"] = "success" + result["original"]["grade"] = orig_grade_result.get("grade", "UNKNOWN") + result["original"]["detailed_feedback"] = orig_grade_result.get("detailed_feedback", "") + result["original"]["major_issues"] = orig_grade_result.get("major_issues", "") + result["original"]["final_answer_correct"] = orig_grade_result.get("final_answer_correct", False) + result["original"]["reasoning_rigor_score"] = orig_grade_result.get("reasoning_rigor_score", 0) + result["original"]["overall_assessment"] = orig_grade_result.get("overall_assessment", "") + + # 4. o3根据题型评分kernel variant解答 + print(f" 🔍 Grading kernel variant solution ({grading_style})...") + kv_grade_result, kv_grade_raw = await grade_solution( + cli, kernel_question, kv_student_solution, kernel_solution, problem_type + ) + + if not kv_grade_result: + result["kernel_variant"]["grade_status"] = "failed" + else: + result["kernel_variant"]["grade_status"] = "success" + result["kernel_variant"]["grade"] = kv_grade_result.get("grade", "UNKNOWN") + result["kernel_variant"]["detailed_feedback"] = kv_grade_result.get("detailed_feedback", "") + result["kernel_variant"]["major_issues"] = kv_grade_result.get("major_issues", "") + result["kernel_variant"]["final_answer_correct"] = kv_grade_result.get("final_answer_correct", False) + result["kernel_variant"]["reasoning_rigor_score"] = kv_grade_result.get("reasoning_rigor_score", 0) + result["kernel_variant"]["overall_assessment"] = kv_grade_result.get("overall_assessment", "") + + # 5. 比较分析 + if (result["original"]["grade_status"] == "success" and + result["kernel_variant"]["grade_status"] == "success"): + + orig_correct = result["original"]["grade"] == "CORRECT" + kv_correct = result["kernel_variant"]["grade"] == "CORRECT" + + result["comparison"]["original_correct"] = orig_correct + result["comparison"]["kernel_variant_correct"] = kv_correct + result["comparison"]["both_correct"] = orig_correct and kv_correct + result["comparison"]["both_incorrect"] = not orig_correct and not kv_correct + result["comparison"]["original_harder"] = not orig_correct and kv_correct # 原题更难 + result["comparison"]["kernel_variant_harder"] = orig_correct and not kv_correct # kernel variant更难 + + orig_rigor = result["original"]["reasoning_rigor_score"] + kv_rigor = result["kernel_variant"]["reasoning_rigor_score"] + result["comparison"]["rigor_difference"] = orig_rigor - kv_rigor # 正数=原题推理更严谨 + + total_time = time.time() - start_time + result["processing_time"] = total_time + + print(f" ✅ Completed {index} in {total_time:.1f}s") + if result["comparison"]: + orig_status = "✅" if result["comparison"]["original_correct"] else "❌" + kv_status = "✅" if result["comparison"]["kernel_variant_correct"] else "❌" + print(f" Original: {orig_status}, Kernel Variant: {kv_status}") + + return result + + except Exception as e: + return { + "index": index if 'index' in locals() else file_path.stem, + "status": "error", + "error": str(e), + "error_type": type(e).__name__, + "timestamp": time.time() + } + +async def save_detailed_results(results: List[Dict], output_file: str): + """保存详细结果""" + output_path = RESULTS_DIR / f"{output_file}_detailed.json" + try: + output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding='utf-8') + print(f"💾 Detailed results saved to {output_path}") + except Exception as e: + print(f"❌ Failed to save detailed results: {e}") + +def generate_summary_report(results: List[Dict]) -> Dict: + """生成汇总报告""" + summary = { + "total_files": len(results), + "completed": 0, + "failed": 0, + "skipped": 0, + "by_problem_type": { + "proof": {"count": 0, "original_correct": 0, "kv_correct": 0}, + "calculation": {"count": 0, "original_correct": 0, "kv_correct": 0} + }, + "original_stats": {"correct": 0, "incorrect": 0, "total_graded": 0}, + "kernel_variant_stats": {"correct": 0, "incorrect": 0, "total_graded": 0}, + "comparison_stats": { + "both_correct": 0, + "both_incorrect": 0, + "original_harder": 0, + "kernel_variant_harder": 0, + "total_compared": 0 + }, + "rigor_analysis": { + "original_avg_rigor": 0, + "kernel_variant_avg_rigor": 0, + "rigor_difference_avg": 0 + } + } + + orig_rigor_scores = [] + kv_rigor_scores = [] + rigor_differences = [] + + for result in results: + if result["status"] == "completed": + summary["completed"] += 1 + + # 按题型统计 + ptype = result.get("problem_type", "proof") + if ptype in summary["by_problem_type"]: + summary["by_problem_type"][ptype]["count"] += 1 + if result["original"].get("grade") == "CORRECT": + summary["by_problem_type"][ptype]["original_correct"] += 1 + if result["kernel_variant"].get("grade") == "CORRECT": + summary["by_problem_type"][ptype]["kv_correct"] += 1 + + # 原题统计 + if result["original"].get("grade_status") == "success": + summary["original_stats"]["total_graded"] += 1 + if result["original"]["grade"] == "CORRECT": + summary["original_stats"]["correct"] += 1 + else: + summary["original_stats"]["incorrect"] += 1 + orig_rigor_scores.append(result["original"]["reasoning_rigor_score"]) + + # kernel variant统计 + if result["kernel_variant"].get("grade_status") == "success": + summary["kernel_variant_stats"]["total_graded"] += 1 + if result["kernel_variant"]["grade"] == "CORRECT": + summary["kernel_variant_stats"]["correct"] += 1 + else: + summary["kernel_variant_stats"]["incorrect"] += 1 + kv_rigor_scores.append(result["kernel_variant"]["reasoning_rigor_score"]) + + # 比较统计 + if result.get("comparison"): + summary["comparison_stats"]["total_compared"] += 1 + comp = result["comparison"] + if comp["both_correct"]: + summary["comparison_stats"]["both_correct"] += 1 + elif comp["both_incorrect"]: + summary["comparison_stats"]["both_incorrect"] += 1 + elif comp["original_harder"]: + summary["comparison_stats"]["original_harder"] += 1 + elif comp["kernel_variant_harder"]: + summary["comparison_stats"]["kernel_variant_harder"] += 1 + + rigor_differences.append(comp["rigor_difference"]) + + elif result["status"] == "skipped": + summary["skipped"] += 1 + else: + summary["failed"] += 1 + + # 计算平均分 + if orig_rigor_scores: + summary["rigor_analysis"]["original_avg_rigor"] = sum(orig_rigor_scores) / len(orig_rigor_scores) + if kv_rigor_scores: + summary["rigor_analysis"]["kernel_variant_avg_rigor"] = sum(kv_rigor_scores) / len(kv_rigor_scores) + if rigor_differences: + summary["rigor_analysis"]["rigor_difference_avg"] = sum(rigor_differences) / len(rigor_differences) + + # 计算正确率 + if summary["original_stats"]["total_graded"] > 0: + summary["original_stats"]["accuracy"] = summary["original_stats"]["correct"] / summary["original_stats"]["total_graded"] + + if summary["kernel_variant_stats"]["total_graded"] > 0: + summary["kernel_variant_stats"]["accuracy"] = summary["kernel_variant_stats"]["correct"] / summary["kernel_variant_stats"]["total_graded"] + + return summary + +def print_summary_report(summary: Dict): + """打印汇总报告""" + print("\n" + "="*80) + print("📊 ORIGINAL vs KERNEL VARIANT COMPARISON REPORT") + print("="*80) + + print(f"📁 Total files: {summary['total_files']}") + print(f"✅ Completed: {summary['completed']}") + print(f"⏭️ Skipped: {summary['skipped']}") + print(f"❌ Failed: {summary['failed']}") + + print(f"\n📈 ACCURACY COMPARISON:") + orig_acc = summary["original_stats"].get("accuracy", 0) * 100 + kv_acc = summary["kernel_variant_stats"].get("accuracy", 0) * 100 + print(f"Original Problems: {orig_acc:.1f}% ({summary['original_stats']['correct']}/{summary['original_stats']['total_graded']})") + print(f"Kernel Variants: {kv_acc:.1f}% ({summary['kernel_variant_stats']['correct']}/{summary['kernel_variant_stats']['total_graded']})") + + if orig_acc > 0 and kv_acc > 0: + diff = orig_acc - kv_acc + if diff > 5: + print(f"📉 Kernel variants are {diff:.1f}% harder (as expected)") + elif diff < -5: + print(f"📈 Original problems are {-diff:.1f}% harder (unexpected)") + else: + print(f"📊 Similar difficulty (difference: {diff:.1f}%)") + + print(f"\n🎯 BY PROBLEM TYPE:") + for ptype, stats in summary["by_problem_type"].items(): + if stats["count"] > 0: + orig_acc_type = (stats["original_correct"] / stats["count"]) * 100 + kv_acc_type = (stats["kv_correct"] / stats["count"]) * 100 + grading_note = " (STRICT grading)" if ptype == "proof" else " (LENIENT grading)" + print(f"{ptype.upper()} Problems{grading_note}:") + print(f" Original: {orig_acc_type:.1f}% ({stats['original_correct']}/{stats['count']})") + print(f" Kernel Variant: {kv_acc_type:.1f}% ({stats['kv_correct']}/{stats['count']})") + if stats["count"] >= 3: # Only show difference if we have enough samples + type_diff = orig_acc_type - kv_acc_type + print(f" Difference: {type_diff:+.1f}%") + + print(f"\n🔍 DETAILED COMPARISON:") + comp = summary["comparison_stats"] + total = comp["total_compared"] + if total > 0: + print(f"Both correct: {comp['both_correct']:3d} ({comp['both_correct']/total*100:.1f}%)") + print(f"Both incorrect: {comp['both_incorrect']:3d} ({comp['both_incorrect']/total*100:.1f}%)") + print(f"Original harder: {comp['original_harder']:3d} ({comp['original_harder']/total*100:.1f}%)") + print(f"Kernel variant harder: {comp['kernel_variant_harder']:3d} ({comp['kernel_variant_harder']/total*100:.1f}%)") + + print(f"\n📏 REASONING RIGOR ANALYSIS:") + rigor = summary["rigor_analysis"] + print(f"Original avg rigor: {rigor['original_avg_rigor']:.2f}/10") + print(f"Kernel variant rigor: {rigor['kernel_variant_avg_rigor']:.2f}/10") + print(f"Difference: {rigor['rigor_difference_avg']:.2f} (positive = original more rigorous)") + + print("="*80) + +@click.command() +@click.option("-c", "--concurrency", default=16, show_default=True, + help="Maximum concurrent processing tasks") +@click.option("--max-files", default=50, show_default=True, + help="Maximum number of files to test (for quick testing)") +@click.option("--file-pattern", default="*.json", show_default=True, + help="File pattern to process") +@click.option("--output-prefix", default="comparison_test", show_default=True, + help="Prefix for output files") +@click.option("--debug", is_flag=True, help="Enable debug output") +def main(concurrency: int, max_files: int, file_pattern: str, output_prefix: str, debug: bool): + """原题 vs Kernel Variant 数学能力对比测试""" + print(f"🧪 Starting Original vs Kernel Variant Comparison Test") + print(f" Solver Model: {SOLVER_MODEL}") + print(f" Grader Model: {GRADER_MODEL}") + print(f" Max files: {max_files}") + print(f" Concurrency: {concurrency}") + + if not os.getenv("OPENAI_API_KEY"): + print("❌ OPENAI_API_KEY environment variable not set!") + return + + # 找到测试文件 + all_files = sorted(SRC_DIR.glob(file_pattern)) + if max_files > 0: + all_files = all_files[:max_files] + + print(f"📁 Testing {len(all_files)} files") + + if not all_files: + print("❌ No files found to test!") + return + + async def run_test(): + cli = AsyncOpenAI() + sem = asyncio.Semaphore(concurrency) + + async def worker(file_path: pathlib.Path): + async with sem: + return await test_single_file(file_path, cli) + + # 执行测试 + results = [] + progress_bar = tqdm.tqdm(total=len(all_files), desc="Testing", unit="file") + + tasks = [worker(f) for f in all_files] + for coro in asyncio.as_completed(tasks): + result = await coro + results.append(result) + progress_bar.update(1) + + progress_bar.close() + return results + + # 运行测试 + results = asyncio.run(run_test()) + + # 保存详细结果 + timestamp = int(time.time()) + output_name = f"{output_prefix}_{timestamp}" + asyncio.run(save_detailed_results(results, output_name)) + + # 生成并显示汇总报告 + summary = generate_summary_report(results) + print_summary_report(summary) + + # 保存汇总报告 + summary_path = RESULTS_DIR / f"{output_name}_summary.json" + try: + summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8') + print(f"💾 Summary report saved to {summary_path}") + except Exception as e: + print(f"❌ Failed to save summary: {e}") + +if __name__ == "__main__": + main() +
\ No newline at end of file diff --git a/putnam-bench-anon/scripts/health_check.py b/putnam-bench-anon/scripts/health_check.py new file mode 100644 index 0000000..65c7855 --- /dev/null +++ b/putnam-bench-anon/scripts/health_check.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +""" +Health check script for all AI providers. + +This script tests connectivity, API keys, and basic functionality for all +supported AI providers. Useful for troubleshooting and verifying setup. + +Usage: + python health_check.py # Check all providers + python health_check.py --provider openai # Check specific provider + python health_check.py --detailed # Detailed diagnostics +""" + +import asyncio +import json +import sys +import os +from pathlib import Path +import argparse +from typing import Dict, List, Any +from datetime import datetime +import platform + +# Add the loader module to the path +sys.path.append(str(Path(__file__).parent)) + +from loader import create_loader, get_supported_providers, get_default_models + + +class HealthChecker: + """Health checker for AI providers.""" + + def __init__(self, detailed: bool = False): + self.detailed = detailed + self.results = {} + + async def check_system_info(self) -> Dict[str, Any]: + """Check system information.""" + import psutil + + return { + 'python_version': platform.python_version(), + 'platform': platform.platform(), + 'cpu_count': psutil.cpu_count(), + 'memory_total_gb': round(psutil.virtual_memory().total / (1024**3), 2), + 'memory_available_gb': round(psutil.virtual_memory().available / (1024**3), 2), + 'disk_free_gb': round(psutil.disk_usage('.').free / (1024**3), 2), + 'timestamp': datetime.now().isoformat() + } + + async def check_environment_variables(self) -> Dict[str, Any]: + """Check required environment variables.""" + env_vars = { + 'OPENAI_API_KEY': os.getenv('OPENAI_API_KEY'), + 'ANTHROPIC_API_KEY': os.getenv('ANTHROPIC_API_KEY'), + 'GOOGLE_API_KEY': os.getenv('GOOGLE_API_KEY'), + } + + return { + var: { + 'set': bool(value), + 'length': len(value) if value else 0, + 'preview': value[:8] + '...' if value and len(value) > 8 else value + } + for var, value in env_vars.items() + } + + async def check_dependencies(self) -> Dict[str, Any]: + """Check required Python packages.""" + dependencies = { + 'openai': 'OpenAI API client', + 'anthropic': 'Anthropic API client', + 'google-generativeai': 'Google Gemini API client', + 'transformers': 'HuggingFace transformers', + 'torch': 'PyTorch for local models', + 'vllm': 'VLLM for local serving', + 'psutil': 'System monitoring' + } + + results = {} + for package, description in dependencies.items(): + try: + if package == 'google-generativeai': + import google.generativeai + version = getattr(google.generativeai, '__version__', 'unknown') + else: + module = __import__(package) + version = getattr(module, '__version__', 'unknown') + + results[package] = { + 'installed': True, + 'version': version, + 'description': description + } + except ImportError: + results[package] = { + 'installed': False, + 'version': None, + 'description': description + } + + return results + + async def check_provider(self, provider: str) -> Dict[str, Any]: + """Check a specific AI provider.""" + print(f"🔍 Checking {provider}...") + + result = { + 'provider': provider, + 'available': False, + 'health_check_passed': False, + 'error': None, + 'response_time': None, + 'models': {}, + 'cost_estimation': None + } + + try: + # Get default models + default_models = get_default_models(provider) + result['models']['defaults'] = default_models + + # Provider-specific configuration + loader_kwargs = {} + if provider == 'vllm': + loader_kwargs['base_url'] = 'http://localhost:8000/v1' + elif provider == 'huggingface': + loader_kwargs['device'] = 'cpu' # Use CPU for testing + # Use smaller models for testing + loader_kwargs['solver_model'] = 'microsoft/DialoGPT-small' + loader_kwargs['grader_model'] = 'microsoft/DialoGPT-small' + + # Create loader + start_time = asyncio.get_event_loop().time() + loader = create_loader(provider, **loader_kwargs) + creation_time = asyncio.get_event_loop().time() - start_time + + result['available'] = True + result['creation_time'] = creation_time + + # Get model info + model_info = loader.get_model_info() + result['models']['configured'] = model_info + + # Health check + health_start = asyncio.get_event_loop().time() + health_passed = await asyncio.wait_for(loader.health_check(), timeout=60) + health_time = asyncio.get_event_loop().time() - health_start + + result['health_check_passed'] = health_passed + result['response_time'] = health_time + + if health_passed: + # Cost estimation + try: + cost_info = await loader.estimate_cost(10) + result['cost_estimation'] = cost_info + except Exception as e: + result['cost_estimation_error'] = str(e) + + # Try to list models if available + if hasattr(loader, 'list_models'): + try: + available_models = await loader.list_models() + result['models']['available'] = available_models[:10] # Limit output + except Exception as e: + result['models']['list_error'] = str(e) + + except asyncio.TimeoutError: + result['error'] = 'Health check timed out' + except Exception as e: + result['error'] = str(e) + + return result + + async def check_all_providers(self, specific_provider: str = None) -> Dict[str, Any]: + """Check all providers or a specific one.""" + providers = [specific_provider] if specific_provider else get_supported_providers() + + print("🏥 AI Provider Health Check") + print("=" * 50) + + # System information + if self.detailed: + print("📊 System Information:") + system_info = await self.check_system_info() + for key, value in system_info.items(): + print(f" {key}: {value}") + print() + + # Environment variables + print("🔧 Environment Variables:") + env_info = await self.check_environment_variables() + for var, info in env_info.items(): + status = "✅" if info['set'] else "❌" + print(f" {status} {var}: {'Set' if info['set'] else 'Not set'}") + print() + + # Dependencies + print("📦 Dependencies:") + dep_info = await self.check_dependencies() + for package, info in dep_info.items(): + status = "✅" if info['installed'] else "❌" + version = f" (v{info['version']})" if info['installed'] and info['version'] != 'unknown' else "" + print(f" {status} {package}{version}") + print() + + # Provider checks + print("🤖 Provider Health Checks:") + provider_results = {} + + for provider in providers: + provider_result = await self.check_provider(provider) + provider_results[provider] = provider_result + + # Print summary + if provider_result['available']: + if provider_result['health_check_passed']: + status = "✅" + details = f"({provider_result['response_time']:.2f}s)" + else: + status = "⚠️" + details = "(Health check failed)" + else: + status = "❌" + details = f"({provider_result['error']})" + + print(f" {status} {provider.upper()}: {details}") + + print() + + # Summary + total_providers = len(providers) + healthy_providers = sum(1 for r in provider_results.values() + if r['available'] and r['health_check_passed']) + + print("📋 Summary:") + print(f" Total providers checked: {total_providers}") + print(f" Healthy providers: {healthy_providers}") + print(f" Success rate: {healthy_providers/total_providers*100:.1f}%") + + # Detailed results + final_results = { + 'timestamp': datetime.now().isoformat(), + 'summary': { + 'total_providers': total_providers, + 'healthy_providers': healthy_providers, + 'success_rate': healthy_providers/total_providers*100 + }, + 'environment': env_info, + 'dependencies': dep_info, + 'providers': provider_results + } + + if self.detailed: + final_results['system'] = system_info + + return final_results + + async def run_diagnostics(self, provider: str) -> Dict[str, Any]: + """Run detailed diagnostics for a specific provider.""" + print(f"🔧 Running detailed diagnostics for {provider}...") + + result = await self.check_provider(provider) + + # Additional detailed checks + if result['available'] and result['health_check_passed']: + print(f"✅ {provider} is healthy!") + + # Test with a simple problem + print("🧪 Testing with a simple math problem...") + try: + loader_kwargs = {} + if provider == 'vllm': + loader_kwargs['base_url'] = 'http://localhost:8000/v1' + elif provider == 'huggingface': + loader_kwargs['device'] = 'cpu' + loader_kwargs['solver_model'] = 'microsoft/DialoGPT-small' + loader_kwargs['grader_model'] = 'microsoft/DialoGPT-small' + + loader = create_loader(provider, **loader_kwargs) + + # Simple test problem + test_problem = { + 'original': { + 'problem_statement': 'What is 2 + 2?', + 'solution': 'The answer is 4.', + 'problem_type': 'calculation' + } + } + + start_time = asyncio.get_event_loop().time() + test_result = await asyncio.wait_for( + loader.test_single_problem(test_problem, variant_type='original'), + timeout=120 + ) + test_time = asyncio.get_event_loop().time() - start_time + + result['test_problem'] = { + 'success': True, + 'time': test_time, + 'grade': 10 if test_result.get('correct', False) else 0, + 'solution_length': len(test_result.get('solve', {}).get('solution', '')) + } + print(f" ✅ Test completed in {test_time:.2f}s") + print(f" 📊 Grade: {10 if test_result.get('correct', False) else 0} ({'CORRECT' if test_result.get('correct', False) else 'INCORRECT'})") + + except asyncio.TimeoutError: + result['test_problem'] = {'success': False, 'error': 'Test timed out'} + print(" ⚠️ Test problem timed out") + except Exception as e: + result['test_problem'] = {'success': False, 'error': str(e)} + print(f" ❌ Test problem failed: {str(e)}") + + return result + + +async def main(): + """Main function.""" + parser = argparse.ArgumentParser(description="Health check for AI providers") + parser.add_argument("--provider", choices=get_supported_providers(), + help="Check specific provider only") + parser.add_argument("--detailed", action="store_true", + help="Show detailed system information") + parser.add_argument("--diagnostics", action="store_true", + help="Run detailed diagnostics (requires --provider)") + parser.add_argument("--output", type=Path, + help="Save results to JSON file") + parser.add_argument("--quiet", action="store_true", + help="Suppress output, save to file only") + + args = parser.parse_args() + + if args.diagnostics and not args.provider: + print("❌ Error: --diagnostics requires --provider") + return 1 + + # Redirect output if quiet + if args.quiet: + import io + sys.stdout = io.StringIO() + + checker = HealthChecker(detailed=args.detailed) + + try: + if args.diagnostics: + results = await checker.run_diagnostics(args.provider) + else: + results = await checker.check_all_providers(args.provider) + + # Save to file if requested + if args.output: + args.output.parent.mkdir(parents=True, exist_ok=True) + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(results, f, indent=2, ensure_ascii=False) + + if not args.quiet: + print(f"\n💾 Results saved to {args.output}") + + # Print JSON if quiet mode + if args.quiet: + sys.stdout = sys.__stdout__ + print(json.dumps(results, indent=2)) + + return 0 + + except KeyboardInterrupt: + print("\n⏸️ Health check interrupted by user") + return 1 + except Exception as e: + print(f"\n❌ Health check failed: {str(e)}") + return 1 + + +if __name__ == "__main__": + exit(asyncio.run(main()))
\ No newline at end of file diff --git a/putnam-bench-anon/scripts/regrade.py b/putnam-bench-anon/scripts/regrade.py new file mode 100644 index 0000000..ffc177e --- /dev/null +++ b/putnam-bench-anon/scripts/regrade.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +""" +Re-grade an existing results JSON file using a (possibly different) grader model. + +The script loads a results file produced by `batch_evaluate.py` (or a compatible +JSON list) and re-grades every problem using the specified grader. No solving +is performed – instead we reuse the previously generated solutions stored in +`solve.solution`. + +Example usage +------------- +python regrade.py \ + --results-file results/o3/o3_original.json \ + --dataset-dir dataset/ \ + --provider openai \ + --grader-model o3 \ + --max-concurrent 5 \ + --output results/regraded_o3_original.json + +""" + +import argparse +import asyncio +import json +import sys +import time +from pathlib import Path +from typing import Any, Dict, List +from datetime import datetime +import logging + +# Determine directories +SCRIPT_DIR = Path(__file__).resolve().parent +PROJECT_ROOT = SCRIPT_DIR.parent # one level up + +# Add both the script dir and project root to PYTHONPATH to locate 'loader' +sys.path.append(str(SCRIPT_DIR)) +sys.path.append(str(PROJECT_ROOT)) + +from loader import create_loader # type: ignore + +try: + from tqdm import tqdm # type: ignore + HAS_TQDM = True +except ImportError: # pragma: no cover + HAS_TQDM = False + + class tqdm: # type: ignore + """Minimal fallback if tqdm is not available.""" + + def __init__(self, total=None, desc=None, **kwargs): + self.total = total + self.n = 0 + self.desc = desc or "" + print(f"{self.desc}: starting …") + + def update(self, n=1): + self.n += n + if self.total: + pct = self.n / self.total * 100 + print(f"{self.desc}: {self.n}/{self.total} ({pct:.1f}%)", end="\r") + + def set_postfix(self, _): + pass + + def close(self): + print() # newline + + +############################################################################### +# Helper functions +############################################################################### + + +def load_dataset(dataset_dir: Path) -> Dict[str, Dict[str, Any]]: + """Read every JSON file in *dataset_dir* and return a mapping index → data.""" + dataset: Dict[str, Dict[str, Any]] = {} + for json_file in dataset_dir.glob("*.json"): + try: + with open(json_file, "r", encoding="utf-8") as fh: + data = json.load(fh) + idx = data.get("index") + if idx: + dataset[idx] = data + except Exception as exc: # pragma: no cover – best-effort ingest + logging.warning("Failed to load %s: %s", json_file, exc) + return dataset + + +async def regrade_problem(loader, # type: ignore[valid-type] + problem_record: Dict[str, Any], + dataset_entry: Dict[str, Any], + variant_type: str) -> Dict[str, Any]: + """Re-grade one problem and return a new result dict.""" + + idx = problem_record.get("index", "unknown") + problem_type = dataset_entry.get("problem_type", "proof") + + # Extract question & reference solution according to variant + if variant_type == "original": + question = str(dataset_entry.get("question", "")).strip() + reference_solution = str(dataset_entry.get("solution", "")).strip() + else: + variant = dataset_entry.get("variants", {}).get(variant_type, {}) + question = str(variant.get("question", "")).strip() + reference_solution = str(variant.get("solution", "")).strip() + + if not question or not reference_solution: + return { + "index": idx, + "status": "skipped", + "reason": "missing_fields", + } + + # Previously generated solution + student_solution = str(problem_record.get("solve", {}).get("solution", "")).strip() + final_answer = str(problem_record.get("solve", {}).get("final_answer", "")).strip() + + # Grade the solution (temperature hard-coded inside create_loader for o-series) + grade_result, _raw = await loader.grade_solution( + question, + student_solution, + reference_solution, + problem_type, + ) + + # Build merged record retaining original fields + new grade + new_record = { + "index": idx, + "variant_type": variant_type, + "problem_type": problem_type, + "solve": { + "solution": student_solution, + "final_answer": final_answer, + }, + "grade": grade_result or {"status": "failed"}, + } + + # Convenience shortcut for correctness + new_record["correct"] = new_record["grade"].get("grade") == "CORRECT" + return new_record + + +############################################################################### +# Main orchestration +############################################################################### + + +async def main() -> None: # noqa: C901 – single entry-point + parser = argparse.ArgumentParser(description="Re-grade an existing results file") + parser.add_argument("--results-file", required=True, type=Path, help="Path to existing results JSON") + parser.add_argument("--dataset-dir", required=True, type=Path, help="Directory containing dataset JSON files") + parser.add_argument("--provider", default="openai", help="Grader provider (default: openai)") + parser.add_argument("--grader-model", default="o3", help="Grader model name (default: o3)") + parser.add_argument("--max-concurrent", type=int, default=3, help="Max concurrent API calls") + parser.add_argument("--variant-type", default="original", help="Problem variant used in results file") + parser.add_argument("--output", type=Path, help="Where to write re-graded results (JSON)") + parser.add_argument("--quick", action="store_true", help="Quick mode – single retry, shorter timeouts") + parser.add_argument("--debug", action="store_true", help="Verbose JSON-parsing debug") + + args = parser.parse_args() + + # Configure logging early + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + + if not args.results_file.exists(): + logging.error("Results file %s does not exist", args.results_file) + sys.exit(1) + + if not args.dataset_dir.exists(): + logging.error("Dataset directory %s does not exist", args.dataset_dir) + sys.exit(1) + + # Load dataset into memory once + logging.info("Loading dataset from %s", args.dataset_dir) + dataset_map = load_dataset(args.dataset_dir) + logging.info("Loaded %d dataset entries", len(dataset_map)) + + # Load results JSON (support two formats: {'problems':[...]} or simple list) + with open(args.results_file, "r", encoding="utf-8") as fh: + raw_data = json.load(fh) + + if isinstance(raw_data, dict) and "problems" in raw_data: + original_problems: List[Dict[str, Any]] = raw_data["problems"] # type: ignore[assignment] + elif isinstance(raw_data, list): + original_problems = raw_data # type: ignore[assignment] + else: + logging.error("Unsupported results file structure – expected list or dict with key 'problems'.") + sys.exit(1) + + if not original_problems: + logging.warning("No problems found in results file – nothing to re-grade.") + sys.exit(0) + + # Create loader – we only need grader, but solver_model must be provided; reuse grader_model + loader = create_loader( + args.provider, + solver_model=args.grader_model, + grader_model=args.grader_model, + quick=args.quick, + debug=args.debug, + ) + + if not await loader.health_check(): + logging.error("Health check failed for provider %s", args.provider) + sys.exit(1) + + # Estimate costs (rough – assumes avg lengths; tweak as needed) + cost_info = await loader.estimate_cost(len(original_problems)) + logging.info("Estimated grading cost: $%.2f", cost_info.get("total_cost", 0)) + + # Concurrency control + semaphore = asyncio.Semaphore(args.max_concurrent) + + async def wrapper(problem_record): + idx = problem_record.get("index", "unknown") + if idx not in dataset_map: + logging.warning("Dataset entry for index %s not found – skipping", idx) + return {"index": idx, "status": "skipped", "reason": "dataset_missing"} + async with semaphore: + return await regrade_problem( + loader, + problem_record, + dataset_map[idx], + args.variant_type, + ) + + # Progress bar setup + pbar = tqdm(total=len(original_problems), desc="Re-grading") + results: List[Dict[str, Any]] = [] + + async def gather_tasks(): + for coro in asyncio.as_completed([wrapper(rec) for rec in original_problems]): + res = await coro + results.append(res) + pbar.update(1) + await gather_tasks() + pbar.close() + + # Build summary + completed = [r for r in results if r.get("grade", {}).get("status") == "success"] + grades = [r["grade"].get("grade") for r in completed] + numeric = [5.0 if g == "CORRECT" else 2.5 for g in grades] + + summary = { + "total_problems": len(results), + "completed": len(completed), + "correct": sum(1 for g in grades if g == "CORRECT"), + "incorrect": sum(1 for g in grades if g == "INCORRECT"), + "average_grade": sum(numeric) / len(numeric) if numeric else 0.0, + "provider": args.provider, + "grader_model": args.grader_model, + "variant_type": args.variant_type, + "estimated_cost": cost_info, + "timestamp": datetime.now().isoformat(), + } + + output_payload = { + "summary": summary, + "problems": results, + } + + # Determine output path + if args.output: + out_path = args.output + else: + stem = args.results_file.stem + f"_regraded_{args.grader_model}" + out_path = args.results_file.with_name(stem + args.results_file.suffix) + + with open(out_path, "w", encoding="utf-8") as fh: + json.dump(output_payload, fh, indent=2, ensure_ascii=False) + logging.info("Saved re-graded results to %s", out_path) + + # Clean up HTTP client if applicable + if hasattr(loader, "__aexit__"): + await loader.__aexit__(None, None, None) + + +if __name__ == "__main__": + asyncio.run(main())
\ No newline at end of file |
