# Utils.py ######################################### # General Utilities Library # This library is used to hold random utilities used by various other libraries. # #### #################### # Function List # # 1. extract_text_from_segments(segments: List[Dict]) -> str # 2. download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5) # 3. verify_checksum(file_path, expected_checksum) # 4. create_download_directory(title) # 5. sanitize_filename(filename) # 6. normalize_title(title) # 7. # # # #################### # Import necessary libraries import configparser import hashlib import json import logging import os import re import time from datetime import timedelta from urllib.parse import urlparse, parse_qs, urlencode, urlunparse import requests import unicodedata from tqdm import tqdm from App_Function_Libraries.Video_DL_Ingestion_Lib import get_youtube ####################################################################################################################### # Function Definitions # def extract_text_from_segments(segments): logging.debug(f"Segments received: {segments}") logging.debug(f"Type of segments: {type(segments)}") def extract_text_recursive(data): if isinstance(data, dict): for key, value in data.items(): if key == 'Text': return value elif isinstance(value, (dict, list)): result = extract_text_recursive(value) if result: return result elif isinstance(data, list): return ' '.join(filter(None, [extract_text_recursive(item) for item in data])) return None text = extract_text_recursive(segments) if text: return text.strip() else: logging.error(f"Unable to extract text from segments: {segments}") return "Error: Unable to extract transcription" def download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5): temp_path = dest_path + '.tmp' for attempt in range(max_retries): try: # Check if a partial download exists and get its size resume_header = {} if os.path.exists(temp_path): resume_header = {'Range': f'bytes={os.path.getsize(temp_path)}-'} response = requests.get(url, stream=True, headers=resume_header) response.raise_for_status() # Get the total file size from headers total_size = int(response.headers.get('content-length', 0)) initial_pos = os.path.getsize(temp_path) if os.path.exists(temp_path) else 0 mode = 'ab' if 'Range' in response.headers else 'wb' with open(temp_path, mode) as temp_file, tqdm( total=total_size, unit='B', unit_scale=True, desc=dest_path, initial=initial_pos, ascii=True ) as pbar: for chunk in response.iter_content(chunk_size=8192): if chunk: # filter out keep-alive new chunks temp_file.write(chunk) pbar.update(len(chunk)) # Verify the checksum if provided if expected_checksum: if not verify_checksum(temp_path, expected_checksum): os.remove(temp_path) raise ValueError("Downloaded file's checksum does not match the expected checksum") # Move the file to the final destination os.rename(temp_path, dest_path) print("Download complete and verified!") return dest_path except Exception as e: print(f"Attempt {attempt + 1} failed: {e}") if attempt < max_retries - 1: print(f"Retrying in {delay} seconds...") time.sleep(delay) else: print("Max retries reached. Download failed.") raise def verify_checksum(file_path, expected_checksum): sha256_hash = hashlib.sha256() with open(file_path, 'rb') as f: for byte_block in iter(lambda: f.read(4096), b''): sha256_hash.update(byte_block) return sha256_hash.hexdigest() == expected_checksum def create_download_directory(title): base_dir = "Results" # Remove characters that are illegal in Windows filenames and normalize safe_title = normalize_title(title) logging.debug(f"{title} successfully normalized") session_path = os.path.join(base_dir, safe_title) if not os.path.exists(session_path): os.makedirs(session_path, exist_ok=True) logging.debug(f"Created directory for downloaded video: {session_path}") else: logging.debug(f"Directory already exists for downloaded video: {session_path}") return session_path def sanitize_filename(filename): # Remove invalid characters and replace spaces with underscores sanitized = re.sub(r'[<>:"/\\|?*]', '', filename) sanitized = re.sub(r'\s+', ' ', sanitized).strip() return sanitized def normalize_title(title): # Normalize the string to 'NFKD' form and encode to 'ascii' ignoring non-ascii characters title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii') title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?', '').replace( '<', '').replace('>', '').replace('|', '') return title def clean_youtube_url(url): parsed_url = urlparse(url) query_params = parse_qs(parsed_url.query) if 'list' in query_params: query_params.pop('list') cleaned_query = urlencode(query_params, doseq=True) cleaned_url = urlunparse(parsed_url._replace(query=cleaned_query)) return cleaned_url def extract_video_info(url): info_dict = get_youtube(url) title = info_dict.get('title', 'Untitled') return info_dict, title def import_data(file): # Implement this function to import data from a file pass def safe_read_file(file_path): encodings = ['utf-8', 'utf-16', 'ascii', 'latin-1', 'iso-8859-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as file: return file.read() except UnicodeDecodeError: continue except FileNotFoundError: return f"File not found: {file_path}" except Exception as e: return f"An error occurred: {e}" return f"Unable to decode the file {file_path} with any of the attempted encodings: {encodings}" # # ####################### # Temp file cleanup # # Global list to keep track of downloaded files downloaded_files = [] def cleanup_downloads(): """Function to clean up downloaded files when the server exits.""" for file_path in downloaded_files: try: if os.path.exists(file_path): os.remove(file_path) print(f"Cleaned up file: {file_path}") except Exception as e: print(f"Error cleaning up file {file_path}: {e}") # # ####################### # Config loading # def load_comprehensive_config(): # Get the directory of the current script current_dir = os.path.dirname(os.path.abspath(__file__)) # Go up one level to the project root directory project_root = os.path.dirname(current_dir) # Construct the path to the config file in the project root directory config_path = os.path.join(project_root, 'config.txt') # Create a ConfigParser object config = configparser.ConfigParser() # Read the configuration file files_read = config.read(config_path) if not files_read: raise FileNotFoundError(f"Config file not found at {config_path}") return config # FIXME - update to include prompt path in return statement def load_and_log_configs(): try: config = load_comprehensive_config() if config is None: logging.error("Config is None, cannot proceed") return None # API Keys anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None) logging.debug( f"Loaded Anthropic API Key: {anthropic_api_key[:5]}...{anthropic_api_key[-5:] if anthropic_api_key else None}") cohere_api_key = config.get('API', 'cohere_api_key', fallback=None) logging.debug( f"Loaded Cohere API Key: {cohere_api_key[:5]}...{cohere_api_key[-5:] if cohere_api_key else None}") groq_api_key = config.get('API', 'groq_api_key', fallback=None) logging.debug(f"Loaded Groq API Key: {groq_api_key[:5]}...{groq_api_key[-5:] if groq_api_key else None}") openai_api_key = config.get('API', 'openai_api_key', fallback=None) logging.debug( f"Loaded OpenAI API Key: {openai_api_key[:5]}...{openai_api_key[-5:] if openai_api_key else None}") huggingface_api_key = config.get('API', 'huggingface_api_key', fallback=None) logging.debug( f"Loaded HuggingFace API Key: {huggingface_api_key[:5]}...{huggingface_api_key[-5:] if huggingface_api_key else None}") openrouter_api_key = config.get('API', 'openrouter_api_key', fallback=None) logging.debug( f"Loaded OpenRouter API Key: {openrouter_api_key[:5]}...{openrouter_api_key[-5:] if openrouter_api_key else None}") deepseek_api_key = config.get('API', 'deepseek_api_key', fallback=None) logging.debug( f"Loaded DeepSeek API Key: {deepseek_api_key[:5]}...{deepseek_api_key[-5:] if deepseek_api_key else None}") # Models anthropic_model = config.get('API', 'anthropic_model', fallback='claude-3-sonnet-20240229') cohere_model = config.get('API', 'cohere_model', fallback='command-r-plus') groq_model = config.get('API', 'groq_model', fallback='llama3-70b-8192') openai_model = config.get('API', 'openai_model', fallback='gpt-4-turbo') huggingface_model = config.get('API', 'huggingface_model', fallback='CohereForAI/c4ai-command-r-plus') openrouter_model = config.get('API', 'openrouter_model', fallback='microsoft/wizardlm-2-8x22b') deepseek_model = config.get('API', 'deepseek_model', fallback='deepseek-chat') logging.debug(f"Loaded Anthropic Model: {anthropic_model}") logging.debug(f"Loaded Cohere Model: {cohere_model}") logging.debug(f"Loaded Groq Model: {groq_model}") logging.debug(f"Loaded OpenAI Model: {openai_model}") logging.debug(f"Loaded HuggingFace Model: {huggingface_model}") logging.debug(f"Loaded OpenRouter Model: {openrouter_model}") # Local-Models kobold_api_ip = config.get('Local-API', 'kobold_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') kobold_api_key = config.get('Local-API', 'kobold_api_key', fallback='') llama_api_IP = config.get('Local-API', 'llama_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') llama_api_key = config.get('Local-API', 'llama_api_key', fallback='') ooba_api_IP = config.get('Local-API', 'ooba_api_IP', fallback='http://127.0.0.1:5000/v1/chat/completions') ooba_api_key = config.get('Local-API', 'ooba_api_key', fallback='') tabby_api_IP = config.get('Local-API', 'tabby_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') tabby_api_key = config.get('Local-API', 'tabby_api_key', fallback=None) tabby_model = config.get('models', 'tabby_model', fallback=None) vllm_api_url = config.get('Local-API', 'vllm_api_IP', fallback='http://127.0.0.1:500/api/v1/chat/completions') vllm_api_key = config.get('Local-API', 'vllm_api_key', fallback=None) vllm_model = config.get('Local-API', 'vllm_model', fallback=None) ollama_api_url = config.get('Local-API', 'ollama_api_IP', fallback='http://127.0.0.1:11434/api/generate') ollama_api_key = config.get('Local-API', 'ollama_api_key', fallback=None) ollama_model = config.get('Local-API', 'ollama_model', fallback=None) logging.debug(f"Loaded Kobold API IP: {kobold_api_ip}") logging.debug(f"Loaded Llama API IP: {llama_api_IP}") logging.debug(f"Loaded Ooba API IP: {ooba_api_IP}") logging.debug(f"Loaded Tabby API IP: {tabby_api_IP}") logging.debug(f"Loaded VLLM API URL: {vllm_api_url}") # Retrieve output paths from the configuration file output_path = config.get('Paths', 'output_path', fallback='results') logging.debug(f"Output path set to: {output_path}") # Retrieve processing choice from the configuration file processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') logging.debug(f"Processing choice set to: {processing_choice}") # Prompts - FIXME prompt_path = config.get('Prompts', 'prompt_path', fallback='prompts.db') return { 'api_keys': { 'anthropic': anthropic_api_key, 'cohere': cohere_api_key, 'groq': groq_api_key, 'openai': openai_api_key, 'huggingface': huggingface_api_key, 'openrouter': openrouter_api_key, 'deepseek': deepseek_api_key, 'kobold': kobold_api_key, 'llama': llama_api_key, 'ooba': ooba_api_key, 'tabby': tabby_api_key, 'vllm': vllm_api_key, 'ollama': ollama_api_key }, 'models': { 'anthropic': anthropic_model, 'cohere': cohere_model, 'groq': groq_model, 'openai': openai_model, 'huggingface': huggingface_model, 'openrouter': openrouter_model, 'deepseek': deepseek_model, 'vllm': vllm_model, 'tabby': tabby_model, 'ollama': ollama_model }, 'local_api_ip': { 'kobold': kobold_api_ip, 'llama': llama_api_IP, 'ooba': ooba_api_IP, 'tabby': tabby_api_IP, 'vllm': vllm_api_url, 'ollama': ollama_api_url }, 'output_path': output_path, 'processing_choice': processing_choice } except Exception as e: logging.error(f"Error loading config: {str(e)}") return None # Log file # logging.basicConfig(filename='debug-runtime.log', encoding='utf-8', level=logging.DEBUG) def format_metadata_as_text(metadata): if not metadata: return "No metadata available" formatted_text = "Video Metadata:\n" for key, value in metadata.items(): if value is not None: if isinstance(value, list): # Join list items with commas formatted_value = ", ".join(str(item) for item in value) elif key == 'upload_date' and len(str(value)) == 8: # Format date as YYYY-MM-DD formatted_value = f"{value[:4]}-{value[4:6]}-{value[6:]}" elif key in ['view_count', 'like_count']: # Format large numbers with commas formatted_value = f"{value:,}" elif key == 'duration': # Convert seconds to HH:MM:SS format hours, remainder = divmod(value, 3600) minutes, seconds = divmod(remainder, 60) formatted_value = f"{hours:02d}:{minutes:02d}:{seconds:02d}" else: formatted_value = str(value) formatted_text += f"{key.capitalize()}: {formatted_value}\n" return formatted_text.strip() # # Example usage: # example_metadata = { # 'title': 'Sample Video Title', # 'uploader': 'Channel Name', # 'upload_date': '20230615', # 'view_count': 1000000, # 'like_count': 50000, # 'duration': 3725, # 1 hour, 2 minutes, 5 seconds # 'tags': ['tag1', 'tag2', 'tag3'], # 'description': 'This is a sample video description.' # } # # print(format_metadata_as_text(example_metadata)) def convert_to_seconds(time_str): if not time_str: return 0 # If it's already a number, assume it's in seconds if time_str.isdigit(): return int(time_str) # Parse time string in format HH:MM:SS, MM:SS, or SS time_parts = time_str.split(':') if len(time_parts) == 3: return int(timedelta(hours=int(time_parts[0]), minutes=int(time_parts[1]), seconds=int(time_parts[2])).total_seconds()) elif len(time_parts) == 2: return int(timedelta(minutes=int(time_parts[0]), seconds=int(time_parts[1])).total_seconds()) elif len(time_parts) == 1: return int(time_parts[0]) else: raise ValueError(f"Invalid time format: {time_str}") def save_to_file(video_urls, filename): with open(filename, 'w') as file: file.write('\n'.join(video_urls)) print(f"Video URLs saved to {filename}") def save_segments_to_json(segments, file_name="transcription_segments.json"): """ Save transcription segments to a JSON file. Parameters: segments (list): List of transcription segments file_name (str): Name of the JSON file to save (default: "transcription_segments.json") Returns: str: Path to the saved JSON file """ # Ensure the Results directory exists os.makedirs("Results", exist_ok=True) # Full path for the JSON file json_file_path = os.path.join("Results", file_name) # Save segments to JSON file with open(json_file_path, 'w', encoding='utf-8') as json_file: json.dump(segments, json_file, ensure_ascii=False, indent=4) return json_file_path def generate_unique_filename(base_path, base_filename): """Generate a unique filename by appending a counter if necessary.""" filename = base_filename counter = 1 while os.path.exists(os.path.join(base_path, filename)): name, ext = os.path.splitext(base_filename) filename = f"{name}_{counter}{ext}" counter += 1 return filename def generate_unique_identifier(file_path): filename = os.path.basename(file_path) timestamp = int(time.time()) # Generate a hash of the file content hasher = hashlib.md5() with open(file_path, 'rb') as f: buf = f.read() hasher.update(buf) content_hash = hasher.hexdigest()[:8] # Use first 8 characters of the hash return f"local:{timestamp}:{content_hash}:{filename}" # # ####################################################################################################################### # # Backup code # # End of backup code #######################################################################################################################