summaryrefslogtreecommitdiff
path: root/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py
diff options
context:
space:
mode:
authorYuren Hao <yurenh2@illinois.edu>2026-04-08 22:06:05 -0500
committerYuren Hao <yurenh2@illinois.edu>2026-04-08 22:06:05 -0500
commit05704d0eb2fa59fe727652465b07db40bcb06c38 (patch)
tree8904aca836cf552fd1a5ae8c2174e9f91e70bbbc /putnam-bench-anon/scripts/compare_original_vs_kernel_test.py
Initial release: GAP framework
- Full pipeline: variant generation, multi-judge verification, evaluation - Loaders for OpenAI / Anthropic / Google / xAI / OpenRouter / vLLM - Framework-level mechanism analyses: paired structural overlap, repairability rescue, self-correction probe, cross-model agreement, topic x problem-type interaction - Unicode -> bare-LaTeX cleaner + audit + spot-check - Mirrors https://huggingface.co/datasets/blackhao0426/PutnamGAP
Diffstat (limited to 'putnam-bench-anon/scripts/compare_original_vs_kernel_test.py')
-rw-r--r--putnam-bench-anon/scripts/compare_original_vs_kernel_test.py630
1 files changed, 630 insertions, 0 deletions
diff --git a/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py
new file mode 100644
index 0000000..76952bd
--- /dev/null
+++ b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py
@@ -0,0 +1,630 @@
+#!/usr/bin/env python3
+"""
+原题 vs Kernel Variant 数学能力对比测试
+使用4o-mini解题,o3严格评分,比较两种题目的正确率差异
+"""
+
+import os
+import json
+import asyncio
+import pathlib
+import time
+import re
+import random
+from typing import Dict, List, Tuple, Optional
+import click
+import tqdm
+from openai import AsyncOpenAI, RateLimitError, APIError, APIConnectionError
+
+# Configuration
+SOLVER_MODEL = "gpt-4o-mini" # 用于解题的模型
+GRADER_MODEL = "o3" # 用于评分的模型
+SRC_DIR = pathlib.Path("raw/json")
+RESULTS_DIR = pathlib.Path("results/comparison_test")
+RESULTS_DIR.mkdir(parents=True, exist_ok=True)
+
+RETRIES = 4
+TIMEOUT_BASE = 600
+RESP_FMT = {"type": "json_object"}
+
+# 解题系统prompt - 4o-mini
+SOLVER_SYSTEM_PROMPT = """You are an expert mathematician solving competition-level problems.
+Provide detailed, step-by-step solutions with clear mathematical reasoning.
+
+Requirements:
+- Show all your work and intermediate steps
+- Justify each major step of your reasoning
+- Use proper mathematical notation
+- Be thorough but concise
+- State your final answer clearly
+
+Solve the problem completely and rigorously."""
+
+SOLVER_USER_TEMPLATE = """Please solve this mathematical problem:
+
+{problem_statement}
+
+Provide a complete solution with detailed reasoning. Return your response in JSON format:
+{{"solution": "your complete step-by-step solution with mathematical reasoning",
+ "final_answer": "your final answer in a clear, concise form"}}"""
+
+# 证明题严格评分系统prompt - o3
+PROOF_GRADER_SYSTEM_PROMPT = """You are an extremely strict mathematical grader evaluating competition-level PROOF problems.
+
+GRADING STANDARDS (BE VERY STRICT):
+- Mathematical rigor: Every step must be mathematically sound and justified
+- Logical flow: The reasoning must be clear, complete, and logically connected
+- Correctness: All calculations, algebraic manipulations, and conclusions must be correct
+- Completeness: The solution must address all parts of the problem fully
+- Precision: Mathematical statements must be precise and unambiguous
+
+FAILING CRITERIA (Mark as INCORRECT if ANY of these apply):
+- Any unjustified logical leap or gap in reasoning
+- Any computational error, no matter how small
+- Missing steps in critical parts of the argument
+- Imprecise or ambiguous mathematical statements
+- Incorrect final answer, even if approach is partially correct
+- Circular reasoning or logical fallacies
+- Misuse of mathematical theorems or definitions
+
+BE EXTREMELY STRICT. Competition mathematics proofs require perfect precision."""
+
+# 计算题相对宽松评分系统prompt - o3
+CALCULATION_GRADER_SYSTEM_PROMPT = """You are a mathematical grader evaluating competition-level CALCULATION problems.
+
+GRADING STANDARDS FOR CALCULATION PROBLEMS:
+- Primary focus: Is the final answer correct?
+- Secondary focus: Is the overall approach reasonable and mathematically sound?
+- Computation: Allow minor computational slips if the method is correct and final answer is right
+
+GRADING CRITERIA:
+- CORRECT: Final answer is correct AND approach is fundamentally sound
+- INCORRECT: Final answer is wrong OR approach is fundamentally flawed
+
+For calculation problems, the final numerical answer is the most important criterion.
+Minor intermediate errors are acceptable if they don't affect the final result."""
+
+PROOF_GRADER_USER_TEMPLATE = """Grade this PROOF solution with extreme strictness.
+
+PROBLEM:
+{problem_statement}
+
+STUDENT SOLUTION:
+{solution}
+
+CORRECT REFERENCE SOLUTION:
+{reference_solution}
+
+Evaluate with maximum strictness. Every logical step must be perfect. Return JSON with:
+{{"grade": "CORRECT" or "INCORRECT",
+ "detailed_feedback": "specific detailed analysis of what is right/wrong",
+ "major_issues": "list of significant mathematical errors or gaps",
+ "final_answer_correct": true or false,
+ "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed),
+ "overall_assessment": "comprehensive evaluation summary"}}"""
+
+CALCULATION_GRADER_USER_TEMPLATE = """Grade this CALCULATION solution with focus on final answer correctness.
+
+PROBLEM:
+{problem_statement}
+
+STUDENT SOLUTION:
+{solution}
+
+CORRECT REFERENCE SOLUTION:
+{reference_solution}
+
+Focus primarily on whether the final answer is correct. Return JSON with:
+{{"grade": "CORRECT" or "INCORRECT",
+ "detailed_feedback": "specific detailed analysis of what is right/wrong",
+ "major_issues": "list of significant mathematical errors or gaps",
+ "final_answer_correct": true or false,
+ "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed),
+ "overall_assessment": "comprehensive evaluation summary"}}"""
+
+JSON_RE = re.compile(r"\{[\s\S]*\}")
+
+def parse_json_response(raw: str) -> Optional[Dict]:
+ """Parse JSON from LLM response with fallback strategies."""
+ if not raw:
+ return None
+
+ try:
+ return json.loads(raw)
+ except:
+ pass
+
+ match = JSON_RE.search(raw)
+ if match:
+ try:
+ return json.loads(match.group(0))
+ except:
+ pass
+
+ try:
+ fixed = raw.replace('\\"', '"').replace('\\\\', '\\')
+ return json.loads(fixed)
+ except:
+ pass
+
+ return None
+
+def to_str(x) -> str:
+ """Convert various types to string safely."""
+ if x is None:
+ return ""
+ if isinstance(x, str):
+ return x
+ if isinstance(x, (list, tuple)):
+ return "\n".join(map(str, x))
+ return str(x)
+
+async def call_api_with_retry(cli: AsyncOpenAI, model: str, messages: List[Dict]) -> Tuple[Optional[Dict], str]:
+ """Make OpenAI API call with retry logic."""
+ raw_response = ""
+
+ for attempt in range(1, RETRIES + 1):
+ timeout = TIMEOUT_BASE * (2 ** (attempt - 1))
+ try:
+ # Set temperature based on model
+ # o3, o3-mini, and o4-mini require temperature 1.0
+ if any(model_name in model.lower() for model_name in ['o3', 'o3-mini', 'o4-mini']):
+ temperature = 1.0
+ else:
+ # Use temperature 0.0 for deterministic solving with other models
+ temperature = 0.0
+
+ response = await asyncio.wait_for(
+ cli.chat.completions.create(
+ model=model,
+ messages=messages,
+ temperature=temperature,
+ response_format=RESP_FMT,
+ ),
+ timeout=timeout,
+ )
+ raw_response = response.choices[0].message.content or ""
+ parsed = parse_json_response(raw_response)
+ if parsed:
+ return parsed, raw_response
+ raise ValueError("Failed to parse JSON response")
+
+ except RateLimitError as e:
+ print(f"🚫 RateLimitError (attempt {attempt}/{RETRIES}): {str(e)}")
+ if "insufficient_quota" in str(e):
+ print("⏳ Detected quota exhaustion - sleeping 15 minutes")
+ await asyncio.sleep(900)
+ else:
+ sleep_time = 2 ** attempt + random.random()
+ print(f" ⏰ Rate limited, sleeping {sleep_time:.1f}s")
+ await asyncio.sleep(sleep_time)
+
+ except (APIError, APIConnectionError, asyncio.TimeoutError, ValueError) as e:
+ print(f"❌ {type(e).__name__} (attempt {attempt}/{RETRIES}): {str(e)}")
+ if attempt == RETRIES:
+ return None, raw_response
+ sleep_time = 2 ** attempt + random.random()
+ print(f" ⏰ Retrying in {sleep_time:.1f}s")
+ await asyncio.sleep(sleep_time)
+
+ return None, raw_response
+
+async def solve_problem(cli: AsyncOpenAI, problem_statement: str) -> Tuple[Optional[Dict], str]:
+ """让4o-mini解题"""
+ messages = [
+ {"role": "system", "content": SOLVER_SYSTEM_PROMPT},
+ {"role": "user", "content": SOLVER_USER_TEMPLATE.format(
+ problem_statement=problem_statement
+ )}
+ ]
+ return await call_api_with_retry(cli, SOLVER_MODEL, messages)
+
+async def grade_solution(cli: AsyncOpenAI, problem_statement: str, solution: str,
+ reference_solution: str, problem_type: str = "proof") -> Tuple[Optional[Dict], str]:
+ """让o3根据题型评分 - 证明题严格,计算题注重答案"""
+ if problem_type == "calculation":
+ system_prompt = CALCULATION_GRADER_SYSTEM_PROMPT
+ user_template = CALCULATION_GRADER_USER_TEMPLATE
+ else: # Default to proof (strict grading)
+ system_prompt = PROOF_GRADER_SYSTEM_PROMPT
+ user_template = PROOF_GRADER_USER_TEMPLATE
+
+ messages = [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_template.format(
+ problem_statement=problem_statement,
+ solution=solution,
+ reference_solution=reference_solution
+ )}
+ ]
+ return await call_api_with_retry(cli, GRADER_MODEL, messages)
+
+async def test_single_file(file_path: pathlib.Path, cli: AsyncOpenAI) -> Dict:
+ """测试单个文件的原题和kernel variant"""
+ try:
+ # 加载数据
+ data = json.loads(file_path.read_text(encoding='utf-8'))
+ index = data.get("index", file_path.stem)
+
+ # 检查必要字段
+ original_question = to_str(data.get("question", "")).strip()
+ original_solution = to_str(data.get("solution", "")).strip()
+ problem_type = data.get("problem_type", "proof") # 默认为证明题,严格评分
+
+ kv = data.get("variants", {}).get("kernel_variant")
+ if not kv:
+ return {
+ "index": index,
+ "status": "skipped",
+ "reason": "no_kernel_variant"
+ }
+
+ kernel_question = to_str(kv.get("question", "")).strip()
+ kernel_solution = to_str(kv.get("solution", "")).strip()
+
+ if not all([original_question, original_solution, kernel_question, kernel_solution]):
+ return {
+ "index": index,
+ "status": "skipped",
+ "reason": "missing_fields"
+ }
+
+ print(f"🧮 Testing {index} (Type: {problem_type.upper()})")
+ start_time = time.time()
+
+ result = {
+ "index": index,
+ "status": "completed",
+ "timestamp": time.time(),
+ "problem_type": problem_type,
+ "original": {},
+ "kernel_variant": {},
+ "comparison": {}
+ }
+
+ # 1. 让4o-mini解原题
+ print(f" 📝 Solving original problem...")
+ orig_solve_result, orig_solve_raw = await solve_problem(cli, original_question)
+
+ if not orig_solve_result:
+ result["original"]["solve_status"] = "failed"
+ result["status"] = "failed"
+ return result
+
+ orig_student_solution = to_str(orig_solve_result.get("solution", "")).strip()
+ orig_final_answer = to_str(orig_solve_result.get("final_answer", "")).strip()
+
+ result["original"]["student_solution"] = orig_student_solution
+ result["original"]["student_final_answer"] = orig_final_answer
+ result["original"]["solve_status"] = "success"
+
+ # 2. 让4o-mini解kernel variant
+ print(f" 📝 Solving kernel variant...")
+ kv_solve_result, kv_solve_raw = await solve_problem(cli, kernel_question)
+
+ if not kv_solve_result:
+ result["kernel_variant"]["solve_status"] = "failed"
+ result["status"] = "failed"
+ return result
+
+ kv_student_solution = to_str(kv_solve_result.get("solution", "")).strip()
+ kv_final_answer = to_str(kv_solve_result.get("final_answer", "")).strip()
+
+ result["kernel_variant"]["student_solution"] = kv_student_solution
+ result["kernel_variant"]["student_final_answer"] = kv_final_answer
+ result["kernel_variant"]["solve_status"] = "success"
+
+ # 3. o3根据题型评分原题解答
+ grading_style = "STRICT" if problem_type == "proof" else "LENIENT"
+ print(f" 🔍 Grading original solution ({grading_style})...")
+ orig_grade_result, orig_grade_raw = await grade_solution(
+ cli, original_question, orig_student_solution, original_solution, problem_type
+ )
+
+ if not orig_grade_result:
+ result["original"]["grade_status"] = "failed"
+ else:
+ result["original"]["grade_status"] = "success"
+ result["original"]["grade"] = orig_grade_result.get("grade", "UNKNOWN")
+ result["original"]["detailed_feedback"] = orig_grade_result.get("detailed_feedback", "")
+ result["original"]["major_issues"] = orig_grade_result.get("major_issues", "")
+ result["original"]["final_answer_correct"] = orig_grade_result.get("final_answer_correct", False)
+ result["original"]["reasoning_rigor_score"] = orig_grade_result.get("reasoning_rigor_score", 0)
+ result["original"]["overall_assessment"] = orig_grade_result.get("overall_assessment", "")
+
+ # 4. o3根据题型评分kernel variant解答
+ print(f" 🔍 Grading kernel variant solution ({grading_style})...")
+ kv_grade_result, kv_grade_raw = await grade_solution(
+ cli, kernel_question, kv_student_solution, kernel_solution, problem_type
+ )
+
+ if not kv_grade_result:
+ result["kernel_variant"]["grade_status"] = "failed"
+ else:
+ result["kernel_variant"]["grade_status"] = "success"
+ result["kernel_variant"]["grade"] = kv_grade_result.get("grade", "UNKNOWN")
+ result["kernel_variant"]["detailed_feedback"] = kv_grade_result.get("detailed_feedback", "")
+ result["kernel_variant"]["major_issues"] = kv_grade_result.get("major_issues", "")
+ result["kernel_variant"]["final_answer_correct"] = kv_grade_result.get("final_answer_correct", False)
+ result["kernel_variant"]["reasoning_rigor_score"] = kv_grade_result.get("reasoning_rigor_score", 0)
+ result["kernel_variant"]["overall_assessment"] = kv_grade_result.get("overall_assessment", "")
+
+ # 5. 比较分析
+ if (result["original"]["grade_status"] == "success" and
+ result["kernel_variant"]["grade_status"] == "success"):
+
+ orig_correct = result["original"]["grade"] == "CORRECT"
+ kv_correct = result["kernel_variant"]["grade"] == "CORRECT"
+
+ result["comparison"]["original_correct"] = orig_correct
+ result["comparison"]["kernel_variant_correct"] = kv_correct
+ result["comparison"]["both_correct"] = orig_correct and kv_correct
+ result["comparison"]["both_incorrect"] = not orig_correct and not kv_correct
+ result["comparison"]["original_harder"] = not orig_correct and kv_correct # 原题更难
+ result["comparison"]["kernel_variant_harder"] = orig_correct and not kv_correct # kernel variant更难
+
+ orig_rigor = result["original"]["reasoning_rigor_score"]
+ kv_rigor = result["kernel_variant"]["reasoning_rigor_score"]
+ result["comparison"]["rigor_difference"] = orig_rigor - kv_rigor # 正数=原题推理更严谨
+
+ total_time = time.time() - start_time
+ result["processing_time"] = total_time
+
+ print(f" ✅ Completed {index} in {total_time:.1f}s")
+ if result["comparison"]:
+ orig_status = "✅" if result["comparison"]["original_correct"] else "❌"
+ kv_status = "✅" if result["comparison"]["kernel_variant_correct"] else "❌"
+ print(f" Original: {orig_status}, Kernel Variant: {kv_status}")
+
+ return result
+
+ except Exception as e:
+ return {
+ "index": index if 'index' in locals() else file_path.stem,
+ "status": "error",
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "timestamp": time.time()
+ }
+
+async def save_detailed_results(results: List[Dict], output_file: str):
+ """保存详细结果"""
+ output_path = RESULTS_DIR / f"{output_file}_detailed.json"
+ try:
+ output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding='utf-8')
+ print(f"💾 Detailed results saved to {output_path}")
+ except Exception as e:
+ print(f"❌ Failed to save detailed results: {e}")
+
+def generate_summary_report(results: List[Dict]) -> Dict:
+ """生成汇总报告"""
+ summary = {
+ "total_files": len(results),
+ "completed": 0,
+ "failed": 0,
+ "skipped": 0,
+ "by_problem_type": {
+ "proof": {"count": 0, "original_correct": 0, "kv_correct": 0},
+ "calculation": {"count": 0, "original_correct": 0, "kv_correct": 0}
+ },
+ "original_stats": {"correct": 0, "incorrect": 0, "total_graded": 0},
+ "kernel_variant_stats": {"correct": 0, "incorrect": 0, "total_graded": 0},
+ "comparison_stats": {
+ "both_correct": 0,
+ "both_incorrect": 0,
+ "original_harder": 0,
+ "kernel_variant_harder": 0,
+ "total_compared": 0
+ },
+ "rigor_analysis": {
+ "original_avg_rigor": 0,
+ "kernel_variant_avg_rigor": 0,
+ "rigor_difference_avg": 0
+ }
+ }
+
+ orig_rigor_scores = []
+ kv_rigor_scores = []
+ rigor_differences = []
+
+ for result in results:
+ if result["status"] == "completed":
+ summary["completed"] += 1
+
+ # 按题型统计
+ ptype = result.get("problem_type", "proof")
+ if ptype in summary["by_problem_type"]:
+ summary["by_problem_type"][ptype]["count"] += 1
+ if result["original"].get("grade") == "CORRECT":
+ summary["by_problem_type"][ptype]["original_correct"] += 1
+ if result["kernel_variant"].get("grade") == "CORRECT":
+ summary["by_problem_type"][ptype]["kv_correct"] += 1
+
+ # 原题统计
+ if result["original"].get("grade_status") == "success":
+ summary["original_stats"]["total_graded"] += 1
+ if result["original"]["grade"] == "CORRECT":
+ summary["original_stats"]["correct"] += 1
+ else:
+ summary["original_stats"]["incorrect"] += 1
+ orig_rigor_scores.append(result["original"]["reasoning_rigor_score"])
+
+ # kernel variant统计
+ if result["kernel_variant"].get("grade_status") == "success":
+ summary["kernel_variant_stats"]["total_graded"] += 1
+ if result["kernel_variant"]["grade"] == "CORRECT":
+ summary["kernel_variant_stats"]["correct"] += 1
+ else:
+ summary["kernel_variant_stats"]["incorrect"] += 1
+ kv_rigor_scores.append(result["kernel_variant"]["reasoning_rigor_score"])
+
+ # 比较统计
+ if result.get("comparison"):
+ summary["comparison_stats"]["total_compared"] += 1
+ comp = result["comparison"]
+ if comp["both_correct"]:
+ summary["comparison_stats"]["both_correct"] += 1
+ elif comp["both_incorrect"]:
+ summary["comparison_stats"]["both_incorrect"] += 1
+ elif comp["original_harder"]:
+ summary["comparison_stats"]["original_harder"] += 1
+ elif comp["kernel_variant_harder"]:
+ summary["comparison_stats"]["kernel_variant_harder"] += 1
+
+ rigor_differences.append(comp["rigor_difference"])
+
+ elif result["status"] == "skipped":
+ summary["skipped"] += 1
+ else:
+ summary["failed"] += 1
+
+ # 计算平均分
+ if orig_rigor_scores:
+ summary["rigor_analysis"]["original_avg_rigor"] = sum(orig_rigor_scores) / len(orig_rigor_scores)
+ if kv_rigor_scores:
+ summary["rigor_analysis"]["kernel_variant_avg_rigor"] = sum(kv_rigor_scores) / len(kv_rigor_scores)
+ if rigor_differences:
+ summary["rigor_analysis"]["rigor_difference_avg"] = sum(rigor_differences) / len(rigor_differences)
+
+ # 计算正确率
+ if summary["original_stats"]["total_graded"] > 0:
+ summary["original_stats"]["accuracy"] = summary["original_stats"]["correct"] / summary["original_stats"]["total_graded"]
+
+ if summary["kernel_variant_stats"]["total_graded"] > 0:
+ summary["kernel_variant_stats"]["accuracy"] = summary["kernel_variant_stats"]["correct"] / summary["kernel_variant_stats"]["total_graded"]
+
+ return summary
+
+def print_summary_report(summary: Dict):
+ """打印汇总报告"""
+ print("\n" + "="*80)
+ print("📊 ORIGINAL vs KERNEL VARIANT COMPARISON REPORT")
+ print("="*80)
+
+ print(f"📁 Total files: {summary['total_files']}")
+ print(f"✅ Completed: {summary['completed']}")
+ print(f"⏭️ Skipped: {summary['skipped']}")
+ print(f"❌ Failed: {summary['failed']}")
+
+ print(f"\n📈 ACCURACY COMPARISON:")
+ orig_acc = summary["original_stats"].get("accuracy", 0) * 100
+ kv_acc = summary["kernel_variant_stats"].get("accuracy", 0) * 100
+ print(f"Original Problems: {orig_acc:.1f}% ({summary['original_stats']['correct']}/{summary['original_stats']['total_graded']})")
+ print(f"Kernel Variants: {kv_acc:.1f}% ({summary['kernel_variant_stats']['correct']}/{summary['kernel_variant_stats']['total_graded']})")
+
+ if orig_acc > 0 and kv_acc > 0:
+ diff = orig_acc - kv_acc
+ if diff > 5:
+ print(f"📉 Kernel variants are {diff:.1f}% harder (as expected)")
+ elif diff < -5:
+ print(f"📈 Original problems are {-diff:.1f}% harder (unexpected)")
+ else:
+ print(f"📊 Similar difficulty (difference: {diff:.1f}%)")
+
+ print(f"\n🎯 BY PROBLEM TYPE:")
+ for ptype, stats in summary["by_problem_type"].items():
+ if stats["count"] > 0:
+ orig_acc_type = (stats["original_correct"] / stats["count"]) * 100
+ kv_acc_type = (stats["kv_correct"] / stats["count"]) * 100
+ grading_note = " (STRICT grading)" if ptype == "proof" else " (LENIENT grading)"
+ print(f"{ptype.upper()} Problems{grading_note}:")
+ print(f" Original: {orig_acc_type:.1f}% ({stats['original_correct']}/{stats['count']})")
+ print(f" Kernel Variant: {kv_acc_type:.1f}% ({stats['kv_correct']}/{stats['count']})")
+ if stats["count"] >= 3: # Only show difference if we have enough samples
+ type_diff = orig_acc_type - kv_acc_type
+ print(f" Difference: {type_diff:+.1f}%")
+
+ print(f"\n🔍 DETAILED COMPARISON:")
+ comp = summary["comparison_stats"]
+ total = comp["total_compared"]
+ if total > 0:
+ print(f"Both correct: {comp['both_correct']:3d} ({comp['both_correct']/total*100:.1f}%)")
+ print(f"Both incorrect: {comp['both_incorrect']:3d} ({comp['both_incorrect']/total*100:.1f}%)")
+ print(f"Original harder: {comp['original_harder']:3d} ({comp['original_harder']/total*100:.1f}%)")
+ print(f"Kernel variant harder: {comp['kernel_variant_harder']:3d} ({comp['kernel_variant_harder']/total*100:.1f}%)")
+
+ print(f"\n📏 REASONING RIGOR ANALYSIS:")
+ rigor = summary["rigor_analysis"]
+ print(f"Original avg rigor: {rigor['original_avg_rigor']:.2f}/10")
+ print(f"Kernel variant rigor: {rigor['kernel_variant_avg_rigor']:.2f}/10")
+ print(f"Difference: {rigor['rigor_difference_avg']:.2f} (positive = original more rigorous)")
+
+ print("="*80)
+
+@click.command()
+@click.option("-c", "--concurrency", default=16, show_default=True,
+ help="Maximum concurrent processing tasks")
+@click.option("--max-files", default=50, show_default=True,
+ help="Maximum number of files to test (for quick testing)")
+@click.option("--file-pattern", default="*.json", show_default=True,
+ help="File pattern to process")
+@click.option("--output-prefix", default="comparison_test", show_default=True,
+ help="Prefix for output files")
+@click.option("--debug", is_flag=True, help="Enable debug output")
+def main(concurrency: int, max_files: int, file_pattern: str, output_prefix: str, debug: bool):
+ """原题 vs Kernel Variant 数学能力对比测试"""
+ print(f"🧪 Starting Original vs Kernel Variant Comparison Test")
+ print(f" Solver Model: {SOLVER_MODEL}")
+ print(f" Grader Model: {GRADER_MODEL}")
+ print(f" Max files: {max_files}")
+ print(f" Concurrency: {concurrency}")
+
+ if not os.getenv("OPENAI_API_KEY"):
+ print("❌ OPENAI_API_KEY environment variable not set!")
+ return
+
+ # 找到测试文件
+ all_files = sorted(SRC_DIR.glob(file_pattern))
+ if max_files > 0:
+ all_files = all_files[:max_files]
+
+ print(f"📁 Testing {len(all_files)} files")
+
+ if not all_files:
+ print("❌ No files found to test!")
+ return
+
+ async def run_test():
+ cli = AsyncOpenAI()
+ sem = asyncio.Semaphore(concurrency)
+
+ async def worker(file_path: pathlib.Path):
+ async with sem:
+ return await test_single_file(file_path, cli)
+
+ # 执行测试
+ results = []
+ progress_bar = tqdm.tqdm(total=len(all_files), desc="Testing", unit="file")
+
+ tasks = [worker(f) for f in all_files]
+ for coro in asyncio.as_completed(tasks):
+ result = await coro
+ results.append(result)
+ progress_bar.update(1)
+
+ progress_bar.close()
+ return results
+
+ # 运行测试
+ results = asyncio.run(run_test())
+
+ # 保存详细结果
+ timestamp = int(time.time())
+ output_name = f"{output_prefix}_{timestamp}"
+ asyncio.run(save_detailed_results(results, output_name))
+
+ # 生成并显示汇总报告
+ summary = generate_summary_report(results)
+ print_summary_report(summary)
+
+ # 保存汇总报告
+ summary_path = RESULTS_DIR / f"{output_name}_summary.json"
+ try:
+ summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
+ print(f"💾 Summary report saved to {summary_path}")
+ except Exception as e:
+ print(f"❌ Failed to save summary: {e}")
+
+if __name__ == "__main__":
+ main()
+ \ No newline at end of file