Spaces:
Runtime error
Runtime error
import gradio as gr | |
from pyannote.audio import Pipeline | |
import torch | |
import os | |
import zipfile | |
import tempfile | |
import shutil | |
from pydub import AudioSegment | |
import numpy as np | |
hf_token = os.getenv("HF_TOKEN") | |
# Initialize the diarization pipeline | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token) | |
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) | |
def process_zip(zip_file): | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# Step 1: Extract the zip file | |
with zipfile.ZipFile(zip_file.name, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
# Create directories for each speaker | |
speaker1_dir = os.path.join(temp_dir, "speaker1") | |
speaker2_dir = os.path.join(temp_dir, "speaker2") | |
os.makedirs(speaker1_dir, exist_ok=True) | |
os.makedirs(speaker2_dir, exist_ok=True) | |
# Step 2: Analyze each audio file | |
for filename in os.listdir(temp_dir): | |
if filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')): | |
file_path = os.path.join(temp_dir, filename) | |
# Load audio file | |
audio = AudioSegment.from_file(file_path) | |
samples = np.array(audio.get_array_of_samples()) | |
# Convert to mono if stereo | |
if audio.channels == 2: | |
samples = samples.reshape((-1, 2)).mean(axis=1) | |
# Convert to float32 numpy array | |
waveform = torch.tensor(samples).float() / 32768.0 # Assuming 16-bit audio | |
waveform = waveform.unsqueeze(0) # Add channel dimension | |
# Perform diarization | |
diarization = pipeline({"waveform": waveform, "sample_rate": audio.frame_rate}) | |
# Determine dominant speaker | |
speaker_times = {1: 0, 2: 0} | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
speaker_num = int(speaker.split('_')[-1]) | |
speaker_times[speaker_num] += turn.end - turn.start | |
dominant_speaker = 1 if speaker_times[1] > speaker_times[2] else 2 | |
# Move file to appropriate speaker directory | |
if dominant_speaker == 1: | |
shutil.move(file_path, os.path.join(speaker1_dir, filename)) | |
else: | |
shutil.move(file_path, os.path.join(speaker2_dir, filename)) | |
# Step 3: Create zip files for each speaker | |
speaker1_zip = os.path.join(temp_dir, "speaker1.zip") | |
speaker2_zip = os.path.join(temp_dir, "speaker2.zip") | |
shutil.make_archive(os.path.join(temp_dir, "speaker1"), 'zip', speaker1_dir) | |
shutil.make_archive(os.path.join(temp_dir, "speaker2"), 'zip', speaker2_dir) | |
return speaker1_zip, speaker2_zip | |
iface = gr.Interface( | |
fn=process_zip, | |
inputs=gr.File(label="Upload ZIP file containing audio files"), | |
outputs=[ | |
gr.File(label="Speaker 1 Audio Files"), | |
gr.File(label="Speaker 2 Audio Files") | |
], | |
title="Speaker Diarization and Audio Sorting", | |
description="Upload a ZIP file containing audio files. The system will analyze each file and sort them into two groups based on the dominant speaker." | |
) | |
iface.launch() |