import os import time import threading import requests from telethon import TelegramClient, events import gradio as gr import asyncio from io import BytesIO from pymongo import MongoClient # MongoDB connection setup mongo_uri = os.getenv('MONGO_URI') mongo_client = MongoClient(mongo_uri) db = mongo_client['Scarlett'] user_histories_collection = db['chats'] def load_system_prompt(): with open('prompt.txt', 'r') as file: return file.read() system_prompt = load_system_prompt() api_id = os.getenv('api_id') api_hash = os.getenv('api_hash') bot_token = os.getenv('bot_token') cloudflare_api_key = os.getenv('glhf') cloudflare_base_url = os.getenv('yolo') client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token) class MongoDBHistory: def __init__(self, user_id, size=99): self.user_id = str(user_id) self.size = size def add(self, role: str, content: str): user_history = user_histories_collection.find_one({'user_id': self.user_id}) if user_history: messages = user_history['history'] else: messages = [] messages.append({'role': role, 'content': content}) if len(messages) > self.size: messages = messages[-self.size:] user_histories_collection.update_one( {'user_id': self.user_id}, {'$set': {'history': messages}}, upsert=True ) def get_history(self): user_history = user_histories_collection.find_one({'user_id': self.user_id}) if user_history: return user_history['history'] return [] def reset(self): user_histories_collection.delete_one({'user_id': self.user_id}) # Only store history for 3 users user_histories = {} def get_user_history(user_id): if user_id not in user_histories: user_histories[user_id] = MongoDBHistory(user_id) return user_histories[user_id] def fetch_image_as_u8(image_binary): return list(image_binary) async def get_completion(prompt: str, user_id, image_u8=None) -> str: user_history = get_user_history(user_id) # Prepare the message content messages = [ {"role": "system", "content": system_prompt}, *user_history.get_history(), {"role": "user", "content": prompt} ] try: # Prepare the payload with additional parameters data = { "messages": messages, "temperature": 0.5, # Adjust creativity "top_p": 0.9, # Nucleus sampling parameter "max_tokens": 512, # Maximum token limit "frequency_penalty": 1.6, # Penalize repeated tokens "presence_penalty": 1.9 # Encourage new topics } # If there is an image, include it as an array of unsigned 8-bit integers (u8) if image_u8: print(image_u8) data["image"] = image_u8 # Send as array of integers headers = { "Authorization": f"Bearer {cloudflare_api_key}" } # Send request to Cloudflare API request = requests.post( url=cloudflare_base_url, json=data, headers=headers ) response = request.json() print(response) if response['success'] == True: message = response["result"]["response"] else: message = f"Error: {response['errors'][0]['message']}" except Exception as e: message = f"Error: {str(e)}" print(e) # Add the user prompt and assistant response to the history user_history.add("user", prompt) user_history.add("assistant", message) return message @client.on(events.NewMessage(pattern='/start')) async def start(event): await event.respond("Hello!") @client.on(events.NewMessage(pattern='/help')) async def help(event): await event.respond("Here is how I can help you:\n/start - Start\n/help - Show this message\n/reset - Reset chat history") @client.on(events.NewMessage(pattern='/reset')) async def reset(event): user_history = get_user_history(event.sender_id) user_history.reset() await event.respond("History has been reset.") @client.on(events.NewMessage) async def handle_message(event): if event.raw_text.startswith('/start') or event.raw_text.startswith('/help') or event.raw_text.startswith('/reset'): return try: async with client.action(event.chat_id, 'typing'): await asyncio.sleep(3) user_id = event.sender_id user_message = event.raw_text # Check if the message contains an image image_u8 = None if event.photo: image = await event.download_media(file=BytesIO()) image_u8 = fetch_image_as_u8(image.getvalue()) # Convert to u8 response = await get_completion(user_message, user_id, image_u8) await event.respond(response) except Exception as e: print(f"An error occurred: {e}") await event.respond("Whoopsie 🤭") def launch_gradio(): welcome_message = """ # Meet Scarlett! - Scarlett is a girl residing in Telegram - She can chat with you without any censorship. - Head over to https://t.me/Scarlett_Rogue_isNotBot """ with gr.Blocks() as demo: gr.Markdown(f""" # Follow me: https://example.com {welcome_message} """) demo.launch(show_api=False) def keep_alive(): while True: try: requests.get("https://huggingface.co/spaces/rbn2008k/Scarlett/") except Exception as e: print(f"Keep-alive request failed: {e}") time.sleep(180) if __name__ == "__main__": threading.Thread(target=launch_gradio).start() threading.Thread(target=keep_alive).start() client.run_until_disconnected()