diff options
| author | Yuren Hao <yurenh2@illinois.edu> | 2026-04-08 22:06:05 -0500 |
|---|---|---|
| committer | Yuren Hao <yurenh2@illinois.edu> | 2026-04-08 22:06:05 -0500 |
| commit | 05704d0eb2fa59fe727652465b07db40bcb06c38 (patch) | |
| tree | 8904aca836cf552fd1a5ae8c2174e9f91e70bbbc /putnam-bench-anon/scripts/compare_original_vs_kernel_test.py | |
Initial release: GAP framework
- Full pipeline: variant generation, multi-judge verification, evaluation
- Loaders for OpenAI / Anthropic / Google / xAI / OpenRouter / vLLM
- Framework-level mechanism analyses: paired structural overlap, repairability rescue, self-correction probe, cross-model agreement, topic x problem-type interaction
- Unicode -> bare-LaTeX cleaner + audit + spot-check
- Mirrors https://huggingface.co/datasets/blackhao0426/PutnamGAP
Diffstat (limited to 'putnam-bench-anon/scripts/compare_original_vs_kernel_test.py')
| -rw-r--r-- | putnam-bench-anon/scripts/compare_original_vs_kernel_test.py | 630 |
1 files changed, 630 insertions, 0 deletions
diff --git a/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py new file mode 100644 index 0000000..76952bd --- /dev/null +++ b/putnam-bench-anon/scripts/compare_original_vs_kernel_test.py @@ -0,0 +1,630 @@ +#!/usr/bin/env python3 +""" +原题 vs Kernel Variant 数学能力对比测试 +使用4o-mini解题,o3严格评分,比较两种题目的正确率差异 +""" + +import os +import json +import asyncio +import pathlib +import time +import re +import random +from typing import Dict, List, Tuple, Optional +import click +import tqdm +from openai import AsyncOpenAI, RateLimitError, APIError, APIConnectionError + +# Configuration +SOLVER_MODEL = "gpt-4o-mini" # 用于解题的模型 +GRADER_MODEL = "o3" # 用于评分的模型 +SRC_DIR = pathlib.Path("raw/json") +RESULTS_DIR = pathlib.Path("results/comparison_test") +RESULTS_DIR.mkdir(parents=True, exist_ok=True) + +RETRIES = 4 +TIMEOUT_BASE = 600 +RESP_FMT = {"type": "json_object"} + +# 解题系统prompt - 4o-mini +SOLVER_SYSTEM_PROMPT = """You are an expert mathematician solving competition-level problems. +Provide detailed, step-by-step solutions with clear mathematical reasoning. + +Requirements: +- Show all your work and intermediate steps +- Justify each major step of your reasoning +- Use proper mathematical notation +- Be thorough but concise +- State your final answer clearly + +Solve the problem completely and rigorously.""" + +SOLVER_USER_TEMPLATE = """Please solve this mathematical problem: + +{problem_statement} + +Provide a complete solution with detailed reasoning. Return your response in JSON format: +{{"solution": "your complete step-by-step solution with mathematical reasoning", + "final_answer": "your final answer in a clear, concise form"}}""" + +# 证明题严格评分系统prompt - o3 +PROOF_GRADER_SYSTEM_PROMPT = """You are an extremely strict mathematical grader evaluating competition-level PROOF problems. + +GRADING STANDARDS (BE VERY STRICT): +- Mathematical rigor: Every step must be mathematically sound and justified +- Logical flow: The reasoning must be clear, complete, and logically connected +- Correctness: All calculations, algebraic manipulations, and conclusions must be correct +- Completeness: The solution must address all parts of the problem fully +- Precision: Mathematical statements must be precise and unambiguous + +FAILING CRITERIA (Mark as INCORRECT if ANY of these apply): +- Any unjustified logical leap or gap in reasoning +- Any computational error, no matter how small +- Missing steps in critical parts of the argument +- Imprecise or ambiguous mathematical statements +- Incorrect final answer, even if approach is partially correct +- Circular reasoning or logical fallacies +- Misuse of mathematical theorems or definitions + +BE EXTREMELY STRICT. Competition mathematics proofs require perfect precision.""" + +# 计算题相对宽松评分系统prompt - o3 +CALCULATION_GRADER_SYSTEM_PROMPT = """You are a mathematical grader evaluating competition-level CALCULATION problems. + +GRADING STANDARDS FOR CALCULATION PROBLEMS: +- Primary focus: Is the final answer correct? +- Secondary focus: Is the overall approach reasonable and mathematically sound? +- Computation: Allow minor computational slips if the method is correct and final answer is right + +GRADING CRITERIA: +- CORRECT: Final answer is correct AND approach is fundamentally sound +- INCORRECT: Final answer is wrong OR approach is fundamentally flawed + +For calculation problems, the final numerical answer is the most important criterion. +Minor intermediate errors are acceptable if they don't affect the final result.""" + +PROOF_GRADER_USER_TEMPLATE = """Grade this PROOF solution with extreme strictness. + +PROBLEM: +{problem_statement} + +STUDENT SOLUTION: +{solution} + +CORRECT REFERENCE SOLUTION: +{reference_solution} + +Evaluate with maximum strictness. Every logical step must be perfect. Return JSON with: +{{"grade": "CORRECT" or "INCORRECT", + "detailed_feedback": "specific detailed analysis of what is right/wrong", + "major_issues": "list of significant mathematical errors or gaps", + "final_answer_correct": true or false, + "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed), + "overall_assessment": "comprehensive evaluation summary"}}""" + +CALCULATION_GRADER_USER_TEMPLATE = """Grade this CALCULATION solution with focus on final answer correctness. + +PROBLEM: +{problem_statement} + +STUDENT SOLUTION: +{solution} + +CORRECT REFERENCE SOLUTION: +{reference_solution} + +Focus primarily on whether the final answer is correct. Return JSON with: +{{"grade": "CORRECT" or "INCORRECT", + "detailed_feedback": "specific detailed analysis of what is right/wrong", + "major_issues": "list of significant mathematical errors or gaps", + "final_answer_correct": true or false, + "reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed), + "overall_assessment": "comprehensive evaluation summary"}}""" + +JSON_RE = re.compile(r"\{[\s\S]*\}") + +def parse_json_response(raw: str) -> Optional[Dict]: + """Parse JSON from LLM response with fallback strategies.""" + if not raw: + return None + + try: + return json.loads(raw) + except: + pass + + match = JSON_RE.search(raw) + if match: + try: + return json.loads(match.group(0)) + except: + pass + + try: + fixed = raw.replace('\\"', '"').replace('\\\\', '\\') + return json.loads(fixed) + except: + pass + + return None + +def to_str(x) -> str: + """Convert various types to string safely.""" + if x is None: + return "" + if isinstance(x, str): + return x + if isinstance(x, (list, tuple)): + return "\n".join(map(str, x)) + return str(x) + +async def call_api_with_retry(cli: AsyncOpenAI, model: str, messages: List[Dict]) -> Tuple[Optional[Dict], str]: + """Make OpenAI API call with retry logic.""" + raw_response = "" + + for attempt in range(1, RETRIES + 1): + timeout = TIMEOUT_BASE * (2 ** (attempt - 1)) + try: + # Set temperature based on model + # o3, o3-mini, and o4-mini require temperature 1.0 + if any(model_name in model.lower() for model_name in ['o3', 'o3-mini', 'o4-mini']): + temperature = 1.0 + else: + # Use temperature 0.0 for deterministic solving with other models + temperature = 0.0 + + response = await asyncio.wait_for( + cli.chat.completions.create( + model=model, + messages=messages, + temperature=temperature, + response_format=RESP_FMT, + ), + timeout=timeout, + ) + raw_response = response.choices[0].message.content or "" + parsed = parse_json_response(raw_response) + if parsed: + return parsed, raw_response + raise ValueError("Failed to parse JSON response") + + except RateLimitError as e: + print(f"🚫 RateLimitError (attempt {attempt}/{RETRIES}): {str(e)}") + if "insufficient_quota" in str(e): + print("⏳ Detected quota exhaustion - sleeping 15 minutes") + await asyncio.sleep(900) + else: + sleep_time = 2 ** attempt + random.random() + print(f" ⏰ Rate limited, sleeping {sleep_time:.1f}s") + await asyncio.sleep(sleep_time) + + except (APIError, APIConnectionError, asyncio.TimeoutError, ValueError) as e: + print(f"❌ {type(e).__name__} (attempt {attempt}/{RETRIES}): {str(e)}") + if attempt == RETRIES: + return None, raw_response + sleep_time = 2 ** attempt + random.random() + print(f" ⏰ Retrying in {sleep_time:.1f}s") + await asyncio.sleep(sleep_time) + + return None, raw_response + +async def solve_problem(cli: AsyncOpenAI, problem_statement: str) -> Tuple[Optional[Dict], str]: + """让4o-mini解题""" + messages = [ + {"role": "system", "content": SOLVER_SYSTEM_PROMPT}, + {"role": "user", "content": SOLVER_USER_TEMPLATE.format( + problem_statement=problem_statement + )} + ] + return await call_api_with_retry(cli, SOLVER_MODEL, messages) + +async def grade_solution(cli: AsyncOpenAI, problem_statement: str, solution: str, + reference_solution: str, problem_type: str = "proof") -> Tuple[Optional[Dict], str]: + """让o3根据题型评分 - 证明题严格,计算题注重答案""" + if problem_type == "calculation": + system_prompt = CALCULATION_GRADER_SYSTEM_PROMPT + user_template = CALCULATION_GRADER_USER_TEMPLATE + else: # Default to proof (strict grading) + system_prompt = PROOF_GRADER_SYSTEM_PROMPT + user_template = PROOF_GRADER_USER_TEMPLATE + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_template.format( + problem_statement=problem_statement, + solution=solution, + reference_solution=reference_solution + )} + ] + return await call_api_with_retry(cli, GRADER_MODEL, messages) + +async def test_single_file(file_path: pathlib.Path, cli: AsyncOpenAI) -> Dict: + """测试单个文件的原题和kernel variant""" + try: + # 加载数据 + data = json.loads(file_path.read_text(encoding='utf-8')) + index = data.get("index", file_path.stem) + + # 检查必要字段 + original_question = to_str(data.get("question", "")).strip() + original_solution = to_str(data.get("solution", "")).strip() + problem_type = data.get("problem_type", "proof") # 默认为证明题,严格评分 + + kv = data.get("variants", {}).get("kernel_variant") + if not kv: + return { + "index": index, + "status": "skipped", + "reason": "no_kernel_variant" + } + + kernel_question = to_str(kv.get("question", "")).strip() + kernel_solution = to_str(kv.get("solution", "")).strip() + + if not all([original_question, original_solution, kernel_question, kernel_solution]): + return { + "index": index, + "status": "skipped", + "reason": "missing_fields" + } + + print(f"🧮 Testing {index} (Type: {problem_type.upper()})") + start_time = time.time() + + result = { + "index": index, + "status": "completed", + "timestamp": time.time(), + "problem_type": problem_type, + "original": {}, + "kernel_variant": {}, + "comparison": {} + } + + # 1. 让4o-mini解原题 + print(f" 📝 Solving original problem...") + orig_solve_result, orig_solve_raw = await solve_problem(cli, original_question) + + if not orig_solve_result: + result["original"]["solve_status"] = "failed" + result["status"] = "failed" + return result + + orig_student_solution = to_str(orig_solve_result.get("solution", "")).strip() + orig_final_answer = to_str(orig_solve_result.get("final_answer", "")).strip() + + result["original"]["student_solution"] = orig_student_solution + result["original"]["student_final_answer"] = orig_final_answer + result["original"]["solve_status"] = "success" + + # 2. 让4o-mini解kernel variant + print(f" 📝 Solving kernel variant...") + kv_solve_result, kv_solve_raw = await solve_problem(cli, kernel_question) + + if not kv_solve_result: + result["kernel_variant"]["solve_status"] = "failed" + result["status"] = "failed" + return result + + kv_student_solution = to_str(kv_solve_result.get("solution", "")).strip() + kv_final_answer = to_str(kv_solve_result.get("final_answer", "")).strip() + + result["kernel_variant"]["student_solution"] = kv_student_solution + result["kernel_variant"]["student_final_answer"] = kv_final_answer + result["kernel_variant"]["solve_status"] = "success" + + # 3. o3根据题型评分原题解答 + grading_style = "STRICT" if problem_type == "proof" else "LENIENT" + print(f" 🔍 Grading original solution ({grading_style})...") + orig_grade_result, orig_grade_raw = await grade_solution( + cli, original_question, orig_student_solution, original_solution, problem_type + ) + + if not orig_grade_result: + result["original"]["grade_status"] = "failed" + else: + result["original"]["grade_status"] = "success" + result["original"]["grade"] = orig_grade_result.get("grade", "UNKNOWN") + result["original"]["detailed_feedback"] = orig_grade_result.get("detailed_feedback", "") + result["original"]["major_issues"] = orig_grade_result.get("major_issues", "") + result["original"]["final_answer_correct"] = orig_grade_result.get("final_answer_correct", False) + result["original"]["reasoning_rigor_score"] = orig_grade_result.get("reasoning_rigor_score", 0) + result["original"]["overall_assessment"] = orig_grade_result.get("overall_assessment", "") + + # 4. o3根据题型评分kernel variant解答 + print(f" 🔍 Grading kernel variant solution ({grading_style})...") + kv_grade_result, kv_grade_raw = await grade_solution( + cli, kernel_question, kv_student_solution, kernel_solution, problem_type + ) + + if not kv_grade_result: + result["kernel_variant"]["grade_status"] = "failed" + else: + result["kernel_variant"]["grade_status"] = "success" + result["kernel_variant"]["grade"] = kv_grade_result.get("grade", "UNKNOWN") + result["kernel_variant"]["detailed_feedback"] = kv_grade_result.get("detailed_feedback", "") + result["kernel_variant"]["major_issues"] = kv_grade_result.get("major_issues", "") + result["kernel_variant"]["final_answer_correct"] = kv_grade_result.get("final_answer_correct", False) + result["kernel_variant"]["reasoning_rigor_score"] = kv_grade_result.get("reasoning_rigor_score", 0) + result["kernel_variant"]["overall_assessment"] = kv_grade_result.get("overall_assessment", "") + + # 5. 比较分析 + if (result["original"]["grade_status"] == "success" and + result["kernel_variant"]["grade_status"] == "success"): + + orig_correct = result["original"]["grade"] == "CORRECT" + kv_correct = result["kernel_variant"]["grade"] == "CORRECT" + + result["comparison"]["original_correct"] = orig_correct + result["comparison"]["kernel_variant_correct"] = kv_correct + result["comparison"]["both_correct"] = orig_correct and kv_correct + result["comparison"]["both_incorrect"] = not orig_correct and not kv_correct + result["comparison"]["original_harder"] = not orig_correct and kv_correct # 原题更难 + result["comparison"]["kernel_variant_harder"] = orig_correct and not kv_correct # kernel variant更难 + + orig_rigor = result["original"]["reasoning_rigor_score"] + kv_rigor = result["kernel_variant"]["reasoning_rigor_score"] + result["comparison"]["rigor_difference"] = orig_rigor - kv_rigor # 正数=原题推理更严谨 + + total_time = time.time() - start_time + result["processing_time"] = total_time + + print(f" ✅ Completed {index} in {total_time:.1f}s") + if result["comparison"]: + orig_status = "✅" if result["comparison"]["original_correct"] else "❌" + kv_status = "✅" if result["comparison"]["kernel_variant_correct"] else "❌" + print(f" Original: {orig_status}, Kernel Variant: {kv_status}") + + return result + + except Exception as e: + return { + "index": index if 'index' in locals() else file_path.stem, + "status": "error", + "error": str(e), + "error_type": type(e).__name__, + "timestamp": time.time() + } + +async def save_detailed_results(results: List[Dict], output_file: str): + """保存详细结果""" + output_path = RESULTS_DIR / f"{output_file}_detailed.json" + try: + output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding='utf-8') + print(f"💾 Detailed results saved to {output_path}") + except Exception as e: + print(f"❌ Failed to save detailed results: {e}") + +def generate_summary_report(results: List[Dict]) -> Dict: + """生成汇总报告""" + summary = { + "total_files": len(results), + "completed": 0, + "failed": 0, + "skipped": 0, + "by_problem_type": { + "proof": {"count": 0, "original_correct": 0, "kv_correct": 0}, + "calculation": {"count": 0, "original_correct": 0, "kv_correct": 0} + }, + "original_stats": {"correct": 0, "incorrect": 0, "total_graded": 0}, + "kernel_variant_stats": {"correct": 0, "incorrect": 0, "total_graded": 0}, + "comparison_stats": { + "both_correct": 0, + "both_incorrect": 0, + "original_harder": 0, + "kernel_variant_harder": 0, + "total_compared": 0 + }, + "rigor_analysis": { + "original_avg_rigor": 0, + "kernel_variant_avg_rigor": 0, + "rigor_difference_avg": 0 + } + } + + orig_rigor_scores = [] + kv_rigor_scores = [] + rigor_differences = [] + + for result in results: + if result["status"] == "completed": + summary["completed"] += 1 + + # 按题型统计 + ptype = result.get("problem_type", "proof") + if ptype in summary["by_problem_type"]: + summary["by_problem_type"][ptype]["count"] += 1 + if result["original"].get("grade") == "CORRECT": + summary["by_problem_type"][ptype]["original_correct"] += 1 + if result["kernel_variant"].get("grade") == "CORRECT": + summary["by_problem_type"][ptype]["kv_correct"] += 1 + + # 原题统计 + if result["original"].get("grade_status") == "success": + summary["original_stats"]["total_graded"] += 1 + if result["original"]["grade"] == "CORRECT": + summary["original_stats"]["correct"] += 1 + else: + summary["original_stats"]["incorrect"] += 1 + orig_rigor_scores.append(result["original"]["reasoning_rigor_score"]) + + # kernel variant统计 + if result["kernel_variant"].get("grade_status") == "success": + summary["kernel_variant_stats"]["total_graded"] += 1 + if result["kernel_variant"]["grade"] == "CORRECT": + summary["kernel_variant_stats"]["correct"] += 1 + else: + summary["kernel_variant_stats"]["incorrect"] += 1 + kv_rigor_scores.append(result["kernel_variant"]["reasoning_rigor_score"]) + + # 比较统计 + if result.get("comparison"): + summary["comparison_stats"]["total_compared"] += 1 + comp = result["comparison"] + if comp["both_correct"]: + summary["comparison_stats"]["both_correct"] += 1 + elif comp["both_incorrect"]: + summary["comparison_stats"]["both_incorrect"] += 1 + elif comp["original_harder"]: + summary["comparison_stats"]["original_harder"] += 1 + elif comp["kernel_variant_harder"]: + summary["comparison_stats"]["kernel_variant_harder"] += 1 + + rigor_differences.append(comp["rigor_difference"]) + + elif result["status"] == "skipped": + summary["skipped"] += 1 + else: + summary["failed"] += 1 + + # 计算平均分 + if orig_rigor_scores: + summary["rigor_analysis"]["original_avg_rigor"] = sum(orig_rigor_scores) / len(orig_rigor_scores) + if kv_rigor_scores: + summary["rigor_analysis"]["kernel_variant_avg_rigor"] = sum(kv_rigor_scores) / len(kv_rigor_scores) + if rigor_differences: + summary["rigor_analysis"]["rigor_difference_avg"] = sum(rigor_differences) / len(rigor_differences) + + # 计算正确率 + if summary["original_stats"]["total_graded"] > 0: + summary["original_stats"]["accuracy"] = summary["original_stats"]["correct"] / summary["original_stats"]["total_graded"] + + if summary["kernel_variant_stats"]["total_graded"] > 0: + summary["kernel_variant_stats"]["accuracy"] = summary["kernel_variant_stats"]["correct"] / summary["kernel_variant_stats"]["total_graded"] + + return summary + +def print_summary_report(summary: Dict): + """打印汇总报告""" + print("\n" + "="*80) + print("📊 ORIGINAL vs KERNEL VARIANT COMPARISON REPORT") + print("="*80) + + print(f"📁 Total files: {summary['total_files']}") + print(f"✅ Completed: {summary['completed']}") + print(f"⏭️ Skipped: {summary['skipped']}") + print(f"❌ Failed: {summary['failed']}") + + print(f"\n📈 ACCURACY COMPARISON:") + orig_acc = summary["original_stats"].get("accuracy", 0) * 100 + kv_acc = summary["kernel_variant_stats"].get("accuracy", 0) * 100 + print(f"Original Problems: {orig_acc:.1f}% ({summary['original_stats']['correct']}/{summary['original_stats']['total_graded']})") + print(f"Kernel Variants: {kv_acc:.1f}% ({summary['kernel_variant_stats']['correct']}/{summary['kernel_variant_stats']['total_graded']})") + + if orig_acc > 0 and kv_acc > 0: + diff = orig_acc - kv_acc + if diff > 5: + print(f"📉 Kernel variants are {diff:.1f}% harder (as expected)") + elif diff < -5: + print(f"📈 Original problems are {-diff:.1f}% harder (unexpected)") + else: + print(f"📊 Similar difficulty (difference: {diff:.1f}%)") + + print(f"\n🎯 BY PROBLEM TYPE:") + for ptype, stats in summary["by_problem_type"].items(): + if stats["count"] > 0: + orig_acc_type = (stats["original_correct"] / stats["count"]) * 100 + kv_acc_type = (stats["kv_correct"] / stats["count"]) * 100 + grading_note = " (STRICT grading)" if ptype == "proof" else " (LENIENT grading)" + print(f"{ptype.upper()} Problems{grading_note}:") + print(f" Original: {orig_acc_type:.1f}% ({stats['original_correct']}/{stats['count']})") + print(f" Kernel Variant: {kv_acc_type:.1f}% ({stats['kv_correct']}/{stats['count']})") + if stats["count"] >= 3: # Only show difference if we have enough samples + type_diff = orig_acc_type - kv_acc_type + print(f" Difference: {type_diff:+.1f}%") + + print(f"\n🔍 DETAILED COMPARISON:") + comp = summary["comparison_stats"] + total = comp["total_compared"] + if total > 0: + print(f"Both correct: {comp['both_correct']:3d} ({comp['both_correct']/total*100:.1f}%)") + print(f"Both incorrect: {comp['both_incorrect']:3d} ({comp['both_incorrect']/total*100:.1f}%)") + print(f"Original harder: {comp['original_harder']:3d} ({comp['original_harder']/total*100:.1f}%)") + print(f"Kernel variant harder: {comp['kernel_variant_harder']:3d} ({comp['kernel_variant_harder']/total*100:.1f}%)") + + print(f"\n📏 REASONING RIGOR ANALYSIS:") + rigor = summary["rigor_analysis"] + print(f"Original avg rigor: {rigor['original_avg_rigor']:.2f}/10") + print(f"Kernel variant rigor: {rigor['kernel_variant_avg_rigor']:.2f}/10") + print(f"Difference: {rigor['rigor_difference_avg']:.2f} (positive = original more rigorous)") + + print("="*80) + +@click.command() +@click.option("-c", "--concurrency", default=16, show_default=True, + help="Maximum concurrent processing tasks") +@click.option("--max-files", default=50, show_default=True, + help="Maximum number of files to test (for quick testing)") +@click.option("--file-pattern", default="*.json", show_default=True, + help="File pattern to process") +@click.option("--output-prefix", default="comparison_test", show_default=True, + help="Prefix for output files") +@click.option("--debug", is_flag=True, help="Enable debug output") +def main(concurrency: int, max_files: int, file_pattern: str, output_prefix: str, debug: bool): + """原题 vs Kernel Variant 数学能力对比测试""" + print(f"🧪 Starting Original vs Kernel Variant Comparison Test") + print(f" Solver Model: {SOLVER_MODEL}") + print(f" Grader Model: {GRADER_MODEL}") + print(f" Max files: {max_files}") + print(f" Concurrency: {concurrency}") + + if not os.getenv("OPENAI_API_KEY"): + print("❌ OPENAI_API_KEY environment variable not set!") + return + + # 找到测试文件 + all_files = sorted(SRC_DIR.glob(file_pattern)) + if max_files > 0: + all_files = all_files[:max_files] + + print(f"📁 Testing {len(all_files)} files") + + if not all_files: + print("❌ No files found to test!") + return + + async def run_test(): + cli = AsyncOpenAI() + sem = asyncio.Semaphore(concurrency) + + async def worker(file_path: pathlib.Path): + async with sem: + return await test_single_file(file_path, cli) + + # 执行测试 + results = [] + progress_bar = tqdm.tqdm(total=len(all_files), desc="Testing", unit="file") + + tasks = [worker(f) for f in all_files] + for coro in asyncio.as_completed(tasks): + result = await coro + results.append(result) + progress_bar.update(1) + + progress_bar.close() + return results + + # 运行测试 + results = asyncio.run(run_test()) + + # 保存详细结果 + timestamp = int(time.time()) + output_name = f"{output_prefix}_{timestamp}" + asyncio.run(save_detailed_results(results, output_name)) + + # 生成并显示汇总报告 + summary = generate_summary_report(results) + print_summary_report(summary) + + # 保存汇总报告 + summary_path = RESULTS_DIR / f"{output_name}_summary.json" + try: + summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8') + print(f"💾 Summary report saved to {summary_path}") + except Exception as e: + print(f"❌ Failed to save summary: {e}") + +if __name__ == "__main__": + main() +
\ No newline at end of file |
