summaryrefslogtreecommitdiff
path: root/putnam-bench-anon/scripts/batch_evaluate.py
diff options
context:
space:
mode:
Diffstat (limited to 'putnam-bench-anon/scripts/batch_evaluate.py')
-rw-r--r--putnam-bench-anon/scripts/batch_evaluate.py1211
1 files changed, 1211 insertions, 0 deletions
diff --git a/putnam-bench-anon/scripts/batch_evaluate.py b/putnam-bench-anon/scripts/batch_evaluate.py
new file mode 100644
index 0000000..6fde90b
--- /dev/null
+++ b/putnam-bench-anon/scripts/batch_evaluate.py
@@ -0,0 +1,1211 @@
+#!/usr/bin/env python3
+"""
+Batch evaluation script for processing entire datasets with multiple providers.
+
+This script efficiently processes all JSON files in the dataset directory,
+supports multiple AI providers, and generates comprehensive evaluation reports.
+
+Features:
+- Incremental saving: Results are saved after each problem completes
+- Simple resume support: Skip already completed problems based on checkpoint
+- Multi-provider support
+- Comprehensive evaluation reports
+
+Usage:
+ python batch_evaluate.py --provider openai --output results/openai_results.json
+ python batch_evaluate.py --provider anthropic --variant kernel_variant --max-concurrent 5
+
+Resume usage (simplified):
+ # Resume with same configuration
+ python batch_evaluate.py --provider openai --dataset dataset/ --resume checkpoint_file.json
+
+ # Resume with different settings (checkpoint only provides skip list)
+ python batch_evaluate.py --provider openai --dataset dataset/ --concurrent 10 --resume checkpoint_file.json
+"""
+
+import asyncio
+import json
+import sys
+import time
+from pathlib import Path
+import argparse
+from typing import List, Dict, Any
+import logging
+from datetime import datetime
+import shutil
+
+try:
+ from tqdm import tqdm
+ HAS_TQDM = True
+except ImportError:
+ HAS_TQDM = False
+ # Fallback progress bar
+ class tqdm:
+ def __init__(self, total=None, desc=None, **kwargs):
+ self.total = total
+ self.n = 0
+ self.desc = desc
+ print(f"{desc}: Starting...")
+
+ def update(self, n=1):
+ self.n += n
+ if self.total:
+ percent = (self.n / self.total) * 100
+ print(f"{self.desc}: {self.n}/{self.total} ({percent:.1f}%)", end='\r')
+
+ def set_postfix(self, postfix_dict):
+ pass
+
+ def close(self):
+ print() # New line after progress
+
+# Add the loader module to the path
+sys.path.append(str(Path(__file__).parent))
+
+from loader import create_loader, get_supported_providers
+
+
+def setup_logging(output_dir: Path):
+ """Setup logging configuration."""
+ log_file = output_dir / f"evaluation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
+
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.FileHandler(log_file),
+ logging.StreamHandler(sys.stdout)
+ ]
+ )
+
+ return logging.getLogger(__name__)
+
+
+async def load_dataset(dataset_path: Path, max_files: int = None) -> List[Dict[str, Any]]:
+ """Load all JSON files from the dataset directory."""
+ json_files = list(dataset_path.glob("*.json"))
+
+ if max_files:
+ json_files = json_files[:max_files]
+
+ problems = []
+ for json_file in json_files:
+ try:
+ with open(json_file, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ data['_source_file'] = str(json_file.name)
+ problems.append(data)
+ except Exception as e:
+ logging.warning(f"Failed to load {json_file}: {str(e)}")
+
+ return problems
+
+
+async def process_single_problem(loader, problem_data: Dict[str, Any],
+ variant_type: str, solver_model: str = None,
+ grader_model: str = None) -> Dict[str, Any]:
+ """Process a single problem and return results with metadata."""
+ start_time = time.time()
+
+ try:
+ result = await loader.test_single_problem(
+ problem_data,
+ variant_type=variant_type,
+ solver_model=solver_model,
+ grader_model=grader_model
+ )
+
+ # Add metadata
+ result['_metadata'] = {
+ 'source_file': problem_data.get('_source_file', 'unknown'),
+ 'variant_type': variant_type,
+ 'processing_time': time.time() - start_time,
+ 'timestamp': datetime.now().isoformat(),
+ 'models_used': {
+ 'solver': solver_model or loader.solver_model,
+ 'grader': grader_model or loader.grader_model
+ }
+ }
+
+ return result
+
+ except Exception as e:
+ # Return error information
+ return {
+ 'error': str(e),
+ 'final_grade': 0,
+ '_metadata': {
+ 'source_file': problem_data.get('_source_file', 'unknown'),
+ 'variant_type': variant_type,
+ 'processing_time': time.time() - start_time,
+ 'timestamp': datetime.now().isoformat(),
+ 'error': True
+ }
+ }
+
+
+async def batch_evaluate(dataset_path: Path = None, provider: str = None, variant_type: str = "original",
+ max_concurrent: int = 3, max_files: int = None,
+ solver_model: str = None, grader_model: str = None,
+ output_file: Path = None, resume_checkpoint: Path = None,
+ **loader_kwargs) -> Dict[str, Any]:
+ """
+ Batch evaluate problems using specified provider with resume support.
+
+ Args:
+ dataset_path: Path to dataset directory (required for new runs or old checkpoint format)
+ provider: AI provider name (required for new runs or old checkpoint format)
+ variant_type: Problem variant to use
+ max_concurrent: Maximum concurrent evaluations
+ max_files: Maximum number of files to process (None for all)
+ solver_model: Override solver model
+ grader_model: Override grader model
+ output_file: Output file path
+ resume_checkpoint: Path to checkpoint file to resume from
+ **loader_kwargs: Additional arguments for loader
+
+ Returns:
+ Dictionary with evaluation results and statistics
+ """
+ logger = logging.getLogger(__name__)
+
+ # Check if resuming from checkpoint
+ if resume_checkpoint and resume_checkpoint.exists():
+ logger.info(f"Resuming from checkpoint: {resume_checkpoint}")
+ with open(resume_checkpoint, 'r', encoding='utf-8') as f:
+ checkpoint_data = json.load(f)
+
+ # Simple resume: just restore completed indices and results
+ completed_indices = set(checkpoint_data.get('completed_indices', []))
+ results = checkpoint_data.get('results', [])
+ failed_indices = checkpoint_data.get('failed_indices', [])
+ successful_indices = checkpoint_data.get('successful_indices', [])
+ correct_indices = checkpoint_data.get('correct_indices', [])
+
+ # Always require dataset_path and provider from command line
+ if not dataset_path:
+ raise ValueError("dataset_path is required when resuming")
+ if not provider:
+ raise ValueError("provider is required when resuming")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ checkpoint_file = resume_checkpoint # Continue using the same checkpoint file
+ logger.info(f"Resuming with {len(completed_indices)} completed problems out of {len(problems)}")
+ else:
+ # New evaluation - validate required parameters
+ if not dataset_path:
+ raise ValueError("dataset_path is required for new evaluation")
+ if not provider:
+ raise ValueError("provider is required for new evaluation")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ # Initialize state for new run
+ completed_indices = set()
+ results = []
+ failed_indices = []
+ successful_indices = []
+ correct_indices = []
+
+ # Create checkpoint file name
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ if output_file:
+ checkpoint_file = output_file.parent / f"checkpoint_{output_file.stem}_{timestamp}.json"
+ else:
+ checkpoint_file = Path(f"checkpoint_{provider}_{variant_type}_{timestamp}.json")
+
+ # Create loader
+ logger.info(f"Creating {provider} loader")
+
+ # Include solver_model and grader_model in loader_kwargs if specified
+ if solver_model:
+ loader_kwargs['solver_model'] = solver_model
+ if grader_model:
+ loader_kwargs['grader_model'] = grader_model
+
+ loader = create_loader(provider, **loader_kwargs)
+
+ # Health check
+ logger.info("Performing health check...")
+ if not await loader.health_check():
+ raise RuntimeError(f"Health check failed for {provider}")
+
+ # Cost estimation
+ logger.info("Estimating costs...")
+ cost_info = await loader.estimate_cost(len(problems))
+ logger.info(f"Estimated cost: ${cost_info.get('total_cost', 0):.2f}")
+
+ # Progress tracking
+ remaining_problems = [p for p in problems if p.get('index', 'unknown') not in completed_indices]
+ progress_bar = tqdm(total=len(problems), desc=f"Evaluating with {provider}", initial=len(completed_indices))
+
+ # Semaphore for concurrency control
+ semaphore = asyncio.Semaphore(max_concurrent)
+
+ def save_checkpoint():
+ """Save current state to checkpoint file - simplified version"""
+ checkpoint_data = {
+ 'timestamp': datetime.now().isoformat(),
+ # Only save essential state information
+ 'completed_indices': list(completed_indices),
+ 'successful_indices': successful_indices,
+ 'failed_indices': failed_indices,
+ 'correct_indices': correct_indices,
+ 'results': results,
+ # Save minimal config for reference (not for resume)
+ 'dataset_path': str(dataset_path), # For convenience
+ 'total_problems': len(problems),
+ 'current_config': {
+ 'provider': provider,
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model
+ }
+ }
+
+ # Write to temporary file first, then move (atomic operation)
+ temp_file = checkpoint_file.with_suffix('.tmp')
+ with open(temp_file, 'w', encoding='utf-8') as f:
+ json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
+
+ # Atomic rename
+ temp_file.replace(checkpoint_file)
+
+ async def evaluate_problem(problem_data: Dict[str, Any]) -> Dict[str, Any]:
+ """Evaluate a single problem with concurrency control."""
+ problem_index = problem_data.get('index', 'unknown')
+
+ # Skip if already completed
+ if problem_index in completed_indices:
+ return None
+
+ async with semaphore:
+ try:
+ result = await loader.test_single_problem(
+ problem_data,
+ variant_type=variant_type
+ )
+
+ # Track success/failure based on technical completion, not correctness
+ if result.get('status') == 'completed':
+ successful_indices.append(result['index']) # Successfully processed
+ if result.get('correct'):
+ correct_indices.append(result['index']) # Also correct
+ else:
+ failed_indices.append(result['index']) # Technical failure
+
+ # Add to results and mark as completed
+ results.append(result)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint immediately after each problem
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error evaluating problem {problem_index}: {e}")
+ result = {
+ 'index': problem_index,
+ 'status': 'error',
+ 'error': str(e),
+ 'error_type': type(e).__name__
+ }
+
+ # Add to results and mark as completed (even if failed)
+ results.append(result)
+ failed_indices.append(problem_index)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ # Run evaluations
+ start_time = time.time()
+
+ try:
+ # Create tasks only for remaining problems
+ tasks = [evaluate_problem(problem) for problem in remaining_problems]
+
+ if tasks:
+ # Execute all tasks concurrently (limited by semaphore)
+ await asyncio.gather(*tasks)
+ else:
+ logger.info("All problems already completed!")
+
+ except KeyboardInterrupt:
+ logger.info("Evaluation interrupted by user. Progress saved to checkpoint.")
+ logger.info(f"To resume, use: --resume {checkpoint_file}")
+ raise
+
+ finally:
+ progress_bar.close()
+
+ # Calculate statistics
+ total_time = time.time() - start_time
+ completed_results = [r for r in results if r.get('status') == 'completed']
+ grades = [r['grade']['grade'] for r in completed_results
+ if r.get('grade', {}).get('status') == 'success' and 'grade' in r.get('grade', {})]
+
+ # Calculate numeric grades (CORRECT=5, INCORRECT=2.5)
+ numeric_grades = [5.0 if g == 'CORRECT' else 2.5 for g in grades]
+ average_grade = sum(numeric_grades) / len(numeric_grades) if numeric_grades else 0.0
+
+ summary = {
+ 'total_problems': len(problems),
+ 'completed': len(completed_results),
+ 'successful': len(successful_indices), # Technical success (completed processing)
+ 'failed': len(failed_indices), # Technical failures
+ 'correct_answers': len(correct_indices), # Mathematically correct answers
+ 'incorrect_answers': len(successful_indices) - len(correct_indices), # Wrong but processed
+ 'success_rate': (len(successful_indices) / len(problems) * 100) if problems else 0, # Technical success rate
+ 'accuracy_rate': (len(correct_indices) / len(successful_indices) * 100) if successful_indices else 0, # Correctness rate
+ 'average_grade': average_grade,
+ 'total_time_seconds': total_time,
+ 'problems_per_second': len(problems) / total_time if total_time > 0 else 0,
+ 'provider': provider,
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model,
+ 'max_concurrent': max_concurrent,
+ 'estimated_cost': cost_info,
+ 'checkpoint_file': str(checkpoint_file)
+ }
+
+ # Create full results
+ full_results = {
+ 'summary': summary,
+ 'problems': results,
+ 'successful_indices': successful_indices, # Technical successes
+ 'failed_indices': failed_indices, # Technical failures
+ 'correct_indices': correct_indices, # Correct answers
+ 'timestamp': datetime.now().isoformat()
+ }
+
+ # Save final results
+ if output_file:
+ logger.info(f"Saving final results to {output_file}")
+ with open(output_file, 'w', encoding='utf-8') as f:
+ json.dump(full_results, f, indent=2, ensure_ascii=False)
+
+ # Clean up checkpoint file after successful completion
+ if checkpoint_file.exists():
+ logger.info(f"Removing checkpoint file: {checkpoint_file}")
+ checkpoint_file.unlink()
+
+ # Print summary
+ logger.info(f"\n{'='*60}")
+ logger.info("EVALUATION SUMMARY")
+ logger.info(f"{'='*60}")
+ logger.info(f"Provider: {provider}")
+ logger.info(f"Variant: {variant_type}")
+ logger.info(f"Total problems: {summary['total_problems']}")
+ logger.info(f"āœ… Successfully processed: {summary['successful']} ({summary['success_rate']:.1f}%)")
+ logger.info(f"šŸ’„ Technical failures: {summary['failed']}")
+ logger.info(f"šŸŽÆ Correct answers: {summary['correct_answers']} ({summary['accuracy_rate']:.1f}% of processed)")
+ logger.info(f"āŒ Wrong answers: {summary['incorrect_answers']}")
+ logger.info(f"Average grade: {summary['average_grade']:.2f}")
+ logger.info(f"Total time: {summary['total_time_seconds']:.1f}s")
+ logger.info(f"Speed: {summary['problems_per_second']:.2f} problems/second")
+
+ # Cleanup
+ if hasattr(loader, '__aexit__'):
+ await loader.__aexit__(None, None, None)
+
+ return full_results
+
+
+async def batch_evaluate_cross(dataset_path: Path = None,
+ solver_provider: str = None,
+ grader_provider: str = None,
+ variant_type: str = "original",
+ max_concurrent: int = 3,
+ max_files: int = None,
+ solver_model: str = None,
+ grader_model: str = None,
+ output_file: Path = None,
+ resume_checkpoint: Path = None,
+ vllm_url: str = None,
+ device: str = None,
+ quick: bool = False) -> Dict[str, Any]:
+ """
+ Batch evaluate problems using different providers for solving and grading with resume support.
+
+ Args:
+ dataset_path: Path to dataset directory (required for new runs, ignored for resume)
+ solver_provider: Provider for solving problems (required for new runs, ignored for resume)
+ grader_provider: Provider for grading (if None, uses solver_provider)
+ variant_type: Problem variant to use
+ max_concurrent: Maximum concurrent evaluations
+ max_files: Maximum number of files to process (None for all)
+ solver_model: Override solver model
+ grader_model: Override grader model
+ output_file: Output file path
+ resume_checkpoint: Path to checkpoint file to resume from
+ vllm_url: VLLM server URL if using VLLM
+ device: Device for HuggingFace models
+
+ Returns:
+ Dictionary with evaluation results and statistics
+ """
+ logger = logging.getLogger(__name__)
+
+ # Check if resuming from checkpoint
+ if resume_checkpoint and resume_checkpoint.exists():
+ logger.info(f"Resuming from checkpoint: {resume_checkpoint}")
+ with open(resume_checkpoint, 'r', encoding='utf-8') as f:
+ checkpoint_data = json.load(f)
+
+ # Simple resume: just restore completed indices and results
+ completed_indices = set(checkpoint_data.get('completed_indices', []))
+ results = checkpoint_data.get('results', [])
+ failed_indices = checkpoint_data.get('failed_indices', [])
+ successful_indices = checkpoint_data.get('successful_indices', [])
+ correct_indices = checkpoint_data.get('correct_indices', [])
+
+ # Always require providers and dataset_path from command line
+ if not dataset_path:
+ raise ValueError("dataset_path is required when resuming")
+ if not solver_provider:
+ raise ValueError("solver_provider is required when resuming")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ checkpoint_file = resume_checkpoint # Continue using the same checkpoint file
+ logger.info(f"Resuming with {len(completed_indices)} completed problems out of {len(problems)}")
+ else:
+ # New evaluation - validate required parameters
+ if not dataset_path:
+ raise ValueError("dataset_path is required for new evaluation")
+ if not solver_provider:
+ raise ValueError("solver_provider is required for new evaluation")
+
+ # Load dataset
+ logger.info(f"Loading dataset from {dataset_path}")
+ problems = await load_dataset(dataset_path, max_files)
+ logger.info(f"Loaded {len(problems)} problems")
+
+ if not problems:
+ raise ValueError("No problems found in dataset")
+
+ # Initialize state for new run
+ completed_indices = set()
+ results = []
+ failed_indices = []
+ successful_indices = []
+ correct_indices = []
+
+ # Create checkpoint file name
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ if output_file:
+ checkpoint_file = output_file.parent / f"checkpoint_{output_file.stem}_{timestamp}.json"
+ else:
+ checkpoint_file = Path(f"checkpoint_cross_{solver_provider}_{grader_provider or solver_provider}_{variant_type}_{timestamp}.json")
+
+ # Create cross-provider loader
+ logger.info(f"Creating cross-provider loader: solver={solver_provider}, grader={grader_provider or solver_provider}")
+
+ from loader import create_cross_provider_loader
+
+ # Prepare kwargs for each provider
+ loader_kwargs = {}
+
+ # VLLM settings
+ if vllm_url:
+ if solver_provider == 'vllm':
+ loader_kwargs['solver_kwargs'] = {'base_url': vllm_url}
+ if grader_provider == 'vllm':
+ loader_kwargs['grader_kwargs'] = {'base_url': vllm_url}
+
+ # HuggingFace settings
+ if device:
+ if solver_provider == 'huggingface':
+ loader_kwargs['solver_kwargs'] = {'device': device}
+ if grader_provider == 'huggingface':
+ loader_kwargs['grader_kwargs'] = {'device': device}
+
+ # Add quick mode if specified
+ if quick:
+ loader_kwargs['quick'] = True
+
+ loader = create_cross_provider_loader(
+ solver_provider=solver_provider,
+ grader_provider=grader_provider,
+ solver_model=solver_model,
+ grader_model=grader_model,
+ **loader_kwargs
+ )
+
+ # Health check
+ logger.info("Performing health check...")
+ if not await loader.health_check():
+ raise RuntimeError(f"Health check failed")
+
+ # Cost estimation
+ logger.info("Estimating costs...")
+ cost_info = await loader.estimate_cost(len(problems))
+ logger.info(f"Estimated cost: ${cost_info.get('total_cost', 0):.2f}")
+
+ # Progress tracking
+ remaining_problems = [p for p in problems if p.get('index', 'unknown') not in completed_indices]
+ progress_bar = tqdm(total=len(problems), desc=f"Evaluating (solver={solver_provider}, grader={grader_provider or solver_provider})", initial=len(completed_indices))
+
+ # Semaphore for concurrency control
+ semaphore = asyncio.Semaphore(max_concurrent)
+
+ def save_checkpoint():
+ """Save current state to checkpoint file - simplified version"""
+ checkpoint_data = {
+ 'timestamp': datetime.now().isoformat(),
+ # Only save essential state information
+ 'completed_indices': list(completed_indices),
+ 'successful_indices': successful_indices,
+ 'failed_indices': failed_indices,
+ 'correct_indices': correct_indices,
+ 'results': results,
+ # Save minimal config for reference (not for resume)
+ 'dataset_path': str(dataset_path), # For convenience
+ 'total_problems': len(problems),
+ 'current_config': {
+ 'solver_provider': solver_provider,
+ 'grader_provider': grader_provider or solver_provider,
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model
+ }
+ }
+
+ # Write to temporary file first, then move (atomic operation)
+ temp_file = checkpoint_file.with_suffix('.tmp')
+ with open(temp_file, 'w', encoding='utf-8') as f:
+ json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
+
+ # Atomic rename
+ temp_file.replace(checkpoint_file)
+
+ async def evaluate_problem(problem_data: Dict[str, Any]) -> Dict[str, Any]:
+ """Evaluate a single problem with concurrency control."""
+ problem_index = problem_data.get('index', 'unknown')
+
+ # Skip if already completed
+ if problem_index in completed_indices:
+ return None
+
+ async with semaphore:
+ try:
+ result = await loader.test_single_problem(
+ problem_data,
+ variant_type=variant_type
+ )
+
+ # Track success/failure based on technical completion, not correctness
+ if result.get('status') == 'completed':
+ successful_indices.append(result['index']) # Successfully processed
+ if result.get('correct'):
+ correct_indices.append(result['index']) # Also correct
+ else:
+ failed_indices.append(result['index']) # Technical failure
+
+ # Add to results and mark as completed
+ results.append(result)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint immediately after each problem
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ except Exception as e:
+ import traceback
+
+ # Capture full error details
+ error_details = {
+ 'error_message': str(e),
+ 'error_type': type(e).__name__,
+ 'traceback': traceback.format_exc(),
+ 'timestamp': datetime.now().isoformat(),
+ 'problem_index': problem_index,
+ 'problem_title': problem_data.get('title', 'unknown')
+ }
+
+ # Try to capture HTTP-specific details if available
+ if hasattr(e, 'response'):
+ try:
+ error_details['http_status'] = e.response.status_code
+ error_details['http_headers'] = dict(e.response.headers)
+ error_details['http_response_text'] = e.response.text
+ except:
+ pass
+
+ # Try to capture request details if available
+ if hasattr(e, 'request'):
+ try:
+ error_details['request_method'] = e.request.method
+ error_details['request_url'] = e.request.url
+ error_details['request_headers'] = dict(e.request.headers)
+ # Don't log request body as it might contain sensitive info
+ except:
+ pass
+
+ # Log detailed error
+ logger.error(f"DETAILED ERROR for problem {problem_index}:")
+ logger.error(f" Error Type: {error_details['error_type']}")
+ logger.error(f" Error Message: {error_details['error_message']}")
+ logger.error(f" Problem Title: {error_details['problem_title']}")
+
+ if 'http_status' in error_details:
+ logger.error(f" HTTP Status: {error_details['http_status']}")
+ logger.error(f" HTTP Response: {error_details['http_response_text'][:500]}...")
+
+ logger.error(f" Full Traceback:\n{error_details['traceback']}")
+
+ # Save to detailed error log
+ error_log_file = output_file.parent / f"detailed_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" if output_file else Path(f"detailed_errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
+
+ try:
+ # Load existing errors if file exists
+ if error_log_file.exists():
+ with open(error_log_file, 'r') as f:
+ existing_errors = json.load(f)
+ else:
+ existing_errors = []
+
+ # Add new error
+ existing_errors.append(error_details)
+
+ # Save updated errors
+ with open(error_log_file, 'w') as f:
+ json.dump(existing_errors, f, indent=2, ensure_ascii=False)
+
+ logger.info(f"Detailed error saved to {error_log_file}")
+
+ except Exception as save_error:
+ logger.error(f"Failed to save detailed error log: {save_error}")
+
+ result = {
+ 'index': problem_index,
+ 'status': 'error',
+ 'error': str(e),
+ 'error_type': type(e).__name__,
+ 'error_details': error_details
+ }
+
+ # Add to results and mark as completed (even if failed)
+ results.append(result)
+ failed_indices.append(problem_index)
+ completed_indices.add(problem_index)
+
+ # Save checkpoint
+ save_checkpoint()
+
+ progress_bar.update(1)
+ progress_bar.set_postfix({
+ 'success': len(successful_indices),
+ 'failed': len(failed_indices),
+ 'saved': len(completed_indices)
+ })
+
+ return result
+
+ # Run evaluations
+ start_time = time.time()
+
+ try:
+ # Create tasks only for remaining problems
+ tasks = [evaluate_problem(problem) for problem in remaining_problems]
+
+ if tasks:
+ # Execute all tasks concurrently (limited by semaphore)
+ await asyncio.gather(*tasks)
+ else:
+ logger.info("All problems already completed!")
+
+ except KeyboardInterrupt:
+ logger.info("Evaluation interrupted by user. Progress saved to checkpoint.")
+ logger.info(f"To resume, use: --resume {checkpoint_file}")
+ raise
+
+ finally:
+ progress_bar.close()
+
+ # Calculate statistics
+ total_time = time.time() - start_time
+ completed_results = [r for r in results if r.get('status') == 'completed']
+ grades = [r['grade']['grade'] for r in completed_results
+ if r.get('grade', {}).get('status') == 'success' and 'grade' in r.get('grade', {})]
+
+ # Calculate numeric grades (CORRECT=5, INCORRECT=2.5)
+ numeric_grades = [5.0 if g == 'CORRECT' else 2.5 for g in grades]
+ average_grade = sum(numeric_grades) / len(numeric_grades) if numeric_grades else 0.0
+
+ model_info = loader.get_model_info()
+
+ summary = {
+ 'total_problems': len(problems),
+ 'completed': len(completed_results),
+ 'successful': len(successful_indices), # Technical success (completed processing)
+ 'failed': len(failed_indices), # Technical failures
+ 'correct_answers': len(correct_indices), # Mathematically correct answers
+ 'incorrect_answers': len(successful_indices) - len(correct_indices), # Wrong but processed
+ 'success_rate': (len(successful_indices) / len(problems) * 100) if problems else 0, # Technical success rate
+ 'accuracy_rate': (len(correct_indices) / len(successful_indices) * 100) if successful_indices else 0, # Correctness rate
+ 'average_grade': average_grade,
+ 'total_time_seconds': total_time,
+ 'problems_per_second': len(problems) / total_time if total_time > 0 else 0,
+ 'solver_provider': model_info.get('solver_provider', solver_provider),
+ 'grader_provider': model_info.get('grader_provider', grader_provider or solver_provider),
+ 'variant_type': variant_type,
+ 'solver_model': loader.solver_model,
+ 'grader_model': loader.grader_model,
+ 'max_concurrent': max_concurrent,
+ 'estimated_cost': cost_info,
+ 'is_cross_provider': model_info.get('is_cross_provider', False)
+ }
+
+ # Create full results
+ full_results = {
+ 'summary': summary,
+ 'problems': results,
+ 'successful_indices': successful_indices, # Technical successes
+ 'failed_indices': failed_indices, # Technical failures
+ 'correct_indices': correct_indices, # Correct answers
+ 'timestamp': datetime.now().isoformat()
+ }
+
+ # Save if requested
+ if output_file:
+ logger.info(f"Saving results to {output_file}")
+ with open(output_file, 'w', encoding='utf-8') as f:
+ json.dump(full_results, f, indent=2, ensure_ascii=False)
+
+ # Print summary
+ logger.info(f"\n{'='*60}")
+ logger.info("CROSS-PROVIDER EVALUATION SUMMARY")
+ logger.info(f"{'='*60}")
+ logger.info(f"Solver Provider: {summary['solver_provider']} ({loader.solver_model})")
+ logger.info(f"Grader Provider: {summary['grader_provider']} ({loader.grader_model})")
+ logger.info(f"Variant: {variant_type}")
+ logger.info(f"Total problems: {summary['total_problems']}")
+ logger.info(f"āœ… Successfully processed: {summary['successful']} ({summary['success_rate']:.1f}%)")
+ logger.info(f"šŸ’„ Technical failures: {summary['failed']}")
+ logger.info(f"šŸŽÆ Correct answers: {summary['correct_answers']} ({summary['accuracy_rate']:.1f}% of processed)")
+ logger.info(f"āŒ Wrong answers: {summary['incorrect_answers']}")
+ logger.info(f"Average grade: {summary['average_grade']:.2f}")
+ logger.info(f"Total time: {summary['total_time_seconds']:.1f}s")
+ logger.info(f"Speed: {summary['problems_per_second']:.2f} problems/second")
+
+ # Cleanup
+ if hasattr(loader, '__aexit__'):
+ await loader.__aexit__(None, None, None)
+
+ return full_results
+
+
+async def batch_evaluate_all_variants(dataset_path: Path, provider: str,
+ variants: List[str] = None,
+ max_concurrent: int = 3, max_files: int = None,
+ solver_model: str = None, grader_model: str = None,
+ output_dir: Path = None,
+ base_url: str = None, device: str = None) -> Dict[str, Any]:
+ """
+ Batch evaluate problems across all variants using specified provider.
+
+ Args:
+ dataset_path: Path to dataset directory
+ provider: AI provider name
+ variants: List of variants to test (None for all)
+ max_concurrent: Maximum concurrent evaluations
+ max_files: Maximum number of files to process per variant (None for all)
+ solver_model: Override solver model
+ grader_model: Override grader model
+ output_dir: Output directory path
+ **loader_kwargs: Additional arguments for loader
+
+ Returns:
+ Dictionary with all variant results and comparative analysis
+ """
+ if variants is None:
+ variants = ["original", "descriptive_long", "descriptive_long_confusing",
+ "descriptive_long_misleading", "garbled_string", "kernel_variant"]
+
+ if output_dir is None:
+ output_dir = Path("results")
+
+ logger = logging.getLogger(__name__)
+
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ config_name = f"{provider}"
+ if solver_model:
+ config_name += f"_{solver_model.replace('/', '_').replace('-', '_')}"
+
+ # Create configuration-specific output directory
+ config_output_dir = output_dir / f"{config_name}_{timestamp}"
+ config_output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Prepare loader kwargs based on provider
+ loader_kwargs = {}
+ if provider == 'vllm' and base_url:
+ loader_kwargs['base_url'] = base_url
+ elif provider == 'huggingface' and device:
+ loader_kwargs['device'] = device
+
+ logger.info(f"šŸš€ Starting multi-variant test for {config_name}")
+ logger.info(f"šŸ“Š Testing {len(variants)} variants with up to {max_files or 'ALL'} files each")
+
+ overall_start_time = time.time()
+ variant_results = {}
+
+ # Create overall progress bar for variants if tqdm is available
+ if HAS_TQDM:
+ variant_progress = tqdm.tqdm(total=len(variants), desc="Variants",
+ unit="variant", position=1, leave=True)
+
+ for i, variant in enumerate(variants):
+ logger.info(f"\nšŸ“ [{i+1}/{len(variants)}] Testing variant: {variant}")
+ variant_start_time = time.time()
+
+ # Output file for this variant
+ variant_output_file = config_output_dir / f"{variant}_{timestamp}.json"
+
+ try:
+ # Run batch evaluation for this variant
+ result = await batch_evaluate(
+ dataset_path=dataset_path,
+ provider=provider,
+ variant_type=variant,
+ max_concurrent=max_concurrent,
+ max_files=max_files,
+ solver_model=solver_model,
+ grader_model=grader_model,
+ output_file=variant_output_file,
+ **loader_kwargs
+ )
+
+ variant_time = time.time() - variant_start_time
+
+ # Extract key metrics
+ summary = result.get('summary', {})
+ variant_results[variant] = {
+ 'status': 'success',
+ 'output_file': str(variant_output_file),
+ 'total_problems': summary.get('total_problems', 0),
+ 'successful_evaluations': summary.get('successful', 0),
+ 'correct_evaluations': summary.get('correct_answers', 0),
+ 'incorrect_evaluations': summary.get('incorrect_answers', 0),
+ 'failed_evaluations': summary.get('failed', 0),
+ 'success_rate': summary.get('success_rate', 0),
+ 'average_grade': summary.get('average_grade', 0),
+ 'total_processing_time': summary.get('total_time_seconds', 0),
+ 'avg_time_per_problem': summary.get('problems_per_second', 0),
+ 'variant_test_time': variant_time,
+ 'grade_distribution': result.get('problems', []) # Assuming 'problems' contains all results
+ }
+
+ logger.info(f"āœ… {variant}: "
+ f"Grade {summary.get('average_grade', 0):.2f}, "
+ f"Success {summary.get('success_rate', 0):.1f}%, "
+ f"Time {variant_time/60:.1f}min")
+
+ except Exception as e:
+ variant_time = time.time() - variant_start_time
+ error_msg = str(e)
+
+ variant_results[variant] = {
+ 'status': 'failed',
+ 'error': error_msg,
+ 'variant_test_time': variant_time
+ }
+
+ logger.error(f"āŒ {variant} failed: {error_msg}")
+
+ # Update variant progress bar
+ if HAS_TQDM and 'variant_progress' in locals():
+ variant_progress.update(1)
+ successful_variants_count = len([v for v, r in variant_results.items() if r.get('status') == 'success'])
+ variant_progress.set_postfix({
+ 'Success': successful_variants_count,
+ 'Failed': len(variant_results) - successful_variants_count
+ })
+
+ # Close variant progress bar
+ if HAS_TQDM and 'variant_progress' in locals():
+ variant_progress.close()
+
+ overall_time = time.time() - overall_start_time
+
+ # Generate comprehensive summary
+ successful_variants = [v for v, r in variant_results.items() if r.get('status') == 'success']
+ failed_variants = [v for v, r in variant_results.items() if r.get('status') == 'failed']
+
+ # Calculate aggregate statistics
+ if successful_variants:
+ total_problems = sum(variant_results[v].get('total_problems', 0) for v in successful_variants)
+ total_successful = sum(variant_results[v].get('successful_evaluations', 0) for v in successful_variants)
+ total_correct = sum(variant_results[v].get('correct_evaluations', 0) for v in successful_variants)
+ total_incorrect = sum(variant_results[v].get('incorrect_evaluations', 0) for v in successful_variants)
+ total_failed = sum(variant_results[v].get('failed_evaluations', 0) for v in successful_variants)
+
+ grades = [variant_results[v].get('average_grade', 0) for v in successful_variants]
+ success_rates = [variant_results[v].get('success_rate', 0) for v in successful_variants]
+ times = [variant_results[v].get('avg_time_per_problem', 0) for v in successful_variants]
+
+ overall_avg_grade = sum(grades) / len(grades) if grades else 0
+ overall_success_rate = sum(success_rates) / len(success_rates) if success_rates else 0
+ overall_avg_time = sum(times) / len(times) if times else 0
+
+ # Find best and worst performing variants
+ best_variant = max(successful_variants, key=lambda v: variant_results[v].get('average_grade', 0))
+ worst_variant = min(successful_variants, key=lambda v: variant_results[v].get('average_grade', 0))
+
+ fastest_variant = min(successful_variants, key=lambda v: variant_results[v].get('avg_time_per_problem', float('inf')))
+ slowest_variant = max(successful_variants, key=lambda v: variant_results[v].get('avg_time_per_problem', 0))
+ else:
+ total_problems = total_successful = total_correct = total_incorrect = total_failed = 0
+ overall_avg_grade = overall_success_rate = overall_avg_time = 0
+ best_variant = worst_variant = fastest_variant = slowest_variant = None
+
+ summary_result = {
+ 'configuration': {
+ 'provider': provider,
+ 'solver_model': solver_model,
+ 'grader_model': grader_model,
+ 'base_url': base_url,
+ 'device': device,
+ 'timestamp': timestamp
+ },
+ 'test_overview': {
+ 'total_variants_tested': len(variant_results),
+ 'successful_variants': len(successful_variants),
+ 'failed_variants': len(failed_variants),
+ 'total_test_time_minutes': overall_time / 60,
+ 'variants_list': list(variant_results.keys())
+ },
+ 'aggregate_metrics': {
+ 'total_problems_across_variants': total_problems,
+ 'total_successful_evaluations': total_successful,
+ 'total_correct_evaluations': total_correct,
+ 'total_incorrect_evaluations': total_incorrect,
+ 'total_technical_failures': total_failed,
+ 'overall_average_grade': overall_avg_grade,
+ 'overall_success_rate': overall_success_rate,
+ 'overall_avg_time_per_problem': overall_avg_time
+ },
+ 'variant_comparison': {
+ 'best_performing_variant': {
+ 'variant': best_variant,
+ 'grade': variant_results.get(best_variant, {}).get('average_grade', 0) if best_variant else 0
+ },
+ 'worst_performing_variant': {
+ 'variant': worst_variant,
+ 'grade': variant_results.get(worst_variant, {}).get('average_grade', 0) if worst_variant else 0
+ },
+ 'fastest_variant': {
+ 'variant': fastest_variant,
+ 'time_per_problem': variant_results.get(fastest_variant, {}).get('avg_time_per_problem', 0) if fastest_variant else 0
+ },
+ 'slowest_variant': {
+ 'variant': slowest_variant,
+ 'time_per_problem': variant_results.get(slowest_variant, {}).get('avg_time_per_problem', 0) if slowest_variant else 0
+ }
+ },
+ 'detailed_variant_results': variant_results
+ }
+
+ # Save configuration summary
+ summary_file = config_output_dir / f"SUMMARY_{config_name}_{timestamp}.json"
+ with open(summary_file, 'w', encoding='utf-8') as f:
+ json.dump(summary_result, f, indent=2, ensure_ascii=False)
+
+ # Print summary to console
+ logger.info("\n" + "="*80)
+ logger.info("šŸ“Š MULTI-VARIANT TEST SUMMARY REPORT")
+ logger.info("="*80)
+
+ logger.info(f"šŸ¤– Provider: {provider}")
+ if solver_model:
+ logger.info(f"🧠 Solver Model: {solver_model}")
+ if grader_model:
+ logger.info(f"šŸ“ Grader Model: {grader_model}")
+
+ logger.info(f"\nšŸ“‹ Test Overview:")
+ logger.info(f" Total variants tested: {len(variant_results)}")
+ logger.info(f" Successful variants: {len(successful_variants)}")
+ logger.info(f" Failed variants: {len(failed_variants)}")
+ logger.info(f" Total test time: {overall_time/60:.1f} minutes")
+
+ if total_problems > 0:
+ logger.info(f"\nšŸ“ˆ Aggregate Performance:")
+ logger.info(f" Total problems: {total_problems}")
+ logger.info(f" Overall average grade: {overall_avg_grade:.2f}")
+ logger.info(f" Overall success rate: {overall_success_rate:.1f}%")
+ logger.info(f" Average time per problem: {overall_avg_time:.2f}s")
+
+ if best_variant:
+ logger.info(f"\nšŸ† Variant Performance:")
+ logger.info(f" Best performing: {best_variant} (Grade: {variant_results[best_variant]['average_grade']:.2f})")
+ logger.info(f" Worst performing: {worst_variant} (Grade: {variant_results[worst_variant]['average_grade']:.2f})")
+ logger.info(f" Fastest: {fastest_variant} ({variant_results[fastest_variant]['avg_time_per_problem']:.2f}s/problem)")
+ logger.info(f" Slowest: {slowest_variant} ({variant_results[slowest_variant]['avg_time_per_problem']:.2f}s/problem)")
+
+ logger.info("="*80)
+ logger.info(f"šŸ’¾ Configuration summary saved to {summary_file}")
+
+ return summary_result
+
+
+async def main():
+ """Main function."""
+ parser = argparse.ArgumentParser(description="Batch evaluate mathematical problems")
+
+ # Required arguments
+ parser.add_argument("--provider", required=True, choices=get_supported_providers(),
+ help="AI provider to use")
+
+ # Dataset options
+ parser.add_argument("--dataset", default="dataset",
+ help="Dataset directory path (default: dataset)")
+ parser.add_argument("--variant", default="original",
+ choices=["original", "descriptive_long", "descriptive_long_confusing",
+ "descriptive_long_misleading", "garbled_string", "kernel_variant"],
+ help="Problem variant to use (default: original)")
+ parser.add_argument("--all-variants", action="store_true",
+ help="Test all 6 problem variants instead of just one")
+ parser.add_argument("--variants", nargs="+",
+ choices=["original", "descriptive_long", "descriptive_long_confusing",
+ "descriptive_long_misleading", "garbled_string", "kernel_variant"],
+ help="Specific variants to test (use with --all-variants)")
+ parser.add_argument("--max-files", type=int,
+ help="Maximum number of files to process per variant (default: all)")
+
+ # Processing options
+ parser.add_argument("--max-concurrent", type=int, default=3,
+ help="Maximum concurrent evaluations (default: 3)")
+ parser.add_argument("--solver-model",
+ help="Override solver model")
+ parser.add_argument("--grader-model",
+ help="Override grader model")
+
+ # Output options
+ parser.add_argument("--output", type=Path,
+ help="Output file path (default: results/[provider]_[timestamp].json)")
+ parser.add_argument("--output-dir", type=Path, default="results",
+ help="Output directory (default: results)")
+ parser.add_argument("--resume", type=Path,
+ help="Path to checkpoint file to resume from")
+
+ # Provider-specific options
+ parser.add_argument("--base-url",
+ help="Base URL for VLLM provider")
+ parser.add_argument("--device", default="auto",
+ help="Device for HuggingFace provider (auto/cuda/cpu)")
+
+ args = parser.parse_args()
+
+ # Setup output directory and logging
+ args.output_dir.mkdir(parents=True, exist_ok=True)
+ logger = setup_logging(args.output_dir)
+
+ # Default output file if not specified
+ if not args.output:
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ args.output = args.output_dir / f"{args.provider}_{args.variant}_{timestamp}.json"
+
+ # Prepare loader kwargs based on provider
+ loader_kwargs = {}
+ if args.provider == 'vllm' and args.base_url:
+ loader_kwargs['base_url'] = args.base_url
+ elif args.provider == 'huggingface' and args.device:
+ loader_kwargs['device'] = args.device
+
+ try:
+ if args.all_variants or args.variants:
+ # Multi-variant evaluation
+ variants_to_test = args.variants if args.variants else None
+ results = await batch_evaluate_all_variants(
+ dataset_path=Path(args.dataset),
+ provider=args.provider,
+ variants=variants_to_test,
+ max_concurrent=args.max_concurrent,
+ max_files=args.max_files,
+ solver_model=args.solver_model,
+ grader_model=args.grader_model,
+ output_dir=args.output_dir,
+ base_url=args.base_url,
+ device=args.device
+ )
+
+ logger.info(f"Multi-variant evaluation completed successfully!")
+ logger.info(f"Overall average grade: {results['aggregate_metrics']['overall_average_grade']:.2f}")
+ logger.info(f"Overall success rate: {results['aggregate_metrics']['overall_success_rate']:.1f}%")
+ else:
+ # Single variant evaluation
+ results = await batch_evaluate(
+ dataset_path=Path(args.dataset),
+ provider=args.provider,
+ variant_type=args.variant,
+ max_concurrent=args.max_concurrent,
+ max_files=args.max_files,
+ solver_model=args.solver_model,
+ grader_model=args.grader_model,
+ output_file=args.output,
+ resume_checkpoint=args.resume,
+ **loader_kwargs
+ )
+
+ logger.info(f"Batch evaluation completed successfully!")
+ logger.info(f"Average grade: {results['summary']['average_grade']:.2f}")
+ logger.info(f"Success rate: {results['summary']['success_rate']:.1f}%")
+
+ except KeyboardInterrupt:
+ logger.info("Evaluation interrupted by user")
+ except Exception as e:
+ logger.error(f"Evaluation failed: {str(e)}")
+ return 1
+
+ return 0
+
+
+if __name__ == "__main__":
+ exit(asyncio.run(main())) \ No newline at end of file