Spaces:
Running
Running
File size: 11,920 Bytes
9cf96c4 79a7353 9cf96c4 4b6b17e 9cf96c4 a3fa62e a38c7a5 97136b4 9cf96c4 79a7353 bb67179 9cf96c4 400b811 9cf96c4 e5419b0 79a7353 b442dbe 4b6b17e bb67179 79a7353 bb67179 400b811 79a7353 97136b4 79a7353 f1315c3 97136b4 79a7353 97136b4 79a7353 97136b4 79a7353 97136b4 79a7353 97136b4 79a7353 a38c7a5 bb67179 22156af f1315c3 bb67179 400b811 bb67179 79a7353 400b811 bb67179 400b811 bb67179 400b811 bb67179 400b811 bb67179 400b811 bb67179 9cf96c4 b442dbe 0a11892 9cf96c4 79a7353 bb67179 9cf96c4 6ac2b5f 9cf96c4 bb67179 9cf96c4 bb67179 9cf96c4 79a7353 9cf96c4 bb67179 9cf96c4 79a7353 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# This code is based on the following example:
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot
import sys
import os
import discord
from discord import app_commands
from discord.ext import commands
from threading import Thread
import json
from horde import HordeAPI
import inspect
from async_eval import eval
# 读取json
with open("discord.json", "r") as f:
json_data = json.load(f)
# 创建一个字典,将规则类型映射到相应的条件检查函数
check_functions = {
"equals": lambda content, message: message.content == content,
"contains": lambda content, message: content in message.content,
"startswith": lambda content, message: message.content.startswith(content),
"endswith": lambda content, message: message.content.endswith(content)
}
# 定义bot和tree
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(
command_prefix=json_data["command_prefix"] if "command_prefix" in json_data else '>',
intents=intents
)
tree = bot.tree
COMMAND_NAME_PREFIX=json_data["command_name_prefix"] if "command_name_prefix" in json_data else "discord_bot_call_"
def load_module_by_function_name(name: str = ""):
"""
根据函数名称,导入对应的模块
Args:
name: 函数名称
"""
module_name=name.split(".")[0]
if name.find(".") > 0 and module_name not in sys.modules:
exec(f"import {module_name}")
def generate_discord_command_callback_param_str(pre: str = '', parameters: list = []) -> str:
"""
生成discord command的callback的调用参数
Args:
pre: 前置固定参数
parameters: 参数列表
Returns:
string: 生成的调用参数的代码
"""
return f'''
{pre}{''.join([f", {param['name']}: {param['type']}" + (f' = "{param["default"]}"' if 'default' in param and param['type'] == 'str' else f" = {param['default']}" if 'default' in param else '')
for param in parameters])}
'''
def generate_user_function_param_str(parameters: list = []) -> str:
"""
生成调用用户自定义的函数的参数
Args:
parameters: 参数列表
Returns:
str: 生成的调用参数的代码
"""
return f'''{', '.join([f"{param['name']}={param['name']}"
for param in parameters])}'''
def generate_tree_add_command(command: dict = {}) -> str:
"""
生成tree add_command
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
tree.add_command(app_commands.Command(
name="{command['name']}",
description="{command['description']}",
callback={COMMAND_NAME_PREFIX}{command['name']}
))
'''
def generate_discord_command_callback_function_str(command: dict = {}) -> str:
"""
生成discord command的callback的调用函数
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
async def {COMMAND_NAME_PREFIX}{command['name']}({generate_discord_command_callback_param_str('interaction: discord.Interaction', command['parameters'] if 'parameters' in command else [])}):
await interaction.response.defer()
result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
if result is not None:
await interaction.followup.send(result)
'''
def generate_bot_command_send(command: dict = {}) -> str:
"""
生成bot.command的send语句
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''await ctx.send(\"{command['response']}\")'''
def generate_bot_command_callback(command: dict = {}) -> str:
"""
生成bot.command的调用用户函数和send语句
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
await ctx.send(result)'''
def generate_bot_command_def(command: dict = {}) -> str:
"""
生成bot.command的定义函数
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
@bot.command()
async def {command['name']}({generate_discord_command_callback_param_str('ctx', command['parameters'] if 'parameters' in command else [])}):
{generate_bot_command_send(command) if "response" in command else generate_bot_command_callback(command)}
'''
if "app_command" in json_data:
for command in json_data["app_command"]:
load_module_by_function_name(command["function"])
exec(generate_discord_command_callback_function_str(command))
exec(generate_tree_add_command(command))
if "command" in json_data:
for command in json_data["command"]:
if "function" in command:
load_module_by_function_name(command["function"])
exec(generate_bot_command_def(command))
async def get_help(command_name: str = ''):
def do_with_message(message: dict):
result = ''
result += f'If message is '
match message['type']:
case "equals":
result += ''
case "contains":
result += 'contains '
case "startswith":
result += 'starts with '
case "endswith":
result += 'ends with '
result += f'"{message["content"]}", then this app will '
result += generate_command_help_info(message)
return result
def do_with_command(command: dict):
result = ''
result += f'If message is starts with "{command["name"]}", then this app will '
result += generate_command_help_info(command)
return result
def do_with_app_command(command: dict):
result = ''
result += f'If message is starts with "/{command["name"]}", then this app will '
result += generate_command_help_info(command)
return result
def generate_command_help_info(message: dict):
result = ''
if 'response' in message:
result += f'response "{message["response"]}"'
elif 'function' in message:
if 'description' in message:
result += f'{message["description"][0].lower()}{message["description"][1:]}'
else:
result += f'execulate function which named "{message["function"]}"'
if 'parameters' in message and len(message['parameters']) > 0:
result += ', with parameter(s): '
for param in message['parameters']:
result += '\n\t'
result += f'- {param["name"]}: {param["type"]}'
if 'default' in param:
result += ' = '
match param['type']:
case 'str':
result += f'"{param["default"]}"'
case _:
result += f'"{param["default"]}"'
if 'description' in param:
result += f'\n\t {param["description"]}'
else:
result += 'do nothing'
result += '\n'
return result
result = ''
if command_name is None or command_name == '':
if 'message' in json_data:
for message in json_data['message']:
result += do_with_message(message)
if 'command' in json_data:
for command in json_data['command']:
result += do_with_command(command)
if 'app_command' in json_data:
for command in json_data['app_command']:
result += do_with_app_command(command)
else:
if 'message' in json_data:
for message in json_data['message']:
if message['content'] != command_name:
continue
result += do_with_message(message)
if 'command' in json_data:
for command in json_data['command']:
if command['name'] != command_name:
continue
result += do_with_command(command)
if 'app_command' in json_data:
for command in json_data['app_command']:
if command['name'] != command_name:
continue
result += do_with_app_command(command)
if result is None or result == '':
result = f'Can\'t not find "{command_name}", you can type "/help" to query all commands, or "/help [command name]" to query the command.'
return result
async def greet(name: str):
return f"Hello, {name}!"
async def get_kudos():
async with HordeAPI.getUserDetails() as details:
if "kudos" not in details:
return f'Error: {details["code"]} {details["reason"]}'
return f'The amount of Kudos this user has is {details["kudos"]}'
async def generate_status(id: str):
async with HordeAPI.generateCheck(id) as details:
if "kudos" not in details:
return f'Check Error: {details["code"]} {details["reason"]}'
if bool(details["is_possible"]) is False:
return "This generation is impossible."
if bool(details["faulted"]) is True:
return "This generation is faulted."
if bool(details["done"]) is True:
async with HordeAPI.generateStatus(id) as generation_detail:
if "generations" not in generation_detail:
return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}'
for i in range(len(generation_detail["generations"])):
return generation_detail["generations"][i]["img"]
if int(details["processing"]) > 0:
total = int(details["finished"])
+ int(details["processing"])
+ int(details["queue_position"])
+ int(details["restarted"])
+ int(details["waiting"])
return f'Processing image: {details["processing"]}/{total}'
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s'
@bot.event
async def on_ready():
await tree.sync()
await bot.tree.sync()
print('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_message(message):
if message.author == bot.user:
return
if "message" in json_data:
for rule in json_data["message"]:
rule_type = rule["type"]
content = rule["content"]
# 根据规则类型动态调用对应的判断函数
if check_functions.get(rule_type, lambda c, m: False)(content, message):
# 如果规则指定了函数,则调用对应的函数
if "function" in rule:
function_name = rule["function"]
result = eval(f"await {function_name}()")
await message.channel.send(result)
# 否则发送预定义的响应消息
elif "response" in rule:
await message.channel.send(rule["response"])
# 确保命令系统正常工作
await bot.process_commands(message)
async def sendMessageToChannelHelper(data):
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID"))
# 创建一个 embed 对象
mTitle = "Empty Title"
if "id" in data:
mTitle = data["id"]
if "log_tag" in data:
mTitle = data["log_tag"]
mDescription = "Empty Description"
if "model" in data:
mDescription = data["model"]
if "log_message" in data:
mDescription = data["log_message"]
mColor = 0x00ff00
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"):
mColor = 0xff0000
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor)
# 将 fields 数据加入 embed
for field in data:
if field == "img":
embed.set_image(url=data[field])
else:
embed.add_field(name=field, value=data[field], inline=True)
# 发送 embed 消息
await channel.send(embed=embed)
def sendMessageToChannel(data):
bot.loop.create_task(sendMessageToChannelHelper(data))
def run():
try:
token = json_data["token"] if "token" in json_data else (os.environ.get("TOKEN") or "")
if token == "":
raise Exception("Please add your token to the Secrets pane.")
bot.run(token)
except discord.HTTPException as e:
if e.status == 429:
print(
"The Discord servers denied the connection for making too many requests"
)
print(
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests"
)
else:
raise e
def discord_bot():
print("Running discord_bot")
run()
if __name__ == "__main__":
discord_bot() |