# Diarization_Lib.py ######################################### # Diarization Library # This library is used to perform diarization of audio files. # Currently, uses FIXME for transcription. # #################### #################### # Function List # # 1. speaker_diarize(video_file_path, segments, embedding_model = "pyannote/embedding", embedding_size=512, num_speakers=0) # #################### # Import necessary libraries import logging from pathlib import Path from typing import Dict, List, Any # # Import Local Libraries from App_Function_Libraries.Audio.Audio_Transcription_Lib import speech_to_text # # Import 3rd Party Libraries from pyannote.audio.pipelines.speaker_diarization import SpeakerDiarization import yaml # ####################################################################################################################### # Function Definitions # def load_pipeline_from_pretrained(path_to_config: str | Path) -> SpeakerDiarization: path_to_config = Path(path_to_config).resolve() logging.debug(f"Loading pyannote pipeline from {path_to_config}...") if not path_to_config.exists(): raise FileNotFoundError(f"Config file not found: {path_to_config}") # Load the YAML configuration with open(path_to_config, 'r') as config_file: config = yaml.safe_load(config_file) # Debug: print the entire config logging.debug(f"Loaded config: {config}") # Create the SpeakerDiarization pipeline try: pipeline = SpeakerDiarization( segmentation=config['pipeline']['params']['segmentation'], embedding=config['pipeline']['params']['embedding'], clustering=config['pipeline']['params']['clustering'], ) except KeyError as e: logging.error(f"Error accessing config key: {e}") raise # Set other parameters try: pipeline_params = { "segmentation": {}, "clustering": {}, } if 'params' in config and 'segmentation' in config['params']: if 'min_duration_off' in config['params']['segmentation']: pipeline_params["segmentation"]["min_duration_off"] = config['params']['segmentation']['min_duration_off'] if 'params' in config and 'clustering' in config['params']: if 'method' in config['params']['clustering']: pipeline_params["clustering"]["method"] = config['params']['clustering']['method'] if 'min_cluster_size' in config['params']['clustering']: pipeline_params["clustering"]["min_cluster_size"] = config['params']['clustering']['min_cluster_size'] if 'threshold' in config['params']['clustering']: pipeline_params["clustering"]["threshold"] = config['params']['clustering']['threshold'] if 'pipeline' in config and 'params' in config['pipeline']: if 'embedding_batch_size' in config['pipeline']['params']: pipeline_params["embedding_batch_size"] = config['pipeline']['params']['embedding_batch_size'] if 'embedding_exclude_overlap' in config['pipeline']['params']: pipeline_params["embedding_exclude_overlap"] = config['pipeline']['params']['embedding_exclude_overlap'] if 'segmentation_batch_size' in config['pipeline']['params']: pipeline_params["segmentation_batch_size"] = config['pipeline']['params']['segmentation_batch_size'] logging.debug(f"Pipeline params: {pipeline_params}") pipeline.instantiate(pipeline_params) except KeyError as e: logging.error(f"Error accessing config key: {e}") raise except Exception as e: logging.error(f"Error instantiating pipeline: {e}") raise return pipeline def audio_diarization(audio_file_path: str) -> list: logging.info('audio-diarization: Loading pyannote pipeline') base_dir = Path(__file__).parent.resolve() config_path = base_dir / 'models' / 'pyannote_diarization_config.yaml' logging.info(f"audio-diarization: Loading pipeline from {config_path}") try: pipeline = load_pipeline_from_pretrained(config_path) except Exception as e: logging.error(f"Failed to load pipeline: {str(e)}") raise logging.info(f"audio-diarization: Audio file path: {audio_file_path}") try: logging.info('audio-diarization: Starting diarization...') diarization_result = pipeline(audio_file_path) segments = [] for turn, _, speaker in diarization_result.itertracks(yield_label=True): segment = { "start": turn.start, "end": turn.end, "speaker": speaker } logging.debug(f"Segment: {segment}") segments.append(segment) logging.info("audio-diarization: Diarization completed with pyannote") return segments except Exception as e: logging.error(f"audio-diarization: Error performing diarization: {str(e)}") raise RuntimeError("audio-diarization: Error performing diarization") from e # Old # def audio_diarization(audio_file_path): # logging.info('audio-diarization: Loading pyannote pipeline') # # #config file loading # current_dir = os.path.dirname(os.path.abspath(__file__)) # # Construct the path to the config file # config_path = os.path.join(current_dir, 'Config_Files', 'config.txt') # # Read the config file # config = configparser.ConfigParser() # config.read(config_path) # processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') # # base_dir = Path(__file__).parent.resolve() # config_path = base_dir / 'models' / 'config.yaml' # pipeline = load_pipeline_from_pretrained(config_path) # # time_start = time.time() # if audio_file_path is None: # raise ValueError("audio-diarization: No audio file provided") # logging.info("audio-diarization: Audio file path: %s", audio_file_path) # # try: # _, file_ending = os.path.splitext(audio_file_path) # out_file = audio_file_path.replace(file_ending, ".diarization.json") # prettified_out_file = audio_file_path.replace(file_ending, ".diarization_pretty.json") # if os.path.exists(out_file): # logging.info("audio-diarization: Diarization file already exists: %s", out_file) # with open(out_file) as f: # global diarization_result # diarization_result = json.load(f) # return diarization_result # # logging.info('audio-diarization: Starting diarization...') # diarization_result = pipeline(audio_file_path) # # segments = [] # for turn, _, speaker in diarization_result.itertracks(yield_label=True): # chunk = { # "Time_Start": turn.start, # "Time_End": turn.end, # "Speaker": speaker # } # logging.debug("Segment: %s", chunk) # segments.append(chunk) # logging.info("audio-diarization: Diarization completed with pyannote") # # output_data = {'segments': segments} # # logging.info("audio-diarization: Saving prettified JSON to %s", prettified_out_file) # with open(prettified_out_file, 'w') as f: # json.dump(output_data, f, indent=2) # # logging.info("audio-diarization: Saving JSON to %s", out_file) # with open(out_file, 'w') as f: # json.dump(output_data, f) # # except Exception as e: # logging.error("audio-diarization: Error performing diarization: %s", str(e)) # raise RuntimeError("audio-diarization: Error performing diarization") # return segments def combine_transcription_and_diarization(audio_file_path: str) -> List[Dict[str, Any]]: logging.info('combine-transcription-and-diarization: Starting transcription and diarization...') try: logging.info('Performing speech-to-text...') transcription_result = speech_to_text(audio_file_path) logging.info(f"Transcription result type: {type(transcription_result)}") logging.info(f"Transcription result: {transcription_result[:3] if isinstance(transcription_result, list) and len(transcription_result) > 3 else transcription_result}") logging.info('Performing audio diarization...') diarization_result = audio_diarization(audio_file_path) logging.info(f"Diarization result type: {type(diarization_result)}") logging.info(f"Diarization result sample: {diarization_result[:3] if isinstance(diarization_result, list) and len(diarization_result) > 3 else diarization_result}") if not transcription_result: logging.error("Empty result from transcription") return [] if not diarization_result: logging.error("Empty result from diarization") return [] # Handle the case where transcription_result is a dict with a 'segments' key if isinstance(transcription_result, dict) and 'segments' in transcription_result: transcription_segments = transcription_result['segments'] elif isinstance(transcription_result, list): transcription_segments = transcription_result else: logging.error(f"Unexpected transcription result format: {type(transcription_result)}") return [] logging.info(f"Number of transcription segments: {len(transcription_segments)}") logging.info(f"Transcription segments sample: {transcription_segments[:3] if len(transcription_segments) > 3 else transcription_segments}") if not isinstance(diarization_result, list): logging.error(f"Unexpected diarization result format: {type(diarization_result)}") return [] combined_result = [] for transcription_segment in transcription_segments: if not isinstance(transcription_segment, dict): logging.warning(f"Unexpected transcription segment format: {transcription_segment}") continue for diarization_segment in diarization_result: if not isinstance(diarization_segment, dict): logging.warning(f"Unexpected diarization segment format: {diarization_segment}") continue try: trans_start = transcription_segment.get('Time_Start', 0) trans_end = transcription_segment.get('Time_End', 0) diar_start = diarization_segment.get('start', 0) diar_end = diarization_segment.get('end', 0) if trans_start >= diar_start and trans_end <= diar_end: combined_segment = { "Time_Start": trans_start, "Time_End": trans_end, "Speaker": diarization_segment.get('speaker', 'Unknown'), "Text": transcription_segment.get('Text', '') } combined_result.append(combined_segment) break except Exception as e: logging.error(f"Error processing segment: {str(e)}") logging.error(f"Transcription segment: {transcription_segment}") logging.error(f"Diarization segment: {diarization_segment}") continue logging.info(f"Combined result length: {len(combined_result)}") logging.info(f"Combined result sample: {combined_result[:3] if len(combined_result) > 3 else combined_result}") return combined_result except Exception as e: logging.error(f"Error in combine_transcription_and_diarization: {str(e)}", exc_info=True) return [] # # #######################################################################################################################