|
|
|
|
|
|
|
import openai |
|
import os |
|
from dotenv import load_dotenv |
|
import httpx |
|
import asyncio |
|
import aiometer |
|
from functools import partial |
|
from elevenlabs import generate, save, set_api_key |
|
from base64 import b64decode |
|
import re |
|
from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, concatenate_videoclips, concatenate_audioclips, TextClip, CompositeAudioClip |
|
from random import choice |
|
from uuid import uuid4 |
|
|
|
TIMEOUT = 300 |
|
RATE_LIMIT = 0.15 |
|
|
|
|
|
load_dotenv() |
|
|
|
set_api_key(os.getenv("elevenlabs_api_key")) |
|
openai.api_key = os.getenv("openai_api_key") |
|
|
|
class ChatCompletion: |
|
def __init__(self, temperature=0.8,): |
|
self.model = "gpt-3.5-turbo", |
|
self.temperature = temperature |
|
self.total_tokens = 0 |
|
|
|
|
|
def single_response(self, hist): |
|
response = openai.ChatCompletion.create( |
|
model= "gpt-3.5-turbo", |
|
messages=hist) |
|
try: |
|
print("Tokens used: " + str(response["usage"]["total_tokens"])) |
|
self.total_tokens += response["usage"]["total_tokens"] |
|
except: |
|
print("Error: " + str(response["error"]["message"])) |
|
return -1 |
|
return response["choices"][0]["message"]["content"] |
|
|
|
async def _async_response(self,payload): |
|
async with httpx.AsyncClient() as client: |
|
return await client.post( |
|
url="https://api.openai.com/v1/chat/completions", |
|
json=payload, |
|
headers={"content_type": "application/json", "Authorization": f"Bearer {openai.api_key}"}, |
|
timeout=TIMEOUT, |
|
) |
|
|
|
async def _request(self, hist): |
|
response = await self._async_response({ |
|
"model": "gpt-3.5-turbo", |
|
"messages": hist, |
|
}) |
|
try: |
|
print("Tokens used: " + str(response.json()["usage"]["total_tokens"])) |
|
self.total_tokens += response.json()["usage"]["total_tokens"] |
|
reply = response.json()["choices"][0]["message"]["content"] |
|
return reply |
|
except: |
|
print("Error: " + str(response.json()["error"]["message"])) |
|
return -1 |
|
|
|
async def _multi_response(self, hists): |
|
return await aiometer.run_all( |
|
[partial(self._request, hist) for hist in hists], |
|
max_per_second = RATE_LIMIT |
|
) |
|
|
|
def multi_response(self, hists): |
|
return asyncio.run(self._multi_response(hists)) |
|
|
|
def safety_check(self, message): |
|
if len(message) > 2000: |
|
return False |
|
else: |
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def decide_gender(self, message): |
|
return choice(["male","female"]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_image(self, desc, speaker): |
|
response = openai.Image.create( |
|
prompt=desc, |
|
n=1, |
|
size="256x256", |
|
response_format = "b64_json" |
|
) |
|
image_b64 = response["data"][0]["b64_json"] |
|
with open(f"{speaker}.png","wb") as img: |
|
img.write(b64decode(image_b64)) |
|
return f"{speaker}.png" |
|
|
|
def _str_check(message): |
|
unwanted = re.findall(r"[^A-Za-z\s0-9]", message) |
|
if len(unwanted) > 0: |
|
return False |
|
return True |
|
|
|
def generate_audio(speaker, message): |
|
try: |
|
audio = generate( |
|
text=message, |
|
voice=speaker, |
|
model="eleven_monolingual_v1" |
|
) |
|
except Exception as e: |
|
print("Error:" + str(e)) |
|
return -1 |
|
file_name = speaker + str(uuid4()) + ".wav" |
|
save(audio, file_name) |
|
return file_name |
|
|
|
|
|
def get_user_name(chat, user_name): |
|
if not chat.safety_check(f"My name is {user_name}"): |
|
print("Inappropriate name.") |
|
return -1 |
|
if not _str_check(user_name): |
|
print("Invalid name.") |
|
return -2 |
|
return user_name |
|
|
|
def generate_video(triples,output_path): |
|
video_clips = [] |
|
audio_clips = [] |
|
for _, audio_path, image_path in triples: |
|
image = ImageClip(image_path) |
|
audio = AudioFileClip(audio_path) |
|
duration = audio.duration |
|
image = image.set_duration(duration) |
|
|
|
video = CompositeVideoClip([image]) |
|
video = video.set_audio(audio) |
|
video_clips.append(video) |
|
audio_clips.append(audio) |
|
|
|
final_video = concatenate_videoclips(video_clips, method="compose") |
|
final_audio = concatenate_audioclips(audio_clips) |
|
final_video = final_video.set_audio(final_audio) |
|
final_video.write_videofile(output_path, fps=24, verbose=False, logger=None) |
|
for _, audio_path, _ in triples: |
|
os.remove(audio_path) |