Spaces:
Running
Running
# This code is based on the following example: | |
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot | |
import sys | |
import os | |
import discord | |
from discord import app_commands | |
from discord.ext import commands | |
from threading import Thread | |
import json | |
from horde import HordeAPI | |
import inspect | |
from async_eval import eval | |
# 读取json | |
with open("discord.json", "r") as f: | |
json_data = json.load(f) | |
# 创建一个字典,将规则类型映射到相应的条件检查函数 | |
check_functions = { | |
"equals": lambda content, message: message.content == content, | |
"contains": lambda content, message: content in message.content, | |
"startswith": lambda content, message: message.content.startswith(content), | |
"endswith": lambda content, message: message.content.endswith(content) | |
} | |
# 定义bot和tree | |
intents = discord.Intents.default() | |
intents.message_content = True | |
bot = commands.Bot( | |
command_prefix=json_data["command_prefix"] if "command_prefix" in json_data else '>', | |
intents=intents | |
) | |
tree = bot.tree | |
COMMAND_NAME_PREFIX=json_data["command_name_prefix"] if "command_name_prefix" in json_data else "discord_bot_call_" | |
def load_module_by_function_name(name: str = ""): | |
""" | |
根据函数名称,导入对应的模块 | |
Args: | |
name: 函数名称 | |
""" | |
module_name=name.split(".")[0] | |
if name.find(".") > 0 and module_name not in sys.modules: | |
exec(f"import {module_name}") | |
def generate_discord_command_callback_param_str(pre: str = '', parameters: list = []) -> str: | |
""" | |
生成discord command的callback的调用参数 | |
Args: | |
pre: 前置固定参数 | |
parameters: 参数列表 | |
Returns: | |
string: 生成的调用参数的代码 | |
""" | |
return f''' | |
{pre}{''.join([f", {param['name']}: {param['type']}" + (f' = "{param["default"]}"' if 'default' in param and param['type'] == 'str' else f" = {param['default']}" if 'default' in param else '') | |
for param in parameters])} | |
''' | |
def generate_user_function_param_str(parameters: list = []) -> str: | |
""" | |
生成调用用户自定义的函数的参数 | |
Args: | |
parameters: 参数列表 | |
Returns: | |
str: 生成的调用参数的代码 | |
""" | |
return f'''{', '.join([f"{param['name']}={param['name']}" | |
for param in parameters])}''' | |
def generate_tree_add_command(command: dict = {}) -> str: | |
""" | |
生成tree add_command | |
Args: | |
command: command | |
Returns: | |
str: 生成的调用代码 | |
""" | |
return f''' | |
tree.add_command(app_commands.Command( | |
name="{command['name']}", | |
description="{command['description']}", | |
callback={COMMAND_NAME_PREFIX}{command['name']} | |
)) | |
''' | |
def generate_discord_command_callback_function_str(command: dict = {}) -> str: | |
""" | |
生成discord command的callback的调用函数 | |
Args: | |
command: command | |
Returns: | |
str: 生成的调用代码 | |
""" | |
return f''' | |
async def {COMMAND_NAME_PREFIX}{command['name']}({generate_discord_command_callback_param_str('interaction: discord.Interaction', command['parameters'] if 'parameters' in command else [])}): | |
await interaction.response.defer() | |
result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])}) | |
if result is not None: | |
await interaction.followup.send(result) | |
''' | |
def generate_bot_command_send(command: dict = {}) -> str: | |
""" | |
生成bot.command的send语句 | |
Args: | |
command: command | |
Returns: | |
str: 生成的调用代码 | |
""" | |
return f'''await ctx.send(\"{command['response']}\")''' | |
def generate_bot_command_callback(command: dict = {}) -> str: | |
""" | |
生成bot.command的调用用户函数和send语句 | |
Args: | |
command: command | |
Returns: | |
str: 生成的调用代码 | |
""" | |
return f'''result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])}) | |
await ctx.send(result)''' | |
def generate_bot_command_def(command: dict = {}) -> str: | |
""" | |
生成bot.command的定义函数 | |
Args: | |
command: command | |
Returns: | |
str: 生成的调用代码 | |
""" | |
return f''' | |
@bot.command() | |
async def {command['name']}({generate_discord_command_callback_param_str('ctx', command['parameters'] if 'parameters' in command else [])}): | |
{generate_bot_command_send(command) if "response" in command else generate_bot_command_callback(command)} | |
''' | |
if "app_command" in json_data: | |
for command in json_data["app_command"]: | |
load_module_by_function_name(command["function"]) | |
exec(generate_discord_command_callback_function_str(command)) | |
exec(generate_tree_add_command(command)) | |
if "command" in json_data: | |
for command in json_data["command"]: | |
if "function" in command: | |
load_module_by_function_name(command["function"]) | |
exec(generate_bot_command_def(command)) | |
async def get_help(command_name: str = ''): | |
def do_with_message(message: dict): | |
result = '' | |
result += f'If message is ' | |
match message['type']: | |
case "equals": | |
result += '' | |
case "contains": | |
result += 'contains ' | |
case "startswith": | |
result += 'starts with ' | |
case "endswith": | |
result += 'ends with ' | |
result += f'"{message["content"]}", then this app will ' | |
result += generate_command_help_info(message) | |
return result | |
def do_with_command(command: dict): | |
result = '' | |
result += f'If message is starts with "{command["name"]}", then this app will ' | |
result += generate_command_help_info(command) | |
return result | |
def do_with_app_command(command: dict): | |
result = '' | |
result += f'If message is starts with "/{command["name"]}", then this app will ' | |
result += generate_command_help_info(command) | |
return result | |
def generate_command_help_info(message: dict): | |
result = '' | |
if 'response' in message: | |
result += f'response "{message["response"]}"' | |
elif 'function' in message: | |
if 'description' in message: | |
result += f'{message["description"][0].lower()}{message["description"][1:]}' | |
else: | |
result += f'execulate function which named "{message["function"]}"' | |
if 'parameters' in message and len(message['parameters']) > 0: | |
result += ', with parameter(s): ' | |
for param in message['parameters']: | |
result += '\n\t' | |
result += f'- {param["name"]}: {param["type"]}' | |
if 'default' in param: | |
result += ' = ' | |
match param['type']: | |
case 'str': | |
result += f'"{param["default"]}"' | |
case _: | |
result += f'"{param["default"]}"' | |
if 'description' in param: | |
result += f'\n\t {param["description"]}' | |
else: | |
result += 'do nothing' | |
result += '\n' | |
return result | |
result = '' | |
if command_name is None or command_name == '': | |
if 'message' in json_data: | |
for message in json_data['message']: | |
result += do_with_message(message) | |
if 'command' in json_data: | |
for command in json_data['command']: | |
result += do_with_command(command) | |
if 'app_command' in json_data: | |
for command in json_data['app_command']: | |
result += do_with_app_command(command) | |
else: | |
if 'message' in json_data: | |
for message in json_data['message']: | |
if message['content'] != command_name: | |
continue | |
result += do_with_message(message) | |
if 'command' in json_data: | |
for command in json_data['command']: | |
if command['name'] != command_name: | |
continue | |
result += do_with_command(command) | |
if 'app_command' in json_data: | |
for command in json_data['app_command']: | |
if command['name'] != command_name: | |
continue | |
result += do_with_app_command(command) | |
if result is None or result == '': | |
result = f'Can\'t not find "{command_name}", you can type "/help" to query all commands, or "/help [command name]" to query the command.' | |
return result | |
async def greet(name: str): | |
return f"Hello, {name}!" | |
async def get_kudos(): | |
async with HordeAPI.getUserDetails() as details: | |
if "kudos" not in details: | |
return f'Error: {details["code"]} {details["reason"]}' | |
return f'The amount of Kudos this user has is {details["kudos"]}' | |
async def generate_status(id: str): | |
async with HordeAPI.generateCheck(id) as details: | |
if "kudos" not in details: | |
return f'Check Error: {details["code"]} {details["reason"]}' | |
if bool(details["is_possible"]) is False: | |
return "This generation is impossible." | |
if bool(details["faulted"]) is True: | |
return "This generation is faulted." | |
if bool(details["done"]) is True: | |
async with HordeAPI.generateStatus(id) as generation_detail: | |
if "generations" not in generation_detail: | |
return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}' | |
for i in range(len(generation_detail["generations"])): | |
return generation_detail["generations"][i]["img"] | |
if int(details["processing"]) > 0: | |
total = int(details["finished"]) | |
+ int(details["processing"]) | |
+ int(details["queue_position"]) | |
+ int(details["restarted"]) | |
+ int(details["waiting"]) | |
return f'Processing image: {details["processing"]}/{total}' | |
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s' | |
async def on_ready(): | |
await tree.sync() | |
await bot.tree.sync() | |
print('We have logged in as {0.user}'.format(bot)) | |
async def on_message(message): | |
if message.author == bot.user: | |
return | |
if "message" in json_data: | |
for rule in json_data["message"]: | |
rule_type = rule["type"] | |
content = rule["content"] | |
# 根据规则类型动态调用对应的判断函数 | |
if check_functions.get(rule_type, lambda c, m: False)(content, message): | |
# 如果规则指定了函数,则调用对应的函数 | |
if "function" in rule: | |
function_name = rule["function"] | |
result = eval(f"await {function_name}()") | |
await message.channel.send(result) | |
# 否则发送预定义的响应消息 | |
elif "response" in rule: | |
await message.channel.send(rule["response"]) | |
# 确保命令系统正常工作 | |
await bot.process_commands(message) | |
async def sendMessageToChannelHelper(data): | |
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID")) | |
# 创建一个 embed 对象 | |
mTitle = "Empty Title" | |
if "id" in data: | |
mTitle = data["id"] | |
if "log_tag" in data: | |
mTitle = data["log_tag"] | |
mDescription = "Empty Description" | |
if "model" in data: | |
mDescription = data["model"] | |
if "log_message" in data: | |
mDescription = data["log_message"] | |
mColor = 0x00ff00 | |
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"): | |
mColor = 0xff0000 | |
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor) | |
# 将 fields 数据加入 embed | |
for field in data: | |
if field == "img": | |
embed.set_image(url=data[field]) | |
else: | |
embed.add_field(name=field, value=data[field], inline=True) | |
# 发送 embed 消息 | |
await channel.send(embed=embed) | |
def sendMessageToChannel(data): | |
bot.loop.create_task(sendMessageToChannelHelper(data)) | |
def run(): | |
try: | |
token = json_data["token"] if "token" in json_data else (os.environ.get("TOKEN") or "") | |
if token == "": | |
raise Exception("Please add your token to the Secrets pane.") | |
bot.run(token) | |
except discord.HTTPException as e: | |
if e.status == 429: | |
print( | |
"The Discord servers denied the connection for making too many requests" | |
) | |
print( | |
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests" | |
) | |
else: | |
raise e | |
def discord_bot(): | |
print("Running discord_bot") | |
run() | |
if __name__ == "__main__": | |
discord_bot() |