Spaces:
Sleeping
Sleeping
import time | |
from scipy.io.wavfile import write | |
# from typing import Union | |
# from pydantic import BaseModel | |
from fastapi import FastAPI | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
# from fastapi.staticfiles import StaticFiles | |
# from fastapi.responses import FileResponse | |
import torch | |
# from transformers import pipeline | |
from transformers import SeamlessM4Tv2Model | |
from transformers import AutoProcessor | |
model_name = "facebook/seamless-m4t-v2-large" | |
# model_name = "facebook/hf-seamless-m4t-medium" | |
processor = AutoProcessor.from_pretrained(model_name) | |
model = SeamlessM4Tv2Model.from_pretrained(model_name) | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
# torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
model.to(device) | |
app = FastAPI(docs_url="/api/docs") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_methods=["*"], | |
allow_headers=["*"], | |
allow_credentials=True, | |
) | |
BATCH_SIZE = 8 | |
def getDevice(): | |
start_time = time.time() | |
print("Time took to process the request and return response is {} sec".format( | |
time.time() - start_time)) | |
return device | |
def transcribe(inputs, src_lang="eng", tgt_lang="por"): | |
start_time = time.time() | |
if inputs is None: | |
raise "No audio file submitted! Please upload or record an audio file before submitting your request." | |
text_inputs = processor(text=inputs, | |
src_lang=src_lang, return_tensors="pt").to(device) | |
output_tokens = model.generate( | |
**text_inputs, tgt_lang=tgt_lang, generate_speech=False) | |
translated_text_from_text = processor.decode( | |
output_tokens[0].tolist()[0], skip_special_tokens=True) | |
print("Time took to process the request and return response is {} sec".format( | |
time.time() - start_time)) | |
return translated_text_from_text | |
async def audio(inputs, src_lang="eng", tgt_lang="por", speaker_id=5): | |
start_time = time.time() | |
if inputs is None: | |
raise "No audio file submitted! Please upload or record an audio file before submitting your request." | |
text_inputs = processor(text=inputs, | |
src_lang=src_lang, return_tensors="pt").to(device) | |
audio_array_from_text = model.generate( | |
**text_inputs, tgt_lang=tgt_lang, speaker_id=int(speaker_id))[0].cpu().numpy().squeeze() | |
print("Time took to process the request and return response is {} sec".format( | |
time.time() - start_time)) | |
print(f"sampling_rate {model.config.sampling_rate}") | |
write(f"/tmp/output{start_time}.wav", model.config.sampling_rate, | |
audio_array_from_text) | |
return FileResponse(f"/tmp/output{start_time}.wav", media_type="audio/mpeg") | |