rafaaa2105's picture
Update app.py
c1d27f0 verified
raw
history blame
3.46 kB
import gradio as gr
from pyannote.audio import Pipeline
import torch
import os
import zipfile
import tempfile
import shutil
from pydub import AudioSegment
import numpy as np
hf_token = os.getenv("HF_TOKEN")
# Initialize the diarization pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
def process_zip(zip_file):
with tempfile.TemporaryDirectory() as temp_dir:
# Step 1: Extract the zip file
with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Create directories for each speaker
speaker1_dir = os.path.join(temp_dir, "speaker1")
speaker2_dir = os.path.join(temp_dir, "speaker2")
os.makedirs(speaker1_dir, exist_ok=True)
os.makedirs(speaker2_dir, exist_ok=True)
# Step 2: Analyze each audio file
for filename in os.listdir(temp_dir):
if filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')):
file_path = os.path.join(temp_dir, filename)
# Load audio file
audio = AudioSegment.from_file(file_path)
samples = np.array(audio.get_array_of_samples())
# Convert to mono if stereo
if audio.channels == 2:
samples = samples.reshape((-1, 2)).mean(axis=1)
# Convert to float32 numpy array
waveform = torch.tensor(samples).float() / 32768.0 # Assuming 16-bit audio
waveform = waveform.unsqueeze(0) # Add channel dimension
# Perform diarization
diarization = pipeline({"waveform": waveform, "sample_rate": audio.frame_rate})
# Determine dominant speaker
speaker_times = {1: 0, 2: 0}
for turn, _, speaker in diarization.itertracks(yield_label=True):
speaker_num = int(speaker.split('_')[-1])
speaker_times[speaker_num] += turn.end - turn.start
dominant_speaker = 1 if speaker_times[1] > speaker_times[2] else 2
# Move file to appropriate speaker directory
if dominant_speaker == 1:
shutil.move(file_path, os.path.join(speaker1_dir, filename))
else:
shutil.move(file_path, os.path.join(speaker2_dir, filename))
# Step 3: Create zip files for each speaker
speaker1_zip = os.path.join(temp_dir, "speaker1.zip")
speaker2_zip = os.path.join(temp_dir, "speaker2.zip")
shutil.make_archive(os.path.join(temp_dir, "speaker1"), 'zip', speaker1_dir)
shutil.make_archive(os.path.join(temp_dir, "speaker2"), 'zip', speaker2_dir)
return speaker1_zip, speaker2_zip
iface = gr.Interface(
fn=process_zip,
inputs=gr.File(label="Upload ZIP file containing audio files"),
outputs=[
gr.File(label="Speaker 1 Audio Files"),
gr.File(label="Speaker 2 Audio Files")
],
title="Speaker Diarization and Audio Sorting",
description="Upload a ZIP file containing audio files. The system will analyze each file and sort them into two groups based on the dominant speaker."
)
iface.launch()