import os import time import threading import base64 from io import BytesIO import gradio as gr import asyncio from collections import OrderedDict from datetime import datetime import requests from openai import OpenAI from telethon import TelegramClient, events from PIL import Image from huggingface_hub import InferenceClient import pymongo from pymongo import MongoClient def load_system_prompt(): with open('prompt.txt', 'r') as file: return file.read() system_prompt = load_system_prompt() # Environment variables api_id = os.getenv('api_id') api_hash = os.getenv('api_hash') bot_token = os.getenv('bot_token') openai_api_key = os.getenv('glhf') ping_key = os.getenv('bolo') api_url = os.getenv('yolo') model = os.getenv('model') model2 = os.getenv('model2') mongoURI = os.getenv('MONGO_URI') # OpenAI and MongoDB clients openai_client = OpenAI(api_key=openai_api_key, base_url=api_url) mongo_client = MongoClient(mongoURI) db = mongo_client['Scarlett'] chat_collection = db['chats'] local_chat_history = OrderedDict() MAX_LOCAL_USERS = 5 # Functions for MongoDB-based chat history storage and retrieval def get_history_from_mongo(user_id): result = chat_collection.find_one({"user_id": user_id}) return result.get("messages", []) if result else [] def store_message_in_mongo(user_id, role, content): chat_collection.update_one( {"user_id": user_id}, { "$push": { "messages": { "$each": [{"role": role, "content": content}], "$slice": -20 } } }, upsert=True ) def get_chat_history(user_id): if user_id in local_chat_history: local_chat_history.move_to_end(user_id) return local_chat_history[user_id] history = get_history_from_mongo(user_id) local_chat_history[user_id] = history if len(local_chat_history) > MAX_LOCAL_USERS: local_chat_history.popitem(last=False) return history def update_chat_history(user_id, role, content): if user_id not in local_chat_history: local_chat_history[user_id] = get_history_from_mongo(user_id) local_chat_history[user_id].append({"role": role, "content": content}) local_chat_history[user_id] = local_chat_history[user_id][-20:] local_chat_history.move_to_end(user_id) if len(local_chat_history) > MAX_LOCAL_USERS: local_chat_history.popitem(last=False) store_message_in_mongo(user_id, role, content) # Fixing image encoding def encode_local_image(image_file): try: im = Image.open(image_file) buffered = BytesIO() im.save(buffered, format="PNG") image_bytes = buffered.getvalue() image_base64 = base64.b64encode(image_bytes).decode('ascii') return image_base64 except Exception as e: print(f"Error encoding image: {e}") return None # Image description function, calling external inference model def inference_calling_idefics(image_path, question=""): system_prompt = os.getenv('USER_PROMPT') model_id = model2 client = InferenceClient(model=model_id) # Use the fixed `encode_local_image` to encode the image image_base64 = encode_local_image(image_path) if not image_base64: return "Error: Invalid image or unable to encode image." image_info = f"data:image/png;base64,{image_base64}" prompt = question if question != "" else 'Describe this image without question mark' try: response = "" for message in client.chat_completion( model=image_model, messages=[ { "role": "system", "content": [ {"type": "text", "text": system_prompt}, ], }, { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_info}}, {"type": "text", "text": prompt}, ], } ], max_tokens=2048, stream=True, ): response += message.choices[0].delta.content return response except Exception as e: print(f"Error in inference call: {e}") return "Error while processing the image." def describe_image(client, image_path, question=""): try: answer = inference_calling_idefics(image_path, question) return answer except Exception as e: print(e) return "Error while seeing the image." # Telegram bot setup client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token) async def get_bot_id(): me = await client.get_me() return me.id # OpenAI completion handler async def get_completion(client, event, user_id, prompt): history = get_chat_history(user_id) messages = [ {"role": "system", "content": system_prompt}, *history, {"role": "user", "content": prompt}, ] try: completion = openai_client.chat.completions.create( model=model, messages=messages, max_tokens=512, temperature=0.7, top_p=1.0, frequency_penalty=1.0, presence_penalty=1.0, stream=True ) message = "" for chunk in completion: if chunk.choices[0].delta.content is not None: message += chunk.choices[0].delta.content await event.respond(message) except Exception as e: message = "..........." await event.respond(message) print(e) update_chat_history(user_id, "user", prompt) update_chat_history(user_id, "assistant", message) return message # Telegram bot commands @client.on(events.NewMessage(pattern='/start')) async def start(event): await event.respond("Hello!") @client.on(events.NewMessage(pattern='/help')) async def help(event): await event.respond("Here is how I can help you:\n/start - To check if I am alive\n/help - Show this message\n/reset - Reset chat history") @client.on(events.NewMessage(pattern='/reset')) async def reset(event): user_id = event.chat_id chat_collection.delete_one({"user_id": user_id}) if user_id in local_chat_history: del local_chat_history[user_id] await event.respond("History has been reset.") @client.on(events.NewMessage) async def handle_message(event): bot_id = await get_bot_id() if event.sender_id == bot_id: return user_id = event.chat_id user_message = event.raw_text if event.photo: photo = await event.download_media() if photo: image_description = describe_image(client, photo, user_message) user_message += f"\n\nI sent you an image. Content of the image: {image_description}" if user_message.startswith(('/start', '/help', '/reset')): return try: async with client.action(event.chat_id, 'typing'): await asyncio.sleep(2) await get_completion(client, event, user_id, user_message) except Exception as e: print(f"An error occurred: {e}") await event.respond("Whoopsie!") # Gradio interface def launch_gradio(): welcome_message = """ # Meet Scarlett! - Scarlett is a girl residing in Telegram - She can chat with you without any censorship. - Head over to https://t.me/Scarlett_Rogue_isNotBot - Thanks to https://glhf.chat for powering this! """ with gr.Blocks() as demo: gr.Markdown(f""" # Follow me: https://example.com {welcome_message} """) demo.launch(show_api=False) # Keep-alive functionality for the bot def keep_alive(): ping_client = OpenAI(api_key=ping_key, base_url=api_url) while True: try: messages = [ {"role": "system", "content": "Repeat what I say."}, {"role": "user", "content": "Repeat: 'Ping success'"} ] request = ping_client.chat.completions.create( model=model, messages=messages, max_tokens=10, temperature=0.1, top_p=0.1, ) print(request.choices[0].message.content) except Exception as e: print(f"Keep-alive request failed: {e}") time.sleep(1800) # Main execution if __name__ == "__main__": threading.Thread(target=keep_alive).start() threading.Thread(target=launch_gradio).start() client.run_until_disconnected()