SabziAi / app.py
apaxray's picture
Update app.py
cdfaa37 verified
raw
history blame
6.04 kB
import os
from transformers import pipeline
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import openai
import gc
import psutil
# تعریف مدل‌ها
MODEL_CONFIG = {
"translation": "PontifexMaximus/opus-mt-iir-en-finetuned-fa-to-en", # ترجمه
"qa": "HooshvareLab/bert-fa-base-uncased", # پاسخ به سوال
"math": "OpenAI", # ریاضی با OpenAI
"persian_nlp": "HooshvareLab/bert-fa-zwnj-base", # پردازش زبان فارسی
"custom_ai": "universitytehran/PersianMind-v1.0", # شخصی‌سازی شده
}
# بارگذاری توکن از متغیر محیطی
TOKEN = os.getenv("Passsssssss")
if not TOKEN:
raise ValueError("API token is missing! Set it as 'Passsssssss' in environment variables.")
# تنظیم کلید API OpenAI
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise ValueError("OpenAI API key is missing! Set it as 'OPENAI_API_KEY' in environment variables.")
# محدودیت حافظه (برحسب گیگابایت)
MEMORY_LIMIT_GB = 15
class MemoryManager:
@staticmethod
def check_memory_usage():
"""بررسی مصرف حافظه و خروج در صورت تجاوز از محدودیت."""
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / (1024**3) # تبدیل به گیگابایت
if memory_usage > MEMORY_LIMIT_GB:
raise MemoryError(f"Memory usage exceeded {MEMORY_LIMIT_GB} GB! Current usage: {memory_usage:.2f} GB.")
class MultiModelSystem:
def __init__(self):
"""مدیریت مدل‌های مختلف و پردازش موازی وظایف."""
self.models = {}
self.queue = Queue()
self.executor = ThreadPoolExecutor(max_workers=3) # محدود کردن وظایف همزمان
self.load_models()
def load_models(self):
"""بارگذاری مدل‌ها به صورت دینامیک."""
try:
for task, model_id in MODEL_CONFIG.items():
if model_id == "OpenAI":
self.models[task] = None # مدل OpenAI نیازی به بارگذاری ندارد
else:
self.models[task] = model_id # ذخیره نام مدل، نه بارگذاری کامل
except Exception as e:
print(f"Error loading models: {e}")
raise
def load_model_for_task(self, task):
"""بارگذاری مدل موردنیاز فقط هنگام درخواست."""
model_id = self.models.get(task)
if not model_id:
return None
try:
MemoryManager.check_memory_usage()
return pipeline(
task=self.get_task_type(task),
model=model_id,
use_auth_token=TOKEN,
device=0 if psutil.cpu_count(logical=False) > 1 else -1 # استفاده از GPU در صورت وجود
)
except Exception as e:
print(f"Error loading model for task '{task}': {e}")
return None
@staticmethod
def get_task_type(task):
"""بازگرداندن نوع وظیفه براساس مدل."""
task_map = {
"translation": "translation",
"qa": "question-answering",
"persian_nlp": "text-classification",
"custom_ai": "text-generation",
"math": "text-generation"
}
return task_map.get(task, "text-generation")
def process_task(self, task, **kwargs):
"""پردازش وظایف به صورت غیرهمزمان."""
if task not in self.models:
raise ValueError(f"Model for task '{task}' not loaded.")
def task_handler():
try:
if task == "math":
result = self.process_math_task(kwargs.get("text"))
else:
model = self.load_model_for_task(task)
if model:
result = model(**kwargs)
else:
result = None
self.queue.put(result)
except Exception as e:
print(f"Error processing task '{task}': {e}")
self.queue.put(None)
finally:
gc.collect() # آزادسازی حافظه پس از هر وظیفه
self.executor.submit(task_handler)
def process_math_task(self, text):
"""پردازش ریاضی با OpenAI."""
try:
MemoryManager.check_memory_usage()
response = openai.Completion.create(
engine="text-davinci-003",
prompt=text,
max_tokens=100
)
return response.choices[0].text.strip()
except Exception as e:
print(f"Error processing math task with OpenAI: {e}")
return None
def get_result(self):
"""دریافت نتیجه از صف."""
return self.queue.get()
# نمونه استفاده
if __name__ == "__main__":
system = MultiModelSystem()
# وظایف مختلف
tasks = [
{"task": "translation", "kwargs": {"text": "سلام دنیا!", "src_lang": "fa", "tgt_lang": "en"}},
{"task": "qa", "kwargs": {"question": "پایتخت ایران چیست؟", "context": "ایران کشوری در خاورمیانه است و پایتخت آن تهران است."}},
{"task": "math", "kwargs": {"text": "What is the integral of x^2?"}},
{"task": "persian_nlp", "kwargs": {"text": "این یک جمله فارسی است."}},
{"task": "custom_ai", "kwargs": {"text": "تحلیل متن‌های تاریخی طبری."}}
]
# ارسال وظایف برای پردازش
for task_info in tasks:
system.process_task(task_info["task"], **task_info["kwargs"])
# بازیابی نتایج
for _ in range(len(tasks)):
print("Result:", system.get_result())