discordbot / discord_bot.py
joytou's picture
Fixed get_help function not async
22156af
raw
history blame
11.9 kB
# This code is based on the following example:
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot
import sys
import os
import discord
from discord import app_commands
from discord.ext import commands
from threading import Thread
import json
from horde import HordeAPI
import inspect
from async_eval import eval
# 读取json
with open("discord.json", "r") as f:
json_data = json.load(f)
# 创建一个字典,将规则类型映射到相应的条件检查函数
check_functions = {
"equals": lambda content, message: message.content == content,
"contains": lambda content, message: content in message.content,
"startswith": lambda content, message: message.content.startswith(content),
"endswith": lambda content, message: message.content.endswith(content)
}
# 定义bot和tree
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(
command_prefix=json_data["command_prefix"] if "command_prefix" in json_data else '>',
intents=intents
)
tree = bot.tree
COMMAND_NAME_PREFIX=json_data["command_name_prefix"] if "command_name_prefix" in json_data else "discord_bot_call_"
def load_module_by_function_name(name: str = ""):
"""
根据函数名称,导入对应的模块
Args:
name: 函数名称
"""
module_name=name.split(".")[0]
if name.find(".") > 0 and module_name not in sys.modules:
exec(f"import {module_name}")
def generate_discord_command_callback_param_str(pre: str = '', parameters: list = []) -> str:
"""
生成discord command的callback的调用参数
Args:
pre: 前置固定参数
parameters: 参数列表
Returns:
string: 生成的调用参数的代码
"""
return f'''
{pre}{''.join([f", {param['name']}: {param['type']}" + (f' = "{param["default"]}"' if 'default' in param and param['type'] == 'str' else f" = {param['default']}" if 'default' in param else '')
for param in parameters])}
'''
def generate_user_function_param_str(parameters: list = []) -> str:
"""
生成调用用户自定义的函数的参数
Args:
parameters: 参数列表
Returns:
str: 生成的调用参数的代码
"""
return f'''{', '.join([f"{param['name']}={param['name']}"
for param in parameters])}'''
def generate_tree_add_command(command: dict = {}) -> str:
"""
生成tree add_command
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
tree.add_command(app_commands.Command(
name="{command['name']}",
description="{command['description']}",
callback={COMMAND_NAME_PREFIX}{command['name']}
))
'''
def generate_discord_command_callback_function_str(command: dict = {}) -> str:
"""
生成discord command的callback的调用函数
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
async def {COMMAND_NAME_PREFIX}{command['name']}({generate_discord_command_callback_param_str('interaction: discord.Interaction', command['parameters'] if 'parameters' in command else [])}):
await interaction.response.defer()
result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
if result is not None:
await interaction.followup.send(result)
'''
def generate_bot_command_send(command: dict = {}) -> str:
"""
生成bot.command的send语句
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''await ctx.send(\"{command['response']}\")'''
def generate_bot_command_callback(command: dict = {}) -> str:
"""
生成bot.command的调用用户函数和send语句
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
await ctx.send(result)'''
def generate_bot_command_def(command: dict = {}) -> str:
"""
生成bot.command的定义函数
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
@bot.command()
async def {command['name']}({generate_discord_command_callback_param_str('ctx', command['parameters'] if 'parameters' in command else [])}):
{generate_bot_command_send(command) if "response" in command else generate_bot_command_callback(command)}
'''
if "app_command" in json_data:
for command in json_data["app_command"]:
load_module_by_function_name(command["function"])
exec(generate_discord_command_callback_function_str(command))
exec(generate_tree_add_command(command))
if "command" in json_data:
for command in json_data["command"]:
if "function" in command:
load_module_by_function_name(command["function"])
exec(generate_bot_command_def(command))
async def get_help(command_name: str = ''):
def do_with_message(message: dict):
result = ''
result += f'If message is '
match message['type']:
case "equals":
result += ''
case "contains":
result += 'contains '
case "startswith":
result += 'starts with '
case "endswith":
result += 'ends with '
result += f'"{message["content"]}", then this app will '
result += generate_command_help_info(message)
return result
def do_with_command(command: dict):
result = ''
result += f'If message is starts with "{command["name"]}", then this app will '
result += generate_command_help_info(command)
return result
def do_with_app_command(command: dict):
result = ''
result += f'If message is starts with "/{command["name"]}", then this app will '
result += generate_command_help_info(command)
return result
def generate_command_help_info(message: dict):
result = ''
if 'response' in message:
result += f'response "{message["response"]}"'
elif 'function' in message:
if 'description' in message:
result += f'{message["description"][0].lower()}{message["description"][1:]}'
else:
result += f'execulate function which named "{message["function"]}"'
if 'parameters' in message and len(message['parameters']) > 0:
result += ', with parameter(s): '
for param in message['parameters']:
result += '\n\t'
result += f'- {param["name"]}: {param["type"]}'
if 'default' in param:
result += ' = '
match param['type']:
case 'str':
result += f'"{param["default"]}"'
case _:
result += f'"{param["default"]}"'
if 'description' in param:
result += f'\n\t {param["description"]}'
else:
result += 'do nothing'
result += '\n'
return result
result = ''
if command_name is None or command_name == '':
if 'message' in json_data:
for message in json_data['message']:
result += do_with_message(message)
if 'command' in json_data:
for command in json_data['command']:
result += do_with_command(command)
if 'app_command' in json_data:
for command in json_data['app_command']:
result += do_with_app_command(command)
else:
if 'message' in json_data:
for message in json_data['message']:
if message['content'] != command_name:
continue
result += do_with_message(message)
if 'command' in json_data:
for command in json_data['command']:
if command['name'] != command_name:
continue
result += do_with_command(command)
if 'app_command' in json_data:
for command in json_data['app_command']:
if command['name'] != command_name:
continue
result += do_with_app_command(command)
if result is None or result == '':
result = f'Can\'t not find "{command_name}", you can type "/help" to query all commands, or "/help [command name]" to query the command.'
return result
async def greet(name: str):
return f"Hello, {name}!"
async def get_kudos():
async with HordeAPI.getUserDetails() as details:
if "kudos" not in details:
return f'Error: {details["code"]} {details["reason"]}'
return f'The amount of Kudos this user has is {details["kudos"]}'
async def generate_status(id: str):
async with HordeAPI.generateCheck(id) as details:
if "kudos" not in details:
return f'Check Error: {details["code"]} {details["reason"]}'
if bool(details["is_possible"]) is False:
return "This generation is impossible."
if bool(details["faulted"]) is True:
return "This generation is faulted."
if bool(details["done"]) is True:
async with HordeAPI.generateStatus(id) as generation_detail:
if "generations" not in generation_detail:
return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}'
for i in range(len(generation_detail["generations"])):
return generation_detail["generations"][i]["img"]
if int(details["processing"]) > 0:
total = int(details["finished"])
+ int(details["processing"])
+ int(details["queue_position"])
+ int(details["restarted"])
+ int(details["waiting"])
return f'Processing image: {details["processing"]}/{total}'
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s'
@bot.event
async def on_ready():
await tree.sync()
await bot.tree.sync()
print('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_message(message):
if message.author == bot.user:
return
if "message" in json_data:
for rule in json_data["message"]:
rule_type = rule["type"]
content = rule["content"]
# 根据规则类型动态调用对应的判断函数
if check_functions.get(rule_type, lambda c, m: False)(content, message):
# 如果规则指定了函数,则调用对应的函数
if "function" in rule:
function_name = rule["function"]
result = eval(f"await {function_name}()")
await message.channel.send(result)
# 否则发送预定义的响应消息
elif "response" in rule:
await message.channel.send(rule["response"])
# 确保命令系统正常工作
await bot.process_commands(message)
async def sendMessageToChannelHelper(data):
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID"))
# 创建一个 embed 对象
mTitle = "Empty Title"
if "id" in data:
mTitle = data["id"]
if "log_tag" in data:
mTitle = data["log_tag"]
mDescription = "Empty Description"
if "model" in data:
mDescription = data["model"]
if "log_message" in data:
mDescription = data["log_message"]
mColor = 0x00ff00
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"):
mColor = 0xff0000
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor)
# 将 fields 数据加入 embed
for field in data:
if field == "img":
embed.set_image(url=data[field])
else:
embed.add_field(name=field, value=data[field], inline=True)
# 发送 embed 消息
await channel.send(embed=embed)
def sendMessageToChannel(data):
bot.loop.create_task(sendMessageToChannelHelper(data))
def run():
try:
token = json_data["token"] if "token" in json_data else (os.environ.get("TOKEN") or "")
if token == "":
raise Exception("Please add your token to the Secrets pane.")
bot.run(token)
except discord.HTTPException as e:
if e.status == 429:
print(
"The Discord servers denied the connection for making too many requests"
)
print(
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests"
)
else:
raise e
def discord_bot():
print("Running discord_bot")
run()
if __name__ == "__main__":
discord_bot()