1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
|
#!/usr/bin/env python3
"""
原题 vs Kernel Variant 数学能力对比测试
使用4o-mini解题,o3严格评分,比较两种题目的正确率差异
"""
import os
import json
import asyncio
import pathlib
import time
import re
import random
from typing import Dict, List, Tuple, Optional
import click
import tqdm
from openai import AsyncOpenAI, RateLimitError, APIError, APIConnectionError
# Configuration
SOLVER_MODEL = "gpt-4o-mini" # 用于解题的模型
GRADER_MODEL = "o3" # 用于评分的模型
SRC_DIR = pathlib.Path("raw/json")
RESULTS_DIR = pathlib.Path("results/comparison_test")
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
RETRIES = 4
TIMEOUT_BASE = 600
RESP_FMT = {"type": "json_object"}
# 解题系统prompt - 4o-mini
SOLVER_SYSTEM_PROMPT = """You are an expert mathematician solving competition-level problems.
Provide detailed, step-by-step solutions with clear mathematical reasoning.
Requirements:
- Show all your work and intermediate steps
- Justify each major step of your reasoning
- Use proper mathematical notation
- Be thorough but concise
- State your final answer clearly
Solve the problem completely and rigorously."""
SOLVER_USER_TEMPLATE = """Please solve this mathematical problem:
{problem_statement}
Provide a complete solution with detailed reasoning. Return your response in JSON format:
{{"solution": "your complete step-by-step solution with mathematical reasoning",
"final_answer": "your final answer in a clear, concise form"}}"""
# 证明题严格评分系统prompt - o3
PROOF_GRADER_SYSTEM_PROMPT = """You are an extremely strict mathematical grader evaluating competition-level PROOF problems.
GRADING STANDARDS (BE VERY STRICT):
- Mathematical rigor: Every step must be mathematically sound and justified
- Logical flow: The reasoning must be clear, complete, and logically connected
- Correctness: All calculations, algebraic manipulations, and conclusions must be correct
- Completeness: The solution must address all parts of the problem fully
- Precision: Mathematical statements must be precise and unambiguous
FAILING CRITERIA (Mark as INCORRECT if ANY of these apply):
- Any unjustified logical leap or gap in reasoning
- Any computational error, no matter how small
- Missing steps in critical parts of the argument
- Imprecise or ambiguous mathematical statements
- Incorrect final answer, even if approach is partially correct
- Circular reasoning or logical fallacies
- Misuse of mathematical theorems or definitions
BE EXTREMELY STRICT. Competition mathematics proofs require perfect precision."""
# 计算题相对宽松评分系统prompt - o3
CALCULATION_GRADER_SYSTEM_PROMPT = """You are a mathematical grader evaluating competition-level CALCULATION problems.
GRADING STANDARDS FOR CALCULATION PROBLEMS:
- Primary focus: Is the final answer correct?
- Secondary focus: Is the overall approach reasonable and mathematically sound?
- Computation: Allow minor computational slips if the method is correct and final answer is right
GRADING CRITERIA:
- CORRECT: Final answer is correct AND approach is fundamentally sound
- INCORRECT: Final answer is wrong OR approach is fundamentally flawed
For calculation problems, the final numerical answer is the most important criterion.
Minor intermediate errors are acceptable if they don't affect the final result."""
PROOF_GRADER_USER_TEMPLATE = """Grade this PROOF solution with extreme strictness.
PROBLEM:
{problem_statement}
STUDENT SOLUTION:
{solution}
CORRECT REFERENCE SOLUTION:
{reference_solution}
Evaluate with maximum strictness. Every logical step must be perfect. Return JSON with:
{{"grade": "CORRECT" or "INCORRECT",
"detailed_feedback": "specific detailed analysis of what is right/wrong",
"major_issues": "list of significant mathematical errors or gaps",
"final_answer_correct": true or false,
"reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed),
"overall_assessment": "comprehensive evaluation summary"}}"""
CALCULATION_GRADER_USER_TEMPLATE = """Grade this CALCULATION solution with focus on final answer correctness.
PROBLEM:
{problem_statement}
STUDENT SOLUTION:
{solution}
CORRECT REFERENCE SOLUTION:
{reference_solution}
Focus primarily on whether the final answer is correct. Return JSON with:
{{"grade": "CORRECT" or "INCORRECT",
"detailed_feedback": "specific detailed analysis of what is right/wrong",
"major_issues": "list of significant mathematical errors or gaps",
"final_answer_correct": true or false,
"reasoning_rigor_score": 0-10 integer (10=perfect rigor, 0=severely flawed),
"overall_assessment": "comprehensive evaluation summary"}}"""
JSON_RE = re.compile(r"\{[\s\S]*\}")
def parse_json_response(raw: str) -> Optional[Dict]:
"""Parse JSON from LLM response with fallback strategies."""
if not raw:
return None
try:
return json.loads(raw)
except:
pass
match = JSON_RE.search(raw)
if match:
try:
return json.loads(match.group(0))
except:
pass
try:
fixed = raw.replace('\\"', '"').replace('\\\\', '\\')
return json.loads(fixed)
except:
pass
return None
def to_str(x) -> str:
"""Convert various types to string safely."""
if x is None:
return ""
if isinstance(x, str):
return x
if isinstance(x, (list, tuple)):
return "\n".join(map(str, x))
return str(x)
async def call_api_with_retry(cli: AsyncOpenAI, model: str, messages: List[Dict]) -> Tuple[Optional[Dict], str]:
"""Make OpenAI API call with retry logic."""
raw_response = ""
for attempt in range(1, RETRIES + 1):
timeout = TIMEOUT_BASE * (2 ** (attempt - 1))
try:
# Set temperature based on model
# o3, o3-mini, and o4-mini require temperature 1.0
if any(model_name in model.lower() for model_name in ['o3', 'o3-mini', 'o4-mini']):
temperature = 1.0
else:
# Use temperature 0.0 for deterministic solving with other models
temperature = 0.0
response = await asyncio.wait_for(
cli.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
response_format=RESP_FMT,
),
timeout=timeout,
)
raw_response = response.choices[0].message.content or ""
parsed = parse_json_response(raw_response)
if parsed:
return parsed, raw_response
raise ValueError("Failed to parse JSON response")
except RateLimitError as e:
print(f"🚫 RateLimitError (attempt {attempt}/{RETRIES}): {str(e)}")
if "insufficient_quota" in str(e):
print("⏳ Detected quota exhaustion - sleeping 15 minutes")
await asyncio.sleep(900)
else:
sleep_time = 2 ** attempt + random.random()
print(f" ⏰ Rate limited, sleeping {sleep_time:.1f}s")
await asyncio.sleep(sleep_time)
except (APIError, APIConnectionError, asyncio.TimeoutError, ValueError) as e:
print(f"❌ {type(e).__name__} (attempt {attempt}/{RETRIES}): {str(e)}")
if attempt == RETRIES:
return None, raw_response
sleep_time = 2 ** attempt + random.random()
print(f" ⏰ Retrying in {sleep_time:.1f}s")
await asyncio.sleep(sleep_time)
return None, raw_response
async def solve_problem(cli: AsyncOpenAI, problem_statement: str) -> Tuple[Optional[Dict], str]:
"""让4o-mini解题"""
messages = [
{"role": "system", "content": SOLVER_SYSTEM_PROMPT},
{"role": "user", "content": SOLVER_USER_TEMPLATE.format(
problem_statement=problem_statement
)}
]
return await call_api_with_retry(cli, SOLVER_MODEL, messages)
async def grade_solution(cli: AsyncOpenAI, problem_statement: str, solution: str,
reference_solution: str, problem_type: str = "proof") -> Tuple[Optional[Dict], str]:
"""让o3根据题型评分 - 证明题严格,计算题注重答案"""
if problem_type == "calculation":
system_prompt = CALCULATION_GRADER_SYSTEM_PROMPT
user_template = CALCULATION_GRADER_USER_TEMPLATE
else: # Default to proof (strict grading)
system_prompt = PROOF_GRADER_SYSTEM_PROMPT
user_template = PROOF_GRADER_USER_TEMPLATE
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_template.format(
problem_statement=problem_statement,
solution=solution,
reference_solution=reference_solution
)}
]
return await call_api_with_retry(cli, GRADER_MODEL, messages)
async def test_single_file(file_path: pathlib.Path, cli: AsyncOpenAI) -> Dict:
"""测试单个文件的原题和kernel variant"""
try:
# 加载数据
data = json.loads(file_path.read_text(encoding='utf-8'))
index = data.get("index", file_path.stem)
# 检查必要字段
original_question = to_str(data.get("question", "")).strip()
original_solution = to_str(data.get("solution", "")).strip()
problem_type = data.get("problem_type", "proof") # 默认为证明题,严格评分
kv = data.get("variants", {}).get("kernel_variant")
if not kv:
return {
"index": index,
"status": "skipped",
"reason": "no_kernel_variant"
}
kernel_question = to_str(kv.get("question", "")).strip()
kernel_solution = to_str(kv.get("solution", "")).strip()
if not all([original_question, original_solution, kernel_question, kernel_solution]):
return {
"index": index,
"status": "skipped",
"reason": "missing_fields"
}
print(f"🧮 Testing {index} (Type: {problem_type.upper()})")
start_time = time.time()
result = {
"index": index,
"status": "completed",
"timestamp": time.time(),
"problem_type": problem_type,
"original": {},
"kernel_variant": {},
"comparison": {}
}
# 1. 让4o-mini解原题
print(f" 📝 Solving original problem...")
orig_solve_result, orig_solve_raw = await solve_problem(cli, original_question)
if not orig_solve_result:
result["original"]["solve_status"] = "failed"
result["status"] = "failed"
return result
orig_student_solution = to_str(orig_solve_result.get("solution", "")).strip()
orig_final_answer = to_str(orig_solve_result.get("final_answer", "")).strip()
result["original"]["student_solution"] = orig_student_solution
result["original"]["student_final_answer"] = orig_final_answer
result["original"]["solve_status"] = "success"
# 2. 让4o-mini解kernel variant
print(f" 📝 Solving kernel variant...")
kv_solve_result, kv_solve_raw = await solve_problem(cli, kernel_question)
if not kv_solve_result:
result["kernel_variant"]["solve_status"] = "failed"
result["status"] = "failed"
return result
kv_student_solution = to_str(kv_solve_result.get("solution", "")).strip()
kv_final_answer = to_str(kv_solve_result.get("final_answer", "")).strip()
result["kernel_variant"]["student_solution"] = kv_student_solution
result["kernel_variant"]["student_final_answer"] = kv_final_answer
result["kernel_variant"]["solve_status"] = "success"
# 3. o3根据题型评分原题解答
grading_style = "STRICT" if problem_type == "proof" else "LENIENT"
print(f" 🔍 Grading original solution ({grading_style})...")
orig_grade_result, orig_grade_raw = await grade_solution(
cli, original_question, orig_student_solution, original_solution, problem_type
)
if not orig_grade_result:
result["original"]["grade_status"] = "failed"
else:
result["original"]["grade_status"] = "success"
result["original"]["grade"] = orig_grade_result.get("grade", "UNKNOWN")
result["original"]["detailed_feedback"] = orig_grade_result.get("detailed_feedback", "")
result["original"]["major_issues"] = orig_grade_result.get("major_issues", "")
result["original"]["final_answer_correct"] = orig_grade_result.get("final_answer_correct", False)
result["original"]["reasoning_rigor_score"] = orig_grade_result.get("reasoning_rigor_score", 0)
result["original"]["overall_assessment"] = orig_grade_result.get("overall_assessment", "")
# 4. o3根据题型评分kernel variant解答
print(f" 🔍 Grading kernel variant solution ({grading_style})...")
kv_grade_result, kv_grade_raw = await grade_solution(
cli, kernel_question, kv_student_solution, kernel_solution, problem_type
)
if not kv_grade_result:
result["kernel_variant"]["grade_status"] = "failed"
else:
result["kernel_variant"]["grade_status"] = "success"
result["kernel_variant"]["grade"] = kv_grade_result.get("grade", "UNKNOWN")
result["kernel_variant"]["detailed_feedback"] = kv_grade_result.get("detailed_feedback", "")
result["kernel_variant"]["major_issues"] = kv_grade_result.get("major_issues", "")
result["kernel_variant"]["final_answer_correct"] = kv_grade_result.get("final_answer_correct", False)
result["kernel_variant"]["reasoning_rigor_score"] = kv_grade_result.get("reasoning_rigor_score", 0)
result["kernel_variant"]["overall_assessment"] = kv_grade_result.get("overall_assessment", "")
# 5. 比较分析
if (result["original"]["grade_status"] == "success" and
result["kernel_variant"]["grade_status"] == "success"):
orig_correct = result["original"]["grade"] == "CORRECT"
kv_correct = result["kernel_variant"]["grade"] == "CORRECT"
result["comparison"]["original_correct"] = orig_correct
result["comparison"]["kernel_variant_correct"] = kv_correct
result["comparison"]["both_correct"] = orig_correct and kv_correct
result["comparison"]["both_incorrect"] = not orig_correct and not kv_correct
result["comparison"]["original_harder"] = not orig_correct and kv_correct # 原题更难
result["comparison"]["kernel_variant_harder"] = orig_correct and not kv_correct # kernel variant更难
orig_rigor = result["original"]["reasoning_rigor_score"]
kv_rigor = result["kernel_variant"]["reasoning_rigor_score"]
result["comparison"]["rigor_difference"] = orig_rigor - kv_rigor # 正数=原题推理更严谨
total_time = time.time() - start_time
result["processing_time"] = total_time
print(f" ✅ Completed {index} in {total_time:.1f}s")
if result["comparison"]:
orig_status = "✅" if result["comparison"]["original_correct"] else "❌"
kv_status = "✅" if result["comparison"]["kernel_variant_correct"] else "❌"
print(f" Original: {orig_status}, Kernel Variant: {kv_status}")
return result
except Exception as e:
return {
"index": index if 'index' in locals() else file_path.stem,
"status": "error",
"error": str(e),
"error_type": type(e).__name__,
"timestamp": time.time()
}
async def save_detailed_results(results: List[Dict], output_file: str):
"""保存详细结果"""
output_path = RESULTS_DIR / f"{output_file}_detailed.json"
try:
output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding='utf-8')
print(f"💾 Detailed results saved to {output_path}")
except Exception as e:
print(f"❌ Failed to save detailed results: {e}")
def generate_summary_report(results: List[Dict]) -> Dict:
"""生成汇总报告"""
summary = {
"total_files": len(results),
"completed": 0,
"failed": 0,
"skipped": 0,
"by_problem_type": {
"proof": {"count": 0, "original_correct": 0, "kv_correct": 0},
"calculation": {"count": 0, "original_correct": 0, "kv_correct": 0}
},
"original_stats": {"correct": 0, "incorrect": 0, "total_graded": 0},
"kernel_variant_stats": {"correct": 0, "incorrect": 0, "total_graded": 0},
"comparison_stats": {
"both_correct": 0,
"both_incorrect": 0,
"original_harder": 0,
"kernel_variant_harder": 0,
"total_compared": 0
},
"rigor_analysis": {
"original_avg_rigor": 0,
"kernel_variant_avg_rigor": 0,
"rigor_difference_avg": 0
}
}
orig_rigor_scores = []
kv_rigor_scores = []
rigor_differences = []
for result in results:
if result["status"] == "completed":
summary["completed"] += 1
# 按题型统计
ptype = result.get("problem_type", "proof")
if ptype in summary["by_problem_type"]:
summary["by_problem_type"][ptype]["count"] += 1
if result["original"].get("grade") == "CORRECT":
summary["by_problem_type"][ptype]["original_correct"] += 1
if result["kernel_variant"].get("grade") == "CORRECT":
summary["by_problem_type"][ptype]["kv_correct"] += 1
# 原题统计
if result["original"].get("grade_status") == "success":
summary["original_stats"]["total_graded"] += 1
if result["original"]["grade"] == "CORRECT":
summary["original_stats"]["correct"] += 1
else:
summary["original_stats"]["incorrect"] += 1
orig_rigor_scores.append(result["original"]["reasoning_rigor_score"])
# kernel variant统计
if result["kernel_variant"].get("grade_status") == "success":
summary["kernel_variant_stats"]["total_graded"] += 1
if result["kernel_variant"]["grade"] == "CORRECT":
summary["kernel_variant_stats"]["correct"] += 1
else:
summary["kernel_variant_stats"]["incorrect"] += 1
kv_rigor_scores.append(result["kernel_variant"]["reasoning_rigor_score"])
# 比较统计
if result.get("comparison"):
summary["comparison_stats"]["total_compared"] += 1
comp = result["comparison"]
if comp["both_correct"]:
summary["comparison_stats"]["both_correct"] += 1
elif comp["both_incorrect"]:
summary["comparison_stats"]["both_incorrect"] += 1
elif comp["original_harder"]:
summary["comparison_stats"]["original_harder"] += 1
elif comp["kernel_variant_harder"]:
summary["comparison_stats"]["kernel_variant_harder"] += 1
rigor_differences.append(comp["rigor_difference"])
elif result["status"] == "skipped":
summary["skipped"] += 1
else:
summary["failed"] += 1
# 计算平均分
if orig_rigor_scores:
summary["rigor_analysis"]["original_avg_rigor"] = sum(orig_rigor_scores) / len(orig_rigor_scores)
if kv_rigor_scores:
summary["rigor_analysis"]["kernel_variant_avg_rigor"] = sum(kv_rigor_scores) / len(kv_rigor_scores)
if rigor_differences:
summary["rigor_analysis"]["rigor_difference_avg"] = sum(rigor_differences) / len(rigor_differences)
# 计算正确率
if summary["original_stats"]["total_graded"] > 0:
summary["original_stats"]["accuracy"] = summary["original_stats"]["correct"] / summary["original_stats"]["total_graded"]
if summary["kernel_variant_stats"]["total_graded"] > 0:
summary["kernel_variant_stats"]["accuracy"] = summary["kernel_variant_stats"]["correct"] / summary["kernel_variant_stats"]["total_graded"]
return summary
def print_summary_report(summary: Dict):
"""打印汇总报告"""
print("\n" + "="*80)
print("📊 ORIGINAL vs KERNEL VARIANT COMPARISON REPORT")
print("="*80)
print(f"📁 Total files: {summary['total_files']}")
print(f"✅ Completed: {summary['completed']}")
print(f"⏭️ Skipped: {summary['skipped']}")
print(f"❌ Failed: {summary['failed']}")
print(f"\n📈 ACCURACY COMPARISON:")
orig_acc = summary["original_stats"].get("accuracy", 0) * 100
kv_acc = summary["kernel_variant_stats"].get("accuracy", 0) * 100
print(f"Original Problems: {orig_acc:.1f}% ({summary['original_stats']['correct']}/{summary['original_stats']['total_graded']})")
print(f"Kernel Variants: {kv_acc:.1f}% ({summary['kernel_variant_stats']['correct']}/{summary['kernel_variant_stats']['total_graded']})")
if orig_acc > 0 and kv_acc > 0:
diff = orig_acc - kv_acc
if diff > 5:
print(f"📉 Kernel variants are {diff:.1f}% harder (as expected)")
elif diff < -5:
print(f"📈 Original problems are {-diff:.1f}% harder (unexpected)")
else:
print(f"📊 Similar difficulty (difference: {diff:.1f}%)")
print(f"\n🎯 BY PROBLEM TYPE:")
for ptype, stats in summary["by_problem_type"].items():
if stats["count"] > 0:
orig_acc_type = (stats["original_correct"] / stats["count"]) * 100
kv_acc_type = (stats["kv_correct"] / stats["count"]) * 100
grading_note = " (STRICT grading)" if ptype == "proof" else " (LENIENT grading)"
print(f"{ptype.upper()} Problems{grading_note}:")
print(f" Original: {orig_acc_type:.1f}% ({stats['original_correct']}/{stats['count']})")
print(f" Kernel Variant: {kv_acc_type:.1f}% ({stats['kv_correct']}/{stats['count']})")
if stats["count"] >= 3: # Only show difference if we have enough samples
type_diff = orig_acc_type - kv_acc_type
print(f" Difference: {type_diff:+.1f}%")
print(f"\n🔍 DETAILED COMPARISON:")
comp = summary["comparison_stats"]
total = comp["total_compared"]
if total > 0:
print(f"Both correct: {comp['both_correct']:3d} ({comp['both_correct']/total*100:.1f}%)")
print(f"Both incorrect: {comp['both_incorrect']:3d} ({comp['both_incorrect']/total*100:.1f}%)")
print(f"Original harder: {comp['original_harder']:3d} ({comp['original_harder']/total*100:.1f}%)")
print(f"Kernel variant harder: {comp['kernel_variant_harder']:3d} ({comp['kernel_variant_harder']/total*100:.1f}%)")
print(f"\n📏 REASONING RIGOR ANALYSIS:")
rigor = summary["rigor_analysis"]
print(f"Original avg rigor: {rigor['original_avg_rigor']:.2f}/10")
print(f"Kernel variant rigor: {rigor['kernel_variant_avg_rigor']:.2f}/10")
print(f"Difference: {rigor['rigor_difference_avg']:.2f} (positive = original more rigorous)")
print("="*80)
@click.command()
@click.option("-c", "--concurrency", default=16, show_default=True,
help="Maximum concurrent processing tasks")
@click.option("--max-files", default=50, show_default=True,
help="Maximum number of files to test (for quick testing)")
@click.option("--file-pattern", default="*.json", show_default=True,
help="File pattern to process")
@click.option("--output-prefix", default="comparison_test", show_default=True,
help="Prefix for output files")
@click.option("--debug", is_flag=True, help="Enable debug output")
def main(concurrency: int, max_files: int, file_pattern: str, output_prefix: str, debug: bool):
"""原题 vs Kernel Variant 数学能力对比测试"""
print(f"🧪 Starting Original vs Kernel Variant Comparison Test")
print(f" Solver Model: {SOLVER_MODEL}")
print(f" Grader Model: {GRADER_MODEL}")
print(f" Max files: {max_files}")
print(f" Concurrency: {concurrency}")
if not os.getenv("OPENAI_API_KEY"):
print("❌ OPENAI_API_KEY environment variable not set!")
return
# 找到测试文件
all_files = sorted(SRC_DIR.glob(file_pattern))
if max_files > 0:
all_files = all_files[:max_files]
print(f"📁 Testing {len(all_files)} files")
if not all_files:
print("❌ No files found to test!")
return
async def run_test():
cli = AsyncOpenAI()
sem = asyncio.Semaphore(concurrency)
async def worker(file_path: pathlib.Path):
async with sem:
return await test_single_file(file_path, cli)
# 执行测试
results = []
progress_bar = tqdm.tqdm(total=len(all_files), desc="Testing", unit="file")
tasks = [worker(f) for f in all_files]
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
progress_bar.update(1)
progress_bar.close()
return results
# 运行测试
results = asyncio.run(run_test())
# 保存详细结果
timestamp = int(time.time())
output_name = f"{output_prefix}_{timestamp}"
asyncio.run(save_detailed_results(results, output_name))
# 生成并显示汇总报告
summary = generate_summary_report(results)
print_summary_report(summary)
# 保存汇总报告
summary_path = RESULTS_DIR / f"{output_name}_summary.json"
try:
summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
print(f"💾 Summary report saved to {summary_path}")
except Exception as e:
print(f"❌ Failed to save summary: {e}")
if __name__ == "__main__":
main()
|