# Audio_Files.py ######################################### # Audio Processing Library # This library is used to download or load audio files from a local directory. # #### # # Functions: # # download_audio_file(url, save_path) # process_audio( # process_audio_file(audio_url, audio_file, whisper_model="small.en", api_name=None, api_key=None) # # ######################################### # Imports import json import logging import os import subprocess import tempfile import time import uuid from datetime import datetime from pathlib import Path # # External Imports import requests import yt_dlp # # Local Imports from App_Function_Libraries.DB.DB_Manager import add_media_with_keywords, \ check_media_and_whisper_model from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram from App_Function_Libraries.Summarization.Summarization_General_Lib import perform_summarization from App_Function_Libraries.Utils.Utils import downloaded_files, \ sanitize_filename, generate_unique_id, temp_files from App_Function_Libraries.Video_DL_Ingestion_Lib import extract_metadata from App_Function_Libraries.Audio.Audio_Transcription_Lib import speech_to_text from App_Function_Libraries.Chunk_Lib import improved_chunking_process # ####################################################################################################################### # Function Definitions # MAX_FILE_SIZE = 500 * 1024 * 1024 def download_audio_file(url, current_whisper_model="", use_cookies=False, cookies=None): try: # Check if media already exists in the database and compare whisper models should_download, reason = check_media_and_whisper_model( url=url, current_whisper_model=current_whisper_model ) if not should_download: logging.info(f"Skipping audio download: {reason}") return None logging.info(f"Proceeding with audio download: {reason}") # Set up the request headers headers = {} if use_cookies and cookies: try: cookie_dict = json.loads(cookies) headers['Cookie'] = '; '.join([f'{k}={v}' for k, v in cookie_dict.items()]) except json.JSONDecodeError: logging.warning("Invalid cookie format. Proceeding without cookies.") # Make the request response = requests.get(url, headers=headers, stream=True) # Raise an exception for bad status codes response.raise_for_status() # Get the file size file_size = int(response.headers.get('content-length', 0)) if file_size > 500 * 1024 * 1024: # 500 MB limit raise ValueError("File size exceeds the 500MB limit.") # Generate a unique filename file_name = f"audio_{uuid.uuid4().hex[:8]}.mp3" save_path = os.path.join('downloads', file_name) # Ensure the downloads directory exists os.makedirs('downloads', exist_ok=True) # Download the file with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logging.info(f"Audio file downloaded successfully: {save_path}") return save_path except requests.RequestException as e: logging.error(f"Error downloading audio file: {str(e)}") raise except ValueError as e: logging.error(str(e)) raise except Exception as e: logging.error(f"Unexpected error downloading audio file: {str(e)}") raise def process_audio_files(audio_urls, audio_file, whisper_model, api_name, api_key, use_cookies, cookies, keep_original, custom_keywords, custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking, use_multi_level_chunking, chunk_language, diarize, keep_timestamps, custom_title): start_time = time.time() # Start time for processing processed_count = 0 failed_count = 0 progress = [] all_transcriptions = [] all_summaries = [] temp_files = [] # Keep track of temporary files def format_transcription_with_timestamps(segments): if keep_timestamps: formatted_segments = [] for segment in segments: start = segment.get('Time_Start', 0) end = segment.get('Time_End', 0) text = segment.get('Text', '').strip() formatted_segments.append(f"[{start:.2f}-{end:.2f}] {text}") # Join the segments with a newline to ensure proper formatting return "\n".join(formatted_segments) else: # Join the text without timestamps return "\n".join([segment.get('Text', '').strip() for segment in segments]) def update_progress(message): progress.append(message) return "\n".join(progress) def cleanup_files(): for file in temp_files: try: if os.path.exists(file): os.remove(file) update_progress(f"Temporary file {file} removed.") except Exception as e: update_progress(f"Failed to remove temporary file {file}: {str(e)}") def reencode_mp3(mp3_file_path): try: reencoded_mp3_path = mp3_file_path.replace(".mp3", "_reencoded.mp3") subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, '-codec:a', 'libmp3lame', reencoded_mp3_path], check=True) update_progress(f"Re-encoded {mp3_file_path} to {reencoded_mp3_path}.") return reencoded_mp3_path except subprocess.CalledProcessError as e: update_progress(f"Error re-encoding {mp3_file_path}: {str(e)}") raise def convert_mp3_to_wav(mp3_file_path): try: wav_file_path = mp3_file_path.replace(".mp3", ".wav") subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, wav_file_path], check=True) update_progress(f"Converted {mp3_file_path} to {wav_file_path}.") return wav_file_path except subprocess.CalledProcessError as e: update_progress(f"Error converting {mp3_file_path} to WAV: {str(e)}") raise try: # Check and set the ffmpeg command global ffmpeg_cmd if os.name == "nt": logging.debug("Running on Windows") ffmpeg_cmd = os.path.join(os.getcwd(), "Bin", "ffmpeg.exe") else: ffmpeg_cmd = 'ffmpeg' # Assume 'ffmpeg' is in PATH for non-Windows systems # Ensure ffmpeg is accessible if not os.path.exists(ffmpeg_cmd) and os.name == "nt": raise FileNotFoundError(f"ffmpeg executable not found at path: {ffmpeg_cmd}") # Define chunk options early to avoid undefined errors chunk_options = { 'method': chunk_method, 'max_size': max_chunk_size, 'overlap': chunk_overlap, 'adaptive': use_adaptive_chunking, 'multi_level': use_multi_level_chunking, 'language': chunk_language } # Process URLs if provided if audio_urls: urls = [url.strip() for url in audio_urls.split('\n') if url.strip()] for i, url in enumerate(urls): try: update_progress(f"Processing URL {i + 1}/{len(urls)}: {url}") # Download and process audio file audio_file_path = download_audio_file(url, use_cookies, cookies) if not audio_file_path: raise FileNotFoundError(f"Failed to download audio from URL: {url}") temp_files.append(audio_file_path) # Process the audio file reencoded_mp3_path = reencode_mp3(audio_file_path) temp_files.append(reencoded_mp3_path) wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) temp_files.append(wav_file_path) # Transcribe audio segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=diarize) # Handle segments format if isinstance(segments, dict) and 'segments' in segments: segments = segments['segments'] if not isinstance(segments, list): raise ValueError("Unexpected segments format received from speech_to_text") transcription = format_transcription_with_timestamps(segments) if not transcription.strip(): raise ValueError("Empty transcription generated") # Initialize summary with default value summary = "No summary available" # Attempt summarization if API is provided if api_name and api_name.lower() != "none": try: chunked_text = improved_chunking_process(transcription, chunk_options) summary_result = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) if summary_result: summary = summary_result update_progress("Audio summarized successfully.") except Exception as e: logging.error(f"Summarization failed: {str(e)}") summary = "Summary generation failed" # Add to results all_transcriptions.append(transcription) all_summaries.append(summary) # Add to database title = custom_title if custom_title else os.path.basename(wav_file_path) add_media_with_keywords( url=url, title=title, media_type='audio', content=transcription, keywords=custom_keywords, prompt=custom_prompt_input, summary=summary, transcription_model=whisper_model, author="Unknown", ingestion_date=datetime.now().strftime('%Y-%m-%d') ) processed_count += 1 update_progress(f"Successfully processed URL {i + 1}") log_counter("audio_files_processed_total", 1, {"whisper_model": whisper_model, "api_name": api_name}) except Exception as e: failed_count += 1 update_progress(f"Failed to process URL {i + 1}: {str(e)}") log_counter("audio_files_failed_total", 1, {"whisper_model": whisper_model, "api_name": api_name}) continue # Process uploaded file if provided if audio_file: try: update_progress("Processing uploaded file...") if os.path.getsize(audio_file.name) > MAX_FILE_SIZE: raise ValueError(f"File size exceeds maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f}MB") reencoded_mp3_path = reencode_mp3(audio_file.name) temp_files.append(reencoded_mp3_path) wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) temp_files.append(wav_file_path) # Transcribe audio segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=diarize) if isinstance(segments, dict) and 'segments' in segments: segments = segments['segments'] if not isinstance(segments, list): raise ValueError("Unexpected segments format received from speech_to_text") transcription = format_transcription_with_timestamps(segments) if not transcription.strip(): raise ValueError("Empty transcription generated") # Initialize summary with default value summary = "No summary available" # Attempt summarization if API is provided if api_name and api_name.lower() != "none": try: chunked_text = improved_chunking_process(transcription, chunk_options) summary_result = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) if summary_result: summary = summary_result update_progress("Audio summarized successfully.") except Exception as e: logging.error(f"Summarization failed: {str(e)}") summary = "Summary generation failed" # Add to results all_transcriptions.append(transcription) all_summaries.append(summary) # Add to database title = custom_title if custom_title else os.path.basename(wav_file_path) add_media_with_keywords( url="Uploaded File", title=title, media_type='audio', content=transcription, keywords=custom_keywords, prompt=custom_prompt_input, summary=summary, transcription_model=whisper_model, author="Unknown", ingestion_date=datetime.now().strftime('%Y-%m-%d') ) processed_count += 1 update_progress("Successfully processed uploaded file") log_counter("audio_files_processed_total", 1, {"whisper_model": whisper_model, "api_name": api_name}) except Exception as e: failed_count += 1 update_progress(f"Failed to process uploaded file: {str(e)}") log_counter("audio_files_failed_total", 1, {"whisper_model": whisper_model, "api_name": api_name}) # Cleanup temporary files if not keep_original: cleanup_files() # Log processing metrics processing_time = time.time() - start_time log_histogram("audio_processing_time_seconds", processing_time, {"whisper_model": whisper_model, "api_name": api_name}) log_counter("total_audio_files_processed", processed_count, {"whisper_model": whisper_model, "api_name": api_name}) log_counter("total_audio_files_failed", failed_count, {"whisper_model": whisper_model, "api_name": api_name}) # Prepare final output final_progress = update_progress(f"Processing complete. Processed: {processed_count}, Failed: {failed_count}") final_transcriptions = "\n\n".join(all_transcriptions) if all_transcriptions else "No transcriptions available" final_summaries = "\n\n".join(all_summaries) if all_summaries else "No summaries available" return final_progress, final_transcriptions, final_summaries except Exception as e: logging.error(f"Error in process_audio_files: {str(e)}") log_counter("audio_files_failed_total", 1, {"whisper_model": whisper_model, "api_name": api_name}) if not keep_original: cleanup_files() return update_progress(f"Processing failed: {str(e)}"), "No transcriptions available", "No summaries available" def format_transcription_with_timestamps(segments, keep_timestamps): """ Formats the transcription segments with or without timestamps. Parameters: segments (list): List of transcription segments. keep_timestamps (bool): Whether to include timestamps. Returns: str: Formatted transcription. """ if keep_timestamps: formatted_segments = [] for segment in segments: start = segment.get('Time_Start', 0) end = segment.get('Time_End', 0) text = segment.get('Text', '').strip() formatted_segments.append(f"[{start:.2f}-{end:.2f}] {text}") return "\n".join(formatted_segments) else: return "\n".join([segment.get('Text', '').strip() for segment in segments]) def download_youtube_audio(url): try: # Determine ffmpeg path based on the operating system. ffmpeg_path = './Bin/ffmpeg.exe' if os.name == 'nt' else 'ffmpeg' # Create a temporary directory with tempfile.TemporaryDirectory() as temp_dir: # Extract information about the video with yt_dlp.YoutubeDL({'quiet': True}) as ydl: info_dict = ydl.extract_info(url, download=False) sanitized_title = sanitize_filename(info_dict['title']) # Setup the temporary filenames temp_video_path = Path(temp_dir) / f"{sanitized_title}_temp.mp4" temp_audio_path = Path(temp_dir) / f"{sanitized_title}.mp3" # Initialize yt-dlp with options for downloading ydl_opts = { 'format': 'bestaudio[ext=m4a]/best[height<=480]', # Prefer best audio, or video up to 480p 'ffmpeg_location': ffmpeg_path, 'outtmpl': str(temp_video_path), 'noplaylist': True, 'quiet': True } # Execute yt-dlp to download the video/audio with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Check if the file exists if not temp_video_path.exists(): raise FileNotFoundError(f"Expected file was not found: {temp_video_path}") # Use ffmpeg to extract audio ffmpeg_command = [ ffmpeg_path, '-i', str(temp_video_path), '-vn', # No video '-acodec', 'libmp3lame', '-b:a', '192k', str(temp_audio_path) ] subprocess.run(ffmpeg_command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Check if the audio file was created if not temp_audio_path.exists(): raise FileNotFoundError(f"Expected audio file was not found: {temp_audio_path}") # Create a persistent directory for the download if it doesn't exist persistent_dir = Path("downloads") persistent_dir.mkdir(exist_ok=True) # Move the file from the temporary directory to the persistent directory persistent_file_path = persistent_dir / f"{sanitized_title}.mp3" os.replace(str(temp_audio_path), str(persistent_file_path)) # Add the file to the list of downloaded files downloaded_files.append(str(persistent_file_path)) return str(persistent_file_path), f"Audio downloaded successfully: {sanitized_title}.mp3" except Exception as e: return None, f"Error downloading audio: {str(e)}" def process_podcast(url, title, author, keywords, custom_prompt, api_name, api_key, whisper_model, keep_original=False, enable_diarization=False, use_cookies=False, cookies=None, chunk_method=None, max_chunk_size=300, chunk_overlap=0, use_adaptive_chunking=False, use_multi_level_chunking=False, chunk_language='english', keep_timestamps=True): """ Processes a podcast by downloading the audio, transcribing it, summarizing the transcription, and adding the results to the database. Metrics are logged throughout the process. Parameters: url (str): URL of the podcast. title (str): Title of the podcast. author (str): Author of the podcast. keywords (str): Comma-separated keywords. custom_prompt (str): Custom prompt for summarization. api_name (str): API name for summarization. api_key (str): API key for summarization. whisper_model (str): Whisper model to use for transcription. keep_original (bool): Whether to keep the original audio file. enable_diarization (bool): Whether to enable speaker diarization. use_cookies (bool): Whether to use cookies for authenticated downloads. cookies (str): JSON-formatted cookies string. chunk_method (str): Method for chunking text. max_chunk_size (int): Maximum size for each text chunk. chunk_overlap (int): Overlap size between chunks. use_adaptive_chunking (bool): Whether to use adaptive chunking. use_multi_level_chunking (bool): Whether to use multi-level chunking. chunk_language (str): Language for chunking. keep_timestamps (bool): Whether to keep timestamps in transcription. Returns: tuple: (progress_message, transcription, summary, title, author, keywords, error_message) """ start_time = time.time() # Start time for processing error_message = "" temp_files = [] # Define labels for metrics labels = { "whisper_model": whisper_model, "api_name": api_name if api_name else "None" } def update_progress(message): """ Updates the progress messages. Parameters: message (str): Progress message to append. Returns: str: Combined progress messages. """ progress.append(message) return "\n".join(progress) def cleanup_files(): if not keep_original: for file in temp_files: try: if os.path.exists(file): os.remove(file) update_progress(f"Temporary file {file} removed.") except Exception as e: update_progress(f"Failed to remove temporary file {file}: {str(e)}") progress = [] # Initialize progress messages try: # Handle cookies if required if use_cookies: cookies = json.loads(cookies) # Download the podcast audio file audio_file = download_audio_file(url, whisper_model, use_cookies, cookies) if not audio_file: raise RuntimeError("Failed to download podcast audio.") temp_files.append(audio_file) update_progress("Podcast downloaded successfully.") # Extract metadata from the podcast metadata = extract_metadata(url) title = title or metadata.get('title', 'Unknown Podcast') author = author or metadata.get('uploader', 'Unknown Author') # Format metadata for storage metadata_text = f""" Metadata: Title: {title} Author: {author} Series: {metadata.get('series', 'N/A')} Episode: {metadata.get('episode', 'N/A')} Season: {metadata.get('season', 'N/A')} Upload Date: {metadata.get('upload_date', 'N/A')} Duration: {metadata.get('duration', 'N/A')} seconds Description: {metadata.get('description', 'N/A')} """ # Update keywords with metadata information new_keywords = [] if metadata.get('series'): new_keywords.append(f"series:{metadata['series']}") if metadata.get('episode'): new_keywords.append(f"episode:{metadata['episode']}") if metadata.get('season'): new_keywords.append(f"season:{metadata['season']}") keywords = f"{keywords},{','.join(new_keywords)}" if keywords else ','.join(new_keywords) update_progress(f"Metadata extracted - Title: {title}, Author: {author}, Keywords: {keywords}") # Transcribe the podcast audio try: if enable_diarization: segments = speech_to_text(audio_file, whisper_model=whisper_model, diarize=True) else: segments = speech_to_text(audio_file, whisper_model=whisper_model) # SEems like this could be optimized... FIXME def format_segment(segment): start = segment.get('start', 0) end = segment.get('end', 0) text = segment.get('Text', '') if isinstance(segments, dict) and 'segments' in segments: segments = segments['segments'] if isinstance(segments, list): transcription = format_transcription_with_timestamps(segments, keep_timestamps) update_progress("Podcast transcribed successfully.") else: raise ValueError("Unexpected segments format received from speech_to_text.") if not transcription.strip(): raise ValueError("Transcription is empty.") except Exception as e: error_message = f"Transcription failed: {str(e)}" raise RuntimeError(error_message) # Apply chunking to the transcription chunk_options = { 'method': chunk_method, 'max_size': max_chunk_size, 'overlap': chunk_overlap, 'adaptive': use_adaptive_chunking, 'multi_level': use_multi_level_chunking, 'language': chunk_language } chunked_text = improved_chunking_process(transcription, chunk_options) # Combine metadata and transcription full_content = metadata_text + "\n\nTranscription:\n" + transcription # Summarize the transcription if API is provided summary = None if api_name: try: summary = perform_summarization(api_name, chunked_text, custom_prompt, api_key) update_progress("Podcast summarized successfully.") except Exception as e: error_message = f"Summarization failed: {str(e)}" raise RuntimeError(error_message) else: summary = "No summary available (API not provided)" # Add the processed podcast to the database try: add_media_with_keywords( url=url, title=title, media_type='podcast', content=full_content, keywords=keywords, prompt=custom_prompt, summary=summary or "No summary available", transcription_model=whisper_model, author=author, ingestion_date=datetime.now().strftime('%Y-%m-%d') ) update_progress("Podcast added to database successfully.") except Exception as e: error_message = f"Error adding podcast to database: {str(e)}" raise RuntimeError(error_message) # Cleanup temporary files if required cleanup_files() # Calculate processing time end_time = time.time() processing_time = end_time - start_time # Log successful processing log_counter( metric_name="podcasts_processed_total", labels=labels, value=1 ) # Log processing time log_histogram( metric_name="podcast_processing_time_seconds", value=processing_time, labels=labels ) # Return the final outputs final_progress = update_progress("Processing complete.") return (final_progress, full_content, summary or "No summary generated.", title, author, keywords, error_message) except Exception as e: # Calculate processing time up to the point of failure end_time = time.time() processing_time = end_time - start_time # Log failed processing log_counter( metric_name="podcasts_failed_total", labels=labels, value=1 ) # Log processing time even on failure log_histogram( metric_name="podcast_processing_time_seconds", value=processing_time, labels=labels ) logging.error(f"Error processing podcast: {str(e)}") cleanup_files() final_progress = update_progress(f"Processing failed: {str(e)}") return (final_progress, "", "", "", "", "", str(e)) # # #######################################################################################################################