Spaces:
Sleeping
Sleeping
from pyannote.audio import Pipeline | |
from pydub import AudioSegment | |
import os | |
import re | |
import torch | |
def perform_diarization(audio_file_path, translated_file_path, output_dir='./audio/diarization'): | |
# Initialize diarization pipeline | |
accesstoken = os.environ['Diarization'] | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=accesstoken ) | |
# Send pipeline to GPU (when available) | |
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) | |
# Load audio file | |
audio = AudioSegment.from_wav(audio_file_path) | |
# Apply pretrained pipeline | |
diarization = pipeline(audio_file_path) | |
os.makedirs(output_dir, exist_ok=True) | |
# Process and save each speaker's audio segments | |
speaker_segments_audio = {} | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
start_ms = int(turn.start * 1000) # Convert to milliseconds | |
end_ms = int(turn.end * 1000) # Convert to milliseconds | |
segment = audio[start_ms:end_ms] | |
if speaker in speaker_segments_audio: | |
speaker_segments_audio[speaker] += segment | |
else: | |
speaker_segments_audio[speaker] = segment | |
# Save audio segments | |
for speaker, segment in speaker_segments_audio.items(): | |
output_path = os.path.join(output_dir, f"{speaker}.wav") | |
segment.export(output_path, format="wav") | |
print(f"Combined audio for speaker {speaker} saved in {output_path}") | |
# Load translated text | |
with open(translated_file_path, "r") as file: | |
translated_lines = file.readlines() | |
# Process and align translated text with diarization data | |
last_speaker = None | |
aligned_text = [] | |
timestamp_pattern = re.compile(r'\[(\d+\.\d+)\-(\d+\.\d+)\]') | |
for line in translated_lines: | |
match = timestamp_pattern.match(line) | |
if match: | |
start_time = float(match.group(1)) | |
end_time = float(match.group(2)) | |
text = line[match.end():].strip() # Extract text part | |
speaker_found = False | |
# Find corresponding speaker | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
speaker_start = turn.start | |
speaker_end = turn.end | |
# Check for overlap between speaker segment and line timestamp | |
if max(speaker_start, start_time) < min(speaker_end, end_time): | |
aligned_text.append(f"[{speaker}] [{start_time}-{end_time}] {text}") | |
speaker_found = True | |
last_speaker = speaker | |
break | |
# If no speaker found, use the last speaker | |
if not speaker_found: | |
if last_speaker is not None: | |
aligned_text.append(f"[{last_speaker}] [{start_time}-{end_time}] {text}") | |
else: | |
aligned_text.append(f"[Unknown Speaker] [{start_time}-{end_time}] {text}") | |
# Save aligned text to a single file | |
aligned_text_output_path = os.path.join(output_dir, "aligned_text.txt") | |
with open(aligned_text_output_path, "w") as aligned_text_file: | |
aligned_text_file.write('\n'.join(aligned_text)) | |
print(f"Aligned text saved in {aligned_text_output_path}") | |
# The rest of your script, if any | |