oceansweep's picture
Upload Utils.py
dda5666 verified
raw
history blame
19.5 kB
# Utils.py
#########################################
# General Utilities Library
# This library is used to hold random utilities used by various other libraries.
#
####
####################
# Function List
#
# 1. extract_text_from_segments(segments: List[Dict]) -> str
# 2. download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5)
# 3. verify_checksum(file_path, expected_checksum)
# 4. create_download_directory(title)
# 5. sanitize_filename(filename)
# 6. normalize_title(title)
# 7.
#
#
#
####################
# Import necessary libraries
import configparser
import hashlib
import json
import logging
import os
import re
import time
from datetime import timedelta
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import requests
import unicodedata
from tqdm import tqdm
from App_Function_Libraries.Video_DL_Ingestion_Lib import get_youtube
#######################################################################################################################
# Function Definitions
#
def extract_text_from_segments(segments):
logging.debug(f"Segments received: {segments}")
logging.debug(f"Type of segments: {type(segments)}")
def extract_text_recursive(data):
if isinstance(data, dict):
for key, value in data.items():
if key == 'Text':
return value
elif isinstance(value, (dict, list)):
result = extract_text_recursive(value)
if result:
return result
elif isinstance(data, list):
return ' '.join(filter(None, [extract_text_recursive(item) for item in data]))
return None
text = extract_text_recursive(segments)
if text:
return text.strip()
else:
logging.error(f"Unable to extract text from segments: {segments}")
return "Error: Unable to extract transcription"
def download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5):
temp_path = dest_path + '.tmp'
for attempt in range(max_retries):
try:
# Check if a partial download exists and get its size
resume_header = {}
if os.path.exists(temp_path):
resume_header = {'Range': f'bytes={os.path.getsize(temp_path)}-'}
response = requests.get(url, stream=True, headers=resume_header)
response.raise_for_status()
# Get the total file size from headers
total_size = int(response.headers.get('content-length', 0))
initial_pos = os.path.getsize(temp_path) if os.path.exists(temp_path) else 0
mode = 'ab' if 'Range' in response.headers else 'wb'
with open(temp_path, mode) as temp_file, tqdm(
total=total_size, unit='B', unit_scale=True, desc=dest_path, initial=initial_pos, ascii=True
) as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
temp_file.write(chunk)
pbar.update(len(chunk))
# Verify the checksum if provided
if expected_checksum:
if not verify_checksum(temp_path, expected_checksum):
os.remove(temp_path)
raise ValueError("Downloaded file's checksum does not match the expected checksum")
# Move the file to the final destination
os.rename(temp_path, dest_path)
print("Download complete and verified!")
return dest_path
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
print("Max retries reached. Download failed.")
raise
def verify_checksum(file_path, expected_checksum):
sha256_hash = hashlib.sha256()
with open(file_path, 'rb') as f:
for byte_block in iter(lambda: f.read(4096), b''):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest() == expected_checksum
def create_download_directory(title):
base_dir = "Results"
# Remove characters that are illegal in Windows filenames and normalize
safe_title = normalize_title(title)
logging.debug(f"{title} successfully normalized")
session_path = os.path.join(base_dir, safe_title)
if not os.path.exists(session_path):
os.makedirs(session_path, exist_ok=True)
logging.debug(f"Created directory for downloaded video: {session_path}")
else:
logging.debug(f"Directory already exists for downloaded video: {session_path}")
return session_path
def sanitize_filename(filename):
# Remove invalid characters and replace spaces with underscores
sanitized = re.sub(r'[<>:"/\\|?*]', '', filename)
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
return sanitized
def normalize_title(title):
# Normalize the string to 'NFKD' form and encode to 'ascii' ignoring non-ascii characters
title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii')
title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?',
'').replace(
'<', '').replace('>', '').replace('|', '')
return title
def clean_youtube_url(url):
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if 'list' in query_params:
query_params.pop('list')
cleaned_query = urlencode(query_params, doseq=True)
cleaned_url = urlunparse(parsed_url._replace(query=cleaned_query))
return cleaned_url
def extract_video_info(url):
info_dict = get_youtube(url)
title = info_dict.get('title', 'Untitled')
return info_dict, title
def import_data(file):
# Implement this function to import data from a file
pass
def safe_read_file(file_path):
encodings = ['utf-8', 'utf-16', 'ascii', 'latin-1', 'iso-8859-1', 'cp1252']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as file:
return file.read()
except UnicodeDecodeError:
continue
except FileNotFoundError:
return f"File not found: {file_path}"
except Exception as e:
return f"An error occurred: {e}"
return f"Unable to decode the file {file_path} with any of the attempted encodings: {encodings}"
#
#
#######################
# Temp file cleanup
#
# Global list to keep track of downloaded files
downloaded_files = []
def cleanup_downloads():
"""Function to clean up downloaded files when the server exits."""
for file_path in downloaded_files:
try:
if os.path.exists(file_path):
os.remove(file_path)
print(f"Cleaned up file: {file_path}")
except Exception as e:
print(f"Error cleaning up file {file_path}: {e}")
#
#
#######################
# Config loading
#
def load_comprehensive_config():
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Go up one level to the project root directory
project_root = os.path.dirname(current_dir)
# Construct the path to the config file in the project root directory
config_path = os.path.join(project_root, 'config.txt')
# Create a ConfigParser object
config = configparser.ConfigParser()
# Read the configuration file
files_read = config.read(config_path)
if not files_read:
raise FileNotFoundError(f"Config file not found at {config_path}")
return config
# FIXME - update to include prompt path in return statement
def load_and_log_configs():
try:
config = load_comprehensive_config()
if config is None:
logging.error("Config is None, cannot proceed")
return None
# API Keys
anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None)
logging.debug(
f"Loaded Anthropic API Key: {anthropic_api_key[:5]}...{anthropic_api_key[-5:] if anthropic_api_key else None}")
cohere_api_key = config.get('API', 'cohere_api_key', fallback=None)
logging.debug(
f"Loaded Cohere API Key: {cohere_api_key[:5]}...{cohere_api_key[-5:] if cohere_api_key else None}")
groq_api_key = config.get('API', 'groq_api_key', fallback=None)
logging.debug(f"Loaded Groq API Key: {groq_api_key[:5]}...{groq_api_key[-5:] if groq_api_key else None}")
openai_api_key = config.get('API', 'openai_api_key', fallback=None)
logging.debug(
f"Loaded OpenAI API Key: {openai_api_key[:5]}...{openai_api_key[-5:] if openai_api_key else None}")
huggingface_api_key = config.get('API', 'huggingface_api_key', fallback=None)
logging.debug(
f"Loaded HuggingFace API Key: {huggingface_api_key[:5]}...{huggingface_api_key[-5:] if huggingface_api_key else None}")
openrouter_api_key = config.get('API', 'openrouter_api_key', fallback=None)
logging.debug(
f"Loaded OpenRouter API Key: {openrouter_api_key[:5]}...{openrouter_api_key[-5:] if openrouter_api_key else None}")
deepseek_api_key = config.get('API', 'deepseek_api_key', fallback=None)
logging.debug(
f"Loaded DeepSeek API Key: {deepseek_api_key[:5]}...{deepseek_api_key[-5:] if deepseek_api_key else None}")
# Models
anthropic_model = config.get('API', 'anthropic_model', fallback='claude-3-sonnet-20240229')
cohere_model = config.get('API', 'cohere_model', fallback='command-r-plus')
groq_model = config.get('API', 'groq_model', fallback='llama3-70b-8192')
openai_model = config.get('API', 'openai_model', fallback='gpt-4-turbo')
huggingface_model = config.get('API', 'huggingface_model', fallback='CohereForAI/c4ai-command-r-plus')
openrouter_model = config.get('API', 'openrouter_model', fallback='microsoft/wizardlm-2-8x22b')
deepseek_model = config.get('API', 'deepseek_model', fallback='deepseek-chat')
logging.debug(f"Loaded Anthropic Model: {anthropic_model}")
logging.debug(f"Loaded Cohere Model: {cohere_model}")
logging.debug(f"Loaded Groq Model: {groq_model}")
logging.debug(f"Loaded OpenAI Model: {openai_model}")
logging.debug(f"Loaded HuggingFace Model: {huggingface_model}")
logging.debug(f"Loaded OpenRouter Model: {openrouter_model}")
# Local-Models
kobold_api_ip = config.get('Local-API', 'kobold_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate')
kobold_api_key = config.get('Local-API', 'kobold_api_key', fallback='')
llama_api_IP = config.get('Local-API', 'llama_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions')
llama_api_key = config.get('Local-API', 'llama_api_key', fallback='')
ooba_api_IP = config.get('Local-API', 'ooba_api_IP', fallback='http://127.0.0.1:5000/v1/chat/completions')
ooba_api_key = config.get('Local-API', 'ooba_api_key', fallback='')
tabby_api_IP = config.get('Local-API', 'tabby_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate')
tabby_api_key = config.get('Local-API', 'tabby_api_key', fallback=None)
tabby_model = config.get('models', 'tabby_model', fallback=None)
vllm_api_url = config.get('Local-API', 'vllm_api_IP', fallback='http://127.0.0.1:500/api/v1/chat/completions')
vllm_api_key = config.get('Local-API', 'vllm_api_key', fallback=None)
vllm_model = config.get('Local-API', 'vllm_model', fallback=None)
ollama_api_url = config.get('Local-API', 'ollama_api_IP', fallback='http://127.0.0.1:11434/api/generate')
ollama_api_key = config.get('Local-API', 'ollama_api_key', fallback=None)
ollama_model = config.get('Local-API', 'ollama_model', fallback=None)
logging.debug(f"Loaded Kobold API IP: {kobold_api_ip}")
logging.debug(f"Loaded Llama API IP: {llama_api_IP}")
logging.debug(f"Loaded Ooba API IP: {ooba_api_IP}")
logging.debug(f"Loaded Tabby API IP: {tabby_api_IP}")
logging.debug(f"Loaded VLLM API URL: {vllm_api_url}")
# Retrieve output paths from the configuration file
output_path = config.get('Paths', 'output_path', fallback='results')
logging.debug(f"Output path set to: {output_path}")
# Retrieve processing choice from the configuration file
processing_choice = config.get('Processing', 'processing_choice', fallback='cpu')
logging.debug(f"Processing choice set to: {processing_choice}")
# Prompts - FIXME
prompt_path = config.get('Prompts', 'prompt_path', fallback='prompts.db')
return {
'api_keys': {
'anthropic': anthropic_api_key,
'cohere': cohere_api_key,
'groq': groq_api_key,
'openai': openai_api_key,
'huggingface': huggingface_api_key,
'openrouter': openrouter_api_key,
'deepseek': deepseek_api_key,
'kobold': kobold_api_key,
'llama': llama_api_key,
'ooba': ooba_api_key,
'tabby': tabby_api_key,
'vllm': vllm_api_key,
'ollama': ollama_api_key
},
'models': {
'anthropic': anthropic_model,
'cohere': cohere_model,
'groq': groq_model,
'openai': openai_model,
'huggingface': huggingface_model,
'openrouter': openrouter_model,
'deepseek': deepseek_model,
'vllm': vllm_model,
'tabby': tabby_model,
'ollama': ollama_model
},
'local_api_ip': {
'kobold': kobold_api_ip,
'llama': llama_api_IP,
'ooba': ooba_api_IP,
'tabby': tabby_api_IP,
'vllm': vllm_api_url,
'ollama': ollama_api_url
},
'output_path': output_path,
'processing_choice': processing_choice
}
except Exception as e:
logging.error(f"Error loading config: {str(e)}")
return None
# Log file
# logging.basicConfig(filename='debug-runtime.log', encoding='utf-8', level=logging.DEBUG)
def format_metadata_as_text(metadata):
if not metadata:
return "No metadata available"
formatted_text = "Video Metadata:\n"
for key, value in metadata.items():
if value is not None:
if isinstance(value, list):
# Join list items with commas
formatted_value = ", ".join(str(item) for item in value)
elif key == 'upload_date' and len(str(value)) == 8:
# Format date as YYYY-MM-DD
formatted_value = f"{value[:4]}-{value[4:6]}-{value[6:]}"
elif key in ['view_count', 'like_count']:
# Format large numbers with commas
formatted_value = f"{value:,}"
elif key == 'duration':
# Convert seconds to HH:MM:SS format
hours, remainder = divmod(value, 3600)
minutes, seconds = divmod(remainder, 60)
formatted_value = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
formatted_value = str(value)
formatted_text += f"{key.capitalize()}: {formatted_value}\n"
return formatted_text.strip()
# # Example usage:
# example_metadata = {
# 'title': 'Sample Video Title',
# 'uploader': 'Channel Name',
# 'upload_date': '20230615',
# 'view_count': 1000000,
# 'like_count': 50000,
# 'duration': 3725, # 1 hour, 2 minutes, 5 seconds
# 'tags': ['tag1', 'tag2', 'tag3'],
# 'description': 'This is a sample video description.'
# }
#
# print(format_metadata_as_text(example_metadata))
def convert_to_seconds(time_str):
if not time_str:
return 0
# If it's already a number, assume it's in seconds
if time_str.isdigit():
return int(time_str)
# Parse time string in format HH:MM:SS, MM:SS, or SS
time_parts = time_str.split(':')
if len(time_parts) == 3:
return int(timedelta(hours=int(time_parts[0]),
minutes=int(time_parts[1]),
seconds=int(time_parts[2])).total_seconds())
elif len(time_parts) == 2:
return int(timedelta(minutes=int(time_parts[0]),
seconds=int(time_parts[1])).total_seconds())
elif len(time_parts) == 1:
return int(time_parts[0])
else:
raise ValueError(f"Invalid time format: {time_str}")
def save_to_file(video_urls, filename):
with open(filename, 'w') as file:
file.write('\n'.join(video_urls))
print(f"Video URLs saved to {filename}")
def save_segments_to_json(segments, file_name="transcription_segments.json"):
"""
Save transcription segments to a JSON file.
Parameters:
segments (list): List of transcription segments
file_name (str): Name of the JSON file to save (default: "transcription_segments.json")
Returns:
str: Path to the saved JSON file
"""
# Ensure the Results directory exists
os.makedirs("Results", exist_ok=True)
# Full path for the JSON file
json_file_path = os.path.join("Results", file_name)
# Save segments to JSON file
with open(json_file_path, 'w', encoding='utf-8') as json_file:
json.dump(segments, json_file, ensure_ascii=False, indent=4)
return json_file_path
def generate_unique_filename(base_path, base_filename):
"""Generate a unique filename by appending a counter if necessary."""
filename = base_filename
counter = 1
while os.path.exists(os.path.join(base_path, filename)):
name, ext = os.path.splitext(base_filename)
filename = f"{name}_{counter}{ext}"
counter += 1
return filename
def generate_unique_identifier(file_path):
filename = os.path.basename(file_path)
timestamp = int(time.time())
# Generate a hash of the file content
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
buf = f.read()
hasher.update(buf)
content_hash = hasher.hexdigest()[:8] # Use first 8 characters of the hash
return f"local:{timestamp}:{content_hash}:{filename}"
#
#
#######################################################################################################################
#
# Backup code
#
# End of backup code
#######################################################################################################################