import io import json import os import time import uuid import base64 from datetime import datetime from io import BytesIO from tempfile import NamedTemporaryFile from xmlrpc.client import Binary import jwt import threading import tempfile # Add this import at the top of your file import numpy as np import requests import scipy import soundfile as sf import streamlit as st import streamlit_vertical_slider as svs from pydub import AudioSegment from scipy.signal import butter, sosfilt from streamlit import session_state as st_state from woocommerce import API from wordpress_xmlrpc import Client from wordpress_xmlrpc.compat import xmlrpc_client from wordpress_xmlrpc.methods import media import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # Add enhanced CSS to block download buttons in audio players st.markdown(""" """, unsafe_allow_html=True) # Try to get API_URL from environment variables, if not found set to a default value try: API_URL = os.environ["API_URL"] except KeyError: st.error("API_URL environment variable is not set.") st.stop() # Try to get the Bearer token from environment variables, if not found set to a default value try: BEARER_TOKEN = os.environ["BEARER_TOKEN"] except KeyError: st.error("BEARER_TOKEN environment variable is not set.") st.stop() page_bg_img = ''' ''' st.markdown(page_bg_img, unsafe_allow_html=True) # Ensure session state keys are initialized if "jwt_token" not in st.session_state: st.session_state["jwt_token"] = None if "session_id" not in st.session_state: st.session_state["session_id"] = str(uuid.uuid4()) if 'login_needed' not in st.session_state: st.session_state['login_needed'] = False if 'generate_audio_params' not in st.session_state: st.session_state['generate_audio_params'] = None if 'audio_bytes' not in st.session_state: st.session_state['audio_bytes'] = None if 'playing_state' not in st.session_state: st.session_state['playing_state'] = 'none' # 'none', 'generated', 'processed' if 'audio_metadata' not in st.session_state: st.session_state['audio_metadata'] = None if 'login_refresh_needed' not in st.session_state: st.session_state['login_refresh_needed'] = False if 'promo_code_applied' not in st.session_state: st.session_state['promo_code_applied'] = False # Initialize session state variables for audio processing if "vocal_audio" not in st_state: st_state.vocal_audio = None if "vocal_sample_rate" not in st_state: st_state.vocal_sample_rate = None if "audio" not in st_state: st_state.audio = None if "audio_pydub" not in st_state: st_state.audio_pydub = None if "audio_sample_rate" not in st_state: st_state.audio_sample_rate = None if "augmented_audio" not in st_state: st_state.augmented_audio = None if "augmented_audio_pydub" not in st_state: st_state.augmented_audio_pydub = None if "augmented_audio_sample_rate" not in st_state: st_state.augmented_audio_sample_rate = None def secure_audio_player(audio_bytes, format="audio/wav"): """ Enhanced secure audio player that completely blocks downloads Parameters: audio_bytes: Audio data as bytes format: Audio format mimetype """ if audio_bytes is None: return # Encode to base64 b64 = base64.b64encode(audio_bytes).decode() # Custom HTML with additional JavaScript security custom_html = f"""
""" st.markdown(custom_html, unsafe_allow_html=True) def get_api_headers(): return { "Authorization": f"Bearer {BEARER_TOKEN}", "Content-Type": "application/json", } def get_auth_headers(): jwt_token = st.session_state.get("jwt_token") if jwt_token: return { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json", "Cache-Control": "no-store", } else: return { "Content-Type": "application/json", "Cache-Control": "no-store", } def get_current_subscription_status(): """Get current user's subscription status with improved debugging""" if st.session_state.get("jwt_token"): auth_headers = get_auth_headers() subscription_url = "https://songlabai.com/wp-json/custom-api/v1/subscription" try: print(f"Fetching subscription status from: {subscription_url}") subscription_response = requests.get(subscription_url, headers=auth_headers) if subscription_response.status_code == 200: subscription_data = subscription_response.json() print(f"Subscription response: {subscription_data}") # Convert subscription_plan_id to string for comparison subscription_plan_id = subscription_data.get("subscription_plan_id") # Handle both numeric and string plan IDs if subscription_plan_id is not None: if isinstance(subscription_plan_id, int): subscription_plan_id = str(subscription_plan_id) # CORRECTED PLAN IDs: # 576: Free tier # 2397: Tier 1 ($24.99/month) # 305: Tier 2 ($99.99/month) # 306: Tier 3 ($269.97/month) # Define premium plan IDs (any non-free plan) premium_plan_ids = ["2397", "305", "306"] # Log the plan ID for debugging print(f"Checking plan ID: {subscription_plan_id}, type: {type(subscription_plan_id)}") print(f"Is premium: {subscription_plan_id in premium_plan_ids}") is_free_tier = ( subscription_plan_id == "576" or subscription_plan_id not in premium_plan_ids or subscription_data.get("status") == "no_subscription" ) else: is_free_tier = True return { 'subscription_plan_id': subscription_plan_id, 'is_free_tier': is_free_tier, 'has_exceeded_download_limit': subscription_data.get("has_exceeded_download_limit", False), 'has_exceeded_generation_limit': subscription_data.get("has_exceeded_generation_limit", False) } else: print(f"Error fetching subscription: HTTP {subscription_response.status_code}") print(f"Response: {subscription_response.text}") except Exception as e: print(f"Exception in get_current_subscription_status: {e}") import traceback print(traceback.format_exc()) # Default to free tier if not logged in or error occurs return { 'subscription_plan_id': None, 'is_free_tier': True, 'has_exceeded_download_limit': False, 'has_exceeded_generation_limit': False } def get_usage_counts(): """Get current user's usage counts""" if st.session_state.get("jwt_token"): auth_headers = get_auth_headers() usage_url = "https://songlabai.com/wp-json/custom-api/v1/usage-counts" try: print(f"Fetching usage counts from: {usage_url}") usage_response = requests.get(usage_url, headers=auth_headers) if usage_response.status_code == 200: usage_data = usage_response.json() print(f"Usage counts response: {usage_data}") return { 'generation_count': usage_data.get('generation_count', 0), 'download_count': usage_data.get('download_count', 0), 'total_generation_count': usage_data.get('total_generation_count', 0), 'total_download_count': usage_data.get('total_download_count', 0) } else: print(f"Error fetching usage counts: HTTP {usage_response.status_code}") print(f"Response: {usage_response.text}") except Exception as e: print(f"Exception in get_usage_counts: {e}") # Default values if not logged in or error occurs return { 'generation_count': 0, 'download_count': 0, 'total_generation_count': 0, 'total_download_count': 0 } def show_sidebar(): """Display sidebar with login/user info and subscription details""" with st.sidebar: st.image("https://songlabai.com/wp-content/uploads/2024/03/songlabAI-logo-white-1024x267.png", width=200) # Display login section if not logged in if not st.session_state.get("jwt_token"): st.markdown("### 👤 User Login") with st.form("sidebar_login_form"): username = st.text_input("Username", key="sidebar_username") password = st.text_input("Password", type="password", key="sidebar_password") submit_login = st.form_submit_button("Log In") if submit_login and username and password: with st.spinner("Logging in..."): login_url = "https://songlabai.com/wp-json/jwt-auth/v1/token" data = { "username": username, "password": password } response = requests.post(login_url, data=data) if response.status_code == 200 and 'token' in response.json(): result = response.json() st.session_state["jwt_token"] = result["token"] st.session_state["session_id"] = str(uuid.uuid4()) # Set flag to refresh generate button st.session_state["login_refresh_needed"] = True st.success("✅ Logged in successfully!") st.rerun() else: st.error("❌ Invalid username or password") st.markdown("Don't have an account? [Register here](https://songlabai.com/register-music-generated-by-ai/)") # If logged in, display user info and logout option else: try: # Display user information decoded_token = jwt.decode( st.session_state["jwt_token"], options={"verify_signature": False} ) user_info = decoded_token.get('data', {}).get('user', {}) st.markdown(f"### 👋 Welcome, {user_info.get('display_name', 'User')}!") # Get subscription status and usage information subscription_status = get_current_subscription_status() usage_counts = get_usage_counts() # Identify subscription tier based on plan ID subscription_tier = "Free Tier" if not subscription_status['is_free_tier']: plan_id = subscription_status.get('subscription_plan_id') # Convert to string for comparison if needed if isinstance(plan_id, int): plan_id = str(plan_id) # Debug output to help troubleshoot plan ID issues print(f"DEBUG: User subscription plan_id = {plan_id!r}") # CORRECTED PLAN IDs MAPPING: if plan_id == "2397": subscription_tier = "Tier 1 ($24.99/month)" elif plan_id == "305": subscription_tier = "Tier 2 ($99.99/month)" elif plan_id == "306": subscription_tier = "Tier 3 ($269.97/month)" else: subscription_tier = f"Premium Plan (ID: {plan_id})" # Create subscription info display st.markdown("### 📊 Your Subscription") st.info(f"**Current Plan:** {subscription_tier}") # Show appropriate limits based on subscription if subscription_status['is_free_tier']: # Display for free tier - uses total counts total_gen_count = usage_counts.get('total_generation_count', 0) st.markdown("#### Usage Limits") st.markdown(f"- đŸŽĩ **Generations:** {total_gen_count}/3 total (Free tier)") st.markdown("- đŸ“Ĩ **Downloads:** Not available") st.markdown("- ⏱ī¸ **Duration:** Max 30 seconds") # Show upgrade button prominently for free users st.markdown("### 🚀 Upgrade Your Plan") st.markdown("Get more generations, downloads, and longer tracks!") if st.button("Upgrade Now", type="primary"): st.markdown("[Click here to view plans](https://songlabai.com/subscribe/)") else: # Format for premium users - CORRECTED LIMITS PER TIER daily_limit = 0 download_limit = 0 duration_limit = 140 plan_id = subscription_status.get('subscription_plan_id') if isinstance(plan_id, int): plan_id = str(plan_id) if plan_id == "2397": # Tier 1 ($24.99) daily_limit = 3 download_limit = 1 elif plan_id == "305": # Tier 2 ($99.99) daily_limit = 5 download_limit = 5 elif plan_id == "306": # Tier 3 ($269.97) daily_limit = 20 download_limit = 15 # Get current usage counts daily_gen_count = usage_counts.get('generation_count', 0) monthly_dl_count = usage_counts.get('download_count', 0) # Display premium user limits with actual counts st.markdown("#### 📊 Your Usage") st.markdown(f"- đŸŽĩ **Generations:** {daily_gen_count}/{daily_limit} per day") st.markdown(f"- đŸ“Ĩ **Downloads:** {monthly_dl_count}/{download_limit} per month") st.markdown(f"- ⏱ī¸ **Duration:** Up to {duration_limit} seconds") # Show warning if limits are close to being reached if subscription_status['has_exceeded_generation_limit']: st.warning("⚠ī¸ You've reached your daily generation limit") elif subscription_status['has_exceeded_download_limit']: st.warning("⚠ī¸ You've reached your monthly download limit") # Logout button if st.button("Log Out", key="sidebar_logout"): st.session_state.clear() st.success("Logged out successfully!") st.rerun() except Exception as e: st.error(f"Error displaying user info: {str(e)}") st.code(traceback.format_exc()) # Show stack trace for debugging if st.button("Log Out", key="sidebar_error_logout"): st.session_state.clear() st.rerun() # Help and support section st.markdown("### ℹī¸ Need Help?") with st.expander("Tips & FAQs"): st.markdown(""" **Generation Tips:** - Use detailed descriptions - Experiment with different tempos - Try various genres for unique sounds **Common Issues:** - Server warmup may take several minutes - Free tier has limited features - Some effects work better with certain genres **Contact Support:** For technical issues, email support@songlabai.com """) def wp_create_nonce(action=''): """Generate a WordPress-like nonce for REST API requests""" import hashlib import time # Simple nonce generation based on current timestamp timestamp = int(time.time()) data = f"{action}|{timestamp}" return hashlib.md5(data.encode()).hexdigest() def direct_upload_to_wordpress(audio_bytes, user_id, metadata=None): """ Improved function to upload audio to WordPress with better error handling Parameters: audio_bytes: The audio data as bytes user_id: User ID for tracking metadata: Optional dictionary with track metadata (genre, energy, tempo, etc.) Returns: tuple: (file_url, file_id) """ try: # Generate a unique filename filename = f"songlab_track_{datetime.now().timestamp()}.wav" # Get user info from JWT token user_email = "" user_name = "" subscription_tier = "" if st.session_state.get("jwt_token"): try: decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False}) user_data = decoded.get('data', {}).get('user', {}) user_email = user_data.get('user_email', '') user_name = user_data.get('display_name', '') # Get subscription info subscription_status = get_current_subscription_status() if not subscription_status['is_free_tier']: plan_id = subscription_status.get('subscription_plan_id') if plan_id == "2397": subscription_tier = "Tier 1 ($24.99)" elif plan_id == "305": subscription_tier = "Tier 2 ($99.99)" elif plan_id == "306": subscription_tier = "Tier 3 ($269.97)" else: subscription_tier = "Free Tier" except Exception as e: print(f"Error decoding JWT: {str(e)}") # Get auth headers with JWT token auth_headers = get_auth_headers() # Remove Content-Type from headers for file upload upload_headers = auth_headers.copy() if 'Content-Type' in upload_headers: del upload_headers['Content-Type'] # Try the direct upload AJAX endpoint first try: print("Trying direct AJAX upload endpoint (most reliable)") ajax_url = "https://songlabai.com/wp-admin/admin-ajax.php?action=upload_audio_direct" # Create files dict for upload files = { 'audio_file': (filename, audio_bytes, 'audio/wav') } # Create form data form_data = { 'user_id': str(user_id), 'user_email': user_email, 'user_name': user_name, 'subscription_tier': subscription_tier } # Add metadata to form data if available if metadata: for key, value in metadata.items(): if value is not None: form_data[key] = str(value) # Send the request response = requests.post( ajax_url, headers=upload_headers, files=files, data=form_data ) print(f"AJAX upload response: HTTP {response.status_code}") if response.status_code == 200: try: response_data = response.json() if response_data.get('success'): file_url = response_data.get('data', {}).get('file_url', '#') file_id = response_data.get('data', {}).get('attachment_id', 'N/A') notification_id = response_data.get('data', {}).get('notification_id', 'N/A') print(f"AJAX upload success! File URL: {file_url}, ID: {file_id}, Notification ID: {notification_id}") return file_url, file_id else: print(f"AJAX upload reported failure: {response_data.get('message', 'Unknown error')}") except Exception as e: print(f"Error parsing AJAX response: {str(e)}") else: print(f"AJAX upload failed with status {response.status_code}: {response.text}") except Exception as e: print(f"AJAX upload exception: {str(e)}") import traceback print(traceback.format_exc()) # Try the REST API v2 endpoint next try: print("Trying REST API v2 endpoint") v2_url = "https://songlabai.com/wp-json/songlab/v2/upload-audio" # Create files dict for upload files = { 'file': (filename, audio_bytes, 'audio/wav') } # Create form data form_data = { 'user_id': str(user_id), 'user_email': user_email, 'user_name': user_name, 'subscription_tier': subscription_tier } # Add metadata to form data if available if metadata: for key, value in metadata.items(): if value is not None: form_data[key] = str(value) # Send the request response = requests.post( v2_url, headers=upload_headers, files=files, data=form_data ) print(f"v2 API response: HTTP {response.status_code}") if response.status_code == 200: response_data = response.json() if response_data.get('success'): file_url = response_data.get('file_url', '#') file_id = response_data.get('file_id', 'N/A') print(f"v2 API success! File URL: {file_url}, ID: {file_id}") return file_url, file_id else: print(f"v2 API error: {response_data.get('message', 'Unknown error')}") else: print(f"v2 API failed with status {response.status_code}: {response.text}") except Exception as e: print(f"v2 API exception: {str(e)}") # No other methods worked, create a notification with placeholder values try: print("All direct upload methods failed, creating notification with placeholders") notification_url = "https://songlabai.com/wp-json/songlab/v1/notify" notification_payload = { "link": "#", "product_code": "pending-upload-" + datetime.now().strftime('%Y%m%d%H%M%S'), "user_id": user_id } notify_response = requests.post( notification_url, headers=auth_headers, json=notification_payload ) print(f"Placeholder notification response: HTTP {notify_response.status_code}") print(f"Response text: {notify_response.text}") try: notify_data = notify_response.json() if notify_data.get('success'): # Return placeholder values but log that we submitted a notification print("Placeholder notification created successfully") # Try to extract notification ID notification_id = notify_data.get('data', {}).get('id', 'N/A') if notification_id != 'N/A': print(f"Notification ID: {notification_id}") except: pass except Exception as e: print(f"Placeholder notification exception: {str(e)}") # Return placeholder values print("All upload methods failed, returning placeholder values") return "#", "N/A" except Exception as e: import traceback print(f"Critical error in direct_upload_to_wordpress: {str(e)}") print(traceback.format_exc()) return "#", "N/A" def save_track_to_dashboard(audio_array, sample_rate, metadata=None): """ Save the current track to the user's dashboard with improved error handling Parameters: audio_array: The audio data as a numpy array sample_rate: The sample rate of the audio metadata: Optional dictionary with track metadata (genre, energy, tempo, etc.) Returns: bool: True if successful, False otherwise """ if not st.session_state.get("jwt_token"): st.error("❌ You must be logged in to save tracks to your dashboard.") return False try: # Check subscription status subscription_status = get_current_subscription_status() if subscription_status['is_free_tier']: st.warning("⚠ī¸ Saving tracks to dashboard requires a premium subscription.") st.info("💡 Upgrade to a premium plan to save tracks!\n\n" + "👉 [Upgrade Now](https://songlabai.com/subscribe/)") return False # Get user ID from JWT token user_id = None try: decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False}) user_id = decoded.get('data', {}).get('user', {}).get('id') print(f"Saving track for user ID: {user_id}") except Exception as e: print(f"Error decoding JWT: {str(e)}") st.error("Failed to identify user. Please try logging out and back in.") return False if not user_id: st.error("Could not identify user ID from login token.") return False # Convert the audio to WAV format wav_bytes = BytesIO() sf.write(wav_bytes, audio_array, samplerate=sample_rate, format='WAV') wav_bytes.seek(0) # Upload file to WordPress using the improved function with st.spinner("💾 Uploading track to the server..."): file_url, product_id = direct_upload_to_wordpress(wav_bytes.getvalue(), user_id, metadata) if file_url != "#" and product_id != "N/A": st.success("✅ Track saved to your dashboard successfully!") st.info("Your track will be reviewed by our team. If approved, it will be published and you'll earn 70% of all sales revenue.") return True else: # Even if the direct upload failed, we would have created a notification entry st.warning("⚠ī¸ Track has been saved with limited functionality. Our team will contact you if needed.") return True except Exception as e: import traceback print(f"Error saving track to dashboard: {str(e)}") print(traceback.format_exc()) st.error(f"❌ Error saving track to dashboard: {str(e)}") return False def display_download_button(audio_bytes, is_premium_user=False, has_exceeded_limit=False, location="main"): """ Display download button based on subscription status Parameters: audio_bytes: Audio data as bytes is_premium_user: Whether user has premium subscription has_exceeded_limit: Whether user has exceeded download limits location: Location identifier for unique key generation for buttons """ if audio_bytes is None: return # Show download button only for eligible premium users if is_premium_user and not has_exceeded_limit: st.download_button( label="âŦ‡ī¸ Download Audio (Premium Feature)", data=audio_bytes, file_name=f"songlab_audio_{location}.wav", mime="audio/wav", on_click=update_download_count, key=f"download_btn_{location}" ) st.success("✅ Your premium subscription allows you to download this track!", icon="✅") # For premium users who exceeded limits elif is_premium_user: st.warning("⚠ī¸ You've reached your download limit for this period. Your limits will reset automatically.", icon="⚠ī¸") # For free users else: st.info("⭐ **Premium Feature**: Downloading audio files requires a subscription. " "[Upgrade Now](https://songlabai.com/subscribe/)", icon="ℹī¸") # Update the display_audio_player function to include save to dashboard option def display_audio_player(is_final=False): """ Centralized function to display the appropriate audio player based on the current state with added save to dashboard functionality Parameters: is_final: Whether this is the final processed audio or the initial generated audio """ # Get subscription status subscription_status = get_current_subscription_status() is_premium = not subscription_status['is_free_tier'] has_exceeded_limit = subscription_status['has_exceeded_download_limit'] if is_final and st_state.augmented_audio_pydub is not None: # Display processed audio audio_bytes = st_state.augmented_audio_pydub.export(format="wav").read() st.markdown("### 🎧 Final Processed Audio") secure_audio_player(audio_bytes) # Display columns for download and save buttons col1, col2 = st.columns(2) with col1: # Show download button for final audio display_download_button( audio_bytes, is_premium_user=is_premium, has_exceeded_limit=has_exceeded_limit, location="final" ) with col2: # Add Save to Dashboard button for premium users if st.session_state.get("jwt_token") and is_premium: if st.button("💾 Save to Dashboard", key="save_processed_btn"): # Get audio array from audio bytes with BytesIO(audio_bytes) as buf: processed_audio_array, processed_sample_rate = sf.read(buf) # Get metadata from session state and add processing info metadata = st.session_state.get('audio_metadata', {}).copy() metadata['processed'] = True metadata['processing_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Save to dashboard save_track_to_dashboard(processed_audio_array, processed_sample_rate, metadata) elif st.session_state.get("jwt_token"): # User is logged in but not premium st.info("⭐ **Premium Feature**: Saving tracks requires a subscription. " "[Upgrade Now](https://songlabai.com/subscribe/)") else: # User is not logged in st.info("👤 **Log in** using the sidebar to save tracks to your dashboard") elif not is_final and st.session_state.get('audio_bytes') is not None: # Display initial generated audio st.markdown("### đŸŽĩ Generated Audio") secure_audio_player(st.session_state.audio_bytes) # Display columns for download and save buttons col1, col2 = st.columns(2) with col1: display_download_button( st.session_state.audio_bytes, is_premium_user=is_premium, has_exceeded_limit=has_exceeded_limit, location="generated" ) with col2: # Add Save to Dashboard button for premium users if st.session_state.get("jwt_token") and is_premium: if st.button("💾 Save to Dashboard", key="save_generated_btn"): # Get audio array with BytesIO(st.session_state.audio_bytes) as buf: generated_audio_array, generated_sample_rate = sf.read(buf) # Get metadata metadata = st.session_state.get('audio_metadata', {}) # Save to dashboard save_track_to_dashboard(generated_audio_array, generated_sample_rate, metadata) elif st.session_state.get("jwt_token"): # User is logged in but not premium st.info("⭐ **Premium Feature**: Saving tracks requires a subscription. " "[Upgrade Now](https://songlabai.com/subscribe/)") else: # User is not logged in st.info("👤 **Log in** using the sidebar to save tracks to your dashboard") # Show track information if available if st.session_state.get('audio_metadata'): meta = st.session_state.audio_metadata st.markdown("### đŸŽĩ Track Information") st.markdown(f"**Genre:** {meta.get('genre', 'Not specified')}") st.markdown(f"**Energy:** {meta.get('energy_level', 'Not specified')}") st.markdown(f"**Tempo:** {meta.get('tempo', 'Not specified')} BPM") st.markdown(f"**Duration:** {meta.get('duration', 'Not specified')} seconds") if meta.get('description'): st.markdown(f"**Description:** {meta.get('description')}") # Enhanced function to upload to WordPress def enhanced_save_to_wordpress(audio_array, sample_rate): """ Enhanced function to upload audio to WordPress with better error handling and debugging information """ # Convert audio_array to float32 if not already audio_array = audio_array.astype(np.float32) # Save the audio to a BytesIO buffer wav_bytes = BytesIO() sf.write(wav_bytes, audio_array, samplerate=sample_rate, format='WAV') wav_bytes.seek(0) # Define your WordPress site URL and authentication credentials wordpress_url = "https://songlabai.com/xmlrpc.php" woocommerce_url = "https://songlabai.com" consumer_key = "ck_93d516ba12289a6fd0eced56bbc0b05ecbf98735" consumer_secret = "cs_9d5eb716d631db408a4c47796b5d18b0313d8559" username = "admin_h2ibbgql" password = "Pressabc1!" # Authenticate with WordPress XML-RPC API title = f"generated_audio_{datetime.now().timestamp()}.wav" file_data = { "name": title, "type": "audio/x-wav", "bits": xmlrpc_client.Binary(wav_bytes.getvalue()), } print(f"Uploading file to WordPress: {title}") wp_client = Client(wordpress_url, username, password) for attempt in range(4): try: print(f"Attempt {attempt+1} to upload file to WordPress") # Upload the file to WordPress Media Library media_response = wp_client.call(media.UploadFile(file_data)) # Handle the response if media_response: print(f"File successfully uploaded to WordPress with attachment ID: {media_response}") print(f"File URL: {media_response.get('url', 'Unknown')}") # Create product data for WooCommerce product_data = { "status": "pending", "name": title, "type": "simple", "regular_price": "1.00", # Set the price as needed "sku": str(uuid.uuid4()), "downloadable": True, "download_limit": -1, "download_expiry": -1, } # Authenticate with WooCommerce API wc_api = API( url=woocommerce_url, consumer_key=consumer_key, consumer_secret=consumer_secret, version="wc/v3", ) # Create the product print("Creating product in WooCommerce") response = wc_api.post("products", product_data) # Handle the response if response.status_code == 201: print(f"Product successfully created in WooCommerce: {response.json().get('id')}") # Update product to add downloadable file URL product_update_data = { "downloads": [ { "name": media_response["title"], "file": media_response["url"], # Use direct URL instead of permalink } ] } product_id = response.json().get("id") print(f"Updating product {product_id} with downloadable file") response = wc_api.put(f"products/{product_id}", product_update_data) if response.status_code == 200: print(f"Downloadable file URL added to product: {response.json().get('permalink')}") # Get direct file URL for notifications file_url = media_response["url"] # Get user ID from JWT token with improved debugging user_id = None if st.session_state.get("jwt_token"): try: decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False}) user_id = decoded.get('data', {}).get('user', {}).get('id') print(f"Decoded JWT token user data: {decoded.get('data', {}).get('user', {})}") print(f"Extracted user ID: {user_id}") except Exception as e: print(f"Error decoding JWT: {str(e)}") user_id = None # Send notification using WordPress REST API if user ID exists if user_id: try: notification_url = "https://songlabai.com/wp-json/songlab/v1/notify" notification_payload = { "link": file_url, "product_code": product_id, "user_id": user_id } print(f"Sending notification with payload: {notification_payload}") notification_response = requests.post(notification_url, json=notification_payload) print(f"Notification response status: {notification_response.status_code}") print(f"Notification response: {notification_response.text}") except Exception as e: print(f"Error sending notification: {str(e)}") else: print("No user ID available, skipping notification") return file_url, str(product_id) else: print(f"Error adding downloadable file URL to product: {response.text}") else: print(f"Error creating product in WooCommerce: {response.text}") else: print("Error uploading file to WordPress - empty response.") # Break the loop if we got this far, even with errors break except Exception as e: print(f"Error in WordPress upload (attempt {attempt+1}): {str(e)}") import traceback print(traceback.format_exc()) # Only continue retrying if we haven't reached the max attempts if attempt < 3: print(f"Retrying upload... ({attempt+2}/4)") continue else: print("All upload attempts failed.") # If upload fails, return placeholders return "#", "N/A" def convert_audio_segment_to_float_array(audio_pydub): """ Convert a pydub AudioSegment to a NumPy array of type float32. Args: audio_pydub (AudioSegment): The AudioSegment object to be converted. Returns: np.ndarray: A NumPy array containing the audio data as float32. """ # Get the raw audio data as a sequence of samples samples = audio_pydub.get_array_of_samples() # Convert the samples to a NumPy array and normalize to float32 audio_array = np.array(samples).astype(np.float32) # Normalize the audio array to range between -1.0 and 1.0 max_val = 2**15 # Assuming 16-bit audio, modify this if using different bit depths audio_array /= max_val return audio_array def time_post_request(api_url, headers=None, payload=None, timeout=None): """ Times the execution of a POST request. Parameters: - api_url (str): The URL to which the POST request is sent. - headers (dict): The headers to include in the POST request. - payload (dict): The payload to include in the POST request. - timeout (int): The timeout value in seconds for the POST request (optional). Returns: - response (requests.Response): The response object returned by the POST request. - execution_time (float): The time it took to execute the POST request. """ start_time = time.time() response = requests.post(api_url, headers=headers, json=payload, timeout=timeout) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {execution_time} seconds") return response def check_server_status(): """Check if the server is warm by making a minimal request""" warmup_payload = {"inputs": {"prompt": "warmup", "duration": 1}} try: response = time_post_request( API_URL, headers=get_api_headers(), payload=warmup_payload, timeout=10 # Short timeout for quick check ) return response is not None and response.status_code == 200 except: return False def wait_for_server_warmup(status_placeholder, progress_placeholder, subscription_info_placeholder, stage_placeholder): """Handle server warmup with detailed progress stages and proceed immediately when server is ready""" stages = [ {"time": 0, "message": "🚀 Initializing server resources..."}, {"time": 60, "message": "⚙ī¸ Loading the AI model into memory..."}, {"time": 180, "message": "🔧 Optimizing model parameters..."}, {"time": 300, "message": "📊 Configuring generation settings..."}, {"time": 360, "message": "✨ Finalizing server preparation..."} ] subscription_info = """ ### 📋 **Subscription Tiers** **Free User** - đŸŽĩ **Generations:** 3 music generations - đŸ“Ĩ **Downloads:** No downloads allowed - đŸŽŧ **Submit your track from dashboard** - 💰 **Receive 70% commission** - đŸ’ĩ **Tracks sale for on website** **Tier 2 - $24.99/month** - đŸŽĩ **Generations:** Up to 3 music generations per day - đŸ“Ĩ **Downloads:** 1 download per month - 🔒 **Private Tracks:** Create unpublished, private tracks - 📊 **Dashboard:** Access your profile and save up to 2 tracks **Tier 3 - $99.99/month** - đŸŽĩ **Generations:** Up to 5 music generations per day - đŸ“Ĩ **Downloads:** 5 downloads per month - 📊 **Dashboard:** Full profile access for managing and saving your tracks - 💾 **Storage:** Save up to 10 tracks **Tier 4 - $269.97/month** - đŸŽĩ **Generations:** Up to 20 music generations per day - đŸ“Ĩ **Downloads:** 15 downloads per month - đŸĸ **Commercial Use:** Includes rights for commercial purposes - 📊 **Dashboard:** Advanced profile management for commercial accounts ### 🚀 **Why Subscribe?** Upgrade now to unlock more features and enhance your music creation experience! """ start_time = time.time() total_warmup_time = 420 # 7 minutes current_stage = 0 last_check_time = 0 while time.time() - start_time < total_warmup_time: elapsed = time.time() - start_time remaining = total_warmup_time - elapsed progress = elapsed / total_warmup_time # Update current stage while current_stage < len(stages) - 1 and elapsed > stages[current_stage + 1]["time"]: current_stage += 1 minutes_remaining = int(remaining // 60) seconds_remaining = int(remaining % 60) # Update UI status_placeholder.markdown("### 🔄 AI Server Initialization in Progress") progress_placeholder.progress(progress) stage_placeholder.info( f"{stages[current_stage]['message']}\n\n" f"âŗ Estimated time remaining: {minutes_remaining}m {seconds_remaining}s\n\n" "ℹī¸ **Why the wait?** The AI model needs time to load and prepare when the server is inactive.\n" "Upgrading to a premium plan keeps the server active longer, reducing or eliminating wait times!\n" "👉 [Upgrade Now](https://songlabai.com/subscribe/)" ) subscription_info_placeholder.markdown(subscription_info) # Check if server is ready every 5 seconds if elapsed - last_check_time >= 5: last_check_time = elapsed if check_server_status(): status_placeholder.success("🎉 Server is ready! Proceeding to music generation...") return True time.sleep(0.1) return False def update_download_count(): """Update download count for the current user""" auth_headers = get_auth_headers() try: update_url = "https://songlabai.com/wp-json/custom-api/v1/update-download-count" requests.post(update_url, headers=auth_headers) return True except Exception as e: print(f"× Error updating download count: {str(e)}") return False # Update the load_and_play_generated_audio function to use direct notification def load_and_play_generated_audio(response, genre=None, energy_level=None, tempo=None, description=None, duration=None): """ Load and display the generated audio with better error handling for CUDA/GPU errors """ try: # Parse the response JSON response_eval = json.loads(response.content) # Check for error response from the model server if 'error' in response_eval: error_message = response_eval['error'] # Handle CUDA errors specially if 'CUDA error' in error_message: raise RuntimeError(f"GPU error on the server. Please try again later: {error_message.split('CUDA')[0]}") else: raise RuntimeError(f"Server error: {error_message}") # Now we know the response is not an error, proceed normally # For successful responses, the structure should be a list with the first element containing the audio data if not isinstance(response_eval, list) or len(response_eval) == 0: raise ValueError(f"Unexpected response format: {type(response_eval)}") generated_audio = response_eval[0].get("generated_audio") if generated_audio is None: raise ValueError("No audio data found in response") sample_rate = response_eval[0].get("sample_rate") if sample_rate is None: raise ValueError("No sample rate found in response") # Check if the audio is stereo or mono if isinstance(generated_audio[0], list): # Stereo audio_array = np.array(generated_audio).T else: # Mono audio_array = np.array(generated_audio) # Convert the float32 audio data to int16 max_val = np.max(np.abs(audio_array)) if max_val > 0: audio_array = audio_array / max_val # Normalize to [-1.0, 1.0] int_audio = np.int16(audio_array * 32767) # Write the audio data to a BytesIO buffer using soundfile audio_buffer = BytesIO() sf.write(audio_buffer, int_audio, sample_rate, format='WAV', subtype='PCM_16') audio_buffer.seek(0) # Read the audio data into an AudioSegment audio_segment = AudioSegment.from_file(audio_buffer, format="wav") # Store in session state st_state.audio_pydub = audio_segment st_state.audio_sample_rate = sample_rate st_state.augmented_audio_pydub = st_state.audio_pydub st.session_state.audio_bytes = audio_buffer.getvalue() st.session_state.playing_state = 'generated' # Store metadata st.session_state.audio_metadata = { 'genre': genre, 'energy_level': energy_level, 'tempo': tempo, 'description': description, 'duration': duration, 'generated_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # Get user ID from JWT token for notification user_id = None user_email = '' user_name = '' subscription_tier = 'Free Tier' if st.session_state.get("jwt_token"): try: decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False}) user_data = decoded.get('data', {}).get('user', {}) user_id = user_data.get('id') user_email = user_data.get('user_email', '') user_name = user_data.get('display_name', '') # Get subscription info subscription_status = get_current_subscription_status() if not subscription_status['is_free_tier']: plan_id = subscription_status.get('subscription_plan_id') if plan_id == "2397": subscription_tier = "Tier 1 ($24.99)" elif plan_id == "305": subscription_tier = "Tier 2 ($99.99)" elif plan_id == "306": subscription_tier = "Tier 3 ($269.97)" except Exception as e: print(f"Error decoding JWT: {str(e)}") import traceback print(traceback.format_exc()) # Upload the audio file to WordPress audio_buffer.seek(0) file_url, product_id = direct_upload_to_wordpress( audio_buffer.read(), user_id, st.session_state.audio_metadata ) # Send notification with complete user info if user_id: # Create notification with full user details notification_url = "https://songlabai.com/wp-json/songlab/v1/notify" notification_payload = { "link": file_url, "product_code": product_id, "user_id": user_id, "user_email": user_email, # Include email "user_name": user_name, # Include name "subscription_tier": subscription_tier # Include tier } # Send the notification try: auth_headers = get_auth_headers() notification_response = requests.post( notification_url, headers=auth_headers, json=notification_payload ) print(f"Auto-notification response: {notification_response.status_code}") print(f"Response text: {notification_response.text}") except Exception as e: print(f"Error in auto-notification: {str(e)}") import traceback print(traceback.format_exc()) # Display the audio using the centralized player function display_audio_player(is_final=False) return audio_array, sample_rate except RuntimeError as re: # Handle server-side errors (like CUDA/GPU errors) print(f"Runtime error: {re}") st.error(f"Server error: The AI model is currently experiencing technical difficulties. Please try again in a few minutes.") st.info("💡 This is a temporary issue with the AI server's GPU resources and not related to your account.") return None, None except Exception as e: # Handle other errors import traceback error_details = traceback.format_exc() print(f"Error in load_and_play_generated_audio: {e}") print(f"Error details: {error_details}") st.error(f"Failed to process generated audio: {e}") return None, None def generate_audio(genre, energy_level, tempo, description, duration): # Create placeholders status_placeholder = st.empty() progress_placeholder = st.empty() subscription_placeholder = st.empty() stage_placeholder = st.empty() try: # First, verify user is logged in and has permissions if st.session_state.get("jwt_token") is None: st.error("You must be logged in to generate audio.") st.session_state['login_needed'] = True st.session_state['generate_audio_params'] = { 'genre': genre, 'energy_level': energy_level, 'tempo': tempo, 'description': description, 'duration': duration } st.rerun() # Show login form immediately return None auth_headers = get_auth_headers() # Check subscription status subscription_status = get_current_subscription_status() if subscription_status['has_exceeded_generation_limit']: if subscription_status['is_free_tier']: st.error("❌ You've reached your generation limit on the free tier.") st.info("💡 Upgrade to a premium subscription for more generations!") else: st.warning("⚠ī¸ You've reached your daily generation limit. Limits will reset tomorrow.") return None # Check for free tier duration restriction is_free_tier = subscription_status['is_free_tier'] if is_free_tier: if duration > 30: st.warning("⚠ī¸ Free tier users are limited to 30-second generations.") st.info("💡 Upgrade to a premium plan to generate longer tracks!\n\n" + "**Available Plans:**\n" + "- **$24.99/month:** Up to 3 generations per day, 1 download per month\n" + "- **$99.99/month:** Up to 5 generations per day, 5 downloads per month\n" + "- **$269.97/month:** Up to 20 generations per day, 15 downloads per month\n\n" + "👉 [Upgrade Now](https://songlabai.com/subscribe/)") duration = 30 # Force duration to 30 seconds for free tier # Check if server is cold if not check_server_status(): status_placeholder.warning( "🔄 **Server is currently starting up.**\n\n" "Our AI model needs some time to load and prepare when the server has been inactive.\n" "This can take up to **7 minutes** for free users.\n\n" "💡 **Upgrade to a premium plan to reduce or eliminate wait times!**\n" "👉 [Upgrade Now](https://songlabai.com/subscribe/)" ) # Wait for server to warm up with progress display server_ready = wait_for_server_warmup( status_placeholder, progress_placeholder, subscription_placeholder, stage_placeholder ) if not server_ready: st.error( "❌ Server initialization timed out.\n\n" "Consider upgrading to premium for reliable, fast access.\n\n" "👉 [Upgrade Now](https://songlabai.com/subscribe/)" ) return None # Clear warmup messages for placeholder in [status_placeholder, progress_placeholder, subscription_placeholder, stage_placeholder]: placeholder.empty() # Prepare generation request prompt = f"Genre: {genre}, Energy Level: {energy_level}, Tempo: {tempo}, Description: {description}" payload = {"inputs": {"prompt": prompt, "duration": duration}} api_headers = get_api_headers() # Make the generation request with st.spinner("đŸŽĩ Generating your music... This may take a few moments."): response = time_post_request(API_URL, headers=api_headers, payload=payload, timeout=1200) if response and response.status_code == 200: # Check for error in response before showing success message try: response_data = json.loads(response.content) if 'error' in response_data: error_msg = response_data['error'] if 'CUDA error' in error_msg: st.error("❌ The AI server is currently experiencing GPU resource issues.") st.info("Please try again in a few minutes. This is a temporary server issue.") print(f"CUDA error from server: {error_msg}") return None else: st.error(f"❌ Server error: {error_msg}") return None # If we get here, response is valid st.success("✨ Music generated successfully!") except Exception as parse_error: print(f"Error parsing response to check for errors: {parse_error}") # Continue with normal flow - will be caught in load_and_play if there's an issue st.success("✨ Music generated successfully!") # Load and play the generated audio result = load_and_play_generated_audio( response=response, genre=genre, energy_level=energy_level, tempo=tempo, description=description, duration=duration ) # Only update count if audio was successfully generated if result[0] is not None: # Update generation count on WordPress update_generation_url = "https://songlabai.com/wp-json/custom-api/v1/update-generation-count" requests.post(update_generation_url, headers=auth_headers) return response # Return the response for future use else: error_code = response.status_code if response else "No response" st.error( f"❌ Failed to generate audio (Error {error_code}).\n\n" "This might be due to high server load or an error.\n\n" "💡 **Tip:** Premium users experience fewer failures and faster generation times.\n\n" "👉 [Upgrade Now](https://songlabai.com/subscribe/)" ) return None except Exception as e: import traceback print(f"Detailed error in generate_audio: {traceback.format_exc()}") st.error(f"An unexpected error occurred: {e}") return None def update_generate_button(genre, energy_level, tempo, description, duration): """Create or update the generate button based on current state""" generate_btn_col, login_prompt_col = st.columns([1, 2]) with generate_btn_col: generate_button_enabled = True tooltip = "" # Determine if button should be enabled if not st.session_state.get("jwt_token"): generate_button_enabled = False tooltip = "Please log in using the sidebar to generate music" elif genre and energy_level and tempo: if not confirmed_description: # Check confirmed_description instead generate_button_enabled = False tooltip = "Please add and confirm a description" else: # Always fetch fresh subscription status when checking button state subscription_status = get_current_subscription_status() if subscription_status['has_exceeded_generation_limit']: generate_button_enabled = False tooltip = "You've reached your generation limit" else: generate_button_enabled = False tooltip = "Please fill in all required fields" # Create the button (disabled if needed) if generate_button_enabled: if st.button("đŸŽĩ Generate Audio", key="generate_audio_button", type="primary"): generate_audio(genre, energy_level, tempo, description, duration) else: # Create disabled button with tooltip st.markdown( f"""
""", unsafe_allow_html=True ) with login_prompt_col: # Show login prompt if not logged in if not st.session_state.get("jwt_token"): st.info("👈 Please log in using the sidebar to generate music") # If we just logged in, clear the flag and rerun to refresh the button if st.session_state.get("login_refresh_needed"): st.session_state["login_refresh_needed"] = False st.rerun() # Call the sidebar function at the beginning of the app show_sidebar() # Streamlit app title st.title("Songlab AI") # Main UI components genres = [ "Pop", "Rock", "Hip Hop", "Jazz", "Blues", "Country", "Classical", "Electronic", "Reggae", "Folk", "R&B", "Metal", "Punk", "Indie", "Dance", "World", "Gospel", "Soul", "Funk", "Ambient", "Techno", "Disco", "House", "Trance", "Dubstep", ] genre = st.selectbox("Select Genre:", genres) energy_levels = ["Low", "Medium", "High"] energy_level = st.radio("Energy Level:", energy_levels, horizontal=True) desc_col, button_col = st.columns([4, 1]) with desc_col: description = st.text_input( "Description:", "", key="description_input", placeholder="Enter a description of your desired music" ) with button_col: # Add some vertical spacing to align with the text box st.markdown("
", unsafe_allow_html=True) confirm_clicked = st.button("Confirm", key="confirm_description") if confirm_clicked: # This will trigger a rerun with the current description value st.session_state["confirmed_description"] = description st.rerun() # Use the confirmed description for button enablement confirmed_description = st.session_state.get("confirmed_description", "") tempo = st.slider("Tempo (in bpm):", min_value=40, max_value=100, value=60, step=5) # Get subscription status for duration limits subscription_status = get_current_subscription_status() is_free_tier = subscription_status['is_free_tier'] duration_col, promo_col = st.columns([2, 1]) with duration_col: # Modify the existing duration slider logic if is_free_tier and not st.session_state.get('promo_code_applied', False): duration = st.slider( "Duration (in seconds):", min_value=15, max_value=30, # Restricted to 30 seconds for free tier value=30, step=1, help="Free tier users are limited to 30-second generations. Upgrade to create longer tracks or use a promo code!" ) else: duration = st.slider( "Duration (in seconds):", min_value=15, max_value=140, value=30, step=1 ) with promo_col: # Add the promo code text input promo_code = st.text_input("Promo Code:", "", help="Enter a promo code to unlock premium features") # Check for valid promo code if promo_code == "promosongtester" and not st.session_state.get('promo_code_applied', False): if st.button("Apply Code"): st.session_state['promo_code_applied'] = True st.success("✅ Promo code applied! You now have access to full 3-minute duration.") st.rerun() # Rerun to update the UI # Show status if promo code is already applied elif st.session_state.get('promo_code_applied', False): st.success("✅ Promo code active") # Add option to remove promo code if st.button("Remove Code"): st.session_state['promo_code_applied'] = False st.rerun() # Rerun to update the UI # Use the updated generate button function update_generate_button(genre, energy_level, tempo, description, duration) # Add this section after your main generation UI but before post-processing options st.divider() st.header("đŸŽĩ Load Previous Music") st.markdown(""" Upload a previously downloaded music clip to continue post-processing, add vocals, or apply audio effects. Supported formats: WAV, MP3, OGG, FLAC. """) uploaded_music = st.file_uploader( "Upload previous music clip", type=["wav", "mp3", "ogg", "flac"], help="Upload music you previously generated and downloaded from Songlab AI" ) if uploaded_music: # Show a preview player for the uploaded music st.audio(uploaded_music, format=f"audio/{uploaded_music.name.split('.')[-1]}") # Process button if st.button("Load for Post-Processing", key="load_uploaded_music"): try: # Read the uploaded music into memory with st.spinner("Loading audio..."): # Convert the uploaded file to an AudioSegment audio_data = uploaded_music.read() # Create a temporary file-like object audio_buffer = BytesIO(audio_data) audio_buffer.seek(0) # Load into AudioSegment based on file extension file_extension = uploaded_music.name.split('.')[-1].lower() audio_segment = AudioSegment.from_file(audio_buffer, format=file_extension) # Convert to the same format used in your application if file_extension != 'wav': wav_buffer = BytesIO() audio_segment.export(wav_buffer, format='wav') wav_buffer.seek(0) audio_segment = AudioSegment.from_wav(wav_buffer) # Store in session state st_state.audio_pydub = audio_segment st_state.audio_sample_rate = audio_segment.frame_rate st_state.augmented_audio_pydub = audio_segment st_state.augmented_audio_sample_rate = audio_segment.frame_rate # Convert to bytes for player wav_bytes = BytesIO() audio_segment.export(wav_bytes, format='wav') wav_bytes.seek(0) st.session_state.audio_bytes = wav_bytes.getvalue() # Set playing state st.session_state.playing_state = 'generated' # Clear any existing metadata st.session_state.audio_metadata = { 'genre': 'Unknown (Uploaded)', 'energy_level': 'Unknown', 'tempo': 'Unknown', 'description': f'Uploaded file: {uploaded_music.name}', 'duration': len(audio_segment)/1000 # Convert ms to seconds } # Success message st.success("🎉 Music loaded successfully! Scroll down to use post-processing tools.") # Force refresh to update the UI with the loaded audio st.rerun() except Exception as e: st.error(f"❌ Error loading audio file: {str(e)}") st.info("Please make sure the file is a valid audio file in one of the supported formats.") # Add this helper info section with st.expander("ℹī¸ About uploading previous music"): st.markdown(""" ### Why upload previous music? This feature allows you to: 1. **Continue editing** music you previously downloaded 2. **Add vocals** to your previously generated instrumental tracks 3. **Apply different effects** to experiment with various sounds 4. **Combine multiple tracks** by uploading them sequentially ### Supported file formats - **WAV** (.wav) - Highest quality, uncompressed - **MP3** (.mp3) - Common compressed format - **OGG** (.ogg) - Free, open container format - **FLAC** (.flac) - Lossless compression format For best results, upload WAV files that were previously downloaded from Songlab AI. """) # This divider separates the upload section from post-processing options st.divider() # Post-processing options - only show if we have audio to process if st_state.audio_pydub is not None: st.header("Post-processing Options") vocal_file = st.file_uploader( "Upload Vocal File", type=["mp3", "wav", "ogg", "flac", "aac"] ) if vocal_file: st_state.vocal_audio = vocal_file.read() # Mixing mix_vocals = st.checkbox("Mix Vocals") if mix_vocals and st_state.vocal_audio is not None: with NamedTemporaryFile() as f: f.write(st_state.vocal_audio) st_state.vocal_audio = AudioSegment.from_file(f.name) st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.overlay( st_state.vocal_audio, position=100 ) st_state.augmented_audio = convert_audio_segment_to_float_array( st_state.augmented_audio_pydub ) st_state.augmented_audio_sample_rate = st_state.augmented_audio_pydub.frame_rate st.session_state.playing_state = 'processed' elif not mix_vocals and st_state.vocal_audio is not None: st_state.augmented_audio_pydub = st_state.audio_pydub st_state.augmented_audio_sample_rate = st_state.audio_pydub.frame_rate # Mastering st.header("Mastering") st.markdown("") # Volume Balance, Compression Ratio, and Reverb Amount vol_col, pitch_shift, buttons_col = st.columns([2, 2, 2.5]) with buttons_col: with st.container(height=371, border=True): st.markdown("") apply_stereo = st.button("Apply Stereo Effect") st.markdown("") reverse = st.button("Apply Audio Reverse") st.markdown("") reset_post_processing = st.button("Undo All Post-processings") st.markdown("") with vol_col: with st.container(border=True): volume_balance = svs.vertical_slider( "Volume Balance", min_value=-10.0, max_value=10.0, default_value=0.0, step=0.1, slider_color="green", track_color="lightgray", thumb_color="red", thumb_shape="pill", ) vol_button = st.button("Apply Vol-Balance") # Pitch shifting with pitch_shift: with st.container(border=True): pitch_semitones = svs.vertical_slider( label="Pitch (semitones)", min_value=-12, max_value=12, default_value=0, step=1, slider_color="red", track_color="lightgray", thumb_color="red", thumb_shape="pill", ) pitch_shift_button = st.button("Apply Pitch Shift") effect_applied = False if st_state.augmented_audio_pydub is not None: if vol_button: st_state.augmented_audio_pydub = st_state.augmented_audio_pydub + volume_balance effect_applied = True st.session_state.playing_state = 'processed' if apply_stereo: st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.pan( -0.5 ).overlay(st_state.augmented_audio_pydub.pan(0.5)) effect_applied = True st.session_state.playing_state = 'processed' if reverse: st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.reverse() effect_applied = True st.session_state.playing_state = 'processed' if pitch_shift_button: st_state.augmented_audio_pydub = st_state.augmented_audio_pydub._spawn( st_state.augmented_audio_pydub.raw_data, overrides={ "frame_rate": int( st_state.augmented_audio_pydub.frame_rate * (2 ** (pitch_semitones / 12.0)) ) }, ) effect_applied = True st.session_state.playing_state = 'processed' if reset_post_processing and st_state.audio_pydub is not None: st_state.augmented_audio_pydub = st_state.audio_pydub st.session_state.playing_state = 'generated' effect_applied = True # Display the final audio if processed if effect_applied or st.session_state.playing_state == 'processed': display_audio_player(is_final=True) # Force rerun if any effect was applied to show the updated audio if effect_applied: st.rerun()