import io
import json
import os
import time
import uuid
import base64
from datetime import datetime
from io import BytesIO
from tempfile import NamedTemporaryFile
from xmlrpc.client import Binary
import jwt
import threading
import tempfile # Add this import at the top of your file
import numpy as np
import requests
import scipy
import soundfile as sf
import streamlit as st
import streamlit_vertical_slider as svs
from pydub import AudioSegment
from scipy.signal import butter, sosfilt
from streamlit import session_state as st_state
from woocommerce import API
from wordpress_xmlrpc import Client
from wordpress_xmlrpc.compat import xmlrpc_client
from wordpress_xmlrpc.methods import media
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Add enhanced CSS to block download buttons in audio players
st.markdown("""
""", unsafe_allow_html=True)
# Try to get API_URL from environment variables, if not found set to a default value
try:
API_URL = os.environ["API_URL"]
except KeyError:
st.error("API_URL environment variable is not set.")
st.stop()
# Try to get the Bearer token from environment variables, if not found set to a default value
try:
BEARER_TOKEN = os.environ["BEARER_TOKEN"]
except KeyError:
st.error("BEARER_TOKEN environment variable is not set.")
st.stop()
page_bg_img = '''
'''
st.markdown(page_bg_img, unsafe_allow_html=True)
# Ensure session state keys are initialized
if "jwt_token" not in st.session_state:
st.session_state["jwt_token"] = None
if "session_id" not in st.session_state:
st.session_state["session_id"] = str(uuid.uuid4())
if 'login_needed' not in st.session_state:
st.session_state['login_needed'] = False
if 'generate_audio_params' not in st.session_state:
st.session_state['generate_audio_params'] = None
if 'audio_bytes' not in st.session_state:
st.session_state['audio_bytes'] = None
if 'playing_state' not in st.session_state:
st.session_state['playing_state'] = 'none' # 'none', 'generated', 'processed'
if 'audio_metadata' not in st.session_state:
st.session_state['audio_metadata'] = None
if 'login_refresh_needed' not in st.session_state:
st.session_state['login_refresh_needed'] = False
if 'promo_code_applied' not in st.session_state:
st.session_state['promo_code_applied'] = False
# Initialize session state variables for audio processing
if "vocal_audio" not in st_state:
st_state.vocal_audio = None
if "vocal_sample_rate" not in st_state:
st_state.vocal_sample_rate = None
if "audio" not in st_state:
st_state.audio = None
if "audio_pydub" not in st_state:
st_state.audio_pydub = None
if "audio_sample_rate" not in st_state:
st_state.audio_sample_rate = None
if "augmented_audio" not in st_state:
st_state.augmented_audio = None
if "augmented_audio_pydub" not in st_state:
st_state.augmented_audio_pydub = None
if "augmented_audio_sample_rate" not in st_state:
st_state.augmented_audio_sample_rate = None
def secure_audio_player(audio_bytes, format="audio/wav"):
"""
Enhanced secure audio player that completely blocks downloads
Parameters:
audio_bytes: Audio data as bytes
format: Audio format mimetype
"""
if audio_bytes is None:
return
# Encode to base64
b64 = base64.b64encode(audio_bytes).decode()
# Custom HTML with additional JavaScript security
custom_html = f"""
"""
st.markdown(custom_html, unsafe_allow_html=True)
def get_api_headers():
return {
"Authorization": f"Bearer {BEARER_TOKEN}",
"Content-Type": "application/json",
}
def get_auth_headers():
jwt_token = st.session_state.get("jwt_token")
if jwt_token:
return {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json",
"Cache-Control": "no-store",
}
else:
return {
"Content-Type": "application/json",
"Cache-Control": "no-store",
}
def get_current_subscription_status():
"""Get current user's subscription status with improved debugging"""
if st.session_state.get("jwt_token"):
auth_headers = get_auth_headers()
subscription_url = "https://songlabai.com/wp-json/custom-api/v1/subscription"
try:
print(f"Fetching subscription status from: {subscription_url}")
subscription_response = requests.get(subscription_url, headers=auth_headers)
if subscription_response.status_code == 200:
subscription_data = subscription_response.json()
print(f"Subscription response: {subscription_data}")
# Convert subscription_plan_id to string for comparison
subscription_plan_id = subscription_data.get("subscription_plan_id")
# Handle both numeric and string plan IDs
if subscription_plan_id is not None:
if isinstance(subscription_plan_id, int):
subscription_plan_id = str(subscription_plan_id)
# CORRECTED PLAN IDs:
# 576: Free tier
# 2397: Tier 1 ($24.99/month)
# 305: Tier 2 ($99.99/month)
# 306: Tier 3 ($269.97/month)
# Define premium plan IDs (any non-free plan)
premium_plan_ids = ["2397", "305", "306"]
# Log the plan ID for debugging
print(f"Checking plan ID: {subscription_plan_id}, type: {type(subscription_plan_id)}")
print(f"Is premium: {subscription_plan_id in premium_plan_ids}")
is_free_tier = (
subscription_plan_id == "576" or
subscription_plan_id not in premium_plan_ids or
subscription_data.get("status") == "no_subscription"
)
else:
is_free_tier = True
return {
'subscription_plan_id': subscription_plan_id,
'is_free_tier': is_free_tier,
'has_exceeded_download_limit': subscription_data.get("has_exceeded_download_limit", False),
'has_exceeded_generation_limit': subscription_data.get("has_exceeded_generation_limit", False)
}
else:
print(f"Error fetching subscription: HTTP {subscription_response.status_code}")
print(f"Response: {subscription_response.text}")
except Exception as e:
print(f"Exception in get_current_subscription_status: {e}")
import traceback
print(traceback.format_exc())
# Default to free tier if not logged in or error occurs
return {
'subscription_plan_id': None,
'is_free_tier': True,
'has_exceeded_download_limit': False,
'has_exceeded_generation_limit': False
}
def get_usage_counts():
"""Get current user's usage counts"""
if st.session_state.get("jwt_token"):
auth_headers = get_auth_headers()
usage_url = "https://songlabai.com/wp-json/custom-api/v1/usage-counts"
try:
print(f"Fetching usage counts from: {usage_url}")
usage_response = requests.get(usage_url, headers=auth_headers)
if usage_response.status_code == 200:
usage_data = usage_response.json()
print(f"Usage counts response: {usage_data}")
return {
'generation_count': usage_data.get('generation_count', 0),
'download_count': usage_data.get('download_count', 0),
'total_generation_count': usage_data.get('total_generation_count', 0),
'total_download_count': usage_data.get('total_download_count', 0)
}
else:
print(f"Error fetching usage counts: HTTP {usage_response.status_code}")
print(f"Response: {usage_response.text}")
except Exception as e:
print(f"Exception in get_usage_counts: {e}")
# Default values if not logged in or error occurs
return {
'generation_count': 0,
'download_count': 0,
'total_generation_count': 0,
'total_download_count': 0
}
def show_sidebar():
"""Display sidebar with login/user info and subscription details"""
with st.sidebar:
st.image("https://songlabai.com/wp-content/uploads/2024/03/songlabAI-logo-white-1024x267.png", width=200)
# Display login section if not logged in
if not st.session_state.get("jwt_token"):
st.markdown("### đ¤ User Login")
with st.form("sidebar_login_form"):
username = st.text_input("Username", key="sidebar_username")
password = st.text_input("Password", type="password", key="sidebar_password")
submit_login = st.form_submit_button("Log In")
if submit_login and username and password:
with st.spinner("Logging in..."):
login_url = "https://songlabai.com/wp-json/jwt-auth/v1/token"
data = {
"username": username,
"password": password
}
response = requests.post(login_url, data=data)
if response.status_code == 200 and 'token' in response.json():
result = response.json()
st.session_state["jwt_token"] = result["token"]
st.session_state["session_id"] = str(uuid.uuid4())
# Set flag to refresh generate button
st.session_state["login_refresh_needed"] = True
st.success("â Logged in successfully!")
st.rerun()
else:
st.error("â Invalid username or password")
st.markdown("Don't have an account? [Register here](https://songlabai.com/register-music-generated-by-ai/)")
# If logged in, display user info and logout option
else:
try:
# Display user information
decoded_token = jwt.decode(
st.session_state["jwt_token"],
options={"verify_signature": False}
)
user_info = decoded_token.get('data', {}).get('user', {})
st.markdown(f"### đ Welcome, {user_info.get('display_name', 'User')}!")
# Get subscription status and usage information
subscription_status = get_current_subscription_status()
usage_counts = get_usage_counts()
# Identify subscription tier based on plan ID
subscription_tier = "Free Tier"
if not subscription_status['is_free_tier']:
plan_id = subscription_status.get('subscription_plan_id')
# Convert to string for comparison if needed
if isinstance(plan_id, int):
plan_id = str(plan_id)
# Debug output to help troubleshoot plan ID issues
print(f"DEBUG: User subscription plan_id = {plan_id!r}")
# CORRECTED PLAN IDs MAPPING:
if plan_id == "2397":
subscription_tier = "Tier 1 ($24.99/month)"
elif plan_id == "305":
subscription_tier = "Tier 2 ($99.99/month)"
elif plan_id == "306":
subscription_tier = "Tier 3 ($269.97/month)"
else:
subscription_tier = f"Premium Plan (ID: {plan_id})"
# Create subscription info display
st.markdown("### đ Your Subscription")
st.info(f"**Current Plan:** {subscription_tier}")
# Show appropriate limits based on subscription
if subscription_status['is_free_tier']:
# Display for free tier - uses total counts
total_gen_count = usage_counts.get('total_generation_count', 0)
st.markdown("#### Usage Limits")
st.markdown(f"- đĩ **Generations:** {total_gen_count}/3 total (Free tier)")
st.markdown("- đĨ **Downloads:** Not available")
st.markdown("- âąī¸ **Duration:** Max 30 seconds")
# Show upgrade button prominently for free users
st.markdown("### đ Upgrade Your Plan")
st.markdown("Get more generations, downloads, and longer tracks!")
if st.button("Upgrade Now", type="primary"):
st.markdown("[Click here to view plans](https://songlabai.com/subscribe/)")
else:
# Format for premium users - CORRECTED LIMITS PER TIER
daily_limit = 0
download_limit = 0
duration_limit = 140
plan_id = subscription_status.get('subscription_plan_id')
if isinstance(plan_id, int):
plan_id = str(plan_id)
if plan_id == "2397": # Tier 1 ($24.99)
daily_limit = 3
download_limit = 1
elif plan_id == "305": # Tier 2 ($99.99)
daily_limit = 5
download_limit = 5
elif plan_id == "306": # Tier 3 ($269.97)
daily_limit = 20
download_limit = 15
# Get current usage counts
daily_gen_count = usage_counts.get('generation_count', 0)
monthly_dl_count = usage_counts.get('download_count', 0)
# Display premium user limits with actual counts
st.markdown("#### đ Your Usage")
st.markdown(f"- đĩ **Generations:** {daily_gen_count}/{daily_limit} per day")
st.markdown(f"- đĨ **Downloads:** {monthly_dl_count}/{download_limit} per month")
st.markdown(f"- âąī¸ **Duration:** Up to {duration_limit} seconds")
# Show warning if limits are close to being reached
if subscription_status['has_exceeded_generation_limit']:
st.warning("â ī¸ You've reached your daily generation limit")
elif subscription_status['has_exceeded_download_limit']:
st.warning("â ī¸ You've reached your monthly download limit")
# Logout button
if st.button("Log Out", key="sidebar_logout"):
st.session_state.clear()
st.success("Logged out successfully!")
st.rerun()
except Exception as e:
st.error(f"Error displaying user info: {str(e)}")
st.code(traceback.format_exc()) # Show stack trace for debugging
if st.button("Log Out", key="sidebar_error_logout"):
st.session_state.clear()
st.rerun()
# Help and support section
st.markdown("### âšī¸ Need Help?")
with st.expander("Tips & FAQs"):
st.markdown("""
**Generation Tips:**
- Use detailed descriptions
- Experiment with different tempos
- Try various genres for unique sounds
**Common Issues:**
- Server warmup may take several minutes
- Free tier has limited features
- Some effects work better with certain genres
**Contact Support:**
For technical issues, email support@songlabai.com
""")
def wp_create_nonce(action=''):
"""Generate a WordPress-like nonce for REST API requests"""
import hashlib
import time
# Simple nonce generation based on current timestamp
timestamp = int(time.time())
data = f"{action}|{timestamp}"
return hashlib.md5(data.encode()).hexdigest()
def direct_upload_to_wordpress(audio_bytes, user_id, metadata=None):
"""
Improved function to upload audio to WordPress with better error handling
Parameters:
audio_bytes: The audio data as bytes
user_id: User ID for tracking
metadata: Optional dictionary with track metadata (genre, energy, tempo, etc.)
Returns:
tuple: (file_url, file_id)
"""
try:
# Generate a unique filename
filename = f"songlab_track_{datetime.now().timestamp()}.wav"
# Get user info from JWT token
user_email = ""
user_name = ""
subscription_tier = ""
if st.session_state.get("jwt_token"):
try:
decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False})
user_data = decoded.get('data', {}).get('user', {})
user_email = user_data.get('user_email', '')
user_name = user_data.get('display_name', '')
# Get subscription info
subscription_status = get_current_subscription_status()
if not subscription_status['is_free_tier']:
plan_id = subscription_status.get('subscription_plan_id')
if plan_id == "2397":
subscription_tier = "Tier 1 ($24.99)"
elif plan_id == "305":
subscription_tier = "Tier 2 ($99.99)"
elif plan_id == "306":
subscription_tier = "Tier 3 ($269.97)"
else:
subscription_tier = "Free Tier"
except Exception as e:
print(f"Error decoding JWT: {str(e)}")
# Get auth headers with JWT token
auth_headers = get_auth_headers()
# Remove Content-Type from headers for file upload
upload_headers = auth_headers.copy()
if 'Content-Type' in upload_headers:
del upload_headers['Content-Type']
# Try the direct upload AJAX endpoint first
try:
print("Trying direct AJAX upload endpoint (most reliable)")
ajax_url = "https://songlabai.com/wp-admin/admin-ajax.php?action=upload_audio_direct"
# Create files dict for upload
files = {
'audio_file': (filename, audio_bytes, 'audio/wav')
}
# Create form data
form_data = {
'user_id': str(user_id),
'user_email': user_email,
'user_name': user_name,
'subscription_tier': subscription_tier
}
# Add metadata to form data if available
if metadata:
for key, value in metadata.items():
if value is not None:
form_data[key] = str(value)
# Send the request
response = requests.post(
ajax_url,
headers=upload_headers,
files=files,
data=form_data
)
print(f"AJAX upload response: HTTP {response.status_code}")
if response.status_code == 200:
try:
response_data = response.json()
if response_data.get('success'):
file_url = response_data.get('data', {}).get('file_url', '#')
file_id = response_data.get('data', {}).get('attachment_id', 'N/A')
notification_id = response_data.get('data', {}).get('notification_id', 'N/A')
print(f"AJAX upload success! File URL: {file_url}, ID: {file_id}, Notification ID: {notification_id}")
return file_url, file_id
else:
print(f"AJAX upload reported failure: {response_data.get('message', 'Unknown error')}")
except Exception as e:
print(f"Error parsing AJAX response: {str(e)}")
else:
print(f"AJAX upload failed with status {response.status_code}: {response.text}")
except Exception as e:
print(f"AJAX upload exception: {str(e)}")
import traceback
print(traceback.format_exc())
# Try the REST API v2 endpoint next
try:
print("Trying REST API v2 endpoint")
v2_url = "https://songlabai.com/wp-json/songlab/v2/upload-audio"
# Create files dict for upload
files = {
'file': (filename, audio_bytes, 'audio/wav')
}
# Create form data
form_data = {
'user_id': str(user_id),
'user_email': user_email,
'user_name': user_name,
'subscription_tier': subscription_tier
}
# Add metadata to form data if available
if metadata:
for key, value in metadata.items():
if value is not None:
form_data[key] = str(value)
# Send the request
response = requests.post(
v2_url,
headers=upload_headers,
files=files,
data=form_data
)
print(f"v2 API response: HTTP {response.status_code}")
if response.status_code == 200:
response_data = response.json()
if response_data.get('success'):
file_url = response_data.get('file_url', '#')
file_id = response_data.get('file_id', 'N/A')
print(f"v2 API success! File URL: {file_url}, ID: {file_id}")
return file_url, file_id
else:
print(f"v2 API error: {response_data.get('message', 'Unknown error')}")
else:
print(f"v2 API failed with status {response.status_code}: {response.text}")
except Exception as e:
print(f"v2 API exception: {str(e)}")
# No other methods worked, create a notification with placeholder values
try:
print("All direct upload methods failed, creating notification with placeholders")
notification_url = "https://songlabai.com/wp-json/songlab/v1/notify"
notification_payload = {
"link": "#",
"product_code": "pending-upload-" + datetime.now().strftime('%Y%m%d%H%M%S'),
"user_id": user_id
}
notify_response = requests.post(
notification_url,
headers=auth_headers,
json=notification_payload
)
print(f"Placeholder notification response: HTTP {notify_response.status_code}")
print(f"Response text: {notify_response.text}")
try:
notify_data = notify_response.json()
if notify_data.get('success'):
# Return placeholder values but log that we submitted a notification
print("Placeholder notification created successfully")
# Try to extract notification ID
notification_id = notify_data.get('data', {}).get('id', 'N/A')
if notification_id != 'N/A':
print(f"Notification ID: {notification_id}")
except:
pass
except Exception as e:
print(f"Placeholder notification exception: {str(e)}")
# Return placeholder values
print("All upload methods failed, returning placeholder values")
return "#", "N/A"
except Exception as e:
import traceback
print(f"Critical error in direct_upload_to_wordpress: {str(e)}")
print(traceback.format_exc())
return "#", "N/A"
def save_track_to_dashboard(audio_array, sample_rate, metadata=None):
"""
Save the current track to the user's dashboard with improved error handling
Parameters:
audio_array: The audio data as a numpy array
sample_rate: The sample rate of the audio
metadata: Optional dictionary with track metadata (genre, energy, tempo, etc.)
Returns:
bool: True if successful, False otherwise
"""
if not st.session_state.get("jwt_token"):
st.error("â You must be logged in to save tracks to your dashboard.")
return False
try:
# Check subscription status
subscription_status = get_current_subscription_status()
if subscription_status['is_free_tier']:
st.warning("â ī¸ Saving tracks to dashboard requires a premium subscription.")
st.info("đĄ Upgrade to a premium plan to save tracks!\n\n" +
"đ [Upgrade Now](https://songlabai.com/subscribe/)")
return False
# Get user ID from JWT token
user_id = None
try:
decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False})
user_id = decoded.get('data', {}).get('user', {}).get('id')
print(f"Saving track for user ID: {user_id}")
except Exception as e:
print(f"Error decoding JWT: {str(e)}")
st.error("Failed to identify user. Please try logging out and back in.")
return False
if not user_id:
st.error("Could not identify user ID from login token.")
return False
# Convert the audio to WAV format
wav_bytes = BytesIO()
sf.write(wav_bytes, audio_array, samplerate=sample_rate, format='WAV')
wav_bytes.seek(0)
# Upload file to WordPress using the improved function
with st.spinner("đž Uploading track to the server..."):
file_url, product_id = direct_upload_to_wordpress(wav_bytes.getvalue(), user_id, metadata)
if file_url != "#" and product_id != "N/A":
st.success("â Track saved to your dashboard successfully!")
st.info("Your track will be reviewed by our team. If approved, it will be published and you'll earn 70% of all sales revenue.")
return True
else:
# Even if the direct upload failed, we would have created a notification entry
st.warning("â ī¸ Track has been saved with limited functionality. Our team will contact you if needed.")
return True
except Exception as e:
import traceback
print(f"Error saving track to dashboard: {str(e)}")
print(traceback.format_exc())
st.error(f"â Error saving track to dashboard: {str(e)}")
return False
def display_download_button(audio_bytes, is_premium_user=False, has_exceeded_limit=False, location="main"):
"""
Display download button based on subscription status
Parameters:
audio_bytes: Audio data as bytes
is_premium_user: Whether user has premium subscription
has_exceeded_limit: Whether user has exceeded download limits
location: Location identifier for unique key generation for buttons
"""
if audio_bytes is None:
return
# Show download button only for eligible premium users
if is_premium_user and not has_exceeded_limit:
st.download_button(
label="âŦī¸ Download Audio (Premium Feature)",
data=audio_bytes,
file_name=f"songlab_audio_{location}.wav",
mime="audio/wav",
on_click=update_download_count,
key=f"download_btn_{location}"
)
st.success("â Your premium subscription allows you to download this track!", icon="â ")
# For premium users who exceeded limits
elif is_premium_user:
st.warning("â ī¸ You've reached your download limit for this period. Your limits will reset automatically.", icon="â ī¸")
# For free users
else:
st.info("â **Premium Feature**: Downloading audio files requires a subscription. "
"[Upgrade Now](https://songlabai.com/subscribe/)", icon="âšī¸")
# Update the display_audio_player function to include save to dashboard option
def display_audio_player(is_final=False):
"""
Centralized function to display the appropriate audio player based on the current state
with added save to dashboard functionality
Parameters:
is_final: Whether this is the final processed audio or the initial generated audio
"""
# Get subscription status
subscription_status = get_current_subscription_status()
is_premium = not subscription_status['is_free_tier']
has_exceeded_limit = subscription_status['has_exceeded_download_limit']
if is_final and st_state.augmented_audio_pydub is not None:
# Display processed audio
audio_bytes = st_state.augmented_audio_pydub.export(format="wav").read()
st.markdown("### đ§ Final Processed Audio")
secure_audio_player(audio_bytes)
# Display columns for download and save buttons
col1, col2 = st.columns(2)
with col1:
# Show download button for final audio
display_download_button(
audio_bytes,
is_premium_user=is_premium,
has_exceeded_limit=has_exceeded_limit,
location="final"
)
with col2:
# Add Save to Dashboard button for premium users
if st.session_state.get("jwt_token") and is_premium:
if st.button("đž Save to Dashboard", key="save_processed_btn"):
# Get audio array from audio bytes
with BytesIO(audio_bytes) as buf:
processed_audio_array, processed_sample_rate = sf.read(buf)
# Get metadata from session state and add processing info
metadata = st.session_state.get('audio_metadata', {}).copy()
metadata['processed'] = True
metadata['processing_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Save to dashboard
save_track_to_dashboard(processed_audio_array, processed_sample_rate, metadata)
elif st.session_state.get("jwt_token"):
# User is logged in but not premium
st.info("â **Premium Feature**: Saving tracks requires a subscription. "
"[Upgrade Now](https://songlabai.com/subscribe/)")
else:
# User is not logged in
st.info("đ¤ **Log in** using the sidebar to save tracks to your dashboard")
elif not is_final and st.session_state.get('audio_bytes') is not None:
# Display initial generated audio
st.markdown("### đĩ Generated Audio")
secure_audio_player(st.session_state.audio_bytes)
# Display columns for download and save buttons
col1, col2 = st.columns(2)
with col1:
display_download_button(
st.session_state.audio_bytes,
is_premium_user=is_premium,
has_exceeded_limit=has_exceeded_limit,
location="generated"
)
with col2:
# Add Save to Dashboard button for premium users
if st.session_state.get("jwt_token") and is_premium:
if st.button("đž Save to Dashboard", key="save_generated_btn"):
# Get audio array
with BytesIO(st.session_state.audio_bytes) as buf:
generated_audio_array, generated_sample_rate = sf.read(buf)
# Get metadata
metadata = st.session_state.get('audio_metadata', {})
# Save to dashboard
save_track_to_dashboard(generated_audio_array, generated_sample_rate, metadata)
elif st.session_state.get("jwt_token"):
# User is logged in but not premium
st.info("â **Premium Feature**: Saving tracks requires a subscription. "
"[Upgrade Now](https://songlabai.com/subscribe/)")
else:
# User is not logged in
st.info("đ¤ **Log in** using the sidebar to save tracks to your dashboard")
# Show track information if available
if st.session_state.get('audio_metadata'):
meta = st.session_state.audio_metadata
st.markdown("### đĩ Track Information")
st.markdown(f"**Genre:** {meta.get('genre', 'Not specified')}")
st.markdown(f"**Energy:** {meta.get('energy_level', 'Not specified')}")
st.markdown(f"**Tempo:** {meta.get('tempo', 'Not specified')} BPM")
st.markdown(f"**Duration:** {meta.get('duration', 'Not specified')} seconds")
if meta.get('description'):
st.markdown(f"**Description:** {meta.get('description')}")
# Enhanced function to upload to WordPress
def enhanced_save_to_wordpress(audio_array, sample_rate):
"""
Enhanced function to upload audio to WordPress with better error handling
and debugging information
"""
# Convert audio_array to float32 if not already
audio_array = audio_array.astype(np.float32)
# Save the audio to a BytesIO buffer
wav_bytes = BytesIO()
sf.write(wav_bytes, audio_array, samplerate=sample_rate, format='WAV')
wav_bytes.seek(0)
# Define your WordPress site URL and authentication credentials
wordpress_url = "https://songlabai.com/xmlrpc.php"
woocommerce_url = "https://songlabai.com"
consumer_key = "ck_93d516ba12289a6fd0eced56bbc0b05ecbf98735"
consumer_secret = "cs_9d5eb716d631db408a4c47796b5d18b0313d8559"
username = "admin_h2ibbgql"
password = "Pressabc1!"
# Authenticate with WordPress XML-RPC API
title = f"generated_audio_{datetime.now().timestamp()}.wav"
file_data = {
"name": title,
"type": "audio/x-wav",
"bits": xmlrpc_client.Binary(wav_bytes.getvalue()),
}
print(f"Uploading file to WordPress: {title}")
wp_client = Client(wordpress_url, username, password)
for attempt in range(4):
try:
print(f"Attempt {attempt+1} to upload file to WordPress")
# Upload the file to WordPress Media Library
media_response = wp_client.call(media.UploadFile(file_data))
# Handle the response
if media_response:
print(f"File successfully uploaded to WordPress with attachment ID: {media_response}")
print(f"File URL: {media_response.get('url', 'Unknown')}")
# Create product data for WooCommerce
product_data = {
"status": "pending",
"name": title,
"type": "simple",
"regular_price": "1.00", # Set the price as needed
"sku": str(uuid.uuid4()),
"downloadable": True,
"download_limit": -1,
"download_expiry": -1,
}
# Authenticate with WooCommerce API
wc_api = API(
url=woocommerce_url,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
version="wc/v3",
)
# Create the product
print("Creating product in WooCommerce")
response = wc_api.post("products", product_data)
# Handle the response
if response.status_code == 201:
print(f"Product successfully created in WooCommerce: {response.json().get('id')}")
# Update product to add downloadable file URL
product_update_data = {
"downloads": [
{
"name": media_response["title"],
"file": media_response["url"], # Use direct URL instead of permalink
}
]
}
product_id = response.json().get("id")
print(f"Updating product {product_id} with downloadable file")
response = wc_api.put(f"products/{product_id}", product_update_data)
if response.status_code == 200:
print(f"Downloadable file URL added to product: {response.json().get('permalink')}")
# Get direct file URL for notifications
file_url = media_response["url"]
# Get user ID from JWT token with improved debugging
user_id = None
if st.session_state.get("jwt_token"):
try:
decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False})
user_id = decoded.get('data', {}).get('user', {}).get('id')
print(f"Decoded JWT token user data: {decoded.get('data', {}).get('user', {})}")
print(f"Extracted user ID: {user_id}")
except Exception as e:
print(f"Error decoding JWT: {str(e)}")
user_id = None
# Send notification using WordPress REST API if user ID exists
if user_id:
try:
notification_url = "https://songlabai.com/wp-json/songlab/v1/notify"
notification_payload = {
"link": file_url,
"product_code": product_id,
"user_id": user_id
}
print(f"Sending notification with payload: {notification_payload}")
notification_response = requests.post(notification_url, json=notification_payload)
print(f"Notification response status: {notification_response.status_code}")
print(f"Notification response: {notification_response.text}")
except Exception as e:
print(f"Error sending notification: {str(e)}")
else:
print("No user ID available, skipping notification")
return file_url, str(product_id)
else:
print(f"Error adding downloadable file URL to product: {response.text}")
else:
print(f"Error creating product in WooCommerce: {response.text}")
else:
print("Error uploading file to WordPress - empty response.")
# Break the loop if we got this far, even with errors
break
except Exception as e:
print(f"Error in WordPress upload (attempt {attempt+1}): {str(e)}")
import traceback
print(traceback.format_exc())
# Only continue retrying if we haven't reached the max attempts
if attempt < 3:
print(f"Retrying upload... ({attempt+2}/4)")
continue
else:
print("All upload attempts failed.")
# If upload fails, return placeholders
return "#", "N/A"
def convert_audio_segment_to_float_array(audio_pydub):
"""
Convert a pydub AudioSegment to a NumPy array of type float32.
Args:
audio_pydub (AudioSegment): The AudioSegment object to be converted.
Returns:
np.ndarray: A NumPy array containing the audio data as float32.
"""
# Get the raw audio data as a sequence of samples
samples = audio_pydub.get_array_of_samples()
# Convert the samples to a NumPy array and normalize to float32
audio_array = np.array(samples).astype(np.float32)
# Normalize the audio array to range between -1.0 and 1.0
max_val = 2**15 # Assuming 16-bit audio, modify this if using different bit depths
audio_array /= max_val
return audio_array
def time_post_request(api_url, headers=None, payload=None, timeout=None):
"""
Times the execution of a POST request.
Parameters:
- api_url (str): The URL to which the POST request is sent.
- headers (dict): The headers to include in the POST request.
- payload (dict): The payload to include in the POST request.
- timeout (int): The timeout value in seconds for the POST request (optional).
Returns:
- response (requests.Response): The response object returned by the POST request.
- execution_time (float): The time it took to execute the POST request.
"""
start_time = time.time()
response = requests.post(api_url, headers=headers, json=payload, timeout=timeout)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")
return response
def check_server_status():
"""Check if the server is warm by making a minimal request"""
warmup_payload = {"inputs": {"prompt": "warmup", "duration": 1}}
try:
response = time_post_request(
API_URL,
headers=get_api_headers(),
payload=warmup_payload,
timeout=10 # Short timeout for quick check
)
return response is not None and response.status_code == 200
except:
return False
def wait_for_server_warmup(status_placeholder, progress_placeholder, subscription_info_placeholder, stage_placeholder):
"""Handle server warmup with detailed progress stages and proceed immediately when server is ready"""
stages = [
{"time": 0, "message": "đ Initializing server resources..."},
{"time": 60, "message": "âī¸ Loading the AI model into memory..."},
{"time": 180, "message": "đ§ Optimizing model parameters..."},
{"time": 300, "message": "đ Configuring generation settings..."},
{"time": 360, "message": "⨠Finalizing server preparation..."}
]
subscription_info = """
### đ **Subscription Tiers**
**Free User**
- đĩ **Generations:** 3 music generations
- đĨ **Downloads:** No downloads allowed
- đŧ **Submit your track from dashboard**
- đ° **Receive 70% commission**
- đĩ **Tracks sale for on website**
**Tier 2 - $24.99/month**
- đĩ **Generations:** Up to 3 music generations per day
- đĨ **Downloads:** 1 download per month
- đ **Private Tracks:** Create unpublished, private tracks
- đ **Dashboard:** Access your profile and save up to 2 tracks
**Tier 3 - $99.99/month**
- đĩ **Generations:** Up to 5 music generations per day
- đĨ **Downloads:** 5 downloads per month
- đ **Dashboard:** Full profile access for managing and saving your tracks
- đž **Storage:** Save up to 10 tracks
**Tier 4 - $269.97/month**
- đĩ **Generations:** Up to 20 music generations per day
- đĨ **Downloads:** 15 downloads per month
- đĸ **Commercial Use:** Includes rights for commercial purposes
- đ **Dashboard:** Advanced profile management for commercial accounts
### đ **Why Subscribe?**
Upgrade now to unlock more features and enhance your music creation experience!
"""
start_time = time.time()
total_warmup_time = 420 # 7 minutes
current_stage = 0
last_check_time = 0
while time.time() - start_time < total_warmup_time:
elapsed = time.time() - start_time
remaining = total_warmup_time - elapsed
progress = elapsed / total_warmup_time
# Update current stage
while current_stage < len(stages) - 1 and elapsed > stages[current_stage + 1]["time"]:
current_stage += 1
minutes_remaining = int(remaining // 60)
seconds_remaining = int(remaining % 60)
# Update UI
status_placeholder.markdown("### đ AI Server Initialization in Progress")
progress_placeholder.progress(progress)
stage_placeholder.info(
f"{stages[current_stage]['message']}\n\n"
f"âŗ Estimated time remaining: {minutes_remaining}m {seconds_remaining}s\n\n"
"âšī¸ **Why the wait?** The AI model needs time to load and prepare when the server is inactive.\n"
"Upgrading to a premium plan keeps the server active longer, reducing or eliminating wait times!\n"
"đ [Upgrade Now](https://songlabai.com/subscribe/)"
)
subscription_info_placeholder.markdown(subscription_info)
# Check if server is ready every 5 seconds
if elapsed - last_check_time >= 5:
last_check_time = elapsed
if check_server_status():
status_placeholder.success("đ Server is ready! Proceeding to music generation...")
return True
time.sleep(0.1)
return False
def update_download_count():
"""Update download count for the current user"""
auth_headers = get_auth_headers()
try:
update_url = "https://songlabai.com/wp-json/custom-api/v1/update-download-count"
requests.post(update_url, headers=auth_headers)
return True
except Exception as e:
print(f"Ã Error updating download count: {str(e)}")
return False
# Update the load_and_play_generated_audio function to use direct notification
def load_and_play_generated_audio(response, genre=None, energy_level=None, tempo=None, description=None, duration=None):
"""
Load and display the generated audio with better error handling for CUDA/GPU errors
"""
try:
# Parse the response JSON
response_eval = json.loads(response.content)
# Check for error response from the model server
if 'error' in response_eval:
error_message = response_eval['error']
# Handle CUDA errors specially
if 'CUDA error' in error_message:
raise RuntimeError(f"GPU error on the server. Please try again later: {error_message.split('CUDA')[0]}")
else:
raise RuntimeError(f"Server error: {error_message}")
# Now we know the response is not an error, proceed normally
# For successful responses, the structure should be a list with the first element containing the audio data
if not isinstance(response_eval, list) or len(response_eval) == 0:
raise ValueError(f"Unexpected response format: {type(response_eval)}")
generated_audio = response_eval[0].get("generated_audio")
if generated_audio is None:
raise ValueError("No audio data found in response")
sample_rate = response_eval[0].get("sample_rate")
if sample_rate is None:
raise ValueError("No sample rate found in response")
# Check if the audio is stereo or mono
if isinstance(generated_audio[0], list): # Stereo
audio_array = np.array(generated_audio).T
else: # Mono
audio_array = np.array(generated_audio)
# Convert the float32 audio data to int16
max_val = np.max(np.abs(audio_array))
if max_val > 0:
audio_array = audio_array / max_val # Normalize to [-1.0, 1.0]
int_audio = np.int16(audio_array * 32767)
# Write the audio data to a BytesIO buffer using soundfile
audio_buffer = BytesIO()
sf.write(audio_buffer, int_audio, sample_rate, format='WAV', subtype='PCM_16')
audio_buffer.seek(0)
# Read the audio data into an AudioSegment
audio_segment = AudioSegment.from_file(audio_buffer, format="wav")
# Store in session state
st_state.audio_pydub = audio_segment
st_state.audio_sample_rate = sample_rate
st_state.augmented_audio_pydub = st_state.audio_pydub
st.session_state.audio_bytes = audio_buffer.getvalue()
st.session_state.playing_state = 'generated'
# Store metadata
st.session_state.audio_metadata = {
'genre': genre,
'energy_level': energy_level,
'tempo': tempo,
'description': description,
'duration': duration,
'generated_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# Get user ID from JWT token for notification
user_id = None
user_email = ''
user_name = ''
subscription_tier = 'Free Tier'
if st.session_state.get("jwt_token"):
try:
decoded = jwt.decode(st.session_state["jwt_token"], options={"verify_signature": False})
user_data = decoded.get('data', {}).get('user', {})
user_id = user_data.get('id')
user_email = user_data.get('user_email', '')
user_name = user_data.get('display_name', '')
# Get subscription info
subscription_status = get_current_subscription_status()
if not subscription_status['is_free_tier']:
plan_id = subscription_status.get('subscription_plan_id')
if plan_id == "2397":
subscription_tier = "Tier 1 ($24.99)"
elif plan_id == "305":
subscription_tier = "Tier 2 ($99.99)"
elif plan_id == "306":
subscription_tier = "Tier 3 ($269.97)"
except Exception as e:
print(f"Error decoding JWT: {str(e)}")
import traceback
print(traceback.format_exc())
# Upload the audio file to WordPress
audio_buffer.seek(0)
file_url, product_id = direct_upload_to_wordpress(
audio_buffer.read(),
user_id,
st.session_state.audio_metadata
)
# Send notification with complete user info
if user_id:
# Create notification with full user details
notification_url = "https://songlabai.com/wp-json/songlab/v1/notify"
notification_payload = {
"link": file_url,
"product_code": product_id,
"user_id": user_id,
"user_email": user_email, # Include email
"user_name": user_name, # Include name
"subscription_tier": subscription_tier # Include tier
}
# Send the notification
try:
auth_headers = get_auth_headers()
notification_response = requests.post(
notification_url,
headers=auth_headers,
json=notification_payload
)
print(f"Auto-notification response: {notification_response.status_code}")
print(f"Response text: {notification_response.text}")
except Exception as e:
print(f"Error in auto-notification: {str(e)}")
import traceback
print(traceback.format_exc())
# Display the audio using the centralized player function
display_audio_player(is_final=False)
return audio_array, sample_rate
except RuntimeError as re:
# Handle server-side errors (like CUDA/GPU errors)
print(f"Runtime error: {re}")
st.error(f"Server error: The AI model is currently experiencing technical difficulties. Please try again in a few minutes.")
st.info("đĄ This is a temporary issue with the AI server's GPU resources and not related to your account.")
return None, None
except Exception as e:
# Handle other errors
import traceback
error_details = traceback.format_exc()
print(f"Error in load_and_play_generated_audio: {e}")
print(f"Error details: {error_details}")
st.error(f"Failed to process generated audio: {e}")
return None, None
def generate_audio(genre, energy_level, tempo, description, duration):
# Create placeholders
status_placeholder = st.empty()
progress_placeholder = st.empty()
subscription_placeholder = st.empty()
stage_placeholder = st.empty()
try:
# First, verify user is logged in and has permissions
if st.session_state.get("jwt_token") is None:
st.error("You must be logged in to generate audio.")
st.session_state['login_needed'] = True
st.session_state['generate_audio_params'] = {
'genre': genre,
'energy_level': energy_level,
'tempo': tempo,
'description': description,
'duration': duration
}
st.rerun() # Show login form immediately
return None
auth_headers = get_auth_headers()
# Check subscription status
subscription_status = get_current_subscription_status()
if subscription_status['has_exceeded_generation_limit']:
if subscription_status['is_free_tier']:
st.error("â You've reached your generation limit on the free tier.")
st.info("đĄ Upgrade to a premium subscription for more generations!")
else:
st.warning("â ī¸ You've reached your daily generation limit. Limits will reset tomorrow.")
return None
# Check for free tier duration restriction
is_free_tier = subscription_status['is_free_tier']
if is_free_tier:
if duration > 30:
st.warning("â ī¸ Free tier users are limited to 30-second generations.")
st.info("đĄ Upgrade to a premium plan to generate longer tracks!\n\n" +
"**Available Plans:**\n" +
"- **$24.99/month:** Up to 3 generations per day, 1 download per month\n" +
"- **$99.99/month:** Up to 5 generations per day, 5 downloads per month\n" +
"- **$269.97/month:** Up to 20 generations per day, 15 downloads per month\n\n" +
"đ [Upgrade Now](https://songlabai.com/subscribe/)")
duration = 30 # Force duration to 30 seconds for free tier
# Check if server is cold
if not check_server_status():
status_placeholder.warning(
"đ **Server is currently starting up.**\n\n"
"Our AI model needs some time to load and prepare when the server has been inactive.\n"
"This can take up to **7 minutes** for free users.\n\n"
"đĄ **Upgrade to a premium plan to reduce or eliminate wait times!**\n"
"đ [Upgrade Now](https://songlabai.com/subscribe/)"
)
# Wait for server to warm up with progress display
server_ready = wait_for_server_warmup(
status_placeholder,
progress_placeholder,
subscription_placeholder,
stage_placeholder
)
if not server_ready:
st.error(
"â Server initialization timed out.\n\n"
"Consider upgrading to premium for reliable, fast access.\n\n"
"đ [Upgrade Now](https://songlabai.com/subscribe/)"
)
return None
# Clear warmup messages
for placeholder in [status_placeholder, progress_placeholder, subscription_placeholder, stage_placeholder]:
placeholder.empty()
# Prepare generation request
prompt = f"Genre: {genre}, Energy Level: {energy_level}, Tempo: {tempo}, Description: {description}"
payload = {"inputs": {"prompt": prompt, "duration": duration}}
api_headers = get_api_headers()
# Make the generation request
with st.spinner("đĩ Generating your music... This may take a few moments."):
response = time_post_request(API_URL, headers=api_headers, payload=payload, timeout=1200)
if response and response.status_code == 200:
# Check for error in response before showing success message
try:
response_data = json.loads(response.content)
if 'error' in response_data:
error_msg = response_data['error']
if 'CUDA error' in error_msg:
st.error("â The AI server is currently experiencing GPU resource issues.")
st.info("Please try again in a few minutes. This is a temporary server issue.")
print(f"CUDA error from server: {error_msg}")
return None
else:
st.error(f"â Server error: {error_msg}")
return None
# If we get here, response is valid
st.success("⨠Music generated successfully!")
except Exception as parse_error:
print(f"Error parsing response to check for errors: {parse_error}")
# Continue with normal flow - will be caught in load_and_play if there's an issue
st.success("⨠Music generated successfully!")
# Load and play the generated audio
result = load_and_play_generated_audio(
response=response,
genre=genre,
energy_level=energy_level,
tempo=tempo,
description=description,
duration=duration
)
# Only update count if audio was successfully generated
if result[0] is not None:
# Update generation count on WordPress
update_generation_url = "https://songlabai.com/wp-json/custom-api/v1/update-generation-count"
requests.post(update_generation_url, headers=auth_headers)
return response # Return the response for future use
else:
error_code = response.status_code if response else "No response"
st.error(
f"â Failed to generate audio (Error {error_code}).\n\n"
"This might be due to high server load or an error.\n\n"
"đĄ **Tip:** Premium users experience fewer failures and faster generation times.\n\n"
"đ [Upgrade Now](https://songlabai.com/subscribe/)"
)
return None
except Exception as e:
import traceback
print(f"Detailed error in generate_audio: {traceback.format_exc()}")
st.error(f"An unexpected error occurred: {e}")
return None
def update_generate_button(genre, energy_level, tempo, description, duration):
"""Create or update the generate button based on current state"""
generate_btn_col, login_prompt_col = st.columns([1, 2])
with generate_btn_col:
generate_button_enabled = True
tooltip = ""
# Determine if button should be enabled
if not st.session_state.get("jwt_token"):
generate_button_enabled = False
tooltip = "Please log in using the sidebar to generate music"
elif genre and energy_level and tempo:
if not confirmed_description: # Check confirmed_description instead
generate_button_enabled = False
tooltip = "Please add and confirm a description"
else:
# Always fetch fresh subscription status when checking button state
subscription_status = get_current_subscription_status()
if subscription_status['has_exceeded_generation_limit']:
generate_button_enabled = False
tooltip = "You've reached your generation limit"
else:
generate_button_enabled = False
tooltip = "Please fill in all required fields"
# Create the button (disabled if needed)
if generate_button_enabled:
if st.button("đĩ Generate Audio", key="generate_audio_button", type="primary"):
generate_audio(genre, energy_level, tempo, description, duration)
else:
# Create disabled button with tooltip
st.markdown(
f"""
""",
unsafe_allow_html=True
)
with login_prompt_col:
# Show login prompt if not logged in
if not st.session_state.get("jwt_token"):
st.info("đ Please log in using the sidebar to generate music")
# If we just logged in, clear the flag and rerun to refresh the button
if st.session_state.get("login_refresh_needed"):
st.session_state["login_refresh_needed"] = False
st.rerun()
# Call the sidebar function at the beginning of the app
show_sidebar()
# Streamlit app title
st.title("Songlab AI")
# Main UI components
genres = [
"Pop",
"Rock",
"Hip Hop",
"Jazz",
"Blues",
"Country",
"Classical",
"Electronic",
"Reggae",
"Folk",
"R&B",
"Metal",
"Punk",
"Indie",
"Dance",
"World",
"Gospel",
"Soul",
"Funk",
"Ambient",
"Techno",
"Disco",
"House",
"Trance",
"Dubstep",
]
genre = st.selectbox("Select Genre:", genres)
energy_levels = ["Low", "Medium", "High"]
energy_level = st.radio("Energy Level:", energy_levels, horizontal=True)
desc_col, button_col = st.columns([4, 1])
with desc_col:
description = st.text_input(
"Description:",
"",
key="description_input",
placeholder="Enter a description of your desired music"
)
with button_col:
# Add some vertical spacing to align with the text box
st.markdown("", unsafe_allow_html=True)
confirm_clicked = st.button("Confirm", key="confirm_description")
if confirm_clicked:
# This will trigger a rerun with the current description value
st.session_state["confirmed_description"] = description
st.rerun()
# Use the confirmed description for button enablement
confirmed_description = st.session_state.get("confirmed_description", "")
tempo = st.slider("Tempo (in bpm):", min_value=40, max_value=100, value=60, step=5)
# Get subscription status for duration limits
subscription_status = get_current_subscription_status()
is_free_tier = subscription_status['is_free_tier']
duration_col, promo_col = st.columns([2, 1])
with duration_col:
# Modify the existing duration slider logic
if is_free_tier and not st.session_state.get('promo_code_applied', False):
duration = st.slider(
"Duration (in seconds):",
min_value=15,
max_value=30, # Restricted to 30 seconds for free tier
value=30,
step=1,
help="Free tier users are limited to 30-second generations. Upgrade to create longer tracks or use a promo code!"
)
else:
duration = st.slider(
"Duration (in seconds):",
min_value=15,
max_value=140,
value=30,
step=1
)
with promo_col:
# Add the promo code text input
promo_code = st.text_input("Promo Code:", "",
help="Enter a promo code to unlock premium features")
# Check for valid promo code
if promo_code == "promosongtester" and not st.session_state.get('promo_code_applied', False):
if st.button("Apply Code"):
st.session_state['promo_code_applied'] = True
st.success("â Promo code applied! You now have access to full 3-minute duration.")
st.rerun() # Rerun to update the UI
# Show status if promo code is already applied
elif st.session_state.get('promo_code_applied', False):
st.success("â Promo code active")
# Add option to remove promo code
if st.button("Remove Code"):
st.session_state['promo_code_applied'] = False
st.rerun() # Rerun to update the UI
# Use the updated generate button function
update_generate_button(genre, energy_level, tempo, description, duration)
# Add this section after your main generation UI but before post-processing options
st.divider()
st.header("đĩ Load Previous Music")
st.markdown("""
Upload a previously downloaded music clip to continue post-processing, add vocals,
or apply audio effects. Supported formats: WAV, MP3, OGG, FLAC.
""")
uploaded_music = st.file_uploader(
"Upload previous music clip",
type=["wav", "mp3", "ogg", "flac"],
help="Upload music you previously generated and downloaded from Songlab AI"
)
if uploaded_music:
# Show a preview player for the uploaded music
st.audio(uploaded_music, format=f"audio/{uploaded_music.name.split('.')[-1]}")
# Process button
if st.button("Load for Post-Processing", key="load_uploaded_music"):
try:
# Read the uploaded music into memory
with st.spinner("Loading audio..."):
# Convert the uploaded file to an AudioSegment
audio_data = uploaded_music.read()
# Create a temporary file-like object
audio_buffer = BytesIO(audio_data)
audio_buffer.seek(0)
# Load into AudioSegment based on file extension
file_extension = uploaded_music.name.split('.')[-1].lower()
audio_segment = AudioSegment.from_file(audio_buffer, format=file_extension)
# Convert to the same format used in your application
if file_extension != 'wav':
wav_buffer = BytesIO()
audio_segment.export(wav_buffer, format='wav')
wav_buffer.seek(0)
audio_segment = AudioSegment.from_wav(wav_buffer)
# Store in session state
st_state.audio_pydub = audio_segment
st_state.audio_sample_rate = audio_segment.frame_rate
st_state.augmented_audio_pydub = audio_segment
st_state.augmented_audio_sample_rate = audio_segment.frame_rate
# Convert to bytes for player
wav_bytes = BytesIO()
audio_segment.export(wav_bytes, format='wav')
wav_bytes.seek(0)
st.session_state.audio_bytes = wav_bytes.getvalue()
# Set playing state
st.session_state.playing_state = 'generated'
# Clear any existing metadata
st.session_state.audio_metadata = {
'genre': 'Unknown (Uploaded)',
'energy_level': 'Unknown',
'tempo': 'Unknown',
'description': f'Uploaded file: {uploaded_music.name}',
'duration': len(audio_segment)/1000 # Convert ms to seconds
}
# Success message
st.success("đ Music loaded successfully! Scroll down to use post-processing tools.")
# Force refresh to update the UI with the loaded audio
st.rerun()
except Exception as e:
st.error(f"â Error loading audio file: {str(e)}")
st.info("Please make sure the file is a valid audio file in one of the supported formats.")
# Add this helper info section
with st.expander("âšī¸ About uploading previous music"):
st.markdown("""
### Why upload previous music?
This feature allows you to:
1. **Continue editing** music you previously downloaded
2. **Add vocals** to your previously generated instrumental tracks
3. **Apply different effects** to experiment with various sounds
4. **Combine multiple tracks** by uploading them sequentially
### Supported file formats
- **WAV** (.wav) - Highest quality, uncompressed
- **MP3** (.mp3) - Common compressed format
- **OGG** (.ogg) - Free, open container format
- **FLAC** (.flac) - Lossless compression format
For best results, upload WAV files that were previously downloaded from Songlab AI.
""")
# This divider separates the upload section from post-processing options
st.divider()
# Post-processing options - only show if we have audio to process
if st_state.audio_pydub is not None:
st.header("Post-processing Options")
vocal_file = st.file_uploader(
"Upload Vocal File", type=["mp3", "wav", "ogg", "flac", "aac"]
)
if vocal_file:
st_state.vocal_audio = vocal_file.read()
# Mixing
mix_vocals = st.checkbox("Mix Vocals")
if mix_vocals and st_state.vocal_audio is not None:
with NamedTemporaryFile() as f:
f.write(st_state.vocal_audio)
st_state.vocal_audio = AudioSegment.from_file(f.name)
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.overlay(
st_state.vocal_audio, position=100
)
st_state.augmented_audio = convert_audio_segment_to_float_array(
st_state.augmented_audio_pydub
)
st_state.augmented_audio_sample_rate = st_state.augmented_audio_pydub.frame_rate
st.session_state.playing_state = 'processed'
elif not mix_vocals and st_state.vocal_audio is not None:
st_state.augmented_audio_pydub = st_state.audio_pydub
st_state.augmented_audio_sample_rate = st_state.audio_pydub.frame_rate
# Mastering
st.header("Mastering")
st.markdown("")
# Volume Balance, Compression Ratio, and Reverb Amount
vol_col, pitch_shift, buttons_col = st.columns([2, 2, 2.5])
with buttons_col:
with st.container(height=371, border=True):
st.markdown("")
apply_stereo = st.button("Apply Stereo Effect")
st.markdown("")
reverse = st.button("Apply Audio Reverse")
st.markdown("")
reset_post_processing = st.button("Undo All Post-processings")
st.markdown("")
with vol_col:
with st.container(border=True):
volume_balance = svs.vertical_slider(
"Volume Balance",
min_value=-10.0,
max_value=10.0,
default_value=0.0,
step=0.1,
slider_color="green",
track_color="lightgray",
thumb_color="red",
thumb_shape="pill",
)
vol_button = st.button("Apply Vol-Balance")
# Pitch shifting
with pitch_shift:
with st.container(border=True):
pitch_semitones = svs.vertical_slider(
label="Pitch (semitones)",
min_value=-12,
max_value=12,
default_value=0,
step=1,
slider_color="red",
track_color="lightgray",
thumb_color="red",
thumb_shape="pill",
)
pitch_shift_button = st.button("Apply Pitch Shift")
effect_applied = False
if st_state.augmented_audio_pydub is not None:
if vol_button:
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub + volume_balance
effect_applied = True
st.session_state.playing_state = 'processed'
if apply_stereo:
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.pan(
-0.5
).overlay(st_state.augmented_audio_pydub.pan(0.5))
effect_applied = True
st.session_state.playing_state = 'processed'
if reverse:
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.reverse()
effect_applied = True
st.session_state.playing_state = 'processed'
if pitch_shift_button:
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub._spawn(
st_state.augmented_audio_pydub.raw_data,
overrides={
"frame_rate": int(
st_state.augmented_audio_pydub.frame_rate
* (2 ** (pitch_semitones / 12.0))
)
},
)
effect_applied = True
st.session_state.playing_state = 'processed'
if reset_post_processing and st_state.audio_pydub is not None:
st_state.augmented_audio_pydub = st_state.audio_pydub
st.session_state.playing_state = 'generated'
effect_applied = True
# Display the final audio if processed
if effect_applied or st.session_state.playing_state == 'processed':
display_audio_player(is_final=True)
# Force rerun if any effect was applied to show the updated audio
if effect_applied:
st.rerun()