Very low RPS of SeamlessM4t model during load test
Has anyone conducted a load test on Seamless models for the S2TT task?
The models are deployed within a Docker container as a flask application using Gunicorn in Amazon AWS EC2 r5.8xlarge instance.
The resource constraints are configured as follows:
- CPU: Minimum of 8 vCPUs, maximum of 16 vCPUs.
- Memory: Minimum of 8 GiB, maximum of 60 GiB.
Locust is used for load testing. My input consists of audio files, each ranging from 1 to 5 seconds.
Throughout the load test, requests per second (RPS) remain below 2.
Can someone provide any assistance or share any benchmark load test performance data for this setup?
Here is the locust script. The audios are encoded into base64 encoding and sent as a json request.
import os
import base64
import random
import uuid
from locust import HttpUser, task
user_dir = os.path.dirname(os.getcwd())
path = os.path.join(user_dir, 'Downloads', 'seamless', 'transcribe_gold_standard_voice_samples', 'output')
n_samples = int(os.getenv('N_SAMPLES', 30))
n_requests = int(os.getenv('N_REQUESTS', 1))
model_name = os.getenv('MODEL_TYPE', 'seamless')
audio_file_names = os.listdir(path)[:n_samples]
encoded_audios = []
for file_nm in audio_file_names:
file_path = os.path.join(path, file_nm)
with open(file_path, 'rb') as audio_file:
audio_data = audio_file.read()
encoded_audios.append(str(base64.b64encode(audio_data).decode('utf-8')))
class QuickstartUser(HttpUser):
@task
def hello_world(self):
idxs = random.sample(range(n_samples), n_requests)
json_req = {
"requestId": str(uuid.uuid4()),
"audioContexts": {
"audioContext": [
{
"audioId": idx,
"base64Encoding": encoded_audios[idx],
"sampleRate": 16000,
"frameWidth": 2,
"numberOfChannels": 1
}
for idx in idxs
],
"transcriptionOptions": {
"transcriptionModel": model_name,
"modelType": model_name,
"multilingualSupport": "N"
}
}
}
self.client.post("https://seamless-api-load.social-dev-use1.stellar.eadp.cloud/seamless/transcribe", json=json_req)
And below is the code that handles the request, decodes it and transcribes it using model as per the MODEL_TYPE (seamless here) and sends back the response.
from flask import Flask, request, jsonify
import os
import io
import base64
import numpy as np
import time
import librosa
import logging
import threading
import transformers
import torch
import torchaudio
from transformers import AutoProcessor, SeamlessM4TModel
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
log_level = os.getenv('LOGLEVEL', 'INFO').upper()
log_format = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(level=getattr(logging, log_level), format=log_format)
app = Flask(__name__)
def load_models(model_type):
seamless_processor = None
seamless_model = None
if model_type == 'seamless':
# seamless
start = time.time()
seamless_processor = AutoProcessor.from_pretrained("facebook/hf-seamless-m4t-medium")
seamless_model = SeamlessM4TModel.from_pretrained("facebook/hf-seamless-m4t-medium", low_cpu_mem_usage=True)
seamless_load_time = round(time.time() - start, 2)
logging.info(f'Loaded Seamless model in {seamless_load_time}s')
return seamless_processor, seamless_model
init_model = os.getenv('MODEL_TYPE', 'seamless').lower()
seamless_processor, seamless_model = load_models(init_model)
model_lock = threading.Lock()
set_thread_limit = os.getenv('SET_THREAD_LIMIT', 'True')
if set_thread_limit.lower() == 'true':
thread_limit = int(os.getenv('THREAD_LIMIT', 8))
torch.set_num_threads(thread_limit)
@app
.route('/seamless/transcribe', methods=['POST'])
def transcribe():
try:
startTime = time.time()
requestId = request.json['requestId']
audio_records = request.json['audioContexts']['audioContext']
model_type = request.json['audioContexts']['transcriptionOptions']['modelType']
modelHash = str(hash(seamless_model)) if model_type == 'seamless' else ''
response, audio_ids, audio_decoding_times, transcription_times = get_transcriptions(requestId, audio_records, model_type, modelHash)
turnaroundTime = round(time.time() - startTime, 2)
logging.info(f'RequestId {requestId}|modelType: {model_type}|ModelHash: {modelHash}|BatchSize: {len(audio_records)}|AudioIds {audio_ids}|TAT: {turnaroundTime}|DecodingTimes: {audio_decoding_times}|TranscriptionTimes: {transcription_times}')
return jsonify(response), 200
except Exception as e:
logging.error(f"Error transcribing: {str(e)}")
return jsonify({"error": str(e)}), 500
@app
.route('/whisper/health', methods=['GET'])
def health_check():
return jsonify({"status": "OK"}), 200
def seamless(dec_audio, orig_freq=16_000, new_freq=16_000):
# audio = torchaudio.functional.resample(dec_audio, orig_freq=orig_freq, new_freq=16_000) # must be a 16 kHz waveform array
audio_inputs = seamless_processor(audios=torch.tensor(dec_audio), return_tensors="pt", sampling_rate=16_000)
output_tokens = seamless_model.generate(**audio_inputs, tgt_lang="eng", generate_speech=False)
translated_text_from_audio = seamless_processor.decode(output_tokens[0].tolist()[0], skip_special_tokens=True)
return translated_text_from_audio
def get_transcriptions(requestId, audio_records, model_type, modelHash):
batch_size = len(audio_records)
audio_ids = []
decoded_audios = []
transcriptions = []
audio_decoding_times = []
transcription_times = []
for record in audio_records:
audio_ids.append(record['audioId'])
decode_start_time = time.time()
decoded_audio = base64.b64decode(record['base64Encoding'])
audio_data = io.BytesIO(decoded_audio)
data, sr = librosa.load(audio_data)
if sr != record['sampleRate']:
resampled_audio = librosa.resample(data, orig_sr=sr, target_sr=record['sampleRate'])
decoded_audios.append(resampled_audio)
else:
decoded_audios.append(data)
audio_decoding_times.append(round(time.time() - decode_start_time, 2))
if model_type == 'seamless':
for decoded_audio in decoded_audios:
transcribe_start_time = time.time()
with model_lock:
transcriptions.append(seamless(decoded_audio))
transcription_times.append(round(time.time() - transcribe_start_time, 2))
result = np.empty(batch_size, dtype=dict)
for idx in range(batch_size):
result[idx] = {'audioId': audio_records[idx]['audioId'], 'transcription': transcriptions[idx], 'transcriptionTime': transcription_times[idx], 'decodingTime': audio_decoding_times[idx]}
response = {'requestId': requestId, 'transcriptionResponse': list(result), 'modelType': model_type, 'modelHashValue': modelHash}
return response, audio_ids, audio_decoding_times, transcription_times
if __name__ == '__main__':
app.run(host='0.0.0.0')