File size: 5,735 Bytes
ff5f2a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ee6732
ff5f2a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# pip install python-dotenv   first install this package to be able to call custom env variables from .env
# don't forget to call the load_dotenv() function to initialize the getenv() method of os module 

import openai
import os
from dotenv import load_dotenv
import httpx
import asyncio
import aiometer
from functools import partial
from elevenlabs import generate, save, set_api_key
from base64 import b64decode
import re
from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, concatenate_videoclips, concatenate_audioclips, TextClip, CompositeAudioClip
from random import choice
from uuid import uuid4

TIMEOUT = 300
RATE_LIMIT = 0.15 # 9 requests per minute

# Load environment variables from the .env file
load_dotenv()

set_api_key(os.getenv("elevenlabs_api_key"))
openai.api_key = os.getenv("openai_api_key")

class ChatCompletion:
    def __init__(self, temperature=0.8,):
        self.model = "gpt-3.5-turbo",
        self.temperature = temperature
        self.total_tokens = 0


    def single_response(self, hist):
        response = openai.ChatCompletion.create(
            model= "gpt-3.5-turbo",
            messages=hist)
        try:
            print("Tokens used: " + str(response["usage"]["total_tokens"]))
            self.total_tokens += response["usage"]["total_tokens"]
        except:
            print("Error: " + str(response["error"]["message"]))
            return -1
        return response["choices"][0]["message"]["content"]

    async def _async_response(self,payload):
        async with httpx.AsyncClient() as client:
            return await client.post(
                url="https://api.openai.com/v1/chat/completions",
                json=payload,
                headers={"content_type": "application/json", "Authorization": f"Bearer {openai.api_key}"},
                timeout=TIMEOUT,
            )

    async def _request(self, hist):
        response = await self._async_response({
            "model": "gpt-3.5-turbo",
            "messages": hist,
        })
        try:
            print("Tokens used: " + str(response.json()["usage"]["total_tokens"]))
            self.total_tokens += response.json()["usage"]["total_tokens"]
            reply = response.json()["choices"][0]["message"]["content"]
            return reply
        except:
            print("Error: " + str(response.json()["error"]["message"]))
            return -1

    async def _multi_response(self, hists):
        return await aiometer.run_all(
            [partial(self._request, hist) for hist in hists],
            max_per_second = RATE_LIMIT
        )

    def multi_response(self, hists):
        return asyncio.run(self._multi_response(hists))

    def safety_check(self, message):
        if len(message) > 2000:
            return False
        else:
            return True
        # else:
        #     text = f"""Just answer with "yes" or "no". Is the following message appropriate in DND game context?
            
        #     {message}"""
        #     hist = [{"role": "user", "content": text}]
        #     response = self.single_response(hist).lower()
        #     if(response=="no." or response=="no"):
        #         return False
        #     else:
        #         return True
    
    def decide_gender(self, message):
        return choice(["male","female"])
        # text = f"""Only reply with "male" or "female". Select a gender for {message}. If unknown or both just arbitrarily select one gender."""
        # hist = [{"role": "user", "content": text}]
        # response = self.single_response(hist).lower()
        # match = re.search(r"female", response)
        # if match:
        #     return "female"
        # return "male"

    def generate_image(self, desc, speaker):
        response = openai.Image.create(
            prompt=desc,
            n=1,
            size="256x256",
            response_format = "b64_json"
        )
        image_b64 = response["data"][0]["b64_json"]
        with open(f"{speaker}.png","wb") as img:
            img.write(b64decode(image_b64))
        return f"{speaker}.png"
    
def _str_check(message):
    unwanted = re.findall(r"[^A-Za-z\s0-9]", message)
    if len(unwanted) > 0:
        return False
    return True

def generate_audio(speaker, message):
    try:
        audio = generate(
            text=message,
            voice=speaker,
            model="eleven_monolingual_v1"
        )
    except Exception as e:
        print("Error:" + str(e))
        return -1
    file_name = speaker + str(uuid4()) + ".wav"
    save(audio, file_name)
    return file_name
    

def get_user_name(chat, user_name):
    if not chat.safety_check(f"My name is {user_name}"):
        print("Inappropriate name.")
        return -1
    if not _str_check(user_name):
        print("Invalid name.")
        return -2
    return user_name

def generate_video(triples,output_path):
    video_clips = []
    audio_clips = []
    for _, audio_path, image_path in triples:
        image = ImageClip(image_path)
        audio = AudioFileClip(audio_path)
        duration = audio.duration
        image = image.set_duration(duration)
        #txt_clip = TextClip(text, fontsize=24, color='white', stroke_width=3).set_pos(('left', 'top'))
        video = CompositeVideoClip([image])#, txt_clip])
        video = video.set_audio(audio)
        video_clips.append(video)
        audio_clips.append(audio)

    final_video = concatenate_videoclips(video_clips, method="compose")
    final_audio = concatenate_audioclips(audio_clips)
    final_video = final_video.set_audio(final_audio)
    final_video.write_videofile(output_path, fps=24, verbose=False, logger=None)
    for _, audio_path, _ in triples:
        os.remove(audio_path)