Spaces:
Paused
Paused
import os | |
import time | |
import threading | |
import requests | |
from groq import Groq | |
from telethon import TelegramClient, events | |
import gradio as gr | |
import asyncio | |
from PIL import Image | |
import base64 | |
from io import BytesIO | |
from huggingface_hub import InferenceClient | |
from transformers import AutoProcessor, AutoTokenizer | |
def load_system_prompt(): | |
with open('prompt.txt', 'r') as file: | |
return file.read() | |
system_prompt = load_system_prompt() | |
api_id = os.getenv('api_id') | |
api_hash = os.getenv('api_hash') | |
bot_token = os.getenv('bot_token') | |
openai_api_key = os.getenv('glhf') | |
yolo = os.getenv('yolo') | |
openai_client = Groq( | |
api_key=openai_api_key, | |
) | |
idefics_processor = AutoProcessor.from_pretrained("HuggingFaceM4/idefics2-8b") | |
idefics_client = InferenceClient("HuggingFaceM4/idefics2-8b-chatty") | |
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceM4/idefics2-8b") | |
chat_template = """<|user|>: Describe this image at its finest, mentioning the exact names of the objects present in it. | |
<|assistant|>:""" | |
tokenizer.chat_template = chat_template | |
def encode_local_image(image): | |
pil_image = Image.open(image) | |
buffer = BytesIO() | |
pil_image.save(buffer, format="JPEG") | |
base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
return f"data:image/jpeg;base64,{base64_image}" | |
def describe_image(image_path): | |
image_string = encode_local_image(image_path) | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image"}, | |
{"type": "text", "text": "Describe this image in detail and explain what is in this image basically."}, | |
], | |
}, | |
] | |
prompt_with_template = idefics_processor.apply_chat_template( | |
messages, add_generation_prompt=True | |
) | |
prompt_with_images = prompt_with_template.replace("<image>", " ").format(image_string) | |
payload = { | |
"inputs": prompt_with_images, | |
"parameters": { | |
"return_full_text": False, | |
"max_new_tokens": 2048, | |
}, | |
} | |
response = idefics_client.post(json=payload).decode() | |
return response | |
client = TelegramClient('bot', api_id, api_hash).start(bot_token=bot_token) | |
class CircularBuffer: | |
def __init__(self, size: int): | |
self.size = size | |
self.buffer = [None] * size | |
self.start = 0 | |
self.end = 0 | |
def add(self, role: str, content: str): | |
self.buffer[self.end] = {'role': role, 'content': content} | |
self.end = (self.end + 1) % self.size | |
if self.end == self.start: | |
self.start = (self.start + 1) % self.size | |
def get_history(self): | |
history = [] | |
i = self.start | |
while i != self.end: | |
history.append(self.buffer[i]) | |
i = (i + 1) % self.size | |
return history | |
def reset(self): | |
self.buffer = [None] * self.size | |
self.start = 0 | |
self.end = 0 | |
user_histories = {} | |
def get_user_history(user_id): | |
if user_id not in user_histories: | |
user_histories[user_id] = CircularBuffer(99) | |
return user_histories[user_id] | |
async def get_completion(prompt: str, user_id) -> str: | |
user_history = get_user_history(user_id) | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
*user_history.get_history(), | |
{"role": "user", "content": prompt}, | |
] | |
try: | |
completion = openai_client.chat.completions.create( | |
model="llama-3.1-70b-versatile", | |
messages=messages, | |
stream=True, | |
temperature=0.8, | |
top_p=0.9, | |
max_tokens=4096, | |
frequency_penalty=0.2, | |
presence_penalty=0.6 | |
) | |
message = '' | |
for chunk in completion: | |
if chunk.choices[0].delta.content is not None: | |
message += chunk.choices[0].delta.content | |
except Exception as e: | |
message = f"Error: {str(e)}" | |
print(e) | |
user_history.add("user", prompt) | |
user_history.add("assistant", message) | |
return message | |
async def start(event): | |
await event.respond("Hello! I am your boo.") | |
async def help(event): | |
await event.respond("Here is how I can help you:\n/start - Start the bot\n/help - Get help\n/reset - Reset chat history") | |
async def reset(event): | |
user_history = get_user_history(event.sender_id) | |
user_history.reset() | |
await event.respond("History has been reset.") | |
async def handle_message(event): | |
if event.raw_text.startswith('/start') or event.raw_text.startswith('/help') or event.raw_text.startswith('/reset'): | |
return | |
try: | |
async with client.action(event.chat_id, 'typing'): | |
await asyncio.sleep(3) | |
user_id = event.sender_id | |
user_history = get_user_history(user_id) | |
if event.photo: | |
photo = await event.download_media() | |
image_description = describe_image(photo) | |
user_message = event.raw_text + "\n\nContent of the image: " + image_description | |
else: | |
user_message = event.raw_text | |
response = await get_completion(user_message, user_id) | |
await event.respond(response) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
await event.respond("Whoopsie 🤭") | |
def launch_gradio(): | |
welcome_message = """ | |
# Meet Scarlett! | |
- Scarlett is a girl residing in Telegram | |
- She can chat with you without any censorship. | |
- Head over to https://t.me/Scarlett_Rogue_isNotBot | |
""" | |
with gr.Blocks() as demo: | |
gr.Markdown(f""" | |
# Follow me: https://example.com | |
{welcome_message} | |
""") | |
demo.launch(show_api=False) | |
def keep_alive(): | |
while True: | |
try: | |
requests.get("https://rbn2008k-Scarlett.hf.space") | |
except Exception as e: | |
print(f"Keep-alive request failed: {e}") | |
time.sleep(1800) | |
if __name__ == "__main__": | |
threading.Thread(target=launch_gradio).start() | |
threading.Thread(target=keep_alive).start() | |
client.run_until_disconnected() |