import os import time import json import asyncio import io import os import re import logging import string import random from pyrogram import * from pyrogram.types import * from pyrogram.errors import * from database import db from logger import LOGS from pyrogram import Client, filters from pyrogram.enums import ChatMemberStatus, ChatMembersFilter, ChatType from pyrogram.types import ( CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton, Message ) from PIL import Image, ImageDraw, ImageFont, ImageFilter logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) user_list = [] captcha_texts = {} async def remove_captcha_after_timeout(client, user_id, delay=300): await asyncio.sleep(delay) if user_id in captcha_texts: del captcha_texts[user_id] await client.send_message(user_id, "ā° Your CAPTCHA verification has expired. Please try again.") logger.info(f"CAPTCHA for user {user_id} has expired and been removed.") def generate_captcha_multiple_choice(): letters = string.ascii_uppercase + string.digits captcha_text = ''.join(random.choice(letters) for _ in range(5)) choices = [captcha_text] for _ in range(2): wrong_choice = ''.join(random.choice(letters) for _ in range(5)) while wrong_choice in choices: wrong_choice = ''.join(random.choice(letters) for _ in range(5)) choices.append(wrong_choice) random.shuffle(choices) width, height = 200, 80 background_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) # Warna pastel img = Image.new('RGB', (width, height), color=background_color) d = ImageDraw.Draw(img) for _ in range(500): x = random.randint(0, width) y = random.randint(0, height) noise_color = (random.randint(150, 200), random.randint(150, 200), random.randint(150, 200)) d.point((x, y), fill=noise_color) try: font = ImageFont.truetype("arial.ttf", 45) except IOError: font = ImageFont.load_default() text_width, text_height = d.textsize(captcha_text, font=font) text_x = (width - text_width) / 2 text_y = (height - text_height) / 2 text_image = Image.new('RGBA', (text_width, text_height), (255, 255, 255, 0)) text_draw = ImageDraw.Draw(text_image) text_draw.text((0, 0), captcha_text, font=font, fill=(0, 0, 0)) rotated_text = text_image.rotate(random.randint(-25, 25), expand=1) img.paste(rotated_text, (int(text_x), int(text_y)), rotated_text) for _ in range(5): start = (random.randint(0, width), random.randint(0, height)) end = (random.randint(0, width), random.randint(0, height)) line_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) d.line([start, end], fill=line_color, width=2) img = img.filter(ImageFilter.BLUR) img_path = f"captcha_{captcha_text}.png" img.save(img_path) return captcha_text, img_path, choices @Client.on_chat_join_request(filters.chat("KillerXSupport")) async def join_request(client: Client, event: ChatJoinRequest): member = await client.get_chat_member(event.chat.id, "me") if member.status != ChatMemberStatus.ADMINISTRATOR: return await client.send_message(event.chat.id, text="I am not an administrator in this group.") async for m in client.get_chat_members(event.chat.id, filter=ChatMembersFilter.ADMINISTRATORS): if not m.user.is_bot: user_list.append(m.user.id) captcha_text, img_path, choices = generate_captcha_multiple_choice() captcha_texts[event.from_user.id] = captcha_text captcha_texts["chat_id"] = event.chat.id keyboard = InlineKeyboardMarkup( [ [InlineKeyboardButton(choice, callback_data=f"verify_{event.from_user.id}_{choice}")] for choice in choices ] + [ [InlineKeyboardButton("šŸ”„ Refresh CAPTCHA", callback_data="refresh_captcha")], [InlineKeyboardButton("āŒ Cancel", callback_data="cancel_captcha")] ] ) if event.chat.type == ChatType.SUPERGROUP: try: await client.send_photo( event.from_user.id, photo=img_path, caption=f"ā—ļø **Verify that you are human!**\n\nā” Please select the correct CAPTCHA text shown in the image below.", reply_markup=keyboard ) os.remove(img_path) asyncio.create_task(remove_captcha_after_timeout(client, event.from_user.id)) except Exception as e: await client.send_message( event.chat.id, text=str(e) ) logger.error(str(e)) @Client.on_callback_query(filters.regex("^cancel_captcha$")) async def cancel_captcha_callback(client: Client, cb: CallbackQuery): user_id = cb.from_user.id if user_id in captcha_texts: del captcha_texts[user_id] logger.info(f"User {user_id} has canceled CAPTCHA verification.") await cb.edit_message_text( "āŒ CAPTCHA verification has been canceled. If you wish to try again,", disable_web_page_preview=True ) await cb.answer("CAPTCHA verification canceled.", show_alert=False) else: await cb.answer("No active CAPTCHA verification found.", show_alert=True) @Client.on_callback_query(filters.regex("^refresh_captcha$")) async def refresh_captcha_callback(client: Client, cb: CallbackQuery): user_id = cb.from_user.id if user_id in captcha_texts: del captcha_texts[user_id] captcha_text, img_path, choices = generate_captcha_multiple_choice() captcha_texts[user_id] = captcha_text keyboard = InlineKeyboardMarkup( [ [InlineKeyboardButton("šŸ”„ Refresh CAPTCHA", callback_data="refresh_captcha")], [InlineKeyboardButton("āœ… Verify", callback_data="verify_captcha")], [InlineKeyboardButton("āŒ Cancel", callback_data="cancel_captcha")] ] ) await cb.edit_message_media( media=InputMediaPhoto(img_path), reply_markup=keyboard ) os.remove(img_path) await cb.answer("CAPTCHA refreshed!", show_alert=False) @Client.on_callback_query(filters.regex("^verify_")) async def verify_captcha_multiple_choice_callback(client: Client, cb: CallbackQuery): data = cb.data.split("_") if len(data) != 3: await cb.answer("Invalid data format.", show_alert=True) return _, user_id_str, user_choice = data try: user_id = int(user_id_str) except ValueError: await cb.answer("Invalid user ID.", show_alert=True) return if user_id not in captcha_texts: await cb.answer("ā—ļø Please start the CAPTCHA verification first.", show_alert=True) logger.warning(f"User {user_id} attempted to verify CAPTCHA without starting verification.") return correct_captcha = captcha_texts.get(user_id) logger.info(f"User {user_id} selected CAPTCHA: {user_choice} (Expected: {correct_captcha})") if user_choice == correct_captcha: await cb.edit_message_text("āœ… CAPTCHA verification successful!") logger.info(f"User {user_id} successfully verified CAPTCHA.") await client.approve_chat_join_request( chat_id=captcha_texts.get("chat_id"), user_id=user_id ) del captcha_texts[user_id] del captcha_texts["chat_id"] else: await cb.edit_message_text("āŒ Incorrect CAPTCHA. Please try again") logger.info(f"User {user_id} failed CAPTCHA verification.") await client.decline_chat_join_request( chat_id=captcha_texts.get("chat_id"), user_id=user_id ) del captcha_texts[user_id] del captcha_texts["chat_id"] @Client.on_callback_query(filters.regex("^verify_captcha$")) async def verify_captcha_callback(client: Client, cb: CallbackQuery): user_id = cb.from_user.id if user_id not in captcha_texts: await cb.answer("ā—ļø Please start the CAPTCHA verification first", show_alert=True) logger.warning(f"User {user_id} attempted to verify CAPTCHA without starting verification.") return await cb.message.reply_text("šŸ” **Please enter the CAPTCHA text exactly as shown in the image:**") await cb.answer() logger.info(f"User {user_id} is attempting to verify CAPTCHA.")