File size: 3,461 Bytes
b0b070a
 
 
7e97f23
f418fac
0a1b45f
 
e9036e0
c1d27f0
b0b070a
125ac47
 
b0b070a
125ac47
b0b070a
 
0a1b45f
 
 
 
 
f418fac
0a1b45f
 
 
 
 
f418fac
0a1b45f
 
e9036e0
0a1b45f
 
 
e9036e0
c1d27f0
e9036e0
 
 
c1d27f0
e9036e0
 
 
c1d27f0
0a1b45f
 
e9036e0
0a1b45f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f418fac
 
0a1b45f
 
 
 
 
 
 
 
b0b070a
 
f418fac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import gradio as gr
from pyannote.audio import Pipeline
import torch
import os
import zipfile
import tempfile
import shutil
from pydub import AudioSegment
import numpy as np

hf_token = os.getenv("HF_TOKEN")

# Initialize the diarization pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

def process_zip(zip_file):
    with tempfile.TemporaryDirectory() as temp_dir:
        # Step 1: Extract the zip file
        with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
            zip_ref.extractall(temp_dir)
        
        # Create directories for each speaker
        speaker1_dir = os.path.join(temp_dir, "speaker1")
        speaker2_dir = os.path.join(temp_dir, "speaker2")
        os.makedirs(speaker1_dir, exist_ok=True)
        os.makedirs(speaker2_dir, exist_ok=True)
        
        # Step 2: Analyze each audio file
        for filename in os.listdir(temp_dir):
            if filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')):
                file_path = os.path.join(temp_dir, filename)
                
                # Load audio file
                audio = AudioSegment.from_file(file_path)
                samples = np.array(audio.get_array_of_samples())
                
                # Convert to mono if stereo
                if audio.channels == 2:
                    samples = samples.reshape((-1, 2)).mean(axis=1)
                
                # Convert to float32 numpy array
                waveform = torch.tensor(samples).float() / 32768.0  # Assuming 16-bit audio
                waveform = waveform.unsqueeze(0)  # Add channel dimension
                
                # Perform diarization
                diarization = pipeline({"waveform": waveform, "sample_rate": audio.frame_rate})
                
                # Determine dominant speaker
                speaker_times = {1: 0, 2: 0}
                for turn, _, speaker in diarization.itertracks(yield_label=True):
                    speaker_num = int(speaker.split('_')[-1])
                    speaker_times[speaker_num] += turn.end - turn.start
                
                dominant_speaker = 1 if speaker_times[1] > speaker_times[2] else 2
                
                # Move file to appropriate speaker directory
                if dominant_speaker == 1:
                    shutil.move(file_path, os.path.join(speaker1_dir, filename))
                else:
                    shutil.move(file_path, os.path.join(speaker2_dir, filename))
        
        # Step 3: Create zip files for each speaker
        speaker1_zip = os.path.join(temp_dir, "speaker1.zip")
        speaker2_zip = os.path.join(temp_dir, "speaker2.zip")
        
        shutil.make_archive(os.path.join(temp_dir, "speaker1"), 'zip', speaker1_dir)
        shutil.make_archive(os.path.join(temp_dir, "speaker2"), 'zip', speaker2_dir)
        
        return speaker1_zip, speaker2_zip

iface = gr.Interface(
    fn=process_zip,
    inputs=gr.File(label="Upload ZIP file containing audio files"),
    outputs=[
        gr.File(label="Speaker 1 Audio Files"),
        gr.File(label="Speaker 2 Audio Files")
    ],
    title="Speaker Diarization and Audio Sorting",
    description="Upload a ZIP file containing audio files. The system will analyze each file and sort them into two groups based on the dominant speaker."
)

iface.launch()