# Description: AWS utility functions for Resonate. This file contains the code to parse the AWS Transcribe output. # Documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/transcribe/client/start_transcription_job.html import json import os import re import time import boto3 import dotenv import pandas as pd import webvtt from datetime import datetime from IPython.display import HTML, display class resonate_aws_transcribe: def create_client( self, aws_access_key: str, aws_secret_access_key: str, aws_region_name: str, ) -> tuple[boto3.client, boto3.client]: """ Create and return AWS Transcribe and S3 clients with the specified AWS region. """ session = boto3.Session( aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_access_key, region_name=aws_region_name, ) return session.client("transcribe"), session.client("s3") def create_s3_bucket( self, s3: boto3.client, bucket_name: str, aws_region_name: str ) -> bool: """ Create an S3 bucket using the provided AWS S3 client if it doesn't exist. """ try: s3.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": aws_region_name}, ) print(f"S3 bucket '{bucket_name}' created successfully.") return True except s3.exceptions.BucketAlreadyExists: print(f"S3 bucket '{bucket_name}' already exists.") return True except Exception as e: print(f"Error creating S3 bucket '{bucket_name}': {e}") return False def upload_to_s3( self, s3: boto3.client, file_path: str, bucket_name: str, object_name=None ) -> str: """ Upload the audio file to S3 bucket using the provided AWS S3 client. """ if object_name is None: object_name = file_path try: s3.upload_file(file_path, bucket_name, object_name) uri = f"s3://{bucket_name}/{object_name}" print(f"File '{file_path}' uploaded successfully to '{uri}'") return uri except Exception as e: print( f"Error uploading file '{file_path}' to '{bucket_name}/{object_name}': {e}" ) return "" def download_from_s3( self, s3: boto3.client, object_name: str, bucket_name: str, local_directory: str, ) -> bool: """ Download the .json and .vtt files from an S3 bucket to a local directory. """ local_file_json = f"{local_directory}/{object_name}.json" local_file_vtt = f"{local_directory}/{object_name}.vtt" try: s3.download_file(bucket_name, object_name + ".json", local_file_json) print(f"File '{object_name}' (JSON) downloaded successfully to '{local_file_json}'") s3.download_file(bucket_name, object_name + ".vtt", local_file_vtt) print(f"File '{object_name}' (VTT) downloaded successfully to '{local_file_vtt}'") return True except Exception as e: print(f"Error downloading file '{object_name}' from '{bucket_name}': {e}") return False def delete_from_s3( self, s3: boto3.client, bucket_name: str, object_name: str ) -> bool: """ Delete the file from an S3 bucket using the provided AWS S3 client. """ try: s3.delete_object(Bucket=bucket_name, Key=object_name) print(f"File '{object_name}' deleted successfully from '{bucket_name}'") return True except Exception as e: print(f"Error deleting file '{object_name}' from '{bucket_name}': {e}") return False def delete_s3_bucket(self, s3: boto3.client, bucket_name: str) -> bool: """ Delete a S3 bucket along with its contents using the provided AWS S3 client. """ try: objects = s3.list_objects(Bucket=bucket_name).get("Contents", []) for obj in objects: s3.delete_object(Bucket=bucket_name, Key=obj["Key"]) print( f"Object '{obj['Key']}' deleted successfully from '{bucket_name}'" ) s3.delete_bucket(Bucket=bucket_name) print(f"S3 bucket '{bucket_name}' and its contents deleted successfully.") return True except Exception as e: return e def transcribe_audio( self, transcribe_client: boto3.client, uri: str, output_bucket: str, transcribe_job_name: str = "job", ) -> dict: """ Start a transcription job for audio stored in an S3 bucket using the AWS Transcribe service. """ print("Calling AWS Transcribe Job...") response = transcribe_client.start_transcription_job( TranscriptionJobName=transcribe_job_name, LanguageCode="en-US", MediaFormat="wav", Settings={ "ShowSpeakerLabels": True, "MaxSpeakerLabels": 10, "ChannelIdentification": False, }, Media={"MediaFileUri": uri}, Subtitles={"Formats": ["vtt"]}, OutputBucketName=output_bucket, ) return response def combine_files(self, file_name: str, local_directory: str) -> pd.DataFrame: """ Combines information from a JSON file and a WebVTT file into a CSV file. """ json_file_path = f"{local_directory}/{file_name}.json" with open(json_file_path, "r") as f: data = json.load(f) segments = data["results"]["speaker_labels"]["segments"] df = pd.DataFrame(segments) df["start_time"] = df["start_time"].astype(float) / 60 df["end_time"] = df["end_time"].astype(float) / 60 df = df.rename( columns={ "start_time": "start_time", "end_time": "end_time", "speaker_label": "speaker_label", } ) vtt_file_path = f"{local_directory}/{file_name}.vtt" subtitles = webvtt.read(vtt_file_path) data = [ ( subtitle.start_in_seconds / 60, subtitle.end_in_seconds / 60, subtitle.text.strip(), ) for subtitle in subtitles ] titles = pd.DataFrame(data, columns=["start_time", "end_time", "text"]) transcript = pd.merge_asof( titles.sort_values("start_time"), df.sort_values("start_time"), on="start_time", direction="backward", ) transcript = transcript.dropna(subset=["speaker_label"]) transcript = transcript[["start_time", "end_time_x", "speaker_label", "text"]] transcript.columns = ["start_time", "end_time", "speaker_label", "text"] # Reset the index transcript = transcript.reset_index(drop=True) print("Combined transcript successfully!") return transcript def aws_transcribe_parser( self, transcript_df: pd.DataFrame, output_filename: str ) -> pd.DataFrame: """ Parses the AWS Transcribe output by cleaning duplicate texts and merging consecutive rows with the same speaker. """ prev_text = None # Initialize prev_text transcript_df["text"] = transcript_df["text"].apply( lambda x: re.sub(r"[\"\'\--]+", "", x) ) for index, row in transcript_df.iterrows(): if row["text"] == prev_text and row["speaker_label"] == prev_speaker: transcript_df.at[merge_start, "end_time"] = row["end_time"] transcript_df.drop(index, inplace=True) else: merge_start = index prev_text = row["text"] prev_speaker = row["speaker_label"] transcript_df["group"] = ( transcript_df["speaker_label"] != transcript_df["speaker_label"].shift() ).cumsum() result_df = transcript_df.groupby( ["group", "speaker_label"], as_index=False ).agg({"start_time": "first", "end_time": "last", "text": " ".join}) result_df = result_df.drop(columns=["group"]) result_df.to_csv( "./data/transcriptFiles/" + output_filename + ".csv", index=False ) return result_df def delete_local_temp_file(self, tempFiles: str) -> bool: """ Delete a local temporary file specified by the file path. """ if os.path.exists("./data/tempFiles/" + tempFiles + ".json"): os.remove("./data/tempFiles/" + tempFiles + ".json") if os.path.exists("./data/tempFiles/" + tempFiles + ".vtt"): os.remove("./data/tempFiles/" + tempFiles + ".vtt") def runner( self, file_name: str, input_bucket: str, output_bucket: str, transcribe_job_name: str, aws_access_key: str, aws_secret_access_key: str, aws_region_name: str, ) -> None: """ Run the transcription process for an audio file using AWS Transcribe. """ transcribe_client, s3_client = self.create_client( aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key, aws_region_name=aws_region_name, ) print("Transcribe_client created: ", transcribe_client) print("s3_client created: ", s3_client) # Create S3 buckets print( f"Create S3 Bucket {input_bucket} : ", self.create_s3_bucket(s3_client, input_bucket, aws_region_name), ) print( f"Create S3 Bucket {output_bucket} : ", self.create_s3_bucket(s3_client, output_bucket, aws_region_name), ) URI = self.upload_to_s3( s3_client, "./data/audioFiles/" + file_name, input_bucket ) print("Upload completed now will initiate transcription job.") self.transcribe_audio( transcribe_client, URI, output_bucket, transcribe_job_name=transcribe_job_name, ) # Check status of transcription job while ( transcribe_client.get_transcription_job( TranscriptionJobName=transcribe_job_name )["TranscriptionJob"]["TranscriptionJobStatus"] != "COMPLETED" ): time.sleep(3) # Download transcription job output print( "Download from S3 : ", self.download_from_s3( s3_client, transcribe_job_name, output_bucket, local_directory="./data/tempFiles/", ), ) print( "Delete S3 Bucket Input Bucket : ", self.delete_s3_bucket(s3_client, input_bucket), ) print( "Delete S3 Bucket Output Bucket: ", self.delete_s3_bucket(s3_client, output_bucket), ) try: transcribe_client.delete_transcription_job( TranscriptionJobName=transcribe_job_name ) except: print("Transcription Job does not exist.") # Close clients transcribe_client.close() s3_client.close() # combine the json and vtt results to create a transcript df_transcript_combined = self.combine_files( transcribe_job_name, local_directory="./data/tempFiles/" ) df_transcript_combined_parsed = self.aws_transcribe_parser( transcript_df=df_transcript_combined, output_filename=transcribe_job_name ) print("Transcript parsed successfully") self.delete_local_temp_file(tempFiles=transcribe_job_name) return df_transcript_combined_parsed if __name__ == "__main__": dotenv.load_dotenv("./config/.env") current_timestamp = str.lower(datetime.now().strftime("%Y-%b-%d-%I-%M-%p")) aws_access_key = os.getenv("AWS_ACCESS_KEY") aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") print(aws_access_key, aws_secret_access_key) aws_region_name = "us-east-2" file_name = "test.wav" input_bucket = f"resonate-input-{str(current_timestamp)}" output_bucket = f"resonate-output-{str(current_timestamp)}" transcribe_job_name = f"resonate-job-{str(current_timestamp)}" rat = resonate_aws_transcribe() df = rat.runner( file_name=file_name, input_bucket=input_bucket, output_bucket=output_bucket, transcribe_job_name=transcribe_job_name, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key, aws_region_name=aws_region_name, ) print(df)