from pyrogram.errors import UserNotParticipant from pyrogram.enums.parse_mode import ParseMode from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message from FileStream.utils.translation import LANG from FileStream.utils.database import Database from FileStream.utils.human_readable import humanbytes from FileStream.config import Telegram, Server from FileStream.bot import FileStream db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) async def is_user_joined(bot, message: Message): try: user = await bot.get_chat_member(Telegram.UPDATES_CHANNEL, message.chat.id) if user.status == "BANNED": await message.reply_text( text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True ) return False except UserNotParticipant: await message.reply_text( text = "Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐", reply_markup=InlineKeyboardMarkup( [[ InlineKeyboardButton("Jᴏɪɴ ɴᴏᴡ 🔓", url=f"https://t.me/{Telegram.UPDATES_CHANNEL}") ]] ), parse_mode=ParseMode.HTML ) return False except Exception: await message.reply_text( text = f"Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ [ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]", parse_mode=ParseMode.HTML, disable_web_page_preview=True) return False return True #---------------------[ PRIVATE GEN LINK + CALLBACK ]---------------------# async def gen_link(_id): file_info = await db.get_file(_id) file_name = file_info['file_name'] file_size = humanbytes(file_info['file_size']) mime_type = file_info['mime_type'] page_link = f"{Server.URL}watch/{_id}" stream_link = f"{Server.URL}dl/{_id}" file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" if "video" in mime_type: stream_text = LANG.STREAM_TEXT.format(file_name, file_size, stream_link, page_link, file_link) reply_markup = InlineKeyboardMarkup( [ [InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelpvt_{_id}")], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ] ) else: stream_text = LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link, file_link) reply_markup = InlineKeyboardMarkup( [ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [InlineKeyboardButton("ɢᴇᴛ ғɪʟᴇ", url=file_link), InlineKeyboardButton("ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelpvt_{_id}")], [InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")] ] ) return reply_markup, stream_text #---------------------[ GEN STREAM LINKS FOR CHANNEL ]---------------------# async def gen_linkx(m:Message , _id, name: list): file_info = await db.get_file(_id) file_name = file_info['file_name'] mime_type = file_info['mime_type'] file_size = humanbytes(file_info['file_size']) page_link = f"{Server.URL}watch/{_id}" stream_link = f"{Server.URL}dl/{_id}" file_link = f"https://t.me/{FileStream.username}?start=file_{_id}" if "video" in mime_type: stream_text= LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link, page_link) reply_markup = InlineKeyboardMarkup( [ [InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)] ] ) else: stream_text= LANG.STREAM_TEXT_X.format(file_name, file_size, stream_link, file_link) reply_markup = InlineKeyboardMarkup( [ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)] ] ) return reply_markup, stream_text #---------------------[ USER BANNED ]---------------------# async def is_user_banned(message): if await db.is_user_banned(message.from_user.id): await message.reply_text( text=LANG.BAN_TEXT.format(Telegram.OWNER_ID), parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True ) return True return False #---------------------[ CHANNEL BANNED ]---------------------# async def is_channel_banned(bot, message): if await db.is_user_banned(message.chat.id): await bot.edit_message_reply_markup( chat_id=message.chat.id, message_id=message.id, reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton(f"ᴄʜᴀɴɴᴇʟ ɪs ʙᴀɴɴᴇᴅ", callback_data="N/A")]]) ) return True return False #---------------------[ USER AUTH ]---------------------# async def is_user_authorized(message): if hasattr(Telegram, 'AUTH_USERS') and Telegram.AUTH_USERS: user_id = message.from_user.id if user_id == Telegram.OWNER_ID: return True if not (user_id in Telegram.AUTH_USERS): await message.reply_text( text="You are not authorized to use this bot.", parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True ) return False return True #---------------------[ USER EXIST ]---------------------# async def is_user_exist(bot, message): if not bool(await db.get_user(message.from_user.id)): await db.add_user(message.from_user.id) await bot.send_message( Telegram.LOG_CHANNEL, f"**#NᴇᴡUsᴇʀ**\n**⬩ ᴜsᴇʀ ɴᴀᴍᴇ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**⬩ ᴜsᴇʀ ɪᴅ :** `{message.from_user.id}`" ) async def is_channel_exist(bot, message): if not bool(await db.get_user(message.chat.id)): await db.add_user(message.chat.id) members = await bot.get_chat_members_count(message.chat.id) await bot.send_message( Telegram.LOG_CHANNEL, f"**#NᴇᴡCʜᴀɴɴᴇʟ** \n**⬩ ᴄʜᴀᴛ ɴᴀᴍᴇ :** `{message.chat.title}`\n**⬩ ᴄʜᴀᴛ ɪᴅ :** `{message.chat.id}`\n**⬩ ᴛᴏᴛᴀʟ ᴍᴇᴍʙᴇʀs :** `{members}`" ) async def verify_user(bot, message): if not await is_user_authorized(message): return False if await is_user_banned(message): return False await is_user_exist(bot, message) if Telegram.FORCE_UPDATES_CHANNEL: if not await is_user_joined(bot, message): return False return True