rafaaa2105 commited on
Commit
0a1b45f
·
verified ·
1 Parent(s): f418fac

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +54 -52
app.py CHANGED
@@ -2,10 +2,10 @@ import gradio as gr
2
  from pyannote.audio import Pipeline
3
  import torch
4
  import os
5
- import numpy as np
6
- from pydub import AudioSegment
7
- import io
8
  import zipfile
 
 
 
9
 
10
  hf_token = os.getenv("HF_TOKEN")
11
 
@@ -13,59 +13,61 @@ hf_token = os.getenv("HF_TOKEN")
13
  pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
14
  pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
15
 
16
- def diarize_and_split(audio, sr):
17
- # Convert to mono if stereo
18
- if len(audio.shape) > 1:
19
- audio = np.mean(audio, axis=1)
20
-
21
- # Perform diarization
22
- diarization = pipeline({"waveform": torch.from_numpy(audio), "sample_rate": sr})
23
-
24
- # Create an AudioSegment from the numpy array
25
- audio_segment = AudioSegment(
26
- audio.tobytes(),
27
- frame_rate=sr,
28
- sample_width=audio.dtype.itemsize,
29
- channels=1
30
- )
31
-
32
- speaker_segments = {}
33
-
34
- for turn, _, speaker in diarization.itertracks(yield_label=True):
35
- start_ms = int(turn.start * 1000)
36
- end_ms = int(turn.end * 1000)
37
- segment = audio_segment[start_ms:end_ms]
38
 
39
- if speaker not in speaker_segments:
40
- speaker_segments[speaker] = []
41
- speaker_segments[speaker].append(segment)
42
-
43
- # Create zip files for each speaker
44
- zip_files = {}
45
- for speaker, segments in speaker_segments.items():
46
- zip_buffer = io.BytesIO()
47
- with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
48
- for i, segment in enumerate(segments):
49
- segment_buffer = io.BytesIO()
50
- segment.export(segment_buffer, format="wav")
51
- zip_file.writestr(f"{speaker}_segment_{i}.wav", segment_buffer.getvalue())
52
 
53
- zip_buffer.seek(0)
54
- zip_files[f"{speaker}.zip"] = zip_buffer.getvalue()
55
-
56
- return zip_files
57
-
58
- def process_audio(audio):
59
- sr, audio_data = audio
60
- zip_files = diarize_and_split(audio_data, sr)
61
- return list(zip_files.values())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
  iface = gr.Interface(
64
- fn=process_audio,
65
- inputs=gr.Audio(type="numpy"),
66
- outputs=[gr.File(label="Speaker Zip Files") for _ in range(10)], # Assuming max 10 speakers
67
- title="Speaker Diarization and Audio Splitting",
68
- description="Upload an audio file to split it into separate files for each speaker."
 
 
 
69
  )
70
 
71
  iface.launch()
 
2
  from pyannote.audio import Pipeline
3
  import torch
4
  import os
 
 
 
5
  import zipfile
6
+ import tempfile
7
+ import shutil
8
+ import librosa
9
 
10
  hf_token = os.getenv("HF_TOKEN")
11
 
 
13
  pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
14
  pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
15
 
16
+ def process_zip(zip_file):
17
+ with tempfile.TemporaryDirectory() as temp_dir:
18
+ # Step 1: Extract the zip file
19
+ with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
20
+ zip_ref.extractall(temp_dir)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
+ # Create directories for each speaker
23
+ speaker1_dir = os.path.join(temp_dir, "speaker1")
24
+ speaker2_dir = os.path.join(temp_dir, "speaker2")
25
+ os.makedirs(speaker1_dir, exist_ok=True)
26
+ os.makedirs(speaker2_dir, exist_ok=True)
 
 
 
 
 
 
 
 
27
 
28
+ # Step 2: Analyze each audio file
29
+ for filename in os.listdir(temp_dir):
30
+ if filename.endswith(('.wav', '.mp3', '.ogg', '.flac')):
31
+ file_path = os.path.join(temp_dir, filename)
32
+
33
+ # Load audio file
34
+ waveform, sample_rate = librosa.load(file_path, sr=None)
35
+
36
+ # Perform diarization
37
+ diarization = pipeline({"waveform": torch.from_numpy(waveform), "sample_rate": sample_rate})
38
+
39
+ # Determine dominant speaker
40
+ speaker_times = {1: 0, 2: 0}
41
+ for turn, _, speaker in diarization.itertracks(yield_label=True):
42
+ speaker_num = int(speaker.split('_')[-1])
43
+ speaker_times[speaker_num] += turn.end - turn.start
44
+
45
+ dominant_speaker = 1 if speaker_times[1] > speaker_times[2] else 2
46
+
47
+ # Move file to appropriate speaker directory
48
+ if dominant_speaker == 1:
49
+ shutil.move(file_path, os.path.join(speaker1_dir, filename))
50
+ else:
51
+ shutil.move(file_path, os.path.join(speaker2_dir, filename))
52
+
53
+ # Step 3: Create zip files for each speaker
54
+ speaker1_zip = os.path.join(temp_dir, "speaker1.zip")
55
+ speaker2_zip = os.path.join(temp_dir, "speaker2.zip")
56
+
57
+ shutil.make_archive(os.path.join(temp_dir, "speaker1"), 'zip', speaker1_dir)
58
+ shutil.make_archive(os.path.join(temp_dir, "speaker2"), 'zip', speaker2_dir)
59
+
60
+ return speaker1_zip, speaker2_zip
61
 
62
  iface = gr.Interface(
63
+ fn=process_zip,
64
+ inputs=gr.File(label="Upload ZIP file containing audio files"),
65
+ outputs=[
66
+ gr.File(label="Speaker 1 Audio Files"),
67
+ gr.File(label="Speaker 2 Audio Files")
68
+ ],
69
+ title="Speaker Diarization and Audio Sorting",
70
+ description="Upload a ZIP file containing audio files. The system will analyze each file and sort them into two groups based on the dominant speaker."
71
  )
72
 
73
  iface.launch()