# this is a script that transcibe downloaded youtube video using deepgram # the audio should be cleaned with UVR5 first, so the file is flac # it will upload the full length of interview or podcast to deepgram # and will return the speaker id. User must manually listen audio clip to find out which speaker is wanted # discard remaining speakers and short length audio # import os from dotenv import load_dotenv from pydub import AudioSegment import math from os.path import join import shutil from deepgram import ( DeepgramClient, PrerecordedOptions, FileSource, ) def write_csv_file(csv_file, csv_data): with open(csv_file, 'w') as file: # Iterate over each row in the data for row in csv_data: # Create a string where each field is separated by a '|' row_string = '|'.join(str(item) for item in row) # Write the string to the file, followed by a newline character file.write(row_string + '\n') print(f"Data written to {csv_file}") def process(audio_file, tag, progress): load_dotenv("myenv-variable.env") # Path to the audio file AUDIO_FILE = audio_file #audio name TAGS = tag # youtube source, for categorization API_KEY = os.getenv('API_DEEPGRAM') original_parent_folder = os.getcwd() output_folder = join(original_parent_folder, "output") if os.path.isdir(output_folder): shutil.rmtree(output_folder) if os.path.exists("output.zip"): os.remove("output.zip") os.mkdir(output_folder) deepgram = DeepgramClient(API_KEY) with open(AUDIO_FILE, "rb") as file: buffer_data = file.read() payload: FileSource = { "buffer": buffer_data, } #STEP 2: Configure Deepgram options for audio analysis options = PrerecordedOptions( model="nova-2", smart_format=True, filler_words=True, diarize=True ) progress(0.20) try: response = deepgram.listen.prerecorded.v("1").transcribe_file(payload, options) except Exception as e: print(e) progress(0.30) audio = AudioSegment.from_file(AUDIO_FILE) data = response paragraphs = data['results']['channels'][0]['alternatives'][0]['paragraphs']['paragraphs'] csv_data_dict = dict() i=1 progress(0.40) for paragraph in progress.tqdm(paragraphs, desc="Generating..."): sentences = paragraph['sentences'] for text in sentences: # convert the start and end time of the sentence to ms, add +- 5ms buffer to it start_time_ms = math.floor(text['start']*1000)-5 end_time_ms = math.ceil(text['end']*1000)+5 duration_s = round(text['end']-text['start'],3) duration_ms = str(end_time_ms-start_time_ms).zfill(6) if duration_s < 2: continue speaker_id = paragraph['speaker'] folder_path = join(output_folder, "Speaker_"+str(speaker_id)) if not os.path.isdir(folder_path): os.mkdir(folder_path) csv_data_dict.update({str(speaker_id): [["filename", "speaker", "text", "start_time", "end_time", "duration"]]}) print(csv_data_dict) if speaker_id == 10: speaker_id = "Tayr" # Slice the audio segment segment = audio[start_time_ms:end_time_ms] # Generate file name file_name = join("wavs",f"{TAGS}_Speaker_{speaker_id}_i{str(i).zfill(3)}_d{duration_ms}.wav") # Export the segment to temp folder temp_folder = join(folder_path,f"{TAGS}_Speaker_{speaker_id}_i{str(i).zfill(3)}_d{duration_ms}.wav") segment.export(temp_folder, format="wav") # Add data to CSV list csv_data_dict[str(speaker_id)].append([file_name, speaker_id, text['text'], start_time_ms, end_time_ms, duration_s]) i += 1 progress(0.80) # write output.txt file for key, value in csv_data_dict.items(): # Specify the filename speaker_folder = join(output_folder, f"Speaker_{key}") csv_filename = join(speaker_folder,f"Speaker_{key}_{TAGS}_output.txt") write_csv_file(csv_filename, value) progress(0.90) shutil.make_archive("output", 'zip', output_folder) progress(1.00) return "output.zip"