# Audio_Files.py ######################################### # Audio Processing Library # This library is used to download or load audio files from a local directory. # #### # # Functions: # # download_audio_file(url, save_path) # process_audio( # process_audio_file(audio_url, audio_file, whisper_model="small.en", api_name=None, api_key=None) # # ######################################### # Imports import json import logging import os import subprocess import tempfile import uuid from datetime import datetime from pathlib import Path import requests import yt_dlp from App_Function_Libraries.Audio.Audio_Transcription_Lib import speech_to_text from App_Function_Libraries.Chunk_Lib import improved_chunking_process # # Local Imports from App_Function_Libraries.DB.DB_Manager import add_media_to_database, add_media_with_keywords, \ check_media_and_whisper_model from App_Function_Libraries.Summarization.Summarization_General_Lib import save_transcription_and_summary, perform_transcription, \ perform_summarization from App_Function_Libraries.Utils.Utils import create_download_directory, save_segments_to_json, downloaded_files, \ sanitize_filename from App_Function_Libraries.Video_DL_Ingestion_Lib import extract_metadata # ####################################################################################################################### # Function Definitions # MAX_FILE_SIZE = 500 * 1024 * 1024 def download_audio_file(url, current_whisper_model="", use_cookies=False, cookies=None): try: # Check if media already exists in the database and compare whisper models should_download, reason = check_media_and_whisper_model( url=url, current_whisper_model=current_whisper_model ) if not should_download: logging.info(f"Skipping audio download: {reason}") return None logging.info(f"Proceeding with audio download: {reason}") # Set up the request headers headers = {} if use_cookies and cookies: try: cookie_dict = json.loads(cookies) headers['Cookie'] = '; '.join([f'{k}={v}' for k, v in cookie_dict.items()]) except json.JSONDecodeError: logging.warning("Invalid cookie format. Proceeding without cookies.") # Make the request response = requests.get(url, headers=headers, stream=True) # Raise an exception for bad status codes response.raise_for_status() # Get the file size file_size = int(response.headers.get('content-length', 0)) if file_size > 500 * 1024 * 1024: # 500 MB limit raise ValueError("File size exceeds the 500MB limit.") # Generate a unique filename file_name = f"audio_{uuid.uuid4().hex[:8]}.mp3" save_path = os.path.join('downloads', file_name) # Ensure the downloads directory exists os.makedirs('downloads', exist_ok=True) # Download the file with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logging.info(f"Audio file downloaded successfully: {save_path}") return save_path except requests.RequestException as e: logging.error(f"Error downloading audio file: {str(e)}") raise except ValueError as e: logging.error(str(e)) raise except Exception as e: logging.error(f"Unexpected error downloading audio file: {str(e)}") raise def process_audio( audio_file_path, num_speakers=2, whisper_model="small.en", custom_prompt_input=None, offset=0, api_name=None, api_key=None, vad_filter=False, rolling_summarization=False, detail_level=0.01, keywords="default,no_keyword_set", chunk_text_by_words=False, max_words=0, chunk_text_by_sentences=False, max_sentences=0, chunk_text_by_paragraphs=False, max_paragraphs=0, chunk_text_by_tokens=False, max_tokens=0 ): try: # Perform transcription audio_file_path, segments = perform_transcription(audio_file_path, offset, whisper_model, vad_filter) if audio_file_path is None or segments is None: logging.error("Process_Audio: Transcription failed or segments not available.") return "Process_Audio: Transcription failed.", None, None, None, None, None logging.debug(f"Process_Audio: Transcription audio_file: {audio_file_path}") logging.debug(f"Process_Audio: Transcription segments: {segments}") transcription_text = {'audio_file': audio_file_path, 'transcription': segments} logging.debug(f"Process_Audio: Transcription text: {transcription_text}") # Save segments to JSON segments_json_path = save_segments_to_json(segments) # Perform summarization summary_text = None if api_name: if rolling_summarization is not None: pass # FIXME rolling summarization # summary_text = rolling_summarize_function( # transcription_text, # detail=detail_level, # api_name=api_name, # api_key=api_key, # custom_prompt=custom_prompt_input, # chunk_by_words=chunk_text_by_words, # max_words=max_words, # chunk_by_sentences=chunk_text_by_sentences, # max_sentences=max_sentences, # chunk_by_paragraphs=chunk_text_by_paragraphs, # max_paragraphs=max_paragraphs, # chunk_by_tokens=chunk_text_by_tokens, # max_tokens=max_tokens # ) else: summary_text = perform_summarization(api_name, segments_json_path, custom_prompt_input, api_key) if summary_text is None: logging.error("Summary text is None. Check summarization function.") summary_file_path = None else: summary_text = 'Summary not available' summary_file_path = None # Save transcription and summary download_path = create_download_directory("Audio_Processing") json_file_path, summary_file_path = save_transcription_and_summary(transcription_text, summary_text, download_path) # Update function call to add_media_to_database so that it properly applies the title, author and file type # Add to database add_media_to_database(None, {'title': 'Audio File', 'author': 'Unknown'}, segments, summary_text, keywords, custom_prompt_input, whisper_model) return transcription_text, summary_text, json_file_path, summary_file_path, None, None except Exception as e: logging.error(f"Error in process_audio: {str(e)}") return str(e), None, None, None, None, None def process_single_audio(audio_file_path, whisper_model, api_name, api_key, keep_original,custom_keywords, source, custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking, use_multi_level_chunking, chunk_language): progress = [] transcription = "" summary = "" def update_progress(message): progress.append(message) return "\n".join(progress) try: # Check file size before processing file_size = os.path.getsize(audio_file_path) if file_size > MAX_FILE_SIZE: update_progress(f"File size ({file_size / (1024 * 1024):.2f} MB) exceeds the maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f} MB. Skipping this file.") return "\n".join(progress), "", "" # Perform transcription update_progress("Starting transcription...") segments = speech_to_text(audio_file_path, whisper_model=whisper_model) transcription = " ".join([segment['Text'] for segment in segments]) update_progress("Audio transcribed successfully.") # Perform summarization if API is provided if api_name and api_key: update_progress("Starting summarization...") summary = perform_summarization(api_name, transcription, "Summarize the following audio transcript", api_key) update_progress("Audio summarized successfully.") else: summary = "No summary available" # Prepare keywords keywords = "audio,transcription" if custom_keywords: keywords += f",{custom_keywords}" # Add to database add_media_with_keywords( url=source, title=os.path.basename(audio_file_path), media_type='audio', content=transcription, keywords=keywords, prompt="Summarize the following audio transcript", summary=summary, transcription_model=whisper_model, author="Unknown", ingestion_date=None # This will use the current date ) update_progress("Audio file added to database successfully.") if not keep_original and source != "Uploaded File": os.remove(audio_file_path) update_progress(f"Temporary file {audio_file_path} removed.") elif keep_original and source != "Uploaded File": update_progress(f"Original audio file kept at: {audio_file_path}") except Exception as e: update_progress(f"Error processing {source}: {str(e)}") transcription = f"Error: {str(e)}" summary = "No summary due to error" return "\n".join(progress), transcription, summary def process_audio_files(audio_urls, audio_file, whisper_model, api_name, api_key, use_cookies, cookies, keep_original, custom_keywords, custom_prompt_input, chunk_method, max_chunk_size, chunk_overlap, use_adaptive_chunking, use_multi_level_chunking, chunk_language, diarize): progress = [] temp_files = [] all_transcriptions = [] all_summaries = [] def update_progress(message): progress.append(message) return "\n".join(progress) def cleanup_files(): for file in temp_files: try: if os.path.exists(file): os.remove(file) update_progress(f"Temporary file {file} removed.") except Exception as e: update_progress(f"Failed to remove temporary file {file}: {str(e)}") def reencode_mp3(mp3_file_path): try: reencoded_mp3_path = mp3_file_path.replace(".mp3", "_reencoded.mp3") subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, '-codec:a', 'libmp3lame', reencoded_mp3_path], check=True) update_progress(f"Re-encoded {mp3_file_path} to {reencoded_mp3_path}.") return reencoded_mp3_path except subprocess.CalledProcessError as e: update_progress(f"Error re-encoding {mp3_file_path}: {str(e)}") raise def convert_mp3_to_wav(mp3_file_path): try: wav_file_path = mp3_file_path.replace(".mp3", ".wav") subprocess.run([ffmpeg_cmd, '-i', mp3_file_path, wav_file_path], check=True) update_progress(f"Converted {mp3_file_path} to {wav_file_path}.") return wav_file_path except subprocess.CalledProcessError as e: update_progress(f"Error converting {mp3_file_path} to WAV: {str(e)}") raise try: # Check and set the ffmpeg command global ffmpeg_cmd if os.name == "nt": logging.debug("Running on Windows") ffmpeg_cmd = os.path.join(os.getcwd(), "Bin", "ffmpeg.exe") else: ffmpeg_cmd = 'ffmpeg' # Assume 'ffmpeg' is in PATH for non-Windows systems # Ensure ffmpeg is accessible if not os.path.exists(ffmpeg_cmd) and os.name == "nt": raise FileNotFoundError(f"ffmpeg executable not found at path: {ffmpeg_cmd}") # Define chunk options early to avoid undefined errors chunk_options = { 'method': chunk_method, 'max_size': max_chunk_size, 'overlap': chunk_overlap, 'adaptive': use_adaptive_chunking, 'multi_level': use_multi_level_chunking, 'language': chunk_language } # Process multiple URLs urls = [url.strip() for url in audio_urls.split('\n') if url.strip()] for i, url in enumerate(urls): update_progress(f"Processing URL {i + 1}/{len(urls)}: {url}") # Download and process audio file audio_file_path = download_audio_file(url, use_cookies, cookies) if not os.path.exists(audio_file_path): update_progress(f"Downloaded file not found: {audio_file_path}") continue temp_files.append(audio_file_path) update_progress("Audio file downloaded successfully.") # Re-encode MP3 to fix potential issues reencoded_mp3_path = reencode_mp3(audio_file_path) if not os.path.exists(reencoded_mp3_path): update_progress(f"Re-encoded file not found: {reencoded_mp3_path}") continue temp_files.append(reencoded_mp3_path) # Convert re-encoded MP3 to WAV wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) if not os.path.exists(wav_file_path): update_progress(f"Converted WAV file not found: {wav_file_path}") continue temp_files.append(wav_file_path) # Initialize transcription transcription = "" # Transcribe audio if diarize: segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=True) else: segments = speech_to_text(wav_file_path, whisper_model=whisper_model) # Handle segments nested under 'segments' key if isinstance(segments, dict) and 'segments' in segments: segments = segments['segments'] if isinstance(segments, list): transcription = " ".join([segment.get('Text', '') for segment in segments]) update_progress("Audio transcribed successfully.") else: update_progress("Unexpected segments format received from speech_to_text.") logging.error(f"Unexpected segments format: {segments}") continue if not transcription.strip(): update_progress("Transcription is empty.") else: # Apply chunking chunked_text = improved_chunking_process(transcription, chunk_options) # Summarize if api_name: try: summary = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) update_progress("Audio summarized successfully.") except Exception as e: logging.error(f"Error during summarization: {str(e)}") summary = "Summary generation failed" else: summary = "No summary available (API not provided)" all_transcriptions.append(transcription) all_summaries.append(summary) # Add to database add_media_with_keywords( url=url, title=os.path.basename(wav_file_path), media_type='audio', content=transcription, keywords=custom_keywords, prompt=custom_prompt_input, summary=summary, transcription_model=whisper_model, author="Unknown", ingestion_date=datetime.now().strftime('%Y-%m-%d') ) update_progress("Audio file processed and added to database.") # Process uploaded file if provided if audio_file: if os.path.getsize(audio_file.name) > MAX_FILE_SIZE: update_progress( f"Uploaded file size exceeds the maximum limit of {MAX_FILE_SIZE / (1024 * 1024):.2f}MB. Skipping this file.") else: # Re-encode MP3 to fix potential issues reencoded_mp3_path = reencode_mp3(audio_file.name) if not os.path.exists(reencoded_mp3_path): update_progress(f"Re-encoded file not found: {reencoded_mp3_path}") return update_progress("Processing failed: Re-encoded file not found"), "", "" temp_files.append(reencoded_mp3_path) # Convert re-encoded MP3 to WAV wav_file_path = convert_mp3_to_wav(reencoded_mp3_path) if not os.path.exists(wav_file_path): update_progress(f"Converted WAV file not found: {wav_file_path}") return update_progress("Processing failed: Converted WAV file not found"), "", "" temp_files.append(wav_file_path) # Initialize transcription transcription = "" if diarize: segments = speech_to_text(wav_file_path, whisper_model=whisper_model, diarize=True) else: segments = speech_to_text(wav_file_path, whisper_model=whisper_model) # Handle segments nested under 'segments' key if isinstance(segments, dict) and 'segments' in segments: segments = segments['segments'] if isinstance(segments, list): transcription = " ".join([segment.get('Text', '') for segment in segments]) else: update_progress("Unexpected segments format received from speech_to_text.") logging.error(f"Unexpected segments format: {segments}") chunked_text = improved_chunking_process(transcription, chunk_options) if api_name and api_key: try: summary = perform_summarization(api_name, chunked_text, custom_prompt_input, api_key) update_progress("Audio summarized successfully.") except Exception as e: logging.error(f"Error during summarization: {str(e)}") summary = "Summary generation failed" else: summary = "No summary available (API not provided)" all_transcriptions.append(transcription) all_summaries.append(summary) add_media_with_keywords( url="Uploaded File", title=os.path.basename(wav_file_path), media_type='audio', content=transcription, keywords=custom_keywords, prompt=custom_prompt_input, summary=summary, transcription_model=whisper_model, author="Unknown", ingestion_date=datetime.now().strftime('%Y-%m-%d') ) update_progress("Uploaded file processed and added to database.") # Final cleanup if not keep_original: cleanup_files() final_progress = update_progress("All processing complete.") final_transcriptions = "\n\n".join(all_transcriptions) final_summaries = "\n\n".join(all_summaries) return final_progress, final_transcriptions, final_summaries except Exception as e: logging.error(f"Error processing audio files: {str(e)}") cleanup_files() return update_progress(f"Processing failed: {str(e)}"), "", "" def download_youtube_audio(url): try: # Determine ffmpeg path based on the operating system. ffmpeg_path = './Bin/ffmpeg.exe' if os.name == 'nt' else 'ffmpeg' # Create a temporary directory with tempfile.TemporaryDirectory() as temp_dir: # Extract information about the video with yt_dlp.YoutubeDL({'quiet': True}) as ydl: info_dict = ydl.extract_info(url, download=False) sanitized_title = sanitize_filename(info_dict['title']) # Setup the temporary filenames temp_video_path = Path(temp_dir) / f"{sanitized_title}_temp.mp4" temp_audio_path = Path(temp_dir) / f"{sanitized_title}.mp3" # Initialize yt-dlp with options for downloading ydl_opts = { 'format': 'bestaudio[ext=m4a]/best[height<=480]', # Prefer best audio, or video up to 480p 'ffmpeg_location': ffmpeg_path, 'outtmpl': str(temp_video_path), 'noplaylist': True, 'quiet': True } # Execute yt-dlp to download the video/audio with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Check if the file exists if not temp_video_path.exists(): raise FileNotFoundError(f"Expected file was not found: {temp_video_path}") # Use ffmpeg to extract audio ffmpeg_command = [ ffmpeg_path, '-i', str(temp_video_path), '-vn', # No video '-acodec', 'libmp3lame', '-b:a', '192k', str(temp_audio_path) ] subprocess.run(ffmpeg_command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Check if the audio file was created if not temp_audio_path.exists(): raise FileNotFoundError(f"Expected audio file was not found: {temp_audio_path}") # Create a persistent directory for the download if it doesn't exist persistent_dir = Path("downloads") persistent_dir.mkdir(exist_ok=True) # Move the file from the temporary directory to the persistent directory persistent_file_path = persistent_dir / f"{sanitized_title}.mp3" os.replace(str(temp_audio_path), str(persistent_file_path)) # Add the file to the list of downloaded files downloaded_files.append(str(persistent_file_path)) return str(persistent_file_path), f"Audio downloaded successfully: {sanitized_title}.mp3" except Exception as e: return None, f"Error downloading audio: {str(e)}" def process_podcast(url, title, author, keywords, custom_prompt, api_name, api_key, whisper_model, keep_original=False, enable_diarization=False, use_cookies=False, cookies=None, chunk_method=None, max_chunk_size=300, chunk_overlap=0, use_adaptive_chunking=False, use_multi_level_chunking=False, chunk_language='english'): progress = [] error_message = "" temp_files = [] def update_progress(message): progress.append(message) return "\n".join(progress) def cleanup_files(): if not keep_original: for file in temp_files: try: if os.path.exists(file): os.remove(file) update_progress(f"Temporary file {file} removed.") except Exception as e: update_progress(f"Failed to remove temporary file {file}: {str(e)}") try: # Download podcast audio_file = download_audio_file(url, use_cookies, cookies) temp_files.append(audio_file) update_progress("Podcast downloaded successfully.") # Extract metadata metadata = extract_metadata(url) title = title or metadata.get('title', 'Unknown Podcast') author = author or metadata.get('uploader', 'Unknown Author') # Format metadata for storage metadata_text = f""" Metadata: Title: {title} Author: {author} Series: {metadata.get('series', 'N/A')} Episode: {metadata.get('episode', 'N/A')} Season: {metadata.get('season', 'N/A')} Upload Date: {metadata.get('upload_date', 'N/A')} Duration: {metadata.get('duration', 'N/A')} seconds Description: {metadata.get('description', 'N/A')} """ # Update keywords new_keywords = [] if metadata.get('series'): new_keywords.append(f"series:{metadata['series']}") if metadata.get('episode'): new_keywords.append(f"episode:{metadata['episode']}") if metadata.get('season'): new_keywords.append(f"season:{metadata['season']}") keywords = f"{keywords},{','.join(new_keywords)}" if keywords else ','.join(new_keywords) update_progress(f"Metadata extracted - Title: {title}, Author: {author}, Keywords: {keywords}") # Transcribe the podcast try: if enable_diarization: segments = speech_to_text(audio_file, whisper_model=whisper_model, diarize=True) else: segments = speech_to_text(audio_file, whisper_model=whisper_model) transcription = " ".join([segment['Text'] for segment in segments]) update_progress("Podcast transcribed successfully.") except Exception as e: error_message = f"Transcription failed: {str(e)}" raise # Apply chunking chunk_options = { 'method': chunk_method, 'max_size': max_chunk_size, 'overlap': chunk_overlap, 'adaptive': use_adaptive_chunking, 'multi_level': use_multi_level_chunking, 'language': chunk_language } chunked_text = improved_chunking_process(transcription, chunk_options) # Combine metadata and transcription full_content = metadata_text + "\n\nTranscription:\n" + transcription # Summarize if API is provided summary = None if api_name and api_key: try: summary = perform_summarization(api_name, chunked_text, custom_prompt, api_key) update_progress("Podcast summarized successfully.") except Exception as e: error_message = f"Summarization failed: {str(e)}" raise # Add to database try: add_media_with_keywords( url=url, title=title, media_type='podcast', content=full_content, keywords=keywords, prompt=custom_prompt, summary=summary or "No summary available", transcription_model=whisper_model, author=author, ingestion_date=datetime.now().strftime('%Y-%m-%d') ) update_progress("Podcast added to database successfully.") except Exception as e: error_message = f"Error adding podcast to database: {str(e)}" raise # Cleanup cleanup_files() return (update_progress("Processing complete."), full_content, summary or "No summary generated.", title, author, keywords, error_message) except Exception as e: logging.error(f"Error processing podcast: {str(e)}") cleanup_files() return update_progress(f"Processing failed: {str(e)}"), "", "", "", "", "", str(e) # # #######################################################################################################################