Vesperal / app.py
bhagatsuryainatom's picture
Update app.py
4a06617 verified
import gradio as gr
import discord
from discord.ext import commands, tasks
from collections import defaultdict
import json
import os
from dotenv import load_dotenv
import os
from huggingface_hub import HfApi, HfFolder
from datetime import datetime, timedelta
import pytz
HfFolder.save_token(os.getenv("HUGGING_FACE_TOKEN")) # Save the token for authentication
# Calculate the sleep time until the next midnight in IST
ist_timezone = pytz.timezone('Asia/Kolkata')
current_time = datetime.now(ist_timezone)
next_midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
sleep_time = int((next_midnight - current_time).total_seconds()) # Convert to integer
api = HfApi()
api.set_space_sleep_time(repo_id="bhagatsuryainatom/Vesperal", sleep_time=sleep_time)
load_dotenv()
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
intents.voice_states = True
intents.reactions = True
bot = commands.Bot(command_prefix='!', intents=intents)
# Place your bot's token here
TOKEN = os.getenv('TOKEN')
YOUR_CHANNEL_ID= os.getenv('YOUR_CHANNEL_ID')
# Initialize the sessions and pending_messages dictionaries
sessions = {}
pending_messages = {}
class Session:
def __init__(self):
self.camera_required = False
self.time_without_video = defaultdict(int)
self.warnings_given = defaultdict(int)
self.exemptions = set()
self.instructions_sent = False # Flag to track if instructions have been sent
def to_dict(self):
# Convert sets of Member IDs to lists for JSON serialization
return {
"camera_required": self.camera_required,
"time_without_video": {str(member_id): time for member_id, time in self.time_without_video.items()},
"warnings_given": {str(member_id): warnings for member_id, warnings in self.warnings_given.items()},
"exemptions": list(self.exemptions) # Assuming this already stores member IDs
}
@classmethod
def from_dict(cls, data):
session = cls()
session.camera_required = data.get("camera_required", False)
session.time_without_video = defaultdict(int, data.get("time_without_video", {}))
session.warnings_given = defaultdict(int, data.get("warnings_given", {}))
session.exemptions = set(data.get("exemptions", []))
session.instructions_sent = data.get("instructions_sent", False)
return session
def save_sessions():
with open('sessions.json', 'w') as f:
json.dump({str(vc_id): session.to_dict() for vc_id, session in sessions.items()}, f, ensure_ascii=False, indent=4)
def correct_and_load_json(path):
try:
with open(path, 'r') as file:
return json.load(file)
except json.JSONDecodeError:
print("JSON format error detected. Attempting to correct.")
with open(path, 'r') as file:
file_content = file.read()
corrected_content = file_content.replace("'", '"') # Replace single quotes with double quotes
# Implement other corrections here as needed
with open(path, 'w') as file:
file.write(corrected_content)
with open(path, 'r') as file:
return json.load(file) # Attempt to load corrected content
def load_sessions():
if os.path.exists('sessions.json') and os.path.getsize('sessions.json') > 0:
try:
session_data = correct_and_load_json('sessions.json')
for vc_id, data in session_data.items():
sessions[int(vc_id)] = Session.from_dict(data)
except json.JSONDecodeError:
print("Failed to correct JSON file. Please check the file format manually.")
else:
print("Session file not found or is empty, starting fresh.")
@bot.event
async def on_ready():
load_sessions()
print(f'Logged in as {bot.user.name}')
check_camera_status.start()
# Ensure save_sessions_task starts properly within the on_ready event
if not save_sessions_task.is_running():
save_sessions_task.start()
@bot.command()
async def test(ctx):
await ctx.send("Test successful!")
@tasks.loop(seconds=30)
async def check_camera_status():
for vc_id, session in sessions.items():
if not session.camera_required:
continue
vc = bot.get_channel(vc_id)
if vc is None:
continue
for member in vc.members:
if member.id in session.exemptions:
continue
if not member.voice.self_video:
session.time_without_video[member] += 30
if session.time_without_video[member] == 30:
session.warnings_given[member] = 1
await member.send("Warning 1: Please turn on your camera within the next 30 seconds.")
elif session.time_without_video[member] >= 60:
session.warnings_given[member] = 2
await member.send("Final Warning: You are being removed for not turning on your camera.")
await member.move_to(None)
# Reset the member's session data
session.time_without_video.pop(member, None)
session.warnings_given.pop(member, None)
else:
# Reset the member's session data if their camera is on
session.time_without_video.pop(member, None)
session.warnings_given.pop(member, None)
@tasks.loop(minutes=5) # Save sessions every 5 minutes
async def save_sessions_task():
save_sessions()
print("Sessions saved.")
@save_sessions_task.before_loop
async def before_save_sessions():
await bot.wait_until_ready()
@bot.event
async def on_voice_state_update(member, before, after):
if member.bot:
return
if after.channel and (not before.channel or before.channel.id != after.channel.id):
session = sessions.get(after.channel.id)
if session is None:
session = Session()
sessions[after.channel.id] = session
if any(role.name == "Admin" for role in member.roles):
other_admins_in_channel = [m for m in after.channel.members if m != member and any(role.name == "Admin" for role in m.roles)]
if not other_admins_in_channel:
# Replace "YOUR_CHANNEL_ID" with your actual general channel ID
general_channel_id = YOUR_CHANNEL_ID
general_channel = member.guild.get_channel(int(general_channel_id))
if not general_channel:
print(f"Couldn't find the channel with ID: {general_channel_id}.")
return
message_content = f"Admin {member.mention}, do you require cameras to be ON for this session in {after.channel.name}? React with πŸ“· for YES or ❌ for NO."
try:
message = await general_channel.send(message_content)
pending_messages[message.id] = after.channel.id
await message.add_reaction("πŸ“·")
await message.add_reaction("❌")
except discord.DiscordException as e:
print(f"Failed to send message or add reactions due to: {e}")
@bot.command()
@commands.has_permissions(manage_channels=True)
async def setperms(ctx, channel: discord.TextChannel = None):
channel = channel or ctx.channel
if not channel.permissions_for(ctx.guild.me).manage_channels:
await ctx.send("I don't have permission to manage channels here.")
return
bot_member = ctx.guild.get_member(bot.user.id)
overwrites = {
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False),
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True, manage_messages=True, add_reactions=True)
}
try:
await channel.set_permissions(ctx.guild.default_role, read_messages=False)
await channel.set_permissions(bot_member, **overwrites)
await ctx.send(f"Permissions set for {channel.mention} successfully!")
except Exception as e:
await ctx.send(f"Failed to set permissions for {channel.mention} due to: {e}")
@bot.event
async def on_reaction_add(reaction, user):
await process_reaction(reaction, user)
@bot.event
async def on_reaction_remove(reaction, user):
await process_reaction(reaction, user)
async def process_reaction(reaction, user):
if reaction.message.id in pending_messages and not user.bot:
channel_id = pending_messages[reaction.message.id]
session = sessions.get(channel_id)
if session and any(role.name == "Admin" for role in user.roles):
if str(reaction.emoji) == "πŸ“·":
session.camera_required = True
session.exemptions.clear()
elif str(reaction.emoji) == "❌":
session.camera_required = False
session.exemptions.clear()
# Edit the message to reflect the updated session status
await reaction.message.edit(content=f"Camera requirement for {reaction.message.channel.guild.get_channel(channel_id).name} has been {'UPDATED to ON' if session.camera_required else 'UPDATED to OFF'}.")
# Send instructions only if they haven't been sent before
if not session.instructions_sent:
follow_up_message = (
"**Next Steps for Admins:**\n"
"- Use `#camoff @user` to exempt any specific user from the camera requirement.\n"
"- Use `#camoff` to exempt yourself if actively participating in the session.\n"
"- Use `#end` to conclude the session and reset all settings.\n"
"**Note:** These commands work within the context of the current voice session."
)
await reaction.message.channel.send(follow_up_message)
session.instructions_sent = True # Update the flag to prevent re-sending
# Cleanup
del pending_messages[reaction.message.id]
def dummy_function(input_text):
return "This is a dummy function for the Gradio UI."
iface = gr.Interface(fn=dummy_function,
inputs=gr.Textbox(label="Input"),
outputs=gr.Textbox(label="Output"),
title="Discord Bot",
description="This Space runs a Discord bot in the background.")
@bot.event
async def on_message(message):
# Skip bot messages and non-admin users
if message.author.bot or not any(role.name == "Admin" for role in message.author.roles):
await bot.process_commands(message) # Ensure other commands are still processed
return
# Process commands only if the author is in a voice channel
if message.author.voice:
vc_id = message.author.voice.channel.id
session = sessions.get(vc_id)
if not session:
await message.channel.send("No active session found for this voice channel.")
await bot.process_commands(message)
return
# Admin commands
if "#camoff" in message.content:
# If no users are mentioned, exempt the author; otherwise, exempt mentioned users
users_to_exempt = message.mentions if message.mentions else [message.author]
for user in users_to_exempt:
session.exemptions.add(user.id)
await message.channel.send(f"{user.display_name} is now exempt from the camera requirement.")
elif "#end" in message.content:
# Clear exemptions and reset camera requirement
session.exemptions.clear()
session.camera_required = False
await message.channel.send("Session ended: Camera requirements and exemptions have been reset.")
await bot.process_commands(message) # Ensure this is at the end to process other commands
# Process other commands
await bot.process_commands(message)
@check_camera_status.before_loop
async def before_camera_check():
await bot.wait_until_ready()
bot.run(TOKEN)
iface.launch(inline=False, share=True)