Very low RPS of SeamlessM4t model during load test

#28
by gangen - opened

Has anyone conducted a load test on Seamless models for the S2TT task?

The models are deployed within a Docker container as a flask application using Gunicorn in Amazon AWS EC2 r5.8xlarge instance.

The resource constraints are configured as follows:
- CPU: Minimum of 8 vCPUs, maximum of 16 vCPUs.
- Memory: Minimum of 8 GiB, maximum of 60 GiB.

Locust is used for load testing. My input consists of audio files, each ranging from 1 to 5 seconds.

Throughout the load test, requests per second (RPS) remain below 2.

Can someone provide any assistance or share any benchmark load test performance data for this setup?

Here is the locust script. The audios are encoded into base64 encoding and sent as a json request.

import os
import base64
import random
import uuid
from locust import HttpUser, task

user_dir = os.path.dirname(os.getcwd())
path = os.path.join(user_dir, 'Downloads', 'seamless', 'transcribe_gold_standard_voice_samples', 'output')

n_samples = int(os.getenv('N_SAMPLES', 30))
n_requests = int(os.getenv('N_REQUESTS', 1))
model_name = os.getenv('MODEL_TYPE', 'seamless')

audio_file_names = os.listdir(path)[:n_samples]

encoded_audios = []
for file_nm in audio_file_names:
    file_path = os.path.join(path, file_nm)
    with open(file_path, 'rb') as audio_file:
        audio_data = audio_file.read()
    encoded_audios.append(str(base64.b64encode(audio_data).decode('utf-8')))

class QuickstartUser(HttpUser):
    @task
    def hello_world(self):
        idxs = random.sample(range(n_samples), n_requests)
        json_req = {
            "requestId": str(uuid.uuid4()),
            "audioContexts": {
                "audioContext": [
                    {
                        "audioId": idx,
                        "base64Encoding": encoded_audios[idx],
                        "sampleRate": 16000,
                        "frameWidth": 2,
                        "numberOfChannels": 1
                    }
                    for idx in idxs
                ],
                "transcriptionOptions": {
                    "transcriptionModel": model_name,
                    "modelType": model_name,
                    "multilingualSupport": "N"
                }
            }
        }
        self.client.post("https://seamless-api-load.social-dev-use1.stellar.eadp.cloud/seamless/transcribe", json=json_req)

And below is the code that handles the request, decodes it and transcribes it using model as per the MODEL_TYPE (seamless here) and sends back the response.

from flask import Flask, request, jsonify
import os
import io
import base64
import numpy as np
import time
import librosa
import logging
import threading
import transformers

import torch
import torchaudio
from transformers import AutoProcessor, SeamlessM4TModel
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

log_level = os.getenv('LOGLEVEL', 'INFO').upper()
log_format = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(level=getattr(logging, log_level), format=log_format)

app = Flask(__name__)

def load_models(model_type):

        seamless_processor = None
        seamless_model = None

        if model_type == 'seamless':
                # seamless
                start = time.time()
                seamless_processor = AutoProcessor.from_pretrained("facebook/hf-seamless-m4t-medium")
                seamless_model = SeamlessM4TModel.from_pretrained("facebook/hf-seamless-m4t-medium", low_cpu_mem_usage=True)
                seamless_load_time = round(time.time() - start, 2)

                logging.info(f'Loaded Seamless model in {seamless_load_time}s')

        return seamless_processor, seamless_model

init_model = os.getenv('MODEL_TYPE', 'seamless').lower()
seamless_processor, seamless_model = load_models(init_model)

model_lock = threading.Lock()

set_thread_limit = os.getenv('SET_THREAD_LIMIT', 'True')
if set_thread_limit.lower() == 'true':
    thread_limit = int(os.getenv('THREAD_LIMIT', 8))
    torch.set_num_threads(thread_limit)




@app
	.route('/seamless/transcribe', methods=['POST'])
def transcribe():
        try:
                startTime = time.time()
                requestId = request.json['requestId']
                audio_records = request.json['audioContexts']['audioContext']
                model_type = request.json['audioContexts']['transcriptionOptions']['modelType']

                modelHash = str(hash(seamless_model)) if model_type == 'seamless' else ''

                response, audio_ids, audio_decoding_times, transcription_times = get_transcriptions(requestId, audio_records, model_type, modelHash)
                turnaroundTime = round(time.time() - startTime, 2)

                logging.info(f'RequestId {requestId}|modelType: {model_type}|ModelHash: {modelHash}|BatchSize: {len(audio_records)}|AudioIds {audio_ids}|TAT: {turnaroundTime}|DecodingTimes: {audio_decoding_times}|TranscriptionTimes: {transcription_times}')

                return jsonify(response), 200
        except Exception as e:
                logging.error(f"Error transcribing: {str(e)}")
                return jsonify({"error": str(e)}), 500



@app
	.route('/whisper/health', methods=['GET'])
def health_check():
        return jsonify({"status": "OK"}), 200

def seamless(dec_audio, orig_freq=16_000, new_freq=16_000):
        # audio =  torchaudio.functional.resample(dec_audio, orig_freq=orig_freq, new_freq=16_000) # must be a 16 kHz waveform array
        audio_inputs = seamless_processor(audios=torch.tensor(dec_audio), return_tensors="pt", sampling_rate=16_000)
        output_tokens = seamless_model.generate(**audio_inputs, tgt_lang="eng", generate_speech=False)
        translated_text_from_audio = seamless_processor.decode(output_tokens[0].tolist()[0], skip_special_tokens=True)
        return translated_text_from_audio

def get_transcriptions(requestId, audio_records, model_type, modelHash):
        batch_size = len(audio_records)

        audio_ids = []
        decoded_audios = []
        transcriptions = []
        audio_decoding_times = []
        transcription_times = []

        for record in audio_records:
                audio_ids.append(record['audioId'])
                decode_start_time = time.time()
                decoded_audio = base64.b64decode(record['base64Encoding'])
                audio_data = io.BytesIO(decoded_audio)
                data, sr = librosa.load(audio_data)
                if sr != record['sampleRate']:
                        resampled_audio = librosa.resample(data, orig_sr=sr, target_sr=record['sampleRate'])
                        decoded_audios.append(resampled_audio)
                else:
                        decoded_audios.append(data)
                audio_decoding_times.append(round(time.time() - decode_start_time, 2))

        
        if model_type == 'seamless':
                for decoded_audio in decoded_audios:
                        transcribe_start_time = time.time()
                        with model_lock:
                                transcriptions.append(seamless(decoded_audio))
                        transcription_times.append(round(time.time() - transcribe_start_time, 2))

        result = np.empty(batch_size, dtype=dict)
        for idx in range(batch_size):
                result[idx] = {'audioId': audio_records[idx]['audioId'], 'transcription': transcriptions[idx], 'transcriptionTime': transcription_times[idx], 'decodingTime': audio_decoding_times[idx]}

        response = {'requestId': requestId, 'transcriptionResponse': list(result), 'modelType': model_type, 'modelHashValue': modelHash}

        return response, audio_ids, audio_decoding_times, transcription_times

if __name__ == '__main__':
        app.run(host='0.0.0.0')

Sign up or log in to comment