Spaces:
Runtime error
Runtime error
import os | |
import json | |
import uvicorn | |
from fastapi import FastAPI, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from contextlib import asynccontextmanager | |
from models import load_models | |
from helperfunctions import * | |
from media_download import YoutubeDownloader | |
# from transcription import StableWhisper | |
# from summarizer import Extract_Summary, AudioBookNarration | |
# from audiobook import AudioBook | |
global MODELS | |
### API Configurations | |
# Context Manager for FastAPI Start/Shutdown | |
async def lifespan(app: FastAPI): | |
## FastAPI Startup Code | |
# TODO | |
# Loading ML models | |
print('Loading ML Models..') | |
MODELS = load_models() | |
print('ML Models Loaded!') | |
yield | |
## FastAPI Shutdown Code | |
# Cleaning ML Models & Releasing the Resources | |
MODELS.clear() | |
# Initializing FastAPI App | |
app = FastAPI(lifespan=lifespan) | |
# Output Directory for Files Storage | |
output_folder = 'Output' | |
# Create a context variable to store the contexts for each user | |
users_context = dict() | |
# CORS (Cross-Origin Resource Sharing) | |
origins = [ | |
"http://localhost", | |
"http://localhost:4200", | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
### APIs | |
async def get_media_metadata(request: Request, url: str): | |
# Getting User's IP & Generating UUID | |
user_ip = request.client.host | |
user_id = generate_uuid(user_ip, url) | |
# Getting User's Youtube Downloader | |
youtube_downloader = YoutubeDownloader(url, output_folder) | |
# Getting Youtube Media Info | |
media_metadata = youtube_downloader.get_media_metadata() | |
# Getting Status | |
status = 1 if media_metadata else 0 | |
if status: | |
# Storing Info in the context for this user's session | |
users_context[user_id] = dict() | |
users_context[user_id]['downloader'] = youtube_downloader | |
# users_context[user_id]['media_metadata'] = media_metadata | |
users_context[user_id]['url'] = url | |
return {'status': status, 'user_id': user_id, 'media_metadata': media_metadata} | |
async def get_media_formats(user_id: str): | |
# Downloading Media for User | |
media_formats = users_context[user_id]['downloader'].get_media_formats() | |
# Getting Status | |
status = 1 if media_formats else 0 | |
if status: | |
# Storing Media Info in the context for this user's session | |
users_context[user_id]['media_formats'] = media_formats | |
return {'status': status, 'media_formats': media_formats} | |
async def download_media(user_id: str, media_type: str, media_format: str, media_quality: str): | |
# Downloading Media for User | |
media_path = users_context[user_id]['downloader'].download(media_type, media_format, media_quality) | |
# Getting Status | |
status = 1 if media_path else 0 | |
if status: | |
# Storing Media Info in the context for this user's session | |
users_context[user_id]['media_path'] = media_path | |
users_context[user_id]['media_type'] = media_type | |
return {'status': status, 'media_path': media_path} | |
async def get_transcript(user_id: str, subtitle_format: str = 'srt', word_level: bool = False): | |
# Retrieving the media_path from the context for this user's session | |
media_path = users_context[user_id]['media_path'] | |
# Checking if the media_type is Video, then extract it's audio | |
media_type = users_context[user_id]['media_type'] | |
if media_type == 'video': | |
media_path = extract_audio(media_path) | |
# # Whisper based transcription | |
# stable_whisper_transcript = StableWhisper(media_path, output_folder, subtitle_format=subtitle_format, word_level=word_level) | |
# transcript = stable_whisper_transcript.generate_transcript() | |
# transcript_path = stable_whisper_transcript.save_transcript() | |
temp_dir = 'temp' | |
if word_level: | |
transcript_path = os.path.join(temp_dir, 'word_level_transcript.json') | |
with open(transcript_path, "r") as json_file: | |
transcript = json.load(json_file) | |
else: | |
transcript_path = os.path.join(temp_dir, 'sentence_level_transcript.json') | |
with open(transcript_path, "r") as json_file: | |
transcript = json.load(json_file) | |
# Getting Status | |
status = 1 if transcript else 0 | |
if status: | |
# Storing Transcript Info in the context for this user's session | |
users_context[user_id]['transcript'] = transcript | |
users_context[user_id]['transcript_path'] = transcript_path | |
return {'status': status, "transcript": transcript} | |
async def get_translation(user_id: str, target_language: str = 'en'): | |
# Retrieving the transcript from the context for this user's session | |
transcript = users_context[user_id]['transcript'] | |
# # # NLLB based Translation | |
# nllb_translator = Translation(transcript, transcript['language'], target_language, 'output_path') | |
# translated_transcript = nllb_translator.get_translated_transcript() | |
# translated_subtitles = nllb_translator.get_translated_subtitles() | |
temp_dir = 'temp' | |
translated_transcript_path = os.path.join(temp_dir, 'translated_transcript.txt') | |
with open(translated_transcript_path, "r", encoding="utf-8") as f: | |
translated_transcript = f.read() | |
translated_subtitles_path = os.path.join(temp_dir, 'translated_subtitles.json') | |
with open(translated_subtitles_path, "r", encoding="utf-8") as json_file: | |
translated_subtitles = json.load(json_file) | |
# Getting Status | |
status = 1 if translated_transcript and translated_subtitles else 0 | |
if status: | |
# Storing Translated Transcript Info in the context for this user's session | |
users_context[user_id]['translated_transcript'] = translated_transcript | |
users_context[user_id]['translated_subtitles'] = translated_subtitles | |
# users_context[user_id]['transcript_path'] = transcript_path | |
return {'status': status, "transcript": translated_transcript, "subtitles": translated_subtitles} | |
async def get_summary(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, | |
Response_length: str, Writing_style: str, text_input: str = None): | |
# Getting Transcript if not provided | |
if not text_input: | |
text_input = users_context[user_id]['transcript'] | |
# # Extracting Summary | |
# summary_extractor = Extract_Summary(text_input=text_input) | |
# output = summary_extractor.define_chain(Summary_type=Summary_type, | |
# Summary_strategy=Summary_strategy, | |
# Target_Person_type=Target_Person_type, | |
# Response_length=Response_length, | |
# Writing_style=Writing_style, | |
# key_information=False) | |
temp_dir = 'temp' | |
file_path = os.path.join(temp_dir, 'summary.txt') | |
with open(file_path, 'r') as file: | |
output = file.read() | |
# Getting Status | |
status = 1 if output else 0 | |
if status: | |
# Storing Summary Info in the context for this user's session | |
users_context[user_id]['summary'] = output | |
return {'status': status, "summary": output} | |
async def get_key_info(user_id: str, Summary_type: str, Summary_strategy: str, Target_Person_type: str, | |
Response_length: str, Writing_style: str, text_input: str = None): | |
# Getting Transcript if not provided | |
if not text_input: | |
text_input = users_context[user_id]['transcript'] | |
# # Extracting Summary | |
# summary_extractor = Extract_Summary(text_input=text_input) | |
# output = summary_extractor.define_chain(Summary_type=Summary_type, | |
# Summary_strategy=Summary_strategy, | |
# Target_Person_type=Target_Person_type, | |
# Response_length=Response_length, | |
# Writing_style=Writing_style, | |
# key_information=True) | |
temp_dir = 'temp' | |
file_path = os.path.join(temp_dir, 'key_info.txt') | |
with open(file_path, 'r') as file: | |
output = file.read() | |
# Getting Status | |
status = 1 if output else 0 | |
if status: | |
# Storing Key Info in the context for this user's session | |
users_context[user_id]['key_info'] = output | |
return {'status': status, "key_info": output} | |
async def get_audiobook(user_id: str, narration_style: str, speaker: str = "male", text_input: str = None, | |
audio_format: str = "mp3", audio_quality: str = "128kbps"): | |
# Getting Transcript if not provided | |
if not text_input: | |
text_input = users_context[user_id]['transcript'] | |
# # Extracting Narration | |
# narrator = AudioBookNarration(text_input=text_input) | |
# output = narrator.define_chain(narration_style=narration_style) | |
# # Generating Audiobook | |
# audiobook = AudioBook(output_folder=output_folder) | |
# audio_path = audiobook.generate_audio_from_text(output, speaker=speaker, filename="output_audio") | |
# # Converting the Audio to Required Audio Parameters | |
# audio_path = convert_audio(audio_path, audio_format, audio_quality) | |
temp_dir = 'temp' | |
file_path = os.path.join(temp_dir, 'narration.txt') | |
audio_path = file_path | |
# Getting Status | |
status = 1 if audio_path else 0 | |
if status: | |
# Storing Audiobook path in the context for this user's session | |
users_context[user_id]['audiobook_path'] = audio_path | |
return {'status': status, "audiobook_path": audio_path} | |
async def get_rendered_video(user_id: str, video_format: str, video_quality: str, subtitles_type: str = 'original'): | |
# # Retrieving the media_path from the context for this user's session | |
# media_path = users_context[user_id]['media_path'] | |
# Downloading Video with Required Video Parameters for User | |
media_path = users_context[user_id]['downloader'].download('video', video_format, video_quality) | |
# Getting Required Subtitles | |
if subtitles_type.lower() == 'original': | |
subtitles_path = users_context[user_id]['transcript_path'] | |
elif subtitles_type.lower() == 'translated': | |
# Getting Translated Subtitles from the context for this user's session | |
translated_subtitles = users_context[user_id]['translated_subtitles'] | |
# Saving Translated Subtitles | |
subtitles_path = save_translated_subtitles(translated_subtitles, media_path) | |
# Burning Subtitles & Rendering Video | |
rendered_video_path = burn_subtitles(media_path, subtitles_path) | |
# Getting Status | |
status = 1 if rendered_video_path else 0 | |
return {'status': status, "rendered_video_path": rendered_video_path} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="127.0.0.1", port=8000) |