|
import aiohttp |
|
import asyncio |
|
import json, pprint, uuid, os, datetime |
|
import tempfile, shutil |
|
from typing import List, Optional |
|
from datetime import datetime, timedelta |
|
from pydantic import BaseModel, HttpUrl |
|
|
|
|
|
class Metadata(BaseModel): |
|
filename: str |
|
type: str |
|
|
|
|
|
class Artifact(BaseModel): |
|
asset_id: str |
|
created_at: datetime |
|
file_extension: str |
|
id: str |
|
is_segmented: bool |
|
lookup_key: HttpUrl |
|
md5: str |
|
metadata: Metadata |
|
read_url: HttpUrl |
|
size: int |
|
status: str |
|
uploaded_by: str |
|
|
|
|
|
class TTSResponse(BaseModel): |
|
artifacts: List[Artifact] |
|
created_at: datetime |
|
created_by: str |
|
id: str |
|
lookup_key: HttpUrl |
|
metadata: Optional[dict] |
|
|
|
|
|
class DescriptTTS: |
|
def __init__(self, refresh_token=None): |
|
self.client_id = "VDfu7rg4pdCELWsrQjcw2tG63a8Qlymi" |
|
self.refresh_token_url = "https://auth0.descript.com/oauth/token" |
|
self.project_id = "f734c6d7-e39d-4c1d-8f41-417f94cd37ce" |
|
self.bearer_token = None |
|
self.voice_ids = { |
|
"Henry": "569fffb0-05a3-48a2-96a3-bf411c376477", |
|
"Malcom": "75f8b86e-d05d-4862-a228-8d96fdf55258", |
|
"Lawrance": "042460c0-98a5-41ae-9f31-33672ebb9016", |
|
|
|
} |
|
|
|
self.refresh_token = refresh_token |
|
self.tau_id = "90f9e0ad-594e-4203-9297-d4c7cc691e5x" |
|
|
|
async def login_and_get_bearer_token(self): |
|
|
|
new_bearer_token, new_refresh_token = await self.refresh_access_token() |
|
|
|
|
|
await self.update_refresh_token(new_refresh_token) |
|
|
|
|
|
self.bearer_token = new_bearer_token |
|
self.refresh_token = new_refresh_token |
|
|
|
async def refresh_access_token(self): |
|
|
|
if self.refresh_token == None: |
|
await self.load_existing_refresh_token() |
|
|
|
|
|
payload = { |
|
"grant_type": "refresh_token", |
|
"refresh_token": self.refresh_token, |
|
"client_id": self.client_id, |
|
} |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(self.refresh_token_url, data=payload) as response: |
|
if response.status == 200: |
|
|
|
response_data = await response.json() |
|
new_bearer_token = response_data.get("access_token") |
|
new_refresh_token = response_data.get("refresh_token") |
|
|
|
return new_bearer_token, new_refresh_token |
|
else: |
|
raise Exception( |
|
f"Failed to refresh access token. Status code: {response.status}, Error: {await response.text()}" |
|
) |
|
|
|
async def load_existing_refresh_token(self): |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get( |
|
"https://herokuserver-185316.firebaseio.com/refresh_token_descript.json" |
|
) as response: |
|
if response.status == 200: |
|
|
|
data = await response.json() |
|
self.refresh_token = data.get("refresh_token") |
|
else: |
|
raise Exception( |
|
f"Failed to load existing refresh token. Status code: {response.status}, Error: {await response.text()}" |
|
) |
|
|
|
async def download_and_store_file(self, access_url): |
|
temp_dir = tempfile.mkdtemp() |
|
|
|
random_filename = str(uuid.uuid4()) + ".wav" |
|
file_path = os.path.join(temp_dir, random_filename) |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(access_url) as response: |
|
if response.status == 200: |
|
with open(file_path, "wb") as file: |
|
while True: |
|
chunk = await response.content.read(1024) |
|
if not chunk: |
|
break |
|
file.write(chunk) |
|
|
|
|
|
delete_time = datetime.now() + timedelta(minutes=10) |
|
|
|
async def schedule_delete(): |
|
while datetime.now() < delete_time: |
|
await asyncio.sleep(60) |
|
shutil.rmtree( |
|
temp_dir, ignore_errors=True |
|
) |
|
|
|
asyncio.ensure_future(schedule_delete()) |
|
|
|
return file_path |
|
|
|
async def search_unsplash_images(self, query_terms): |
|
url = "https://api.descript.com/v2/cloud_libraries/providers/unsplash/image/search" |
|
data = { |
|
'tracking_info': {'project_id': self.project_id}, |
|
'pagination_info': {'page': 2, 'page_size': 25}, |
|
'query': {'terms': query_terms} |
|
} |
|
|
|
try: |
|
response = await self.make_authenticated_request(url, method="POST", data=data) |
|
return response |
|
except Exception as e: |
|
print(f"Failed to search Unsplash images: {e}") |
|
return None |
|
|
|
|
|
|
|
|
|
async def search_sound_effects(self, query_terms): |
|
url = "https://api.descript.com/v2/cloud_libraries/providers/stock-sfx/audio/search" |
|
headers = { |
|
'accept': 'application/json, text/plain, */*', |
|
'accept-language': 'en-US,en;q=0.9', |
|
'content-type': 'application/json', |
|
|
|
'authorization': f'Bearer {self.bearer_token}', |
|
} |
|
data = { |
|
'tracking_info': {'project_id': self.project_id}, |
|
'pagination_info': {'page': 1, 'page_size': 25}, |
|
'query': {'terms': query_terms} |
|
} |
|
|
|
try: |
|
response = await self.make_authenticated_request(url, method="POST", data=data) |
|
return response |
|
except Exception as e: |
|
print(f"Failed to search sound effects: {e}") |
|
return {'status':str(e)} |
|
|
|
|
|
async def get_voices(self): |
|
url = "https://api.descript.com/v2/users/me/voices" |
|
try: |
|
response = await self.make_authenticated_request(url) |
|
voices = response |
|
self.voice_ids = {voice['name']: voice['id'] for voice in voices} |
|
|
|
return voices |
|
except Exception as e: |
|
print(f"Failed to fetch voices: {e}") |
|
return None |
|
|
|
|
|
async def start_token_refresh_schedule(self): |
|
while True: |
|
try: |
|
new_bearer_token, new_refresh_token = await self.refresh_access_token() |
|
self.bearer_token = new_bearer_token |
|
self.refresh_token = new_refresh_token |
|
|
|
|
|
await self.update_refresh_token(new_refresh_token) |
|
|
|
print("Token refreshed successfully") |
|
except Exception as e: |
|
print(f"Failed to refresh token: {e}") |
|
|
|
|
|
await asyncio.sleep(24 * 60 * 60) |
|
|
|
|
|
async def update_refresh_token(self, new_refresh_token): |
|
|
|
data = {"refresh_token": new_refresh_token} |
|
async with aiohttp.ClientSession() as session: |
|
async with session.put( |
|
"https://herokuserver-185316.firebaseio.com/refresh_token_descript.json", |
|
json=data, |
|
) as response: |
|
if response.status != 200: |
|
raise Exception( |
|
f"Failed to update refresh token. Status code: {response.status}, Error: {await response.text()}" |
|
) |
|
|
|
async def make_authenticated_request(self, url, method="GET", data=None): |
|
if not self.bearer_token: |
|
await self.login_and_get_bearer_token() |
|
|
|
headers = { |
|
"authority": "api.descript.com", |
|
"accept": "application/json, text/plain, */*", |
|
"accept-language": "en-US,en;q=0.9", |
|
"accept-version": "v1", |
|
"authorization": f"Bearer {self.bearer_token}", |
|
"cache-control": "no-cache", |
|
"content-type": "application/json", |
|
"origin": "https://web.descript.com", |
|
"pragma": "no-cache", |
|
"referer": "https://web.descript.com/", |
|
"sec-ch-ua": '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"', |
|
"sec-ch-ua-mobile": "?0", |
|
"sec-ch-ua-platform": '"Windows"', |
|
"sec-fetch-dest": "empty", |
|
"sec-fetch-mode": "cors", |
|
"sec-fetch-site": "same-site", |
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", |
|
"x-descript-app-build-number": "20231206.146", |
|
"x-descript-app-build-type": "release", |
|
"x-descript-app-id": "48db7358-5ebc-4866-b672-10b412ac39c1", |
|
"x-descript-app-name": "web", |
|
"x-descript-app-version": "78.2.4", |
|
"x-descript-auth": "auth0", |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.request( |
|
method, url, headers=headers, json=data |
|
) as response: |
|
if response.status < 300: |
|
return await response.json() |
|
elif response.status == 401: |
|
|
|
await self.login_and_get_bearer_token() |
|
headers["authorization"] = f"Bearer {self.bearer_token}" |
|
async with session.request( |
|
method, url, headers=headers, json=data |
|
) as retry_response: |
|
if retry_response.status == 200: |
|
return await retry_response.json() |
|
else: |
|
raise Exception( |
|
f"Request failed even after refreshing token. Status code: {retry_response.status}, Error: {await retry_response.text()}" |
|
) |
|
else: |
|
raise Exception( |
|
f"Request failed. Status code: {response.status}, Error: {await response.text()}" |
|
) |
|
|
|
async def get_assets(self): |
|
url = "https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/media_assets?include_artifacts=true&cursor=1702016922390&include_placeholder=true" |
|
try: |
|
result = await self.make_authenticated_request(url) |
|
return result |
|
except Exception as e: |
|
print(f"Failed to get assets: {str(e)}") |
|
|
|
async def overdub_text(self, text, speaker="Lawrance",_voice_id=None): |
|
url = "https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/overdub" |
|
voice_id = _voice_id or self.voice_ids[speaker] |
|
data = { |
|
"text": text, |
|
"voice_id": voice_id, |
|
"concatenate_audio": True, |
|
"tau_id": self.tau_id, |
|
"allow_prefix_expansion": True, |
|
"allow_suffix_expansion": True, |
|
} |
|
|
|
try: |
|
result = await self.make_authenticated_request( |
|
url, method="POST", data=data |
|
) |
|
return result |
|
except Exception as e: |
|
|
|
if "authorization" in str(e).lower(): |
|
await self.login_and_get_bearer_token() |
|
result = await self.make_authenticated_request( |
|
url, method="POST", data=data |
|
) |
|
print(result) |
|
return result |
|
else: |
|
print(f"Failed to perform overdub: {str(e)}") |
|
|
|
async def overdub_staus(self, id): |
|
url = f"https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/overdub/{id}" |
|
|
|
try: |
|
result = await self.make_authenticated_request(url, method="GET") |
|
print(result) |
|
return result |
|
except Exception as e: |
|
|
|
if "authorization" in str(e).lower(): |
|
await self.login_and_get_bearer_token() |
|
result = await self.make_authenticated_request( |
|
url, method="POST", data=data |
|
) |
|
print(result) |
|
return result |
|
else: |
|
print(f"Failed to perform overdub: {str(e)}") |
|
|
|
async def request_status(self, id): |
|
status = await self.overdub_staus(id) |
|
if status["state"] == "done": |
|
asset_id=status["result"]["imputation_audio_asset_id"] |
|
overdub = await self.get_assets() |
|
for asset in overdub["data"]: |
|
if asset["id"] == asset_id: |
|
data = TTSResponse(**asset) |
|
url = data.artifacts[0].read_url |
|
return {'url':url,'status':'done'} |
|
return status |
|
|
|
|
|
|
|
async def say(self, text, speaker="Henry"): |
|
overdub = await self.overdub_text(text, speaker=speaker) |
|
|
|
asset_id = None |
|
while True: |
|
status = await self.overdub_staus(overdub["id"]) |
|
|
|
if status["state"] == "done": |
|
|
|
asset_id = status["result"]["imputation_audio_asset_id"] |
|
break |
|
await asyncio.sleep(3) |
|
|
|
overdub = await self.get_assets() |
|
for asset in overdub["data"]: |
|
if asset["id"] == asset_id: |
|
data = TTSResponse(**asset) |
|
url = data.artifacts[0].read_url |
|
print(url) |
|
path = await self.download_and_store_file(str(url)) |
|
return path, url |
|
|
|
|
|
|