HuggingMod / app.py
lunarflu's picture
lunarflu HF staff
big updates to roles; track who creates/deletes + info on permission changes
70a5666 verified
import discord
import os
import threading
import gradio as gr
import requests
import json
import random
import time
import re
from discord import Embed, Color
from discord.ext import commands
from gradio_client import Client
from PIL import Image
from ratelimiter import RateLimiter
from datetime import datetime, timedelta # for times
from pytz import timezone # for times
import asyncio # check if used
import logging
zurich_tz = timezone("Europe/Zurich")
def convert_to_timezone(dt, tz):
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S %Z")
DISCORD_TOKEN = os.environ.get("DISCORD_TOKEN", None)
intents = discord.Intents.all()
bot = commands.Bot(command_prefix='!', intents=intents, max_messages=1000000)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
#rate_limiter = RateLimiter(max_calls=10, period=60) # needs testing
message_cache = {}
# stats stuff ---------------------------------------------------------------------------------------------------------------------------------------------------------
number_of_messages = 0
user_cooldowns = {}
@bot.event
async def on_message(message):
try:
global number_of_messages
if message.author != bot.user:
lunarflu = bot.get_user(811235357663297546) #811235357663297546
cakiki = bot.get_user(416019758492680203)
"""Backup"""
number_of_messages = number_of_messages + 1
message_link = f"[#{message.channel.name}]({message.jump_url})"
msgcnt = message.content
backup_message = f"{number_of_messages} | {message_link} | {message.author.id} | {message.author}: {msgcnt}"
# check for attachments
if message.attachments:
for attachment in message.attachments:
attachment_url = attachment.url
backup_message += f"\nAttachment: {attachment_url}"
# check for embeds
if message.embeds:
for embed in message.embeds:
backup_message += f"\nEmbed Title: {embed.title}\nEmbed Description: {embed.description}"
dm_message = await lunarflu.send(backup_message)
"""Antispam"""
#Detecting certain unwanted strings
try:
forbidden_strings = ["@everyone", "@here", "discord.gg", "discord.com/invite", "discord.com", "discord-premium"]
if any(string.lower() in message.content.lower() for string in forbidden_strings):
ignored_role_ids = [897381378172264449, 897376942817419265] #admins, huggingfolks
if any(role.id in ignored_role_ids for role in message.author.roles):
if message.author != lunarflu:
return
dm_unwanted = await lunarflu.send(f" {lunarflu.mention} [experimental] SUSPICIOUS MESSAGE: {message_link} | {message.author}: {message.content}")
dm_unwanted = await cakiki.send(f" {cakiki.mention} [experimental] SUSPICIOUS MESSAGE: {message_link} | {message.author}: {message.content}")
except Exception as e:
print(f"Antispam->Detecting certain unwanted strings Error: {e}")
#Posting too fast
"""
cooldown_duration determines the time window within which the bot tracks a user's posting behavior.
This is useful for detecting "staggered" instances of spam, where 20-50 messages are sent 2-10s apart,
over timespans of typically a few minutes.
If a user hasn't posted anything for a duration longer than cooldown_duration, their record is cleared,
and they start fresh if they post again.
If a user posts within the cooldown_duration, their activity count is updated,
and their record persists until it exceeds the specified threshold or until the cooldown_duration window resets.
Increasing cooldown_duration = More robust at detecting "staggered" / "delayed" spam, but more false positives (fast chatters)
"""
# cooldown_duration = 3; false_positive_threshld = 3; -> 99% spam at spam_count of 10+ (could still be wrong, so we timeout)
cooldown_duration = 5 # messages per n seconds, was 1, now 3, could try 5
false_positive_threshold = 5 # big = alert less (catch less spam), small = alert more (catch more spam)
timeout_threshold = 10 # number of messages before issuing a timeout (similar function to ban, easier to reverse)
timeout_duration = 168 # timeout duration in hours (1 week)
if message.author.id not in user_cooldowns:
user_cooldowns[message.author.id] = {'count': 1, 'timestamp': message.created_at}
else:
if (message.created_at - user_cooldowns[message.author.id]['timestamp']).total_seconds() > cooldown_duration:
var1 = message.created_at
var2 = user_cooldowns[message.author.id]['timestamp']
print(f"seconds since last message by {message.author}: ({var1} - {var2}).seconds = {(var1 - var2).total_seconds()}")
# if we wait longer than cooldown_duration, count will reset
user_cooldowns[message.author.id] = {'count': 1, 'timestamp': message.created_at}
else:
user_cooldowns[message.author.id]['count'] += 1
spam_count = user_cooldowns[message.author.id]['count']
# tldr; if we post 2 messages with less than [cooldown_duration]seconds between them
if spam_count >= false_positive_threshold: # n in a row, helps avoid false positives for posting in threads
# warning for 5+
channel = message.channel
if spam_count == false_positive_threshold:
if channel.id != 996580741121065091: # admin channel excluded due to how automod messages are categorized by the discord bot
await channel.send(f"{message.author.mention}, you may be posting too quickly! Please slow down a bit 🤗")
var1 = message.created_at
var2 = user_cooldowns[message.author.id]['timestamp']
print(f"seconds since last message by {message.author}: {(var1 - var2).total_seconds()}")
print(f"spam_count: {spam_count}")
test_server = os.environ.get('TEST_SERVER')
if test_server == 'True':
alert = "<@&1106995261487710411>" # test @alerts role
if test_server == 'False':
alert = "<@&1108342563628404747>" # normal @alerts role
await bot.log_channel.send(
f"[EXPERIMENTAL ALERT] {message.author} may be posting too quickly! \n"
f"Spam count: {spam_count}\n"
f"Message content: {message.content}\n"
f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})\n"
f"{alert}"
)
await cakiki.send(
f"[EXPERIMENTAL ALERT] {message.author} may be posting too quickly! \n"
f"Spam count: {spam_count}\n"
f"Message content: {message.content}\n"
f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})\n"
)
# auto-ban
"""
if spam_count >= timeout_threshold:
try:
member = message.author
await member.send(
"You have been auto-banned for spamming. \n If this was an error, message <@811235357663297546> ."
)
await member.ban()
except Exception as e:
print(f"Error: {e}")
"""
user_cooldowns[message.author.id]['timestamp'] = message.created_at
await bot.process_commands(message)
except Exception as e:
print(f"on_message Error: {e}")
# moderation stuff-----------------------------------------------------------------------------------------------------------------------------------------------------
@bot.event
async def on_message_edit(before, after):
try:
if before.author == bot.user:
return
if before.content != after.content:
embed = Embed(color=Color.orange())
embed.set_author(name=f"{before.author} ID: {before.author.id}", icon_url=before.author.avatar.url if before.author.avatar else bot.user.avatar.url)
embed.title = "Message Edited"
embed.description = f"**Before:** {before.content or '*(empty message)*'}\n**After:** {after.content or '*(empty message)*'}"
embed.add_field(name="Author Username", value=before.author.name, inline=True)
embed.add_field(name="Channel", value=before.channel.mention, inline=True)
#embed.add_field(name="Message Created On", value=before.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(before.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=before.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{before.guild.id}/{before.channel.id}/{before.id})", inline=True)
if before.attachments:
attachment_urls = "\n".join([attachment.url for attachment in before.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_message_edit Error: {e}")
@bot.event
async def on_raw_message_delete(payload):
try:
message_id = payload.message_id
channel_id = payload.channel_id
message = message_cache.pop(message_id, None)
if message:
if message.author == bot.user:
return
embed = Embed(color=Color.red())
embed.set_author(name=f"{message.author} ID: {message.author.id}", icon_url=message.author.avatar.url if message.author.avatar else bot.user.avatar.url)
embed.title = "Message Deleted"
embed.description = message.content or "*(empty message)*"
embed.add_field(name="Author Username", value=message.author.name, inline=True)
embed.add_field(name="Channel", value=message.channel.mention, inline=True)
#embed.add_field(name="Message Created On", value=message.created_at.strftime("%Y-%m-%d %H:%M:%S UTC"), inline=True)
embed.add_field(name="Message Created On", value=convert_to_timezone(message.created_at, zurich_tz), inline=True)
embed.add_field(name="Message ID", value=message.id, inline=True)
embed.add_field(name="Message Jump URL", value=f"[Jump to message!](https://discord.com/channels/{message.guild.id}/{message.channel.id}/{message.id})", inline=True)
if message.attachments:
attachment_urls = "\n".join([attachment.url for attachment in message.attachments])
embed.add_field(name="Attachments", value=attachment_urls, inline=False)
#embed.set_footer(text=f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_message_delete Error: {e}")
# nickname stuff ---------------------------------------------------------------------------------------------------------------------------
@bot.event
async def on_member_update(before, after):
try:
"""
if before.name != after.name:
async for entry in before.guild.audit_logs(limit=5):
print(f'{entry.user} did {entry.action} to {entry.target}')
"""
if before.nick != after.nick:
embed = Embed(color=Color.orange())
embed.set_author(name=f"{after} ID: {after.id}", icon_url=after.avatar.url if after.avatar else bot.user.avatar.url)
embed.title = "Nickname Modified"
embed.add_field(name="Mention", value=after.mention, inline=True)
embed.add_field(name="Old", value=before.nick, inline=True)
embed.add_field(name="New", value=after.nick, inline=True)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
# roles being added/removed
before_roles = set(before.roles)
after_roles = set(after.roles)
# added
added_roles = after_roles - before_roles
for role in added_roles:
async for entry in after.guild.audit_logs(action=discord.AuditLogAction.member_role_update, limit=5):
if entry.target == after and role in entry.changes.after.roles:
moderator = entry.user
break
else:
moderator = "Unknown"
embed = Embed(color=Color.green())
embed.set_author(name=f"{after} ID: {after.id}", icon_url=after.avatar.url if after.avatar else bot.user.avatar.url)
embed.title = "Role Added"
embed.add_field(name="User", value=after.mention, inline=True)
embed.add_field(name="Role", value=f"{role.name} ({role.mention})", inline=True)
embed.add_field(name="Added By", value=moderator.mention if isinstance(moderator, discord.Member) else "Unknown", inline=True)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
# removed
removed_roles = before_roles - after_roles
for role in removed_roles:
async for entry in after.guild.audit_logs(action=discord.AuditLogAction.member_role_update, limit=5):
if entry.target == after and role in entry.changes.before.roles:
moderator = entry.user
break
else:
moderator = "Unknown"
embed = Embed(color=Color.red())
embed.set_author(name=f"{after} ID: {after.id}", icon_url=after.avatar.url if after.avatar else bot.user.avatar.url)
embed.title = "Role Removed"
embed.add_field(name="User", value=after.mention, inline=True)
embed.add_field(name="Role", value=f"{role.name} ({role.mention})", inline=True)
embed.add_field(name="Removed By", value=moderator.mention if isinstance(moderator, discord.Member) else "Unknown", inline=True)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_member_update Error: {e}")
@bot.event
async def on_member_ban(guild, banned_user):
try:
await asyncio.sleep(1)
entry1 = await guild.fetch_ban(banned_user)
ban_reason = entry1.reason
print(f"ban_reason: {ban_reason}")
async for entry2 in guild.audit_logs(action=discord.AuditLogAction.ban, limit=1):
if ban_reason:
print(f'{entry2.user} banned {entry2.target} for {ban_reason}')
else:
print(f'{entry2.user} banned {entry2.target} (no reason specified)')
content = "<@&1108342563628404747>" # @alerts role
embed = Embed(color=Color.red())
embed.set_author(name=f"{entry2.target} ID: {entry2.target.id}", icon_url=entry2.target.avatar.url if entry2.target.avatar else bot.user.avatar.url)
embed.title = "User Banned"
embed.add_field(name="User", value=entry2.target.mention, inline=True)
#nickname = entry2.target.nick if entry2.target.nick else "None"
#embed.add_field(name="Nickname", value=nicknmae, inline=True)
#embed.add_field(name="Account Created At", value=entry2.target.created_at, inline=True)
embed.add_field(name="Moderator", value=entry2.user.mention, inline=True)
embed.add_field(name="Nickname", value=entry2.user.nick, inline=True)
embed.add_field(name="Reason", value=ban_reason, inline=False)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
#user = bot.get_user(811235357663297546)
await bot.log_channel.send(content=content, embed=embed)
try:
dm_message = await banned_user.send(f"You've been banned from the Hugging Face Discord. To appeal, reach out to <@811235357663297546> via DM")
except Exception as e:
print(f"Could not send DM to banned user: {e}")
except Exception as e:
print(f"on_member_ban Error: {e}")
@bot.event
async def on_member_unban(guild, unbanned_user):
try:
await asyncio.sleep(5)
async for entry in guild.audit_logs(action=discord.AuditLogAction.unban, limit=1):
if unbanned_user == entry.target: # verify that unbanned user is in audit log
moderator = entry.user
created_and_age = f"{unbanned_user.created_at}"
content = "<@&1108342563628404747>" # @alerts role
embed = Embed(color=Color.red())
embed.set_author(name=f"{unbanned_user} ID: {unbanned_user.id}", icon_url=unbanned_user.avatar.url if unbanned_user.avatar else bot.user.avatar.url)
embed.title = "User Unbanned"
embed.add_field(name="User", value=unbanned_user.mention, inline=True)
embed.add_field(name="Account Created At", value=created_and_age, inline=True)
embed.add_field(name="Moderator", value=moderator.mention, inline=True)
embed.add_field(name="Nickname", value=moderator.nick, inline=True)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
#user = bot.get_user(811235357663297546)
#dm_message = await user.send(content=content, embed=embed)
await bot.log_channel.send(content=content, embed=embed)
except Exception as e:
print(f"on_member_unban Error: {e}")
# admin stuff-----------------------------------------------------------------------------------------------------------------------
@bot.event
async def on_member_join(member):
try:
await asyncio.sleep(5)
# initialize lvl1 on join
guild = bot.get_guild(879548962464493619)
lvl1 = guild.get_role(1171861537699397733)
member2 = guild.get_member(member.id)
await member2.add_roles(lvl1)
embed = Embed(color=Color.blue())
avatar_url = member.avatar.url if member.avatar else bot.user.avatar.url
embed.set_author(name=f"{member} ID: {member.id}", icon_url=avatar_url)
embed.title = "User Joined"
embed.add_field(name="Mention", value=member.mention, inline=True)
embed.add_field(name="Nickname", value=member.nick, inline=True)
embed.add_field(name="Account Created At", value=member.created_at, inline=True)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_member_join Error: {e}")
@bot.event
async def on_member_remove(member):
try:
embed = Embed(color=Color.blue())
embed.set_author(name=f"{member} ID: {member.id}", icon_url=member.avatar.url if member.avatar else bot.user.avatar.url)
embed.title = "User Left"
embed.add_field(name="Mention", value=member.mention, inline=True)
embed.add_field(name="Nickname", value=member.nick, inline=True)
embed.add_field(name="Account Created At", value=member.created_at, inline=True)
embed.set_footer(text=f"{convert_to_timezone(datetime.utcnow(), zurich_tz)}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_member_remove Error: {e}")
@bot.event
async def on_guild_channel_create(channel):
try:
# creating channels
embed = Embed(description=f'Channel {channel.mention} was created', color=Color.green())
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_guild_channel_create Error: {e}")
@bot.event
async def on_guild_channel_delete(channel):
try:
# deleting channels, should ping @alerts for this
embed = Embed(description=f'Channel {channel.name} ({channel.mention}) was deleted', color=Color.red())
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_guild_channel_delete Error: {e}")
@bot.event
async def on_guild_role_create(role):
try:
# creating roles
async for entry in role.guild.audit_logs(action=discord.AuditLogAction.role_create, limit=5):
if entry.target.id == role.id:
creator = entry.user
break
else:
creator = None
embed = Embed(description=f'Role {role.mention} was created', color=Color.green())
embed.add_field(name="Role Name", value=role.name, inline=True)
embed.add_field(name="Created By", value=creator.mention if creator else "Unknown", inline=True)
embed.set_footer(text=f"Role ID: {role.id}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_guild_role_create Error: {e}")
@bot.event
async def on_guild_role_delete(role):
try:
# deleting roles, should ping @alerts for this
async for entry in role.guild.audit_logs(action=discord.AuditLogAction.role_delete, limit=5):
if entry.target.id == role.id:
deleter = entry.user
break
else:
deleter = None
embed = Embed(description=f'Role {role.name} ({role.mention}) was deleted', color=Color.red())
embed.add_field(name="Deleted By", value=deleter.mention if deleter else "Unknown", inline=True)
embed.set_footer(text=f"Role ID: {role.id}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_guild_role_delete Error: {e}")
@bot.event
async def on_guild_role_update(before, after):
try:
# editing roles, could expand this
if before.name != after.name:
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.permissions.administrator != after.permissions.administrator:
# changes involving the administrator permission / sensitive permissions (can help to prevent mistakes)
content = "<@&1108342563628404747>" # @alerts role
embed = Embed(description=f'Role {after.mention} had its administrator permission {"enabled" if after.permissions.administrator else "disabled"}', color=Color.red())
await bot.log_channel.send(content=content, embed=embed)
except Exception as e:
print(f"on_guild_role_update Error: {e}")
@bot.event
async def on_guild_role_update(before, after):
try:
# Track name changes
if before.name != after.name:
async for entry in after.guild.audit_logs(action=discord.AuditLogAction.role_update, limit=5):
if entry.target.id == after.id and 'name' in entry.changes:
changer = entry.user
break
else:
changer = None
embed = Embed(description=f'Role {before.mention} was renamed to {after.name}', color=Color.orange())
embed.add_field(name="Changed By", value=changer.mention if changer else "Unknown", inline=True)
await bot.log_channel.send(embed=embed)
if before.permissions != after.permissions:
# Find the user who changed the permissions
async for entry in after.guild.audit_logs(action=discord.AuditLogAction.role_update, limit=5):
if entry.target.id == after.id:
changer = entry.user
break
else:
changer = None
# what changed?
changed_permissions = []
for perm, value in after.permissions:
if getattr(before.permissions, perm) != value:
change_status = "enabled" if value else "disabled"
changed_permissions.append(f"{perm.replace('_', ' ').title()}: {change_status}")
embed = Embed(color=Color.red() if "administrator" in changed_permissions else Color.orange())
embed.set_author(name=f"{after.name} Role Updated", icon_url=after.guild.icon.url if after.guild.icon else "")
embed.add_field(name="Changed By", value=changer.mention if changer else "Unknown", inline=True)
embed.add_field(name="Changes", value="\n".join(changed_permissions) if changed_permissions else "No permissions changed", inline=False)
embed.set_footer(text=f"Role ID: {after.id}")
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_guild_role_update Error: {e}")
@bot.event
async def on_voice_state_update(member, before, after):
try:
if before.mute != after.mute:
# muting members
embed = Embed(description=f'{member} was {"muted" if after.mute else "unmuted"} in voice chat', color=Color.orange())
await bot.log_channel.send(embed=embed)
if before.deaf != after.deaf:
# deafening members
embed = Embed(description=f'{member} was {"deafened" if after.deaf else "undeafened"} in voice chat', color=Color.orange())
await bot.log_channel.send(embed=embed)
except Exception as e:
print(f"on_voice_state_update Error: {e}")
# github test stuff -------------------------------------------------------------------------------------------------------------------
"""
async def check_github():
url = f'https://api.github.com/repos/{github_repo}/pulls'
response = requests.get(url)
pulls = response.json()
for pull in pulls:
# Check if the pull request was just opened
if pull['state'] == 'open' and pull['created_at'] == pull['updated_at']:
channel = client.get_channel(channel_id)
if channel:
await channel.send(f'New PR opened: {pull["title"]}')
"""
# bot stuff ---------------------------------------------------------------------------------------------------------------------------
@bot.event
async def on_ready():
await asyncio.sleep(5)
print('Logged on as', bot.user)
await asyncio.sleep(5)
bot.log_channel = bot.get_channel(1036960509586587689) # admin-logs
await asyncio.sleep(5)
print(bot.log_channel)
guild = bot.get_guild(879548962464493619)
for channel in guild.text_channels: # helps with more accurate logging across restarts
try:
message_cache.update({m.id: m async for m in channel.history(limit=10000)})
print(f"Finished caching messages for channel: {channel.name}")
except Exception as e:
print(f"An error occurred while fetching messages from {channel.name}: {e}")
await asyncio.sleep(0.1)
def run_bot():
bot.run(DISCORD_TOKEN)
threading.Thread(target=run_bot).start()
with gr.Blocks() as demo:
gr.Markdown(
r"""
# Client for the [HuggingFace Discord](https://hf.co/join/discord) bot
All code for this bot is under the [app.py](https://huggingface.co/spaces/discord-community/HuggingMod/blob/main/app.py) file.
""")
demo.launch()