diff options
| author | blackhao <13851610112@163.com> | 2025-03-28 05:29:26 -0500 |
|---|---|---|
| committer | blackhao <13851610112@163.com> | 2025-03-28 05:29:26 -0500 |
| commit | c716c400b934dcd0542503f30c49051dfea4768b (patch) | |
| tree | 3182315aa5dceac0bd7b3a929172f80e0efa927e /newbot.py | |
Diffstat (limited to 'newbot.py')
| -rw-r--r-- | newbot.py | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/newbot.py b/newbot.py new file mode 100644 index 0000000..e1b0dff --- /dev/null +++ b/newbot.py @@ -0,0 +1,296 @@ +import asyncio +import json +import os.path +import threading +from typing import Dict, List +from openai import OpenAI +import aiohttp +import botpy +import yaml +from botpy.message import Message +from botpy import logging +from botpy.ext.cog_yaml import read +import numpy as np +import matplotlib.pyplot as plt +from noise import pnoise2 +import matplotlib.colors +from PIL import Image +import http.server +import socketserver + + +def load_openai_key(filepath="openai_key.txt"): + with open(filepath, 'r') as f: + return f.read().strip() + +openai_key = load_openai_key() + +#test_config = YamlUtil.read(os.path.join(os.path.dirname(__file__), "config.yaml")) +gpt_clinet = OpenAI(api_key=openai_key) +#gpt_instruction = YamlUtil.read(os.path.join(os.path.dirname(__file__), "gpt_instruction.yaml")) +new_land_size = 15 +pixel_per_cell = 10 +scale = 25.0 + + + +# 从config字典中提取instruction +# gpt_instruction = "你是个高傲的商人,负责名为“泥头屯”的地区的地产贸易。你不会回答你不清楚的问题,或与规则无关的问题。你不会说“抱歉”之类的礼貌用语,也不会热情地打招呼,也不会主动提出帮忙。但如果有人看起来什么都不知道,你得多告诉他一些你知道的信息,毕竟这样你才有利可图。如果有人冒犯了你,你会进行言语上的回击,并且回击比较犀利,例如称呼会用“傻逼”之类具有攻击性的词或者讽刺对方。你很粗鲁。你的客户往往是想要买地或者卖他们的产品的人。目前泥头屯还在开发中,所以如果有你无法确定的问题你可以说你无法回答。切记,为了维持你比较高冷的人设,不要重复说一样的信息,或者翻来覆去用相似的称呼。泥头屯是国际开发总局刚购买的一颗宜居行星,来咨询你的人会被免费分配一片陆地进行发展。但国际开发总局并不会给这些人启动资金,他们需要向大型企业借高利贷才能开始发展,同时必须维持一定的流动准备金以保持自身的信用水平以防信用评级下降导致的利率增长。在发展到一定程度后,玩家可以购买飞地,但这一部分功能还没有开发出来。玩家可以通过企业议会(共有101席,开局状态是议会席位会被国际发展总局、联合矿业、地建局三个巨企瓜分)进行立法以辅助自身发展,在游戏中后期也可以自己创立企业或引入其他玩家的西罗维基(一个法团主义财阀的种类,不是单个企业的名称)以壮大自己在议会中的力量,这两点你需要着重强调。玩家的收入会来自于贸易、期货市场和税收。最后,记住你的性格特色是极端冷漠而不是易怒。" +test_config = read(os.path.join(os.path.dirname(__file__), "config.yaml")) +gpt_instruction = read(os.path.join(os.path.dirname(__file__), "gpt_instruction.yaml")) +print(test_config["appid"]) +print(test_config["secret"]) +owner_list = read(os.path.join(os.path.dirname(__file__), "owner.yaml")) +print(gpt_instruction["instruction"]) + +_log = logging.get_logger() + +def start_http_server(port=1145, directory="demo/land_image"): + """ + Starts an HTTP server to serve files from the specified directory on the specified port. + Args: + port (int): The port number on which the server will run. + directory (str): The directory which the server will serve files from. + """ + class Handler(http.server.SimpleHTTPRequestHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, directory=directory, **kwargs) + + # Set the directory context + os.chdir(directory) + + # Create the server + with socketserver.TCPServer(("", port), Handler) as httpd: + print(f"Serving at port {port}") + httpd.serve_forever() + +# Start the server in a separate thread +def run_server(): + server_thread = threading.Thread(target=start_http_server, args=(8000,)) + server_thread.daemon = True # This thread won't prevent the program from exiting + server_thread.start() + print("HTTP server is running in a separate thread!") + +# You can call this function when your program starts +run_server() + +def get_gpt_response(message: str) -> str: + try: + response = gpt_clinet.chat.completions.create( + model="gpt-4-turbo", + messages=[ + {"role": "system", "content": gpt_instruction["instruction"]}, + {"role": "user", "content": message} + ] + ) + return response.choices[0].message.content + except Exception as e: + print(f"Error in get_gpt_response: {e}") + return "An error occurred while processing the request." + +def get_terrain_type(value): + # 调整这些阈值以匹配实际所需的地形比例 + if value < 0.01: # 示例阈值,需要根据实际调整 + return '平原' + elif value < 0.075: # 示例阈值,需要根据实际调整 + return '丘陵' + else: + return '高山' + +def get_landform_type(terrain): + rand = np.random.rand() + if terrain == '平原': + if rand < 0.1875: # 7.5% 沙漠 + return '沙漠' + elif rand < 0.1875 + 0.625: # 25% 草原 + return '草原' + else: # 7.5% 沼泽,总和应为100% + return '沼泽' + elif terrain == '丘陵': + if rand < 0.175: # 15% 荒山 + return '荒山' + elif rand < 0.575 + 0.175: # 15% 树林 + return '树林' + else: # 10% 丛林,总和应为40% + return '丛林' + elif terrain == '高山': + if rand < 0.5: # 10% 土山 + return '土山' + elif rand < 0.5 + 0.375: # 7.5% 石山 + return '石山' + else: # 2.5% 雪山,总和应为20% + return '雪山' + +def detect_if_owner(user_id): # 假设这个函数用来获取所有者列表 + owner_list = read(os.path.join(os.path.dirname(__file__), "owner.yaml")) + if owner_list is None: + owner_list = [] # 或者返回 False,或抛出一个异常 + return user_id in owner_list + +def read_owners(filepath: str) -> list: + try: + with open(filepath, 'r') as file: + data = yaml.safe_load(file) or [] # Provide an empty list if the result is None + return data + except FileNotFoundError: + return [] # Return an empty list if the file does not exist + + +def write_owners(filepath: str, owners: list) -> None: + with open(filepath, 'w') as file: + yaml.dump(owners, file) + +def generate_new_land(user_id: str, size: int) -> None: + noise_map = np.zeros((size, size)) + for i in range(size): + for j in range(size): + noise_map[i][j] = pnoise2(i / scale, j / scale, octaves=6, persistence=0.5, lacunarity=2.0, repeatx=size, repeaty=size, base=42) + terrain_map = [[get_terrain_type(value) for value in row] for row in noise_map] + landform_map = [[get_landform_type(terrain) for terrain in row] for row in terrain_map] + + # Create the mine map with all cells initialized to "undetected" + mine_map = [['undetected' for _ in range(size)] for _ in range(size)] + + # Create GDP map initialized to 1.0 + gdp_map = [[1.0 for _ in range(size)] for _ in range(size)] + + # Create GDP growth rate map initialized to 0.01 + gdp_growth_rate_map = [[0.01 for _ in range(size)] for _ in range(size)] + + # Create population map initialized to 0 + population_map = [[0 for _ in range(size)] for _ in range(size)] + + # Create building map initialized to "none" + building_map = [['none' for _ in range(size)] for _ in range(size)] + + map_data = { + 'terrain_map': terrain_map, + 'landform_map': landform_map, + 'mine_map': mine_map, + 'gdp_map': gdp_map, + 'gdp_growth_rate_map': gdp_growth_rate_map, + 'population_map': population_map, + 'building_map': building_map + } + + # Use os.path.join to create a path relative to the script's directory + file_path = os.path.join(os.path.dirname(__file__), f"{user_id}.yaml") + with open(file_path, 'w') as file: + yaml.dump(map_data, file) + print(f"YAML file {file_path} has been created with the terrain, landform, mine, GDP, GDP growth rate, population, and building data.") + + owner_file_path = os.path.join(os.path.dirname(__file__), "owner.yaml") + owners = read_owners(owner_file_path) + if user_id not in owners: + owners.append(user_id) + write_owners(owner_file_path, owners) + print(f"User ID {user_id} has been added to owner.yaml.") + +def user_landform_image_generate(user_id: str) -> None: + # Define the path to the user's YAML file and the directories for images and output + user_file_path = os.path.join(os.path.dirname(__file__), f"{user_id}.yaml") + image_dir = os.path.join(os.path.dirname(__file__), "images") + output_dir = os.path.join(os.path.dirname(__file__), "land_image") + + # Ensure the output directory exists + os.makedirs(output_dir, exist_ok=True) + + # Load the landform data from the YAML file + with open(user_file_path, 'r') as file: + data = yaml.safe_load(file) + landform_map = data['landform_map'] + + # Initialize an empty image of size 1500x1500 + final_image = Image.new('RGB', (1500, 1500)) + + # Process each cell in the landform map + for i, row in enumerate(landform_map): + for j, landform in enumerate(row): + # Load the image corresponding to the current landform + image_path = os.path.join(image_dir, f"{landform}.png") + with Image.open(image_path) as img: + # Resize the image if it's not 100x100 (optional) + """if img.size != (100, 100): + img = img.resize((100, 100), Image.ANTIALIAS)""" + + # Paste the image into the correct position in the final image + final_image.paste(img, (j * 100, i * 100)) + + # Save the final composite image + output_path = os.path.join(output_dir, f"{user_id}.png") + final_image.save(output_path) + print(f"Landform image for user {user_id} saved as {output_path}") + +def get_landform_image_url(user_id: str) -> str: + """ + Generates an HTTP URL for the landform image of a specified user. + + Args: + user_id (str): The user ID to generate the URL for. + + Returns: + str: The HTTP URL to the user's landform image. + """ + # Assuming the HTTP server is running locally on port 8000 + server_url = 'http://127.0.0.1:1145' + image_path = f"{user_id}.png" + + # Construct the HTTP URL + http_url = f"{server_url}/{image_path}" + return http_url + +# Example usage: +# url = get_landform_image_url('example_user_id') +# print(url) + + + +class MyClient(botpy.Client): + async def on_ready(self): + _log.info(f"robot 「{self.robot.name}」 on_ready!") + + async def on_at_message_create(self, message: Message): + _log.info(message.author.avatar) + if "sleep" in message.content: + await asyncio.sleep(10) + _log.info(message.author.username) + _log.info(message.content) + await message.reply(content=get_gpt_response(message.content)) + + async def on_message_create(self, message: Message): + _log.info(f"message {message.id} created") + username = message.author.id + print(username) + if not message.content.startswith("<@"): + if message.content == "-关机": + await message.reply(content="关机中") + await self.close() + quit() + else: + if message.content=="-获得初始地块": + if detect_if_owner(username): + await message.reply(content="你已拥有初始地块") + else: + await message.reply(content="正在生成...") + generate_new_land(username, new_land_size) + await message.reply(content="初始地块生成成功,请查看:") + user_landform_image_generate(username) + await message.reply(image=get_landform_image_url(username)) + elif message.content=="-查看地块": + if detect_if_owner(username): + await message.reply(content="请查看:") + user_landform_image_generate(username) + await message.reply(image=get_landform_image_url(username)) + else: + await message.reply(content="你还没有拥有地块,请先获得初始地块") + + +if __name__ == "__main__": + # 通过预设置的类型,设置需要监听的事件通道 + # intents = botpy.Intents.none() + # intents.public_guild_messages=True + + # 通过kwargs,设置需要监听的事件通道 + intents = botpy.Intents(public_guild_messages=True, guild_messages=True) + client = MyClient(intents=intents) + client.run(appid=test_config["appid"], secret=test_config["secret"])
\ No newline at end of file |
