# This code is based on the following example: # https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot import sys import os import discord from discord import app_commands from discord.ext import commands from threading import Thread import json from horde import HordeAPI import inspect from async_eval import eval # 读取json with open("discord.json", "r") as f: json_data = json.load(f) # 创建一个字典,将规则类型映射到相应的条件检查函数 check_functions = { "equals": lambda content, message: message.content == content, "contains": lambda content, message: content in message.content, "startswith": lambda content, message: message.content.startswith(content), "endswith": lambda content, message: message.content.endswith(content) } # 定义bot和tree intents = discord.Intents.default() intents.message_content = True bot = commands.Bot( command_prefix=json_data["command_prefix"] if "command_prefix" in json_data else '>', intents=intents ) tree = bot.tree COMMAND_NAME_PREFIX=json_data["command_name_prefix"] if "command_name_prefix" in json_data else "discord_bot_call_" def load_module_by_function_name(name: str = ""): """ 根据函数名称,导入对应的模块 Args: name: 函数名称 """ module_name=name.split(".")[0] if name.find(".") > 0 and module_name not in sys.modules: exec(f"import {module_name}") def generate_discord_command_callback_param_str(pre: str = '', parameters: list = []) -> str: """ 生成discord command的callback的调用参数 Args: pre: 前置固定参数 parameters: 参数列表 Returns: string: 生成的调用参数的代码 """ return f''' {pre}{''.join([f", {param['name']}: {param['type']}" + (f' = "{param["default"]}"' if 'default' in param and param['type'] == 'str' else f" = {param['default']}" if 'default' in param else '') for param in parameters])} ''' def generate_user_function_param_str(parameters: list = []) -> str: """ 生成调用用户自定义的函数的参数 Args: parameters: 参数列表 Returns: str: 生成的调用参数的代码 """ return f'''{', '.join([f"{param['name']}={param['name']}" for param in parameters])}''' def generate_tree_add_command(command: dict = {}) -> str: """ 生成tree add_command Args: command: command Returns: str: 生成的调用代码 """ return f''' tree.add_command(app_commands.Command( name="{command['name']}", description="{command['description']}", callback={COMMAND_NAME_PREFIX}{command['name']} )) ''' def generate_discord_command_callback_function_str(command: dict = {}) -> str: """ 生成discord command的callback的调用函数 Args: command: command Returns: str: 生成的调用代码 """ return f''' async def {COMMAND_NAME_PREFIX}{command['name']}({generate_discord_command_callback_param_str('interaction: discord.Interaction', command['parameters'] if 'parameters' in command else [])}): await interaction.response.defer() result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])}) if result is not None: await interaction.followup.send(result) ''' def generate_bot_command_send(command: dict = {}) -> str: """ 生成bot.command的send语句 Args: command: command Returns: str: 生成的调用代码 """ return f'''await ctx.send(\"{command['response']}\")''' def generate_bot_command_callback(command: dict = {}) -> str: """ 生成bot.command的调用用户函数和send语句 Args: command: command Returns: str: 生成的调用代码 """ return f'''result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])}) await ctx.send(result)''' def generate_bot_command_def(command: dict = {}) -> str: """ 生成bot.command的定义函数 Args: command: command Returns: str: 生成的调用代码 """ return f''' @bot.command() async def {command['name']}({generate_discord_command_callback_param_str('ctx', command['parameters'] if 'parameters' in command else [])}): {generate_bot_command_send(command) if "response" in command else generate_bot_command_callback(command)} ''' if "app_command" in json_data: for command in json_data["app_command"]: load_module_by_function_name(command["function"]) exec(generate_discord_command_callback_function_str(command)) exec(generate_tree_add_command(command)) if "command" in json_data: for command in json_data["command"]: if "function" in command: load_module_by_function_name(command["function"]) exec(generate_bot_command_def(command)) async def get_help(command_name: str = ''): def do_with_message(message: dict): result = '' result += f'If message is ' match message['type']: case "equals": result += '' case "contains": result += 'contains ' case "startswith": result += 'starts with ' case "endswith": result += 'ends with ' result += f'"{message["content"]}", then this app will ' result += generate_command_help_info(message) return result def do_with_command(command: dict): result = '' result += f'If message is starts with "{command["name"]}", then this app will ' result += generate_command_help_info(command) return result def do_with_app_command(command: dict): result = '' result += f'If message is starts with "/{command["name"]}", then this app will ' result += generate_command_help_info(command) return result def generate_command_help_info(message: dict): result = '' if 'response' in message: result += f'response "{message["response"]}"' elif 'function' in message: if 'description' in message: result += f'{message["description"][0].lower()}{message["description"][1:]}' else: result += f'execulate function which named "{message["function"]}"' if 'parameters' in message and len(message['parameters']) > 0: result += ', with parameter(s): ' for param in message['parameters']: result += '\n\t' result += f'- {param["name"]}: {param["type"]}' if 'default' in param: result += ' = ' match param['type']: case 'str': result += f'"{param["default"]}"' case _: result += f'"{param["default"]}"' if 'description' in param: result += f'\n\t {param["description"]}' else: result += 'do nothing' result += '\n' return result result = '' if command_name is None or command_name == '': if 'message' in json_data: for message in json_data['message']: result += do_with_message(message) if 'command' in json_data: for command in json_data['command']: result += do_with_command(command) if 'app_command' in json_data: for command in json_data['app_command']: result += do_with_app_command(command) else: if 'message' in json_data: for message in json_data['message']: if message['content'] != command_name: continue result += do_with_message(message) if 'command' in json_data: for command in json_data['command']: if command['name'] != command_name: continue result += do_with_command(command) if 'app_command' in json_data: for command in json_data['app_command']: if command['name'] != command_name: continue result += do_with_app_command(command) if result is None or result == '': result = f'Can\'t not find "{command_name}", you can type "/help" to query all commands, or "/help [command name]" to query the command.' return result async def greet(name: str): return f"Hello, {name}!" async def get_kudos(): async with HordeAPI.getUserDetails() as details: if "kudos" not in details: return f'Error: {details["code"]} {details["reason"]}' return f'The amount of Kudos this user has is {details["kudos"]}' async def generate_status(id: str): async with HordeAPI.generateCheck(id) as details: if "kudos" not in details: return f'Check Error: {details["code"]} {details["reason"]}' if bool(details["is_possible"]) is False: return "This generation is impossible." if bool(details["faulted"]) is True: return "This generation is faulted." if bool(details["done"]) is True: async with HordeAPI.generateStatus(id) as generation_detail: if "generations" not in generation_detail: return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}' for i in range(len(generation_detail["generations"])): return generation_detail["generations"][i]["img"] if int(details["processing"]) > 0: total = int(details["finished"]) + int(details["processing"]) + int(details["queue_position"]) + int(details["restarted"]) + int(details["waiting"]) return f'Processing image: {details["processing"]}/{total}' return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s' @bot.event async def on_ready(): await tree.sync() await bot.tree.sync() print('We have logged in as {0.user}'.format(bot)) @bot.event async def on_message(message): if message.author == bot.user: return if "message" in json_data: for rule in json_data["message"]: rule_type = rule["type"] content = rule["content"] # 根据规则类型动态调用对应的判断函数 if check_functions.get(rule_type, lambda c, m: False)(content, message): # 如果规则指定了函数,则调用对应的函数 if "function" in rule: function_name = rule["function"] result = eval(f"await {function_name}()") await message.channel.send(result) # 否则发送预定义的响应消息 elif "response" in rule: await message.channel.send(rule["response"]) # 确保命令系统正常工作 await bot.process_commands(message) async def sendMessageToChannelHelper(data): channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID")) # 创建一个 embed 对象 mTitle = "Empty Title" if "id" in data: mTitle = data["id"] if "log_tag" in data: mTitle = data["log_tag"] mDescription = "Empty Description" if "model" in data: mDescription = data["model"] if "log_message" in data: mDescription = data["log_message"] mColor = 0x00ff00 if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"): mColor = 0xff0000 embed = discord.Embed(title=mTitle, description=mDescription, color=mColor) # 将 fields 数据加入 embed for field in data: if field == "img": embed.set_image(url=data[field]) else: embed.add_field(name=field, value=data[field], inline=True) # 发送 embed 消息 await channel.send(embed=embed) def sendMessageToChannel(data): bot.loop.create_task(sendMessageToChannelHelper(data)) def run(): try: token = json_data["token"] if "token" in json_data else (os.environ.get("TOKEN") or "") if token == "": raise Exception("Please add your token to the Secrets pane.") bot.run(token) except discord.HTTPException as e: if e.status == 429: print( "The Discord servers denied the connection for making too many requests" ) print( "Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests" ) else: raise e def discord_bot(): print("Running discord_bot") run() if __name__ == "__main__": discord_bot()