|
remove comments |
|
split code to many file clean code |
|
use one client |
|
use .env |
|
|
|
import csv |
|
import asyncio |
|
import time |
|
from telethon import TelegramClient |
|
from tqdm import tqdm |
|
from telethon.tl.functions.channels import JoinChannelRequest |
|
from telethon.tl.functions.messages import ImportChatInviteRequest |
|
from telethon.errors.rpcerrorlist import InviteHashExpiredError |
|
from flask import Flask, jsonify, send_from_directory |
|
|
|
from flask import Flask, render_template, send_from_directory |
|
from telethon.tl.functions.channels import GetParticipantsRequest |
|
from telethon.tl.types import ChannelParticipantsSearch |
|
from telethon.errors import FloodWaitError, UserAdminInvalidError |
|
import json |
|
import asyncio |
|
import nest_asyncio |
|
import logging |
|
from telethon import TelegramClient, events |
|
from supabase import create_client, Client |
|
from flask import Flask, jsonify |
|
from threading import Thread |
|
from multiprocessing import Process, Queue |
|
import unicodedata |
|
from telegram.helpers import escape_markdown |
|
import re |
|
import os |
|
|
|
from telethon.tl.functions.channels import JoinChannelRequest, InviteToChannelRequest |
|
from telethon.tl.functions.channels import EditBannedRequest |
|
from telethon.tl.types import ChatBannedRights |
|
from telethon.errors.rpcerrorlist import UserAdminInvalidError, UserNotParticipantError |
|
from telethon.errors.rpcerrorlist import InviteHashExpiredError, UserAlreadyParticipantError |
|
from telethon.tl.types import Channel, Chat |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[ |
|
logging.FileHandler("join_groups.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
|
|
API_ID = 25216912 |
|
API_HASH = "f65f6050fe283ab5e" |
|
PHONE_NUMBER = "+96743" |
|
OUTPUT_CSV = "groups_with_status.csv" |
|
|
|
CSV_FILENAME = "8.csv" |
|
session_dir = "mbot1" |
|
FILE_DIRECTORY = os.getcwd() |
|
|
|
SLEEP_TIME = 5 |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
"""Show available files for download as an HTML page.""" |
|
files = os.listdir(FILE_DIRECTORY) |
|
return render_template("index.html", files=files) |
|
|
|
|
|
@app.route('/download/<filename>') |
|
def download_file(filename): |
|
"""Allow downloading any file from the directory.""" |
|
return send_from_directory(FILE_DIRECTORY, filename, as_attachment=True) |
|
|
|
|
|
def run_flask(): |
|
app.run(host='0.0.0.0', port=7860) |
|
|
|
|
|
USER_CSV = "user_list.csv" |
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
BATCH_SIZE = 30 |
|
MAX_USERS = 200 |
|
|
|
|
|
async def fetch_users_from_source_group(source_group, max_users=MAX_USERS): |
|
""" |
|
Fetch up to `max_users` from a Telegram group using pagination and save them to a CSV file. |
|
|
|
:param source_group: The source group username or ID. |
|
:param max_users: The maximum number of users to fetch. Use `None` to fetch all users. |
|
""" |
|
logging.info(f"Fetching users from {source_group} (Limit: {max_users if max_users else 'All'})...") |
|
|
|
async with TelegramClient(session_dir, API_ID, API_HASH) as client: |
|
await client.start(PHONE_NUMBER) |
|
|
|
try: |
|
entity = await client.get_entity(source_group) |
|
offset = 0 |
|
total_users = 0 |
|
csv_filename = f"{source_group}.csv" |
|
|
|
with open(csv_filename, mode="w", newline="", encoding="utf-8") as file: |
|
writer = csv.writer(file) |
|
writer.writerow(["user_id", "username", "first_name", "last_name"]) |
|
|
|
while max_users is None or total_users < max_users: |
|
remaining_users = max_users - total_users if max_users else BATCH_SIZE |
|
batch_size = min(BATCH_SIZE, remaining_users) |
|
|
|
try: |
|
participants = await client(GetParticipantsRequest( |
|
entity, ChannelParticipantsSearch(''), offset, batch_size, hash=0 |
|
)) |
|
|
|
if not participants.users: |
|
break |
|
|
|
for user in participants.users: |
|
writer.writerow([user.id, user.username or "N/A", user.first_name or "N/A", user.last_name or "N/A"]) |
|
logging.info(f"β User saved: {user.id} | {user.username}") |
|
|
|
total_users += len(participants.users) |
|
offset += len(participants.users) |
|
|
|
except FloodWaitError as e: |
|
logging.warning(f"β οΈ Telegram rate limit hit! Waiting {e.seconds} seconds...") |
|
await asyncio.sleep(e.seconds) |
|
|
|
await asyncio.sleep(SLEEP_TIME) |
|
|
|
logging.info(f"β
Fetched {total_users} users from {source_group}. Saved to {csv_filename}.") |
|
|
|
except Exception as e: |
|
logging.error(f"β Failed to fetch users from {source_group}: {e}") |
|
|
|
|
|
async def add_users_to_destination_group(destination_group, csvfile): |
|
""" |
|
Reads users from the CSV file and adds them to the destination group while handling rate limits. |
|
Before adding, it fetches current members of the destination group and filters out any users already present. |
|
|
|
:param destination_group: The destination group username or ID. |
|
:param csvfile: The CSV file containing the list of user IDs. |
|
""" |
|
logging.info(f"Adding users to {destination_group} from {csvfile}...") |
|
|
|
async with TelegramClient(session_dir, API_ID, API_HASH) as client: |
|
await client.start(PHONE_NUMBER) |
|
|
|
try: |
|
|
|
dest_entity = await client.get_entity(destination_group) |
|
|
|
|
|
existing_user_ids = set() |
|
async for user in client.iter_participants(dest_entity, limit=None): |
|
existing_user_ids.add(user.id) |
|
logging.info(f"Fetched {len(existing_user_ids)} existing users from {destination_group}.") |
|
|
|
|
|
users = [] |
|
with open(csvfile, mode="r", encoding="utf-8") as file: |
|
reader = csv.reader(file) |
|
|
|
header = next(reader, None) |
|
for row in reader: |
|
|
|
try: |
|
user_id = int(row[0].strip()) |
|
except ValueError: |
|
logging.debug(f"Skipping row with non-numeric user_id: {row}") |
|
continue |
|
if user_id not in existing_user_ids: |
|
users.append(user_id) |
|
|
|
logging.info(f"Filtered CSV: {len(users)} users to add after removing existing members.") |
|
|
|
count = 0 |
|
for index, user_id in enumerate(users, start=1): |
|
try: |
|
logging.info(f"[{index}/{len(users)}] Adding user {user_id} to {destination_group}...") |
|
await client(InviteToChannelRequest(dest_entity, [user_id])) |
|
logging.info(f"β
Successfully added user {user_id}.") |
|
|
|
count += 1 |
|
if count % BATCH_SIZE == 0: |
|
logging.info(f"β³ Waiting {SLEEP_TIME} seconds to avoid rate limits...") |
|
await asyncio.sleep(SLEEP_TIME) |
|
|
|
except FloodWaitError as e: |
|
logging.warning(f"β οΈ FloodWait: Waiting {e.seconds} seconds...") |
|
await asyncio.sleep(e.seconds) |
|
|
|
except UserAdminInvalidError: |
|
logging.error(f"β Cannot add {user_id}: Bot lacks admin rights.") |
|
|
|
except Exception as e: |
|
logging.error(f"β Failed to add {user_id}: {e}") |
|
|
|
logging.info(f"β
Process completed: Added {count} new users to {destination_group}.") |
|
|
|
except Exception as e: |
|
logging.error(f"β Failed to add users to {destination_group}: {e}") |
|
|
|
async def get_user_groups(client): |
|
"""Fetch all groups/channels the user is already a member of using get_dialogs.""" |
|
joined_groups = set() |
|
dialogs = await client.get_dialogs() |
|
|
|
|
|
groups = [d for d in dialogs if d.is_group or d.is_channel] |
|
|
|
for group in groups: |
|
username = f"https://t.me/{group.entity.username}" if hasattr(group.entity, "username") and group.entity.username else "private_group" |
|
joined_groups.add(username) |
|
logging.info(f"Joined group/channel: {group.entity.title} (ID: {username})") |
|
|
|
return joined_groups |
|
|
|
|
|
async def join_groups(): |
|
async with TelegramClient(session_dir, API_ID, API_HASH) as client: |
|
await client.start(PHONE_NUMBER) |
|
me = await client.get_me() |
|
logging.info(f"Logged in as {me.first_name} (ID: {me.id})") |
|
|
|
|
|
user_groups = await get_user_groups(client) |
|
logging.info(f"β
Retrieved {len(user_groups)} joined groups/channels.") |
|
logging.info(f"β
Retrieved {user_groups} joined groups/channels.") |
|
|
|
with open(CSV_FILENAME, mode="r", newline="", encoding="utf-8") as file: |
|
reader = csv.reader(file) |
|
header = next(reader) |
|
groups = [row for row in reader] |
|
|
|
|
|
filtered_groups = [] |
|
for row in groups: |
|
phone_number, group_name, username, group_id = row |
|
|
|
if username and username in user_groups: |
|
logging.info(f"β‘ Already a member: {group_name} ({username}) - Skipping") |
|
else: |
|
filtered_groups.append(row) |
|
|
|
|
|
with open(OUTPUT_CSV, mode="a", newline="", encoding="utf-8") as output_file: |
|
writer = csv.writer(output_file) |
|
writer.writerow(header + ["status"]) |
|
|
|
for index, row in enumerate(filtered_groups, start=2): |
|
phone_number, group_name, username, group_id = row |
|
status = "" |
|
|
|
try: |
|
if username != "private_group": |
|
|
|
await client(JoinChannelRequest(username)) |
|
status = "Joined (public)" |
|
logging.info(f"[{index}/{len(filtered_groups)}] β
Joined public group: {group_name} ({username})") |
|
|
|
|
|
time.sleep(SLEEP_TIME) |
|
else: |
|
|
|
await client(ImportChatInviteRequest(group_id)) |
|
status = "Joined (private)" |
|
logging.info(f"[{index}/{len(filtered_groups)}] β
Joined private group: {group_name}") |
|
|
|
|
|
time.sleep(SLEEP_TIME) |
|
|
|
except UserAlreadyParticipantError: |
|
status = "Already a member" |
|
logging.info(f"[{index}/{len(filtered_groups)}] β‘ Already a member: {group_name} ({username})") |
|
except InviteHashExpiredError: |
|
status = "Failed (private) - Invite link expired" |
|
logging.error(f"[{index}/{len(filtered_groups)}] β Failed to join private group: {group_name} - Invite link expired") |
|
except Exception as e: |
|
status = f"Failed - {e}" |
|
logging.error(f"[{index}/{len(filtered_groups)}] β Failed to join {group_name}: {e}") |
|
|
|
writer.writerow(row + [status]) |
|
|
|
logging.info(f"β
Process completed. Results saved to {OUTPUT_CSV}") |
|
|
|
|
|
def run_telegram(): |
|
asyncio.run(join_groups()) |
|
|
|
def run_telegram_mov(): |
|
asyncio.run(fetch_users_from_source_group("UT_CHEM", None)) |
|
def run_telegram_mov2(): |
|
asyncio.run(add_users_to_destination_group("@searchai090", 'UT_CHEM.csv')) |
|
|
|
if __name__ == "__main__": |
|
p1 = Process(target=run_flask) |
|
|
|
p2 = Process(target=run_telegram_mov2) |
|
p1.start() |
|
p2.start() |
|
|
|
p1.join() |
|
p2.join() |
|
|