ahmetfirat's picture
Update utils.py
1ee6732
raw
history blame
5.74 kB
# pip install python-dotenv first install this package to be able to call custom env variables from .env
# don't forget to call the load_dotenv() function to initialize the getenv() method of os module
import openai
import os
from dotenv import load_dotenv
import httpx
import asyncio
import aiometer
from functools import partial
from elevenlabs import generate, save, set_api_key
from base64 import b64decode
import re
from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, concatenate_videoclips, concatenate_audioclips, TextClip, CompositeAudioClip
from random import choice
from uuid import uuid4
TIMEOUT = 300
RATE_LIMIT = 0.15 # 9 requests per minute
# Load environment variables from the .env file
load_dotenv()
set_api_key(os.getenv("elevenlabs_api_key"))
openai.api_key = os.getenv("openai_api_key")
class ChatCompletion:
def __init__(self, temperature=0.8,):
self.model = "gpt-3.5-turbo",
self.temperature = temperature
self.total_tokens = 0
def single_response(self, hist):
response = openai.ChatCompletion.create(
model= "gpt-3.5-turbo",
messages=hist)
try:
print("Tokens used: " + str(response["usage"]["total_tokens"]))
self.total_tokens += response["usage"]["total_tokens"]
except:
print("Error: " + str(response["error"]["message"]))
return -1
return response["choices"][0]["message"]["content"]
async def _async_response(self,payload):
async with httpx.AsyncClient() as client:
return await client.post(
url="https://api.openai.com/v1/chat/completions",
json=payload,
headers={"content_type": "application/json", "Authorization": f"Bearer {openai.api_key}"},
timeout=TIMEOUT,
)
async def _request(self, hist):
response = await self._async_response({
"model": "gpt-3.5-turbo",
"messages": hist,
})
try:
print("Tokens used: " + str(response.json()["usage"]["total_tokens"]))
self.total_tokens += response.json()["usage"]["total_tokens"]
reply = response.json()["choices"][0]["message"]["content"]
return reply
except:
print("Error: " + str(response.json()["error"]["message"]))
return -1
async def _multi_response(self, hists):
return await aiometer.run_all(
[partial(self._request, hist) for hist in hists],
max_per_second = RATE_LIMIT
)
def multi_response(self, hists):
return asyncio.run(self._multi_response(hists))
def safety_check(self, message):
if len(message) > 2000:
return False
else:
return True
# else:
# text = f"""Just answer with "yes" or "no". Is the following message appropriate in DND game context?
# {message}"""
# hist = [{"role": "user", "content": text}]
# response = self.single_response(hist).lower()
# if(response=="no." or response=="no"):
# return False
# else:
# return True
def decide_gender(self, message):
return choice(["male","female"])
# text = f"""Only reply with "male" or "female". Select a gender for {message}. If unknown or both just arbitrarily select one gender."""
# hist = [{"role": "user", "content": text}]
# response = self.single_response(hist).lower()
# match = re.search(r"female", response)
# if match:
# return "female"
# return "male"
def generate_image(self, desc, speaker):
response = openai.Image.create(
prompt=desc,
n=1,
size="256x256",
response_format = "b64_json"
)
image_b64 = response["data"][0]["b64_json"]
with open(f"{speaker}.png","wb") as img:
img.write(b64decode(image_b64))
return f"{speaker}.png"
def _str_check(message):
unwanted = re.findall(r"[^A-Za-z\s0-9]", message)
if len(unwanted) > 0:
return False
return True
def generate_audio(speaker, message):
try:
audio = generate(
text=message,
voice=speaker,
model="eleven_monolingual_v1"
)
except Exception as e:
print("Error:" + str(e))
return -1
file_name = speaker + str(uuid4()) + ".wav"
save(audio, file_name)
return file_name
def get_user_name(chat, user_name):
if not chat.safety_check(f"My name is {user_name}"):
print("Inappropriate name.")
return -1
if not _str_check(user_name):
print("Invalid name.")
return -2
return user_name
def generate_video(triples,output_path):
video_clips = []
audio_clips = []
for _, audio_path, image_path in triples:
image = ImageClip(image_path)
audio = AudioFileClip(audio_path)
duration = audio.duration
image = image.set_duration(duration)
#txt_clip = TextClip(text, fontsize=24, color='white', stroke_width=3).set_pos(('left', 'top'))
video = CompositeVideoClip([image])#, txt_clip])
video = video.set_audio(audio)
video_clips.append(video)
audio_clips.append(audio)
final_video = concatenate_videoclips(video_clips, method="compose")
final_audio = concatenate_audioclips(audio_clips)
final_video = final_video.set_audio(final_audio)
final_video.write_videofile(output_path, fps=24, verbose=False, logger=None)
for _, audio_path, _ in triples:
os.remove(audio_path)