vmoras's picture
Uploading files to GCP and refactor
e0d9c8e
import os
import time
import requests
def get_video(link_audio: str, image_url: str, path_video: str,) -> None:
"""
Creates a video with d-id and saves it.
:param link_audio: url of the audio in the bucket used for the video
:param path_video: path for saving the video file
:param image_url: url with the base image used for the video
:return: None
:raises Exception: if there was a problem with D-ID
"""
try:
id_video = _create_talk(link_audio, image_url)
except Exception as e:
raise e
link_video = _get_url_talk(id_video)
# Saves the video into a file to later upload it to the cloud
name = f'{path_video}.mp4'
try:
with requests.get(link_video) as r:
r.raise_for_status() # Raises an exception for HTTP errors
if r.status_code == 200:
with open(name, 'wb') as outfile:
outfile.write(r.content)
except requests.exceptions.RequestException as e:
raise Exception(f"Network-related error while downloading the video: {e}")
except ValueError as e:
raise Exception(e)
except Exception as e:
raise Exception(f"An unexpected error occurred: {e}")
def _create_talk(link_audio: str, image_url: str) -> str:
"""
Creates and returns the id of the talk made with d-id.
:param link_audio: url of the audio in the bucket used for the video
:param image_url: url with the base image used for the video
:return: id of the talk
:raises Exception: if there was a problem while generating the talk
"""
url = "https://api.d-id.com/talks"
payload = {
"script": {
"type": "audio",
"provider": {
"type": "microsoft",
"voice_id": "en-US-JennyNeural"
},
"ssml": "false",
"audio_url": link_audio
},
"config": {
"fluent": "false",
"pad_audio": "0.0",
"stitch": True
},
"source_url": image_url
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Basic {os.getenv('D_ID_KEY')}"
}
response = requests.post(url, json=payload, headers=headers)
r = response.json()
try:
talk_id = r['id']
return talk_id
# Probably there are no more available credits
except KeyError:
raise Exception(f"D-ID response is missing 'id' key. Returned error: {r}")
def _get_url_talk(id_video: str) -> str:
"""
Gets the url of the finished talk
:param id_video: id of the previously created talk
:return: url of the video
"""
url = f"https://api.d-id.com/talks/{id_video}"
while True:
headers = {
"accept": "application/json",
"authorization": f"Basic {os.getenv('D_ID_KEY')}"
}
response = requests.get(url, headers=headers)
r = response.json()
if r['status'] == 'done':
break
time.sleep(1) # Sleep until the video is ready
return r['result_url']