File size: 11,920 Bytes
9cf96c4
 
 
79a7353
9cf96c4
 
4b6b17e
9cf96c4
 
 
a3fa62e
a38c7a5
97136b4
9cf96c4
 
79a7353
 
 
 
bb67179
 
 
 
 
 
 
 
 
9cf96c4
400b811
9cf96c4
e5419b0
79a7353
 
 
 
b442dbe
4b6b17e
bb67179
79a7353
bb67179
400b811
79a7353
 
 
 
 
 
 
 
 
97136b4
 
79a7353
 
 
 
 
 
 
 
 
 
f1315c3
 
97136b4
 
 
 
79a7353
 
 
 
 
 
 
 
 
97136b4
 
 
 
79a7353
 
 
 
 
 
 
 
97136b4
 
 
 
 
 
 
 
 
79a7353
 
 
 
 
 
 
 
 
97136b4
79a7353
97136b4
 
 
 
 
79a7353
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a38c7a5
bb67179
22156af
f1315c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb67179
 
 
400b811
bb67179
 
 
 
 
79a7353
400b811
bb67179
 
 
400b811
bb67179
400b811
bb67179
400b811
 
 
 
 
 
bb67179
400b811
 
 
 
 
bb67179
 
 
 
9cf96c4
 
b442dbe
0a11892
9cf96c4
 
 
 
 
 
 
 
79a7353
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb67179
 
 
 
9cf96c4
 
6ac2b5f
9cf96c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb67179
9cf96c4
 
 
bb67179
9cf96c4
 
79a7353
9cf96c4
 
 
 
 
 
 
 
 
 
 
 
 
 
bb67179
9cf96c4
 
 
79a7353
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# This code is based on the following example:
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot

import sys
import os
import discord
from discord import app_commands
from discord.ext import commands
from threading import Thread
import json
from horde import HordeAPI
import inspect
from async_eval import eval


# 读取json
with open("discord.json", "r") as f:
    json_data = json.load(f)


# 创建一个字典,将规则类型映射到相应的条件检查函数
check_functions = {
    "equals": lambda content, message: message.content == content,
    "contains": lambda content, message: content in message.content,
    "startswith": lambda content, message: message.content.startswith(content),
    "endswith": lambda content, message: message.content.endswith(content)
}


# 定义bot和tree
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(
	command_prefix=json_data["command_prefix"] if "command_prefix" in json_data else '>',
	intents=intents
)
tree = bot.tree


COMMAND_NAME_PREFIX=json_data["command_name_prefix"] if "command_name_prefix" in json_data else "discord_bot_call_"


def load_module_by_function_name(name: str = ""):
	"""
	根据函数名称,导入对应的模块
	Args:
		name: 函数名称
	"""
	module_name=name.split(".")[0]
	if name.find(".") > 0 and module_name not in sys.modules:
		exec(f"import {module_name}")


def generate_discord_command_callback_param_str(pre: str = '', parameters: list = []) -> str:
	"""
	生成discord command的callback的调用参数
	Args:
		pre: 前置固定参数
		parameters: 参数列表
	Returns:
		string: 生成的调用参数的代码
	"""
	return f'''
    {pre}{''.join([f", {param['name']}: {param['type']}" + (f' = "{param["default"]}"' if 'default' in param and param['type'] == 'str' else f" = {param['default']}" if 'default' in param else '')
    for param in parameters])}
'''


def generate_user_function_param_str(parameters: list = []) -> str:
	"""
	生成调用用户自定义的函数的参数
	Args:
		parameters: 参数列表
	Returns:
		str: 生成的调用参数的代码
	
	"""
	return f'''{', '.join([f"{param['name']}={param['name']}"
	for param in parameters])}'''


def generate_tree_add_command(command: dict = {}) -> str:
	"""
	生成tree add_command
	Args:
		command: command
	Returns:
		str: 生成的调用代码
	"""
	return f'''
tree.add_command(app_commands.Command(
	name="{command['name']}",
	description="{command['description']}",
	callback={COMMAND_NAME_PREFIX}{command['name']}
))
'''


def generate_discord_command_callback_function_str(command: dict = {}) -> str:
	"""
	生成discord command的callback的调用函数
	Args:
		command: command
	Returns:
		str: 生成的调用代码
	"""
	return f'''
async def {COMMAND_NAME_PREFIX}{command['name']}({generate_discord_command_callback_param_str('interaction: discord.Interaction', command['parameters'] if 'parameters' in command else [])}):
	await interaction.response.defer()
	result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
	if result is not None:
		await interaction.followup.send(result)
'''


def generate_bot_command_send(command: dict = {}) -> str:
	"""
	生成bot.command的send语句
	Args:
		command: command
	Returns:
		str: 生成的调用代码
	"""
	return f'''await ctx.send(\"{command['response']}\")'''


def generate_bot_command_callback(command: dict = {}) -> str:
	"""
	生成bot.command的调用用户函数和send语句
	Args:
		command: command
	Returns:
		str: 生成的调用代码
	"""
	return f'''result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
	await ctx.send(result)'''


def generate_bot_command_def(command: dict = {}) -> str:
	"""
	生成bot.command的定义函数
	Args:
		command: command
	Returns:
		str: 生成的调用代码
	"""
	return f'''
@bot.command()
async def {command['name']}({generate_discord_command_callback_param_str('ctx', command['parameters'] if 'parameters' in command else [])}):
	{generate_bot_command_send(command) if "response" in command else generate_bot_command_callback(command)}
'''


if "app_command" in json_data:
	for command in json_data["app_command"]:
		load_module_by_function_name(command["function"])
		exec(generate_discord_command_callback_function_str(command))
		exec(generate_tree_add_command(command))


if "command" in json_data:
	for command in json_data["command"]:
		if "function" in command:
			load_module_by_function_name(command["function"])
		exec(generate_bot_command_def(command))


async def get_help(command_name: str = ''):
	def do_with_message(message: dict):
		result = ''
		result += f'If message is '
		match message['type']:
				case "equals":
					result += ''
				case "contains":
					result += 'contains '
				case "startswith":
					result += 'starts with '
				case "endswith":
					result += 'ends with '
		result += f'"{message["content"]}", then this app will '
		result += generate_command_help_info(message)
		return result
	def do_with_command(command: dict):
		result = ''
		result += f'If message is starts with "{command["name"]}", then this app will '
		result += generate_command_help_info(command)
		return result
	def do_with_app_command(command: dict):
		result = ''
		result += f'If message is starts with "/{command["name"]}", then this app will '
		result += generate_command_help_info(command)
		return result
	def generate_command_help_info(message: dict):
		result = ''
		if 'response' in message:
			result += f'response "{message["response"]}"'
		elif 'function' in message:
			if 'description' in message:
				result += f'{message["description"][0].lower()}{message["description"][1:]}'	
			else:
				result += f'execulate function which named "{message["function"]}"'
			if 'parameters' in message and len(message['parameters']) > 0:
				result += ', with parameter(s): '
				for param in message['parameters']:
					result += '\n\t'
					result += f'- {param["name"]}: {param["type"]}'
					if 'default' in param:
						result += ' = '
						match param['type']:
							case 'str':
								result += f'"{param["default"]}"'
							case _:
								result += f'"{param["default"]}"'
					if 'description' in param:
						result += f'\n\t  {param["description"]}'
		else:
			result += 'do nothing'
		result += '\n'
		return result
	result = ''
	if command_name is None or command_name == '':
		if 'message' in json_data:
			for message in json_data['message']:
				result += do_with_message(message)
		if 'command' in json_data:
			for command in json_data['command']:
				result += do_with_command(command)
		if 'app_command' in json_data:
			for command in json_data['app_command']:
				result += do_with_app_command(command)
	else:
		if 'message' in json_data:
			for message in json_data['message']:
				if message['content'] != command_name:
					continue
				result += do_with_message(message)
		if 'command' in json_data:
			for command in json_data['command']:
				if command['name'] != command_name:
					continue
				result += do_with_command(command)
		if 'app_command' in json_data:
			for command in json_data['app_command']:
				if command['name'] != command_name:
					continue
				result += do_with_app_command(command)
	if result is None or result == '':
		result = f'Can\'t not find "{command_name}", you can type "/help" to query all commands, or "/help [command name]" to query the command.'
	return result


async def greet(name: str):
    return f"Hello, {name}!"

async def get_kudos():
    async with HordeAPI.getUserDetails() as details:
        if "kudos" not in details:
            return f'Error: {details["code"]} {details["reason"]}'
        return f'The amount of Kudos this user has is {details["kudos"]}'


async def generate_status(id: str):
    async with HordeAPI.generateCheck(id) as details:
        if "kudos" not in details:
            return f'Check Error: {details["code"]} {details["reason"]}'
        if bool(details["is_possible"]) is False:
            return "This generation is impossible."
        if bool(details["faulted"]) is True:
            return "This generation is faulted."
        if bool(details["done"]) is True:
            async with HordeAPI.generateStatus(id) as generation_detail:
                if "generations" not in generation_detail:
                    return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}'
                for i in range(len(generation_detail["generations"])):
                    return generation_detail["generations"][i]["img"]
        if int(details["processing"]) > 0:
            total = int(details["finished"])
            + int(details["processing"])
            + int(details["queue_position"])
            + int(details["restarted"])
            + int(details["waiting"])
            return f'Processing image: {details["processing"]}/{total}'
        return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s'


@bot.event
async def on_ready():
    await tree.sync()
    await bot.tree.sync()
    print('We have logged in as {0.user}'.format(bot))


@bot.event
async def on_message(message):
    if message.author == bot.user:
        return

    if "message" in json_data:
	    for rule in json_data["message"]:
	        rule_type = rule["type"]
	        content = rule["content"]
	        
	        # 根据规则类型动态调用对应的判断函数
	        if check_functions.get(rule_type, lambda c, m: False)(content, message):
	            # 如果规则指定了函数,则调用对应的函数
	            if "function" in rule:
	                function_name = rule["function"]
	                result = eval(f"await {function_name}()")
	                await message.channel.send(result)
	            # 否则发送预定义的响应消息
	            elif "response" in rule:
	                await message.channel.send(rule["response"])

    # 确保命令系统正常工作
    await bot.process_commands(message)


async def sendMessageToChannelHelper(data):
    channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID"))
    # 创建一个 embed 对象
    mTitle = "Empty Title"
    if "id" in data:
        mTitle = data["id"]
    if "log_tag" in data:
        mTitle = data["log_tag"]
    mDescription = "Empty Description"
    if "model" in data:
        mDescription = data["model"]
    if "log_message" in data:
        mDescription = data["log_message"]
    mColor = 0x00ff00
    if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"):
        mColor = 0xff0000
    embed = discord.Embed(title=mTitle, description=mDescription, color=mColor)
    # 将 fields 数据加入 embed
    for field in data:
        if field == "img":
            embed.set_image(url=data[field])
        else:
            embed.add_field(name=field, value=data[field], inline=True)
    # 发送 embed 消息
    await channel.send(embed=embed)


def sendMessageToChannel(data):
    bot.loop.create_task(sendMessageToChannelHelper(data))


def run():
    try:
      token = json_data["token"] if "token" in json_data else (os.environ.get("TOKEN") or "")
      if token == "":
        raise Exception("Please add your token to the Secrets pane.")
      bot.run(token)
    except discord.HTTPException as e:
        if e.status == 429:
            print(
                "The Discord servers denied the connection for making too many requests"
            )
            print(
                "Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests"
            )
        else:
            raise e


def discord_bot():
    print("Running discord_bot")
    run()

if __name__ == "__main__":
	discord_bot()