diff options
| author | = <=> | 2025-06-04 11:49:37 +0800 |
|---|---|---|
| committer | = <=> | 2025-06-04 11:49:37 +0800 |
| commit | 947d9dfdf16ae37109898111a5caacae7377b96d (patch) | |
| tree | ff4e884020fb7d968a6192106f370b215647f569 /code_eval/OpenCodeEval/benchmark | |
| parent | 5e163b529a78d528b745b8b57ba794b7b2bba97a (diff) | |
update code and kk eval
Diffstat (limited to 'code_eval/OpenCodeEval/benchmark')
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/BigCodeBench.py | 113 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/Bird.py | 123 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/HumanEval.py | 114 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/LeetCode.py | 121 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/LiveCodeBench.py | 76 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/MBPP.py | 126 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/Spider.py | 118 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/__init__.py | 0 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/base.py | 124 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/mbpp.py | 153 | ||||
| -rw-r--r-- | code_eval/OpenCodeEval/benchmark/understandml.py | 152 |
11 files changed, 1220 insertions, 0 deletions
diff --git a/code_eval/OpenCodeEval/benchmark/BigCodeBench.py b/code_eval/OpenCodeEval/benchmark/BigCodeBench.py new file mode 100644 index 0000000..abc4faf --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/BigCodeBench.py @@ -0,0 +1,113 @@ +import os +from typing import Literal + +ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_STOP, PYTHON_IMPORTS +from OpenCodeEval.utils import refine_text, stream_jsonl +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize + +class BigCodeBench(Benchmark): + + name: str = "BigCodeBench" + path: str = None + + fullset_path = os.path.abspath(os.path.join(ROOT, "../data/BigCodeBench.jsonl")) + subset_path = os.path.abspath(os.path.join(ROOT, "../data/BigCodeBench_Hard.jsonl")) + + imports_code = PYTHON_IMPORTS + chat_stop = PYTHON_STOP + base_stop = ['\n"""', "\nassert"] + + def __init__(self, + name: str = "BigCodeBench", + timeout:float = 10.0, + prompt_type: Literal["Completion", "Instruction"] = "Completion" + ): + + super().__init__() + + self.name = name + self.timeout = timeout + self.prompt_type = prompt_type + + if self.name == "BigCodeHard": + self.path = self.subset_path + elif self.name == "BigCodeBench": + self.path = self.fullset_path + + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename=self.path): + + task_id = int(task_data["task_id"].split("/")[-1]) + + tasks[task_id] = task_data + + return tasks + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + prompts = [] + for task_id, task_data in self.tasks.items(): + + if self.prompt_type == "Completion": + prompt = task_data['complete_prompt'] + elif self.prompt_type == "Instruction": + prompt = task_data['instruct_prompt'] + + prompts.append( + dict( + task_id = task_id, + prompt = refine_text(prompt) + ) + ) + + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + entry_point = self.tasks[generation['task_id']]["entry_point"] + + result = dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = sanitize(generation['completion'], entry_point) + ) + + return result + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + code = ( + task_data["code_prompt"] + "\n" + + " pass\n" + "\n" + + solution['solution'] + "\n" + ) + + result = check_correctness(solution['task_id'], + solution['completion_id'], + code, + task_data["test"], + self.timeout) + + return result
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/Bird.py b/code_eval/OpenCodeEval/benchmark/Bird.py new file mode 100644 index 0000000..b4359fb --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/Bird.py @@ -0,0 +1,123 @@ +import os +import sys + +from loguru import logger +from typing import Literal + +from OpenCodeEval.benchmark.base import Benchmark +from OpenCodeEval.utils import refine_text, program_extract, stream_jsonl +from OpenCodeEval.eval.sql_test import check_correctness + +class Bird(Benchmark): + + name: str = "Bird" + + def __init__( + self, + split: Literal["train", "dev"] = "dev", + time_out: float = 30.0, + prompt_type: str = "Instruction" + ): + + super().__init__() + + self.split = split + self.time_out = time_out + self.prompt_type = prompt_type + + if self.prompt_type == "Completion": + logger.error("Completion prompt type not supported for Bird") + + self.database = os.path.join(self.path, f"{self.name}/{self.split}/database") + self.path = os.path.join(self.path, f"{self.name}/{self.split}/data.jsonl") + + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the json file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename = self.path): + + tasks[int(task_data['id'])] = task_data + + return tasks + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + def construct_prompt(data): + instruction = data['o_schema'] + instruction += f"\n\n-- External Knowledge: {data['evidence']}\n\n" + instruction += "-- Using valid SQLite and understanding External Knowledge, answer the following questions for the tables provided above.\n\n" + instruction += f"Question: {data['question']}\n" + return instruction + + prompts = [] + + + for task_id, task_data in self.tasks.items(): + + prompt = construct_prompt(task_data) + + prompts.append( + dict( + task_id = task_id, + prompt = refine_text(prompt) + ) + ) + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + def one_line_sql(response): + + response = program_extract(response, "sql", "last").strip() + + lines = response.splitlines() + lines = [l.strip() for l in lines if l.strip()] + sql = " ".join(lines) + + return sql + + result = dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = one_line_sql(generation['completion']) + ) + + return result + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + db_path = self.database + f"/{task_data['db_id']}/{task_data['db_id']}.sqlite" + + result, passed, sql_return = check_correctness( + solution['solution'], + task_data['sql'], + db_path, + self.time_out, + "set_match" + ) + + return dict( + task_id = solution['task_id'], + completion_id = solution['completion_id'], + passed = passed, + result = result, + solution = solution['solution'], + sql_return = sql_return + )
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/HumanEval.py b/code_eval/OpenCodeEval/benchmark/HumanEval.py new file mode 100644 index 0000000..3c3aece --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/HumanEval.py @@ -0,0 +1,114 @@ +import os +from typing import Literal + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_STOP, PYTHON_IMPORTS +from OpenCodeEval.utils import refine_text, stream_jsonl, program_extract +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize + +class HumanEval(Benchmark): + + name: str = "HumanEval" + + imports_code = PYTHON_IMPORTS + chat_stop = PYTHON_STOP + base_stop = ["\ndef ", "\nclass ", "\nimport ", "\nfrom ", "\nassert "] + + def __init__( + self, + split: Literal["base", "hard"] = "base", + time_out: float = 3.0, + prompt_type: str = "Completion" + ): + + super().__init__() + + self.split = split + self.time_out = time_out + self.prompt_type = prompt_type + + self.path = os.path.join(self.path, f"{self.name}/{self.split}.jsonl") + + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename=self.path): + + task_id = int(task_data["task_id"].split("/")[-1]) + + tasks[task_id] = task_data + + return tasks + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + assert self.prompt_type == "Completion", "Prompt type must be Completion for HumanEval" + + prompts = [] + for task_id, task_data in self.tasks.items(): + prompts.append( + dict( + task_id = task_id, + prompt = refine_text(task_data['prompt']) + ) + ) + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + entry_point = self.tasks[generation['task_id']]["entry_point"] + + try: + completion = '\n'.join(generation['completion'].splitlines()[-200:]) + + if '</think>' in completion: + completion = completion.split('</think>')[1] + + solution = sanitize(completion, entry_point) + except Exception: + solution = program_extract(generation['completion'], program="python", mode="all") + + result = dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = solution + ) + + return result + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + code = ( + "\n".join(self.imports_code) + "\n" + + task_data["prompt"] + "\n" + + " pass\n" + "\n" + + solution['solution'] + "\n" + + task_data['test'] + "\n" + + f"check({task_data['entry_point']})" + ) + + result = check_correctness( + solution['task_id'], + solution['completion_id'], + code, + self.time_out + ) + + return result
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/LeetCode.py b/code_eval/OpenCodeEval/benchmark/LeetCode.py new file mode 100644 index 0000000..97b3489 --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/LeetCode.py @@ -0,0 +1,121 @@ +import os +from typing import Literal +from loguru import logger + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_IMPORTS, LEETCODE_IMPORTS, PYTHON_STOP +from OpenCodeEval.utils import refine_text, stream_jsonl +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize +class LeetCode(Benchmark): + + name: str = "LeetCode" + + imports_code = PYTHON_IMPORTS + LEETCODE_IMPORTS + chat_stop = PYTHON_STOP + base_stop = ["\ndef ", "\nclass ", "\nimport ", "\nfrom ", "\nassert "] + + def __init__( + self, + split: Literal["contest", "train", "validation", "test"] = "contest", + time_out: float = 3.0, + prompt_type: Literal["Completion", "Instruction"] = "Instruction" + ): + + super().__init__() + + self.name = name + self.split = split + self.time_out = time_out + + self.prompt_type = prompt_type + if self.split != "contest" and self.prompt_type == "Completion": + logger.error(f"Completion prompt type not support {self.split} split") + + self.path = os.path.join(self.path, f"{self.name}/{self.split}.jsonl") + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename=self.path): + + if self.split == "contest": + task_id = int(task_data["meta"]["questionId"]) + else: + task_id = int(task_data["meta"]["question_id"]) + tasks[task_id] = task_data + + return tasks + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + prompts = [] + for task_id, task_data in self.tasks.items(): + + if self.split == "contest": + if self.prompt_type == "Completion": + prompt = task_data['prompt'] + elif self.prompt_type == "Instruction": + prompt = task_data['prompt_sft'] + else: + prompt = task_data['meta']['query'] + + prompts.append( + dict( + task_id = task_id, + prompt = refine_text(prompt) + ) + ) + + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + return dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = sanitize( + text = generation['completion'], + entrypoint = "Solution", + ) + ) + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + if self.split == "contest": + code = ( + "\n".join(self.imports_code) + "\n\n" + + solution['solution'] + "\n\n" + + task_data['test'] + ) + else: + code = ( + "\n".join(self.imports_code) + "\n\n" + + task_data['meta']['lang_code'] + "\n" + + " pass\n" + "\n" + + solution['solution'] + "\n" + + task_data['test'] + "\n" + + f"check({task_data['entry_point']})" + ) + + result = check_correctness(solution['task_id'], + solution['completion_id'], + code, + self.time_out) + + return result
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/LiveCodeBench.py b/code_eval/OpenCodeEval/benchmark/LiveCodeBench.py new file mode 100644 index 0000000..8e0ccd9 --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/LiveCodeBench.py @@ -0,0 +1,76 @@ +import os +from typing import Literal + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_STOP, PYTHON_IMPORTS +from OpenCodeEval.utils import refine_text, stream_jsonl, program_extract +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize + +class LiveCodeBench(Benchmark): + + name: str = "LiveCodeBench" + path: str = None + + platform_dict = dict( + atcoder = 1, + codeforces = 2, + leetcode = 3, + ) + + def __init__( + self, + split: Literal["v1", "v2", "v3", "v4", "v5"] = "v5", + time_out: float = 3.0, + prompt_type: str = "Instruction" + ): + + super().__init__() + + self.path = os.path.join(self.path, self.name) + + self.tasks = self.get_task() + + def get_task_id(self, data): + """ + Get the task id for the task. + """ + + from datetime import datetime + + date_id = datetime.fromisoformat(data['contest_date']) + + # refromat the date to YYYYMMDD + date_id = date_id.strftime("%Y%m%d") + + if data['platform'] == 'atcoder': + + paltform_id = "1" + contest, letter = data['question_id'].split('_') + contest = ''.join(token for token in contest if token.isdigit()) + contest = contest.zfill(4) + + task_id = paltform_id + contest + str(ord(letter) - ord('a') + 1) + + elif data['platform'] == 'codeforces': + paltform_id = "2" + contest, letter = data['question_id'].split('_') + task_id = paltform_id + contest + str(ord(letter) - ord('A') + 1) + + elif data['platform'] == 'leetcode': + paltform_id = "3" + task_id = paltform_id + data['question_id'] + "0" + + else: + logger.error(f"Invalid platform: {data['platform']}") + + return int(task_id) + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + version = int(self.split.split('v')[1]) + + for i in range(1, version + 1): +
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/MBPP.py b/code_eval/OpenCodeEval/benchmark/MBPP.py new file mode 100644 index 0000000..0242893 --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/MBPP.py @@ -0,0 +1,126 @@ +import os +import sys + +ROOT = os.path.dirname(os.path.abspath(__file__)) + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_STOP, PYTHON_IMPORTS +from OpenCodeEval.utils import refine_text, stream_jsonl, program_extract +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize + +class MBPP(Benchmark): + + name: str = "MBPP" + + imports_code = PYTHON_IMPORTS + chat_stop = PYTHON_STOP + base_stop = ['\n"""', "\nassert"] + # TODO: add more stop words, e.g. "\nif __name__", "\ndef main(", "\nprint(", '\n```\n'] + + def __init__( + self, + split: str = "base", + time_out: float = 3.0, + prompt_type: str = "Instruction" + ): + + super().__init__() + + self.split = split + self.time_out = time_out + self.prompt_type = prompt_type + + self.path = os.path.join(self.path, f"{self.name}/data.jsonl") + + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename=self.path): + + task_id = int(task_data["task_id"]) + + tasks[task_id] = task_data + + return tasks + + def format_prompt(self, + promblem: str, + test: str, + ) -> str: + promblem = f"You are an expert Python programmer, and here is your task:\n{promblem}" + test = f"Your code should pass the test:\n{test}" + prompt = promblem + "\n" + test + return prompt + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + assert self.prompt_type == "Instruction", "Prompt type must be Instruction for MBPP" + + prompts = [] + for task_id, task_data in self.tasks.items(): + prompts.append( + dict( + task_id = task_id, + prompt = refine_text(self.format_prompt(task_data["text"], task_data["test_list"][0])) + ) + ) + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + entry_point = self.tasks[generation['task_id']]["entry_point"] + + try: + # generation['completion'] = program_extract(generation['completion'], program="python", mode="last") + solution = sanitize(generation['completion'], entry_point) + # solution = solution.replace("func0", entry_point) + except Exception: + solution = program_extract(generation['completion'], program="python", mode="all") + + return dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = solution + ) + + + def process_results(self, solution): + """Takes the list of LM generations and evaluates them against ground truth references, + returning the metric for the generations. + :param generations: list(list(str)) + list of lists containing generations + :param references: list(str) + list of str containing refrences + """ + + task_data = self.tasks[solution['task_id']] + + if self.split == "base": + test_code = "\n".join(task_data['test_imports']) + "\n\n" + "\n".join(task_data['test_list']) + elif self.split == "plus": + test_code = "\n".join(task_data['test_imports']) + "\n\n" + task_data['test'] + + code = ( + "\n".join(self.imports_code) + "\n" + + solution['solution'] + "\n" + + test_code + ) + + result = check_correctness(solution['task_id'], + solution['completion_id'], + code, + self.time_out) + + return result
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/Spider.py b/code_eval/OpenCodeEval/benchmark/Spider.py new file mode 100644 index 0000000..dc18afa --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/Spider.py @@ -0,0 +1,118 @@ +import os +import json + +from loguru import logger +from typing import Literal + +from OpenCodeEval.benchmark.base import Benchmark +from OpenCodeEval.utils import refine_text, program_extract, markdown_extract, stream_jsonl +from OpenCodeEval.eval.sql_test import check_correctness + +class Spider(Benchmark): + + name: str = "Spider" + + def __init__( + self, + split: Literal["train", "dev"] = "dev", + time_out: float = 3.0, + prompt_type: str = "Instruction" + ): + + super().__init__() + + self.split = split + self.time_out = time_out + self.prompt_type = prompt_type + + if self.prompt_type == "Completion": + logger.error("Completion prompt type not supported for Spider") + + self.database = os.path.join(self.path, f"{self.name}/{self.split}/database") + self.path = os.path.join(self.path, f"{self.name}/{self.split}/data.jsonl") + + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the json file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename = self.path): + + tasks[int(task_data['id'])] = task_data + + return tasks + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + prompts = [] + + + for task_id, task_data in self.tasks.items(): + + prompt = task_data['instruction'] + + prompts.append( + dict( + task_id = task_id, + prompt = refine_text(prompt) + ) + ) + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + solution = ' '.join(program_extract( + text = generation['completion'], + program = 'sql', + mode = 'last').splitlines() + ) + + if solution == "": + solution = ' '.join(markdown_extract( + text = generation['completion'], + mode = 'last').splitlines() + ) + + result = dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = solution + ) + + return result + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + db_path = os.path.join(self.database, f"{task_data['db_id']}/{task_data['db_id']}.sqlite") + + result, passed,sql_return = check_correctness( + solution['solution'], + task_data['output'], + db_path, + self.time_out, + "exact_match" + ) + + return dict( + task_id = solution['task_id'], + completion_id = solution['completion_id'], + passed = passed, + result = result, + solution = solution['solution'], + sql_return = sql_return + )
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/__init__.py b/code_eval/OpenCodeEval/benchmark/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/__init__.py diff --git a/code_eval/OpenCodeEval/benchmark/base.py b/code_eval/OpenCodeEval/benchmark/base.py new file mode 100644 index 0000000..4bd9750 --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/base.py @@ -0,0 +1,124 @@ +import os +import sys + +ROOT = os.path.dirname(os.path.abspath(__file__)) + +PYTHON_STOP = [ "\nif __name__", + "\ndef main(", + "\nprint(" + ] + +PYTHON_IMPORTS = [ "import math", + "import re", + "import sys", + "import copy", + "import datetime", + "import itertools", + "import collections", + "import heapq", + "import functools", + "import hashlib", + "import numpy", + "import numpy as np", + "import string", + "from typing import *", + "from collections import *" + ] + +LEETCODE_IMPORTS = [ + 'from typing import *', + 'from functools import *', + 'from collections import *', + 'from itertools import *', + 'from heapq import *', + 'from bisect import *', + 'from string import *', + 'from operator import *', + 'from math import *', + 'import math', + 'import datetime', + "inf = float('inf')", +] + +from abc import ABC, abstractmethod + +class Benchmark(ABC): + + name: str = None + split: str = None + path: str = os.path.abspath(os.path.join(os.path.dirname(__file__), "data/")) + + imports = [] + chat_stop = [] + base_stop = [] + + def __init__(self): + """ + :param stop_words: list + list of stop words if the generation uses a stopping criteria during generation + :param requires_execution: bool + wheter the task requires code execution during evaluation or not + """ + pass + + def fewshot_examples(self): + """Loads and returns the few-shot examples for the task if they exist.""" + pass + + @abstractmethod + def get_task(self): + """Builds the task for the LM to generate from. + """ + pass + + @abstractmethod + def get_prompt(self, doc): + """Builds the prompt for the LM to generate from. + :param doc: dict[str: str] + sample from the test dataset + """ + pass + + + def get_reference(self, doc): + """Builds the reference solution for the doc. + :param doc: dict[str: str] + sample from the test dataset + """ + pass + + @abstractmethod + def postprocess_generation(self, task, generation): + """Defines the postprocessing for a LM generation. + :param generation: str + code generation from LM + :param idx: int + index of doc in the dataset to which the generation belongs + """ + pass + + @abstractmethod + def process_results(self, generations, references): + """Takes the list of LM generations and evaluates them against ground truth references, + returning the metric for the generations as in {"metric_name": result}. + :param generations: list(list(str)) + list of lists containing generations + :param references: list(str) + list of str containing refrences + :return: dict[str: float] + """ + pass + + def _stop_at_stop_token(decoded_string, stop_tokens): + """ + Produces the prefix of decoded_string that ends at the first occurrence of + a stop_token. + WARNING: the decoded_string *must not* include the prompt, which may have stop tokens + itself. + """ + min_stop_index = len(decoded_string) + for stop_token in stop_tokens: + stop_index = decoded_string.find(stop_token) + if stop_index != -1 and stop_index < min_stop_index: + min_stop_index = stop_index + return decoded_string[:min_stop_index]
\ No newline at end of file diff --git a/code_eval/OpenCodeEval/benchmark/mbpp.py b/code_eval/OpenCodeEval/benchmark/mbpp.py new file mode 100644 index 0000000..ee522a2 --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/mbpp.py @@ -0,0 +1,153 @@ +import os + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_STOP, PYTHON_IMPORTS +from OpenCodeEval.utils import refine_text, stream_jsonl +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize + +from typing import List, Literal + +class mbpp(Benchmark): + + name: str = "mbpp" + + imports_code = PYTHON_IMPORTS + chat_stop = PYTHON_STOP + base_stop = ['\n"""', "\nassert"] + + def __init__( + self, + split: Literal["full", "sanitized"] = "full", + time_out: float = 3.0, + prompt_type: str = "Instruction" + ): + + super().__init__() + + self.split = split + self.time_out = time_out + self.prompt_type = prompt_type + + self.path = os.path.join(self.path, f"{self.name}/{self.split}.jsonl") + self.tasks = self.get_task() + + self.few_shots_prompt = self.get_few_shots_prompts() if split == "full" else "" + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename = self.path): + task_id = int(task_data["task_id"]) + + task_data['text'] = refine_text(task_data['text']) + + tasks[task_id] = task_data + + return tasks + + def fewshot_examples(self): + + few_shots_start = 1 + few_shots_end = 4 + + few_shots = [] + + for task_id, task_data in self.tasks.items(): + if task_id >= few_shots_start and task_id < few_shots_end: + few_shots.append(task_data) + + return few_shots + + def format_prompt(self, + promblem: str, + tests: List[str], + code: str = None + ) -> str: + promblem = f"You are an expert Python programmer, and here is your task:\n{promblem}" + test = "\n".join(tests) + test = f"Your code should pass these tests:\n{test}\n" + prompt = promblem + test + if code: + code = refine_text(code) + code = f"\n```python\n{code}\n```\n" + prompt = prompt + code + else: + prompt = prompt + "\n```python\n" + return prompt + + def get_few_shots_prompts(self): + + few_shots_prompts = [] + for few_shot in self.fewshot_examples(): + few_shots_prompts.append(self.format_prompt(few_shot["text"], few_shot["test_list"], few_shot["code"])) + + return '\n'.join(few_shots_prompts) + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + assert self.prompt_type == "Instruction", "Prompt type must be Instruction for mbpp" + + if self.split == "full": + test_start = 10 + test_end = 510 + elif self.split == "sanitized": + test_start = 0 + test_end = 974 + + prompts = [] + + for task_id, task_data in self.tasks.items(): + if task_id >= test_start and task_id < test_end: + + prompt = self.few_shots_prompt + '\n' + self.format_prompt(task_data["text"], task_data["test_list"]) + prompts.append({ + 'task_id': task_id, + 'prompt': prompt + }) + + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + if generation['completion'].startswith(self.few_shots_prompt): + generation['completion'] = generation['completion'][len(self.few_shots_prompt):] + + # if "```python" not in generation['completion']: + # generation['completion'] = "" + + return dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = sanitize(generation['completion']) + ) + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + code = ( + "\n".join(self.imports_code) + "\n" + + task_data['test_setup_code'] + "\n" + + solution['solution'] + "\n" + + "\n".join(task_data['test_list']) + ) + + result = check_correctness(solution['task_id'], + solution['completion_id'], + code, + self.time_out) + + return result diff --git a/code_eval/OpenCodeEval/benchmark/understandml.py b/code_eval/OpenCodeEval/benchmark/understandml.py new file mode 100644 index 0000000..590db3f --- /dev/null +++ b/code_eval/OpenCodeEval/benchmark/understandml.py @@ -0,0 +1,152 @@ +import os + +from OpenCodeEval.benchmark.base import Benchmark, PYTHON_STOP, PYTHON_IMPORTS +from OpenCodeEval.utils import program_extract, stream_jsonl +from OpenCodeEval.eval.func_eval import check_correctness +from OpenCodeEval.eval.sanitize import sanitize + +from typing import List, Literal + +from typing import * +from functools import * +from collections import * +from itertools import * +from heapq import * +from bisect import * +from string import * +from operator import * +from math import * + +import numpy as np +from numpy import * + +import datetime +import copy + +inf = float('inf') + +if os.environ.get("MAX_LINES"): + MAX_LINES = int(os.environ.get("MAX_LINES")) +else: + MAX_LINES = 200 + +def base_prompt(data): + prompt = 'You are an expert Python programmer, and here is your task:\n' + prompt = prompt + f'# Task: {data["title"]}\n' + prompt = prompt + f'# Description:\n{data["description"]}\n' + # prompt = prompt + f'# Examples:\n' + # for example_idx, (example, reasoning) in enumerate(zip(data["examples"], data["reasoning"])): + # prompt = prompt + f'## Example {example_idx + 1}:\n' + # prompt = prompt + f'### Input:\n{example["input"]}\n' + # prompt = prompt + f'### Output:\n{example["output"]}\n' + # prompt = prompt + f'### Reasoning:\n{reasoning}\n' + input_code = (data["import_code"] + "\n" + data["starter_code"]).strip() + prompt = prompt + f'# Your code should start with:\n```python\n{input_code}\n```\n' + if data['output_constrains'].strip(): + prompt = prompt + f'# Output Constraints:\n{data["output_constrains"].strip()}\n' + + return prompt + + + +class understandml(Benchmark): + + name: str = "understandml" + + imports_code = PYTHON_IMPORTS + chat_stop = PYTHON_STOP + base_stop = ['\n"""', "\nassert"] + + def __init__( + self, + split: Literal["human", "model"] = "model", + time_out: float = 3.0, + prompt_type: str = "Instruction" + ): + + super().__init__() + + self.split = split + self.time_out = time_out + self.prompt_type = prompt_type + + self.path = os.path.join(self.path, f"{self.name}/{self.split}_benchmark.jsonl") + self.tasks = self.get_task() + + def get_task(self): + """ + Get the task data from the jsonl file into a dictionary. + """ + + tasks = {} + + for task_data in stream_jsonl(filename = self.path): + task_id = int(task_data["id"]) + + tasks[task_id] = task_data + + return tasks + + def get_prompt(self): + """ + Builds the prompt for the LM to generate from. + """ + + assert self.prompt_type == "Instruction", "Prompt type must be Instruction for mbpp" + + prompts = [] + + for task_id, task_data in self.tasks.items(): + + prompt = base_prompt(task_data) + prompts.append({ + 'task_id': task_id, + 'prompt': prompt + }) + + return prompts + + def postprocess_generation(self, generation): + """ + Postprocess the generations. + """ + + entry_point = self.tasks[generation['task_id']]["entry_point"] + + try: + completion = '\n'.join(generation['completion'].splitlines()[-MAX_LINES:]) + + if '</think>' in completion: + completion = completion.split('</think>')[1] + + solution = sanitize(completion, entry_point) + except Exception: + solution = program_extract(generation['completion'], program="python", mode="all") + + result = dict( + task_id = generation['task_id'], + completion_id = generation['completion_id'], + solution = solution + ) + + return result + + def process_results(self, solution): + """ + Takes the list of LM generations and evaluates them against the test cases + """ + + task_data = self.tasks[solution['task_id']] + + code = ( + task_data['import_code'] + "\n" + + solution['solution'] + "\n" + + "\n".join(task_data['test_cases']) + ) + + result = check_correctness(solution['task_id'], + solution['completion_id'], + code, + self.time_out) + + return result
\ No newline at end of file |
