Spaces:
Running
Running
import gradio as gr | |
import librosa | |
import numpy as np | |
import soundfile as sf | |
import os | |
from transformers import pipeline | |
import torchaudio | |
from pyannote.audio import Pipeline | |
hf_token = os.getenv("diarizationToken") | |
# Charger le modèle de reconnaissance vocale | |
print("Chargement du modèle Wav2Vec2...") | |
stt_pipeline = pipeline("automatic-speech-recognition", model="boumehdi/wav2vec2-large-xlsr-moroccan-darija") | |
print("Modèle chargé avec succès !") | |
# Charger le pipeline de diarisation (détection des speakers) | |
print("Chargement du modèle de diarisation...") | |
#diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token) | |
diarization_pipeline = Pipeline.from_pretrained("Revai/reverb-diarization-v2", use_auth_token=hf_token) | |
print("Modèle de diarisation chargé !") | |
def reduce_noise(audio, sr): | |
"""Réduction du bruit pour améliorer la transcription""" | |
audio = librosa.effects.preemphasis(audio) | |
noise_threshold = np.percentile(np.abs(audio), 10) | |
audio = np.where(np.abs(audio) > noise_threshold, audio, 0) | |
return audio | |
def calculate_min_silence_duration(audio_path): | |
# Charger l'audio pour obtenir sa durée | |
audio, sr = librosa.load(audio_path, sr=None) | |
total_duration = len(audio) / sr | |
dynamic_min_silence = total_duration * 0.05 # 5% de la durée totale de l'audio | |
return dynamic_min_silence | |
def diarize_audio(audio_path): | |
""" | |
Diarisation de l'audio : détecte qui parle et à quel moment. | |
Retourne une liste de (speaker, début, fin). | |
""" | |
# Calculer la durée minimale de silence en fonction de la durée de l'audio | |
min_silence_duration = calculate_min_silence_duration(audio_path) | |
print(f"Durée minimale de silence ajustée à : {min_silence_duration} secondes") | |
diarization = diarization_pipeline(audio_path) | |
speaker_segments = {} | |
previous_speaker = None | |
last_end = 0 | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
start, end = turn.start, turn.end | |
# Si le silence entre deux segments est trop court, on fusionne avec le speaker précédent | |
if previous_speaker is not None and start - last_end < min_silence_duration: | |
speaker_segments[previous_speaker].append((start, end)) | |
else: | |
# Nouveau speaker ou silence long : on l'ajoute comme un segment distinct | |
if speaker not in speaker_segments: | |
speaker_segments[speaker] = [] | |
speaker_segments[speaker].append((start, end)) | |
previous_speaker = speaker | |
last_end = end | |
return speaker_segments | |
def merge_speaker_segments(audio, sr, speaker_segments): | |
""" | |
Fusionne les segments d’un même speaker pour améliorer la précision. | |
Retourne un dictionnaire {speaker: signal_audio_fusionné}. | |
""" | |
merged_audio = {} | |
for speaker, segments in speaker_segments.items(): | |
combined_audio = np.array([]) | |
for start, end in segments: | |
start_sample = int(start * sr) | |
end_sample = int(end * sr) | |
combined_audio = np.concatenate((combined_audio, audio[start_sample:end_sample])) | |
merged_audio[speaker] = combined_audio | |
return merged_audio | |
def process_audio(audio_path): | |
print(f"Fichier reçu : {audio_path}") | |
try: | |
# Charger l'audio | |
audio, sr = librosa.load(audio_path, sr=None) | |
print(f"Audio chargé avec {len(audio)} échantillons à {sr} Hz") | |
# Réduction du bruit | |
audio = reduce_noise(audio, sr) | |
# Étape de diarisation : détection des speakers | |
speaker_segments = diarize_audio(audio_path) | |
print(f"Speakers détectés : {list(speaker_segments.keys())}") | |
# Fusionner les segments de chaque speaker | |
merged_audio = merge_speaker_segments(audio, sr, speaker_segments) | |
# Transcrire chaque speaker | |
result = [] | |
for speaker, audio_data in merged_audio.items(): | |
temp_filename = f"temp_{speaker}.wav" | |
sf.write(temp_filename, np.array(audio_data), sr) | |
# Transcription du segment fusionné | |
transcription = stt_pipeline(temp_filename) | |
text = transcription["text"].strip() | |
if text: | |
result.append(f"{speaker}: {text}") | |
# Supprimer le fichier temporaire | |
os.remove(temp_filename) | |
if not result: | |
return "Aucune parole détectée." | |
return "\n".join(result) | |
except Exception as e: | |
print(f"Erreur : {e}") | |
return f"Une erreur s'est produite. + {e}" | |
# Interface Gradio | |
print("Démarrage de Gradio...") | |
iface = gr.Interface( | |
fn=process_audio, | |
inputs=gr.Audio(type="filepath"), | |
outputs="text", | |
title="Transcription avec Diarisation", | |
description="Upload un fichier audio pour une transcription avec détection des speakers." | |
) | |
iface.launch() | |
print("Interface lancée avec succès !") | |