Spaces:
Running
Running
File size: 3,647 Bytes
f9ad4ce b9cccba 27af330 f9ad4ce af123b0 f9ad4ce af123b0 a620fad f9ad4ce af123b0 f9ad4ce b9cccba 10d4350 b9cccba f9ad4ce b9cccba 6e4e6a0 82e0689 971a06e f9ad4ce af123b0 f9ad4ce 27af330 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import logging
import re
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from pyrogram.types import *
from akenoai import AkenoXToJs
from config import API_ID, API_HASH, BOT_TOKEN
import hashlib
logging.getLogger("pyrogram").setLevel(logging.WARNING)
logging.basicConfig(level=logging.INFO)
WELCOME_TEXT = """
Halo {}
Saya adalah bot untuk mengunduh video tiktok di telegram.
Saya dapat mengunduh video dengan tanda air atau tanpa tanda air dan mengunduh audio dari url. Kirimkan saja saya url tiktok.
"""
client = Client(
"TTK-BOT",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN
)
link_storage = {}
DEMO_API_KEY = "akeno_OSrXhljIomunACd5JY18jFIeIuuB6Pdx"
def generate_callback_data(user_id, query):
identifier = hashlib.md5(query.encode()).hexdigest()
callback_data = f"audiodownload_{user_id}_{identifier}"
link_storage[callback_data] = query
return callback_data
@client.on_message(filters.command("start") & filters.private)
async def welcome_start(client: Client, message: Message):
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="📢 Saluran Bot",
url="https://t.me/RendyProjects"
)
]
]
)
await message.reply_text(
WELCOME_TEXT.format(message.from_user.first_name),
reply_markup=keyboard
)
@client.on_callback_query(filters.regex("^audiodownload_"))
async def callback_button(client: Client, cb: CallbackQuery):
try:
data = cb.data
user_id = cb.from_user.id
query = link_storage.get(data)
if query:
response = await AkenoXToJs.randydev(
"dl/tiktok",
api_key=DEMO_API_KEY,
custom_dev=True,
url=query
)
music_tiktok = response.results.data.music_info.get("play")
await client.send_audio(user_id, music_tiktok)
await cb.answer("Audio sent successfully!")
else:
await cb.answer("Invalid or expired link.", show_alert=True)
except Exception as e:
await cb.answer(f"Error: {str(e)}", show_alert=True)
def is_tiktok_url(url):
pattern = r"(https?)://(vt|www)\.tiktok\.com/(\w+)"
match = re.search(pattern, url)
return bool(match)
@client.on_message(filters.text & filters.private)
async def tiktok_downloader(client: Client, message: Message):
if message.text:
query_url = message.text
if not is_tiktok_url(query_url):
return await message.reply_text("Invalid link")
callback_data = generate_callback_data(message.from_user.id, query_url)
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Audio Download",
callback_data=callback_data
)
]
]
)
try:
dll = await message.reply_text("Processing....")
await message.delete()
response = await AkenoXToJs.randydev(
"dl/tiktok",
api_key=DEMO_API_KEY,
custom_dev=True,
url=query_url
)
await message.reply_video(
response.results.data.get("play"),
reply_markup=keyboard
)
await dll.delete()
except Exception as e:
await dll.delete()
await message.reply_text(f"Error: {str(e)}")
client.run() |