Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import zipfile | |
from pathlib import Path | |
import gradio as gr | |
import torch | |
from pydub import AudioSegment | |
from transformers import pipeline | |
# ------------------------ | |
# CONFIG | |
# ------------------------ | |
MODEL_NAME = "openai/whisper-large-v3" | |
device = 0 if torch.cuda.is_available() else "cpu" | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
device=device, | |
model_kwargs={"low_cpu_mem_usage": True}, | |
) | |
TEMP_DIR = "./temp_audio" | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
# On stocke la liste des métadonnées (segments) dans un State | |
# pour la conserver entre les étapes (transcription, découpe, zip). | |
def init_metadata_state(): | |
return [] | |
# ------------------------ | |
# FONCTIONS | |
# ------------------------ | |
def transcribe_audio(audio_path): | |
""" | |
Étape 2 : Transcription du fichier audio via Whisper | |
+ récupération de la transcription brute. | |
""" | |
if not audio_path: | |
return "Aucun fichier audio fourni.", [], None | |
# Transcription Whisper | |
result = pipe(audio_path, return_timestamps="word") | |
text = result["text"] | |
chunks = result["chunks"] # liste de { 'timestamp': (start, end), 'text': ... } | |
raw_transcription = " ".join([c["text"] for c in chunks]) | |
# Le 3e retour = chemin du fichier audio, qu'on renverra tel quel pour la découpe | |
return raw_transcription, [], audio_path | |
def validate_segments(audio_path, table_data, metadata_state): | |
""" | |
Étape 5 : Découpe de l'audio en fonction des segments | |
et mise à jour du State `metadata_state`. | |
- `table_data` doit contenir : [ [Texte, Début(s), Fin(s), ID], ... ] | |
- Retourne : | |
1) Une liste de chemins (extraits audio) pour les players | |
2) La liste des nouvelles métadonnées (mise à jour). | |
""" | |
if not audio_path: | |
return ["Aucun fichier audio..."], metadata_state | |
# Nettoyage du dossier temporaire avant recréation des extraits | |
if os.path.exists(TEMP_DIR): | |
shutil.rmtree(TEMP_DIR) | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
original_audio = AudioSegment.from_file(audio_path) | |
segment_paths = [] | |
updated_metadata = [] | |
for i, row in enumerate(table_data): | |
# row = [ segment_text, start_time, end_time, seg_id ] | |
if len(row) < 4: | |
continue | |
seg_text, start_time, end_time, seg_id = row | |
if not seg_text or start_time is None or end_time is None: | |
continue | |
# Si l'utilisateur n'a pas mis d'ID, en créer un | |
if not seg_id: | |
seg_id = f"seg_{i+1:02d}" | |
# Découpe | |
start_ms = int(float(start_time) * 1000) | |
end_ms = int(float(end_time) * 1000) | |
extract = original_audio[start_ms:end_ms] | |
# Nom de fichier | |
stem_name = Path(audio_path).stem | |
segment_filename = f"{stem_name}_{seg_id}.wav" | |
segment_filepath = os.path.join(TEMP_DIR, segment_filename) | |
extract.export(segment_filepath, format="wav") | |
segment_paths.append(segment_filepath) | |
# Stocker la métadonnée | |
updated_metadata.append({ | |
"audio_file": segment_filename, | |
"text": seg_text, | |
"start_time": start_time, | |
"end_time": end_time, | |
"id": seg_id, | |
}) | |
return segment_paths, updated_metadata | |
def generate_zip(metadata_state): | |
""" | |
Étape 8 : Générer un ZIP contenant tous les extraits + un metadata.csv | |
Retourne le chemin vers le ZIP final. | |
""" | |
if not metadata_state: | |
return None | |
# Supprimer un ancien zip si présent | |
zip_path = os.path.join(TEMP_DIR, "dataset.zip") | |
if os.path.exists(zip_path): | |
os.remove(zip_path) | |
# Créer metadata.csv | |
metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv") | |
with open(metadata_csv_path, "w", encoding="utf-8") as f: | |
f.write("audio_file|text|speaker_name|API\n") | |
for seg in metadata_state: | |
# Ajuste speaker_name ou API selon ton besoin | |
line = f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n" | |
f.write(line) | |
# Créer le ZIP | |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
# Ajouter chaque extrait | |
for seg in metadata_state: | |
seg_file = os.path.join(TEMP_DIR, seg["audio_file"]) | |
if os.path.exists(seg_file): | |
zf.write(seg_file, seg["audio_file"]) | |
# Ajouter le metadata.csv | |
zf.write(metadata_csv_path, "metadata.csv") | |
return zip_path | |
def distribute_segments_to_players(segments): | |
""" | |
Transforme la liste de segments en un tuple de 20 valeurs max | |
(pour 20 players). | |
Si moins de 20 segments, on complète avec None. | |
""" | |
max_players = 20 | |
result = [] | |
for i in range(max_players): | |
if i < len(segments): | |
result.append(segments[i]) | |
else: | |
result.append(None) | |
return tuple(result) | |
# ------------------------ | |
# CONSTRUCTION UI GRADIO | |
# ------------------------ | |
with gr.Blocks(css="style.css") as demo: | |
gr.Markdown("# Application de Découpage Audio + Transcription (jusqu'à 20 extraits)") | |
metadata_state = gr.State(init_metadata_state()) | |
# Étape 1 : Chargement de l'audio | |
with gr.Box(): | |
gr.Markdown("### 1. Téléversez votre fichier audio") | |
audio_input = gr.Audio(source="upload", type="filepath", label="Fichier audio") | |
# Étape 3 : Transcription brute | |
raw_transcription = gr.Textbox( | |
label="Transcription brute (Whisper)", | |
placeholder="Le texte s'affichera ici après la transcription...", | |
interactive=False | |
) | |
# Étape 4 : Tableau pour 20 segments max | |
gr.Markdown("### 2. Définissez jusqu'à 20 segments") | |
gr.Markdown("""**Colonnes :** | |
1) Texte (phrase ou portion copiée depuis la transcription) | |
2) Début (en secondes) | |
3) Fin (en secondes) | |
4) ID segment (optionnel)""") | |
table = gr.Dataframe( | |
headers=["Texte", "Début (s)", "Fin (s)", "ID"], | |
datatype=["str", "number", "number", "str"], | |
row_count=20, # <-- 20 lignes | |
col_count=4 | |
) | |
validate_button = gr.Button("Valider et générer les extraits") | |
# Étape 6 : 20 players audio pour l'écoute | |
# On les organise en 5 rangées de 4 players | |
players = [] | |
for i in range(20): | |
players.append(gr.Audio(label=f"Extrait {i+1}", interactive=False)) | |
# Groupons-les en blocs de 4 | |
for i in range(0, 20, 4): | |
with gr.Row(): | |
for j in range(i, i+4): | |
players[j] | |
# Étape 8 : Génération ZIP | |
generate_button = gr.Button("Générer le fichier ZIP") | |
zip_file = gr.File(label="Télécharger le ZIP (audios + metadata.csv)") | |
# 1) Callback quand on charge l'audio => Transcription | |
audio_input.change( | |
fn=transcribe_audio, | |
inputs=audio_input, | |
outputs=[raw_transcription, table, audio_input] | |
) | |
# 2) Callback quand on valide => Découpe audio + maj metadata | |
validate_button.click( | |
fn=validate_segments, | |
inputs=[audio_input, table, metadata_state], | |
outputs=[ # 1) chemins extraits (list) 2) metadata (list) | |
players, # Les 20 players | |
metadata_state | |
], | |
# On va mapper la liste de segments sur 20 players | |
).then( | |
fn=distribute_segments_to_players, | |
inputs=None, # la sortie "players" (chemins) est déjà captée | |
outputs=players | |
) | |
# 3) Génération ZIP | |
generate_button.click( | |
fn=generate_zip, | |
inputs=metadata_state, | |
outputs=zip_file | |
) | |
demo.queue().launch() |