Embedding / App /TTS /utils /Descript.py
Mbonea's picture
updates the voices list
91050d5
raw
history blame
No virus
14.5 kB
import aiohttp
import asyncio
import json, pprint, uuid, os, datetime
import tempfile, shutil
from typing import List, Optional
from datetime import datetime, timedelta
from pydantic import BaseModel, HttpUrl
class Metadata(BaseModel):
filename: str
type: str
class Artifact(BaseModel):
asset_id: str
created_at: datetime
file_extension: str
id: str
is_segmented: bool
lookup_key: HttpUrl
md5: str
metadata: Metadata
read_url: HttpUrl
size: int
status: str
uploaded_by: str
class TTSResponse(BaseModel):
artifacts: List[Artifact]
created_at: datetime
created_by: str
id: str
lookup_key: HttpUrl
metadata: Optional[dict]
class DescriptTTS:
def __init__(self, refresh_token=None):
self.client_id = "VDfu7rg4pdCELWsrQjcw2tG63a8Qlymi"
self.refresh_token_url = "https://auth0.descript.com/oauth/token"
self.project_id = "f734c6d7-e39d-4c1d-8f41-417f94cd37ce"
self.bearer_token = None
self.voice_ids = {
"Henry": "569fffb0-05a3-48a2-96a3-bf411c376477",
"Malcom": "75f8b86e-d05d-4862-a228-8d96fdf55258",
"Lawrance": "042460c0-98a5-41ae-9f31-33672ebb9016",
## de
}
self.refresh_token = refresh_token
self.tau_id = "90f9e0ad-594e-4203-9297-d4c7cc691e5x"
async def login_and_get_bearer_token(self):
# Step 1: Use refresh token to get a new access token
new_bearer_token, new_refresh_token = await self.refresh_access_token()
# Step 2: Update the new refresh token to the Firebase Realtime Database
await self.update_refresh_token(new_refresh_token)
# Step 3: Set the new bearer token for further use
self.bearer_token = new_bearer_token
self.refresh_token = new_refresh_token
async def refresh_access_token(self):
# Load the existing refresh token from Firebase
if self.refresh_token == None:
await self.load_existing_refresh_token()
# Prepare the payload for token refresh
payload = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.client_id,
}
# Request a new access token using the refresh token
async with aiohttp.ClientSession() as session:
async with session.post(self.refresh_token_url, data=payload) as response:
if response.status == 200:
# Parse the response to get the new access token and refresh token
response_data = await response.json()
new_bearer_token = response_data.get("access_token")
new_refresh_token = response_data.get("refresh_token")
return new_bearer_token, new_refresh_token
else:
raise Exception(
f"Failed to refresh access token. Status code: {response.status}, Error: {await response.text()}"
)
async def load_existing_refresh_token(self):
# Load the existing refresh token from Firebase
async with aiohttp.ClientSession() as session:
async with session.get(
"https://herokuserver-185316.firebaseio.com/refresh_token_descript.json"
) as response:
if response.status == 200:
# Parse the response to get the existing refresh token
data = await response.json()
self.refresh_token = data.get("refresh_token")
else:
raise Exception(
f"Failed to load existing refresh token. Status code: {response.status}, Error: {await response.text()}"
)
async def download_and_store_file(self, access_url):
temp_dir = tempfile.mkdtemp()
# Generate a unique random filename
random_filename = str(uuid.uuid4()) + ".wav"
file_path = os.path.join(temp_dir, random_filename)
async with aiohttp.ClientSession() as session:
async with session.get(access_url) as response:
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
file.write(chunk)
# Schedule the file for deletion after 10 minutes
delete_time = datetime.now() + timedelta(minutes=10)
async def schedule_delete():
while datetime.now() < delete_time:
await asyncio.sleep(60) # Check every minute
shutil.rmtree(
temp_dir, ignore_errors=True
) # Delete the temporary directory
asyncio.ensure_future(schedule_delete())
return file_path
async def search_unsplash_images(self, query_terms):
url = "https://api.descript.com/v2/cloud_libraries/providers/unsplash/image/search"
data = {
'tracking_info': {'project_id': self.project_id},
'pagination_info': {'page': 2, 'page_size': 25},
'query': {'terms': query_terms}
}
try:
response = await self.make_authenticated_request(url, method="POST", data=data)
return response
except Exception as e:
print(f"Failed to search Unsplash images: {e}")
return None
async def search_sound_effects(self, query_terms):
url = "https://api.descript.com/v2/cloud_libraries/providers/stock-sfx/audio/search"
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'authorization': f'Bearer {self.bearer_token}', # Use the valid bearer token
}
data = {
'tracking_info': {'project_id': self.project_id},
'pagination_info': {'page': 1, 'page_size': 25},
'query': {'terms': query_terms}
}
try:
response = await self.make_authenticated_request(url, method="POST", data=data)
return response
except Exception as e:
print(f"Failed to search sound effects: {e}")
return {'status':str(e)}
async def get_voices(self):
url = "https://api.descript.com/v2/users/me/voices"
try:
response = await self.make_authenticated_request(url)
voices = response
self.voice_ids = {voice['name']: voice['id'] for voice in voices}
return voices
except Exception as e:
print(f"Failed to fetch voices: {e}")
return None
async def start_token_refresh_schedule(self):
while True:
try:
new_bearer_token, new_refresh_token = await self.refresh_access_token()
self.bearer_token = new_bearer_token
self.refresh_token = new_refresh_token
# Step 2: Update the new refresh token to the Firebase Realtime Database
await self.update_refresh_token(new_refresh_token)
print("Token refreshed successfully")
except Exception as e:
print(f"Failed to refresh token: {e}")
# Wait for 24 hours before the next refresh
await asyncio.sleep(24 * 60 * 60)
async def update_refresh_token(self, new_refresh_token):
# Update the new refresh token to Firebase
data = {"refresh_token": new_refresh_token}
async with aiohttp.ClientSession() as session:
async with session.put(
"https://herokuserver-185316.firebaseio.com/refresh_token_descript.json",
json=data,
) as response:
if response.status != 200:
raise Exception(
f"Failed to update refresh token. Status code: {response.status}, Error: {await response.text()}"
)
async def make_authenticated_request(self, url, method="GET", data=None):
if not self.bearer_token:
await self.login_and_get_bearer_token() # Make sure we have a valid bearer token
headers = {
"authority": "api.descript.com",
"accept": "application/json, text/plain, */*",
"accept-language": "en-US,en;q=0.9",
"accept-version": "v1",
"authorization": f"Bearer {self.bearer_token}",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": "https://web.descript.com",
"pragma": "no-cache",
"referer": "https://web.descript.com/",
"sec-ch-ua": '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"x-descript-app-build-number": "20231206.146",
"x-descript-app-build-type": "release",
"x-descript-app-id": "48db7358-5ebc-4866-b672-10b412ac39c1",
"x-descript-app-name": "web",
"x-descript-app-version": "78.2.4",
"x-descript-auth": "auth0",
}
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=headers, json=data
) as response:
if response.status < 300:
return await response.json()
elif response.status == 401:
# Retry the request after refreshing the token
await self.login_and_get_bearer_token()
headers["authorization"] = f"Bearer {self.bearer_token}"
async with session.request(
method, url, headers=headers, json=data
) as retry_response:
if retry_response.status == 200:
return await retry_response.json()
else:
raise Exception(
f"Request failed even after refreshing token. Status code: {retry_response.status}, Error: {await retry_response.text()}"
)
else:
raise Exception(
f"Request failed. Status code: {response.status}, Error: {await response.text()}"
)
async def get_assets(self):
url = "https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/media_assets?include_artifacts=true&cursor=1702016922390&include_placeholder=true"
try:
result = await self.make_authenticated_request(url)
return result
except Exception as e:
print(f"Failed to get assets: {str(e)}")
async def overdub_text(self, text, speaker="Lawrance",_voice_id=None):
url = "https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/overdub"
voice_id = _voice_id or self.voice_ids[speaker]
data = {
"text": text,
"voice_id": voice_id,
"concatenate_audio": True,
"tau_id": self.tau_id,
"allow_prefix_expansion": True,
"allow_suffix_expansion": True,
}
try:
result = await self.make_authenticated_request(
url, method="POST", data=data
)
return result
except Exception as e:
# Retry the request after refreshing the token if the failure is due to authorization
if "authorization" in str(e).lower():
await self.login_and_get_bearer_token()
result = await self.make_authenticated_request(
url, method="POST", data=data
)
print(result)
return result
else:
print(f"Failed to perform overdub: {str(e)}")
async def overdub_staus(self, id):
url = f"https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/overdub/{id}"
try:
result = await self.make_authenticated_request(url, method="GET")
print(result)
return result
except Exception as e:
# Retry the request after refreshing the token if the failure is due to authorization
if "authorization" in str(e).lower():
await self.login_and_get_bearer_token()
result = await self.make_authenticated_request(
url, method="POST", data=data
)
print(result)
return result
else:
print(f"Failed to perform overdub: {str(e)}")
async def request_status(self, id):
status = await self.overdub_staus(id)
if status["state"] == "done":
asset_id=status["result"]["imputation_audio_asset_id"]
overdub = await self.get_assets()
for asset in overdub["data"]:
if asset["id"] == asset_id:
data = TTSResponse(**asset)
url = data.artifacts[0].read_url
return {'url':url,'status':'done'}
return status
async def say(self, text, speaker="Henry"):
overdub = await self.overdub_text(text, speaker=speaker)
asset_id = None
while True:
status = await self.overdub_staus(overdub["id"])
# print(status)
if status["state"] == "done":
# print(status)
asset_id = status["result"]["imputation_audio_asset_id"]
break
await asyncio.sleep(3)
overdub = await self.get_assets()
for asset in overdub["data"]:
if asset["id"] == asset_id:
data = TTSResponse(**asset)
url = data.artifacts[0].read_url
print(url)
path = await self.download_and_store_file(str(url))
return path, url