Spaces:
Runtime error
Runtime error
import gradio as gr | |
import discord | |
from discord.ext import commands, tasks | |
from collections import defaultdict | |
import json | |
import os | |
from dotenv import load_dotenv | |
import os | |
from huggingface_hub import HfApi, HfFolder | |
from datetime import datetime, timedelta | |
import pytz | |
HfFolder.save_token(os.getenv("HUGGING_FACE_TOKEN")) # Save the token for authentication | |
# Calculate the sleep time until the next midnight in IST | |
ist_timezone = pytz.timezone('Asia/Kolkata') | |
current_time = datetime.now(ist_timezone) | |
next_midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
sleep_time = int((next_midnight - current_time).total_seconds()) # Convert to integer | |
api = HfApi() | |
api.set_space_sleep_time(repo_id="bhagatsuryainatom/Vesperal", sleep_time=sleep_time) | |
load_dotenv() | |
intents = discord.Intents.default() | |
intents.messages = True | |
intents.message_content = True | |
intents.voice_states = True | |
intents.reactions = True | |
bot = commands.Bot(command_prefix='!', intents=intents) | |
# Place your bot's token here | |
TOKEN = os.getenv('TOKEN') | |
YOUR_CHANNEL_ID= os.getenv('YOUR_CHANNEL_ID') | |
# Initialize the sessions and pending_messages dictionaries | |
sessions = {} | |
pending_messages = {} | |
class Session: | |
def __init__(self): | |
self.camera_required = False | |
self.time_without_video = defaultdict(int) | |
self.warnings_given = defaultdict(int) | |
self.exemptions = set() | |
self.instructions_sent = False # Flag to track if instructions have been sent | |
def to_dict(self): | |
# Convert sets of Member IDs to lists for JSON serialization | |
return { | |
"camera_required": self.camera_required, | |
"time_without_video": {str(member_id): time for member_id, time in self.time_without_video.items()}, | |
"warnings_given": {str(member_id): warnings for member_id, warnings in self.warnings_given.items()}, | |
"exemptions": list(self.exemptions) # Assuming this already stores member IDs | |
} | |
def from_dict(cls, data): | |
session = cls() | |
session.camera_required = data.get("camera_required", False) | |
session.time_without_video = defaultdict(int, data.get("time_without_video", {})) | |
session.warnings_given = defaultdict(int, data.get("warnings_given", {})) | |
session.exemptions = set(data.get("exemptions", [])) | |
session.instructions_sent = data.get("instructions_sent", False) | |
return session | |
def save_sessions(): | |
with open('sessions.json', 'w') as f: | |
json.dump({str(vc_id): session.to_dict() for vc_id, session in sessions.items()}, f, ensure_ascii=False, indent=4) | |
def correct_and_load_json(path): | |
try: | |
with open(path, 'r') as file: | |
return json.load(file) | |
except json.JSONDecodeError: | |
print("JSON format error detected. Attempting to correct.") | |
with open(path, 'r') as file: | |
file_content = file.read() | |
corrected_content = file_content.replace("'", '"') # Replace single quotes with double quotes | |
# Implement other corrections here as needed | |
with open(path, 'w') as file: | |
file.write(corrected_content) | |
with open(path, 'r') as file: | |
return json.load(file) # Attempt to load corrected content | |
def load_sessions(): | |
if os.path.exists('sessions.json') and os.path.getsize('sessions.json') > 0: | |
try: | |
session_data = correct_and_load_json('sessions.json') | |
for vc_id, data in session_data.items(): | |
sessions[int(vc_id)] = Session.from_dict(data) | |
except json.JSONDecodeError: | |
print("Failed to correct JSON file. Please check the file format manually.") | |
else: | |
print("Session file not found or is empty, starting fresh.") | |
async def on_ready(): | |
load_sessions() | |
print(f'Logged in as {bot.user.name}') | |
check_camera_status.start() | |
# Ensure save_sessions_task starts properly within the on_ready event | |
if not save_sessions_task.is_running(): | |
save_sessions_task.start() | |
async def test(ctx): | |
await ctx.send("Test successful!") | |
async def check_camera_status(): | |
for vc_id, session in sessions.items(): | |
if not session.camera_required: | |
continue | |
vc = bot.get_channel(vc_id) | |
if vc is None: | |
continue | |
for member in vc.members: | |
if member.id in session.exemptions: | |
continue | |
if not member.voice.self_video: | |
session.time_without_video[member] += 30 | |
if session.time_without_video[member] == 30: | |
session.warnings_given[member] = 1 | |
await member.send("Warning 1: Please turn on your camera within the next 30 seconds.") | |
elif session.time_without_video[member] >= 60: | |
session.warnings_given[member] = 2 | |
await member.send("Final Warning: You are being removed for not turning on your camera.") | |
await member.move_to(None) | |
# Reset the member's session data | |
session.time_without_video.pop(member, None) | |
session.warnings_given.pop(member, None) | |
else: | |
# Reset the member's session data if their camera is on | |
session.time_without_video.pop(member, None) | |
session.warnings_given.pop(member, None) | |
# Save sessions every 5 minutes | |
async def save_sessions_task(): | |
save_sessions() | |
print("Sessions saved.") | |
async def before_save_sessions(): | |
await bot.wait_until_ready() | |
async def on_voice_state_update(member, before, after): | |
if member.bot: | |
return | |
if after.channel and (not before.channel or before.channel.id != after.channel.id): | |
session = sessions.get(after.channel.id) | |
if session is None: | |
session = Session() | |
sessions[after.channel.id] = session | |
if any(role.name == "Admin" for role in member.roles): | |
other_admins_in_channel = [m for m in after.channel.members if m != member and any(role.name == "Admin" for role in m.roles)] | |
if not other_admins_in_channel: | |
# Replace "YOUR_CHANNEL_ID" with your actual general channel ID | |
general_channel_id = YOUR_CHANNEL_ID | |
general_channel = member.guild.get_channel(int(general_channel_id)) | |
if not general_channel: | |
print(f"Couldn't find the channel with ID: {general_channel_id}.") | |
return | |
message_content = f"Admin {member.mention}, do you require cameras to be ON for this session in {after.channel.name}? React with π· for YES or β for NO." | |
try: | |
message = await general_channel.send(message_content) | |
pending_messages[message.id] = after.channel.id | |
await message.add_reaction("π·") | |
await message.add_reaction("β") | |
except discord.DiscordException as e: | |
print(f"Failed to send message or add reactions due to: {e}") | |
async def setperms(ctx, channel: discord.TextChannel = None): | |
channel = channel or ctx.channel | |
if not channel.permissions_for(ctx.guild.me).manage_channels: | |
await ctx.send("I don't have permission to manage channels here.") | |
return | |
bot_member = ctx.guild.get_member(bot.user.id) | |
overwrites = { | |
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False), | |
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True, manage_messages=True, add_reactions=True) | |
} | |
try: | |
await channel.set_permissions(ctx.guild.default_role, read_messages=False) | |
await channel.set_permissions(bot_member, **overwrites) | |
await ctx.send(f"Permissions set for {channel.mention} successfully!") | |
except Exception as e: | |
await ctx.send(f"Failed to set permissions for {channel.mention} due to: {e}") | |
async def on_reaction_add(reaction, user): | |
await process_reaction(reaction, user) | |
async def on_reaction_remove(reaction, user): | |
await process_reaction(reaction, user) | |
async def process_reaction(reaction, user): | |
if reaction.message.id in pending_messages and not user.bot: | |
channel_id = pending_messages[reaction.message.id] | |
session = sessions.get(channel_id) | |
if session and any(role.name == "Admin" for role in user.roles): | |
if str(reaction.emoji) == "π·": | |
session.camera_required = True | |
session.exemptions.clear() | |
elif str(reaction.emoji) == "β": | |
session.camera_required = False | |
session.exemptions.clear() | |
# Edit the message to reflect the updated session status | |
await reaction.message.edit(content=f"Camera requirement for {reaction.message.channel.guild.get_channel(channel_id).name} has been {'UPDATED to ON' if session.camera_required else 'UPDATED to OFF'}.") | |
# Send instructions only if they haven't been sent before | |
if not session.instructions_sent: | |
follow_up_message = ( | |
"**Next Steps for Admins:**\n" | |
"- Use `#camoff @user` to exempt any specific user from the camera requirement.\n" | |
"- Use `#camoff` to exempt yourself if actively participating in the session.\n" | |
"- Use `#end` to conclude the session and reset all settings.\n" | |
"**Note:** These commands work within the context of the current voice session." | |
) | |
await reaction.message.channel.send(follow_up_message) | |
session.instructions_sent = True # Update the flag to prevent re-sending | |
# Cleanup | |
del pending_messages[reaction.message.id] | |
def dummy_function(input_text): | |
return "This is a dummy function for the Gradio UI." | |
iface = gr.Interface(fn=dummy_function, | |
inputs=gr.Textbox(label="Input"), | |
outputs=gr.Textbox(label="Output"), | |
title="Discord Bot", | |
description="This Space runs a Discord bot in the background.") | |
async def on_message(message): | |
# Skip bot messages and non-admin users | |
if message.author.bot or not any(role.name == "Admin" for role in message.author.roles): | |
await bot.process_commands(message) # Ensure other commands are still processed | |
return | |
# Process commands only if the author is in a voice channel | |
if message.author.voice: | |
vc_id = message.author.voice.channel.id | |
session = sessions.get(vc_id) | |
if not session: | |
await message.channel.send("No active session found for this voice channel.") | |
await bot.process_commands(message) | |
return | |
# Admin commands | |
if "#camoff" in message.content: | |
# If no users are mentioned, exempt the author; otherwise, exempt mentioned users | |
users_to_exempt = message.mentions if message.mentions else [message.author] | |
for user in users_to_exempt: | |
session.exemptions.add(user.id) | |
await message.channel.send(f"{user.display_name} is now exempt from the camera requirement.") | |
elif "#end" in message.content: | |
# Clear exemptions and reset camera requirement | |
session.exemptions.clear() | |
session.camera_required = False | |
await message.channel.send("Session ended: Camera requirements and exemptions have been reset.") | |
await bot.process_commands(message) # Ensure this is at the end to process other commands | |
# Process other commands | |
await bot.process_commands(message) | |
async def before_camera_check(): | |
await bot.wait_until_ready() | |
bot.run(TOKEN) | |
iface.launch(inline=False, share=True) | |