import librosa from transformers import Wav2Vec2ForCTC, AutoProcessor import torch import numpy as np from pathlib import Path import concurrent.futures import gc import psutil # Constants ASR_SAMPLING_RATE = 16_000 CHUNK_LENGTH_S = 30 # Reduced from 60s MAX_CONCURRENT_CHUNKS = 2 # Reduced from 4 DEVICE_MEMORY_THRESHOLD = 0.85 # 85% memory threshold # Load language mapping ASR_LANGUAGES = {} with open(f"data/asr/all_langs.tsv") as f: for line in f: iso, name = line.split(" ", 1) ASR_LANGUAGES[iso.strip()] = name.strip() MODEL_ID = "facebook/mms-1b-all" processor = AutoProcessor.from_pretrained(MODEL_ID) model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) def check_memory(): memory = psutil.virtual_memory() return memory.percent / 100 < DEVICE_MEMORY_THRESHOLD def load_audio(audio_data): if isinstance(audio_data, tuple): sr, audio_samples = audio_data audio_samples = (audio_samples / 32768.0).astype(np.float32) if sr != ASR_SAMPLING_RATE: audio_samples = librosa.resample( audio_samples, orig_sr=sr, target_sr=ASR_SAMPLING_RATE ) elif isinstance(audio_data, np.ndarray): audio_samples = audio_data elif isinstance(audio_data, str): audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0] else: raise ValueError(f"Invalid Audio Input Instance: {type(audio_data)}") return audio_samples def process_chunk(chunk, device): try: inputs = processor(chunk, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt").to(device) with torch.no_grad(): outputs = model(**inputs).logits ids = torch.argmax(outputs, dim=-1)[0] # Clear memory del outputs torch.cuda.empty_cache() if device.type == "cuda" else gc.collect() return processor.decode(ids) except RuntimeError as e: if "out of memory" in str(e): # Fallback to CPU device = torch.device("cpu") return process_chunk(chunk, device) raise e def transcribe(audio_data=None, lang="eng (English)"): if audio_data is None or (isinstance(audio_data, np.ndarray) and audio_data.size == 0): return "<>" try: # Clear memory before starting torch.cuda.empty_cache() if torch.cuda.is_available() else gc.collect() audio_samples = load_audio(audio_data) lang_code = lang.split()[0] processor.tokenizer.set_target_lang(lang_code) try: model.load_adapter(lang_code) except Exception as e: return f"<>" device = torch.device("cuda" if torch.cuda.is_available() and check_memory() else "cpu") model.to(device) chunk_length = int(CHUNK_LENGTH_S * ASR_SAMPLING_RATE) chunks = [audio_samples[i:i+chunk_length] for i in range(0, len(audio_samples), chunk_length)] transcriptions = [] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHUNKS) as executor: future_to_chunk = {executor.submit(process_chunk, chunk, device): chunk for chunk in chunks} for future in concurrent.futures.as_completed(future_to_chunk): try: result = future.result() transcriptions.append(result) except Exception as e: transcriptions.append(f"<>") # Clear memory after processing torch.cuda.empty_cache() if device.type == "cuda" else gc.collect() return " ".join(transcriptions) except Exception as e: return f"<>" # Example usage ASR_EXAMPLES = [ ["upload/english.mp3", "eng (English)"], # ["upload/tamil.mp3", "tam (Tamil)"], # ["upload/burmese.mp3", "mya (Burmese)"], ] # Memory monitoring wrapper def monitor_memory_usage(): print(f"CPU Memory Usage: {psutil.Process().memory_percent()}%") if torch.cuda.is_available(): print(f"GPU Memory Usage: {torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated() * 100:.2f}%")