Spaces:
Running
Running
File size: 4,996 Bytes
1053c8b 0a5cfb8 fc3405f d32b773 557b689 a5cedb3 e392559 d32b773 1053c8b d19f3e0 e5c4db0 d32b773 1170fb5 33aca12 d32b773 fc3405f d32b773 fc3405f 65b377d d32b773 65b377d d32b773 1b222a0 d32b773 1b222a0 d32b773 fc3405f d32b773 fc3405f d32b773 fc3405f 1053c8b d19f3e0 54472e1 1053c8b fc3405f d32b773 fc3405f d32b773 0a5cfb8 d32b773 fc3405f d32b773 fc3405f c3303c6 fc3405f c3303c6 d32b773 54472e1 fc3405f 54472e1 c3303c6 d19f3e0 1053c8b d19f3e0 aa8f6c2 1053c8b d19f3e0 1053c8b d32b773 1053c8b d19f3e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import gradio as gr
import librosa
import numpy as np
import soundfile as sf
import os
from transformers import pipeline
import torchaudio
from pyannote.audio import Pipeline
hf_token = os.getenv("diarizationToken")
# Charger le modèle de reconnaissance vocale
print("Chargement du modèle Wav2Vec2...")
stt_pipeline = pipeline("automatic-speech-recognition", model="boumehdi/wav2vec2-large-xlsr-moroccan-darija")
print("Modèle chargé avec succès !")
# Charger le pipeline de diarisation (détection des speakers)
print("Chargement du modèle de diarisation...")
#diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token)
diarization_pipeline = Pipeline.from_pretrained("Revai/reverb-diarization-v2", use_auth_token=hf_token)
print("Modèle de diarisation chargé !")
def reduce_noise(audio, sr):
"""Réduction du bruit pour améliorer la transcription"""
audio = librosa.effects.preemphasis(audio)
noise_threshold = np.percentile(np.abs(audio), 10)
audio = np.where(np.abs(audio) > noise_threshold, audio, 0)
return audio
def calculate_min_silence_duration(audio_path):
# Charger l'audio pour obtenir sa durée
audio, sr = librosa.load(audio_path, sr=None)
total_duration = len(audio) / sr
dynamic_min_silence = total_duration * 0.05 # 5% de la durée totale de l'audio
return dynamic_min_silence
def diarize_audio(audio_path):
"""
Diarisation de l'audio : détecte qui parle et à quel moment.
Retourne une liste de (speaker, début, fin).
"""
# Calculer la durée minimale de silence en fonction de la durée de l'audio
min_silence_duration = calculate_min_silence_duration(audio_path)
print(f"Durée minimale de silence ajustée à : {min_silence_duration} secondes")
diarization = diarization_pipeline(audio_path)
speaker_segments = {}
previous_speaker = None
last_end = 0
for turn, _, speaker in diarization.itertracks(yield_label=True):
start, end = turn.start, turn.end
# Si le silence entre deux segments est trop court, on fusionne avec le speaker précédent
if previous_speaker is not None and start - last_end < min_silence_duration:
speaker_segments[previous_speaker].append((start, end))
else:
# Nouveau speaker ou silence long : on l'ajoute comme un segment distinct
if speaker not in speaker_segments:
speaker_segments[speaker] = []
speaker_segments[speaker].append((start, end))
previous_speaker = speaker
last_end = end
return speaker_segments
def merge_speaker_segments(audio, sr, speaker_segments):
"""
Fusionne les segments d’un même speaker pour améliorer la précision.
Retourne un dictionnaire {speaker: signal_audio_fusionné}.
"""
merged_audio = {}
for speaker, segments in speaker_segments.items():
combined_audio = np.array([])
for start, end in segments:
start_sample = int(start * sr)
end_sample = int(end * sr)
combined_audio = np.concatenate((combined_audio, audio[start_sample:end_sample]))
merged_audio[speaker] = combined_audio
return merged_audio
def process_audio(audio_path):
print(f"Fichier reçu : {audio_path}")
try:
# Charger l'audio
audio, sr = librosa.load(audio_path, sr=None)
print(f"Audio chargé avec {len(audio)} échantillons à {sr} Hz")
# Réduction du bruit
audio = reduce_noise(audio, sr)
# Étape de diarisation : détection des speakers
speaker_segments = diarize_audio(audio_path)
print(f"Speakers détectés : {list(speaker_segments.keys())}")
# Fusionner les segments de chaque speaker
merged_audio = merge_speaker_segments(audio, sr, speaker_segments)
# Transcrire chaque speaker
result = []
for speaker, audio_data in merged_audio.items():
temp_filename = f"temp_{speaker}.wav"
sf.write(temp_filename, np.array(audio_data), sr)
# Transcription du segment fusionné
transcription = stt_pipeline(temp_filename)
text = transcription["text"].strip()
if text:
result.append(f"{speaker}: {text}")
# Supprimer le fichier temporaire
os.remove(temp_filename)
if not result:
return "Aucune parole détectée."
return "\n".join(result)
except Exception as e:
print(f"Erreur : {e}")
return f"Une erreur s'est produite. + {e}"
# Interface Gradio
print("Démarrage de Gradio...")
iface = gr.Interface(
fn=process_audio,
inputs=gr.Audio(type="filepath"),
outputs="text",
title="Transcription avec Diarisation",
description="Upload un fichier audio pour une transcription avec détection des speakers."
)
iface.launch()
print("Interface lancée avec succès !")
|