|
import os |
|
import gradio as gr |
|
import numpy as np |
|
import soundfile as sf |
|
from semanticodec import SemantiCodec |
|
from huggingface_hub import HfApi |
|
import spaces |
|
import torch |
|
import tempfile |
|
import io |
|
import uuid |
|
import pickle |
|
import time |
|
from pathlib import Path |
|
|
|
|
|
def load_model(): |
|
model = SemantiCodec(token_rate=100, semantic_vocab_size=32768) |
|
if torch.cuda.is_available(): |
|
|
|
model = model.to("cuda:0") |
|
|
|
dummy_input = torch.zeros(1, 1, 1, dtype=torch.long).cuda() |
|
try: |
|
with torch.no_grad(): |
|
_ = model.decoder(dummy_input) |
|
except: |
|
print("Dummy forward pass failed, but CUDA initialization attempted") |
|
return model |
|
|
|
|
|
semanticodec = load_model() |
|
|
|
model_device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
print(f"Model initialized on device: {model_device}") |
|
|
|
|
|
|
|
SAMPLE_RATE = 16000 |
|
|
|
@spaces.GPU(duration=20) |
|
def encode_audio(audio_path): |
|
"""Encode audio file to tokens and return them as a file""" |
|
try: |
|
print(f"Encoding audio on device: {model_device}") |
|
|
|
semanticodec.to(model_device) |
|
tokens = semanticodec.encode(audio_path) |
|
print(f"Tokens device after encode: {tokens.device if isinstance(tokens, torch.Tensor) else 'numpy'}") |
|
|
|
|
|
if isinstance(tokens, torch.Tensor): |
|
tokens = tokens.cpu().numpy() |
|
|
|
|
|
if tokens.ndim == 1: |
|
|
|
tokens = tokens.reshape(1, -1, 1) |
|
|
|
|
|
token_data = { |
|
'tokens': tokens, |
|
'shape': tokens.shape, |
|
'device': str(model_device) |
|
} |
|
|
|
|
|
temp_dir = "/tmp" |
|
os.makedirs(temp_dir, exist_ok=True) |
|
temp_file_path = os.path.join(temp_dir, f"tokens_{uuid.uuid4()}.oterin") |
|
|
|
|
|
with open(temp_file_path, "wb") as f: |
|
pickle.dump(token_data, f) |
|
|
|
|
|
if not os.path.exists(temp_file_path) or os.path.getsize(temp_file_path) == 0: |
|
raise Exception("Failed to create token file") |
|
|
|
return temp_file_path, f"Encoded to {tokens.shape[1]} tokens" |
|
except Exception as e: |
|
print(f"Encoding error: {str(e)}") |
|
return None, f"Error encoding audio: {str(e)}" |
|
|
|
@spaces.GPU(duration=160) |
|
def decode_tokens(token_file): |
|
"""Decode tokens to audio""" |
|
|
|
if not token_file or not os.path.exists(token_file): |
|
return None, "Error: Empty or missing token file" |
|
|
|
try: |
|
|
|
with open(token_file, "rb") as f: |
|
token_data = pickle.load(f) |
|
|
|
tokens = token_data['tokens'] |
|
intended_device = token_data.get('device', model_device) |
|
print(f"Loaded tokens with shape {tokens.shape}, intended device: {intended_device}") |
|
|
|
|
|
semanticodec.to(model_device) |
|
print(f"Model device before tensor creation: {next(semanticodec.parameters()).device}") |
|
|
|
|
|
tokens_tensor = torch.tensor(tokens, dtype=torch.long) |
|
print(f"Tokens tensor created on device: {tokens_tensor.device} with dtype: {tokens_tensor.dtype}") |
|
|
|
|
|
tokens_tensor = tokens_tensor.to(model_device) |
|
print(f"Tokens moved to device: {tokens_tensor.device}") |
|
|
|
|
|
waveform = semanticodec.decode(tokens_tensor) |
|
print(f"Waveform device after decode: {waveform.device if isinstance(waveform, torch.Tensor) else 'numpy'}") |
|
|
|
|
|
if isinstance(waveform, torch.Tensor): |
|
waveform = waveform.cpu().numpy() |
|
|
|
|
|
audio_data = waveform[0, 0] |
|
|
|
print(f"Audio data shape: {audio_data.shape}, dtype: {audio_data.dtype}") |
|
|
|
|
|
return (SAMPLE_RATE, audio_data), f"Decoded {tokens.shape[1]} tokens to audio" |
|
except Exception as e: |
|
print(f"Decoding error: {str(e)}") |
|
return None, f"Error decoding tokens: {str(e)}" |
|
|
|
@spaces.GPU(duration=250) |
|
def process_both(audio_path): |
|
"""Encode and then decode the audio without saving intermediate files""" |
|
try: |
|
print(f"Processing both on device: {model_device}") |
|
|
|
semanticodec.to(model_device) |
|
|
|
tokens = semanticodec.encode(audio_path) |
|
print(f"Tokens device after encode: {tokens.device if isinstance(tokens, torch.Tensor) else 'numpy'}") |
|
|
|
if isinstance(tokens, torch.Tensor): |
|
tokens = tokens.cpu().numpy() |
|
|
|
|
|
if tokens.ndim == 1: |
|
|
|
tokens = tokens.reshape(1, -1, 1) |
|
|
|
|
|
tokens_tensor = torch.tensor(tokens, dtype=torch.long) |
|
print(f"Tokens tensor created on device: {tokens_tensor.device} with dtype: {tokens_tensor.dtype}") |
|
|
|
|
|
tokens_tensor = tokens_tensor.to(model_device) |
|
print(f"Tokens moved to device: {tokens_tensor.device}") |
|
|
|
|
|
semanticodec.to(model_device) |
|
print(f"Model device before decode: {next(semanticodec.parameters()).device}") |
|
|
|
|
|
waveform = semanticodec.decode(tokens_tensor) |
|
print(f"Waveform device after decode: {waveform.device if isinstance(waveform, torch.Tensor) else 'numpy'}") |
|
|
|
|
|
if isinstance(waveform, torch.Tensor): |
|
waveform = waveform.cpu().numpy() |
|
|
|
|
|
audio_data = waveform[0, 0] |
|
|
|
print(f"Audio data shape: {audio_data.shape}, dtype: {audio_data.dtype}") |
|
|
|
|
|
return (SAMPLE_RATE, audio_data), f"Encoded to {tokens.shape[1]} tokens\nDecoded {tokens.shape[1]} tokens to audio" |
|
except Exception as e: |
|
print(f"Processing error: {str(e)}") |
|
return None, f"Error processing audio: {str(e)}" |
|
|
|
@spaces.GPU(duration=250) |
|
def stream_both(audio_path): |
|
"""Encode and then stream decode the audio""" |
|
try: |
|
print(f"Processing both (streaming) on device: {model_device}") |
|
|
|
semanticodec.to(model_device) |
|
|
|
|
|
tokens = semanticodec.encode(audio_path) |
|
if isinstance(tokens, torch.Tensor): |
|
tokens = tokens.cpu().numpy() |
|
|
|
|
|
if tokens.ndim == 1: |
|
tokens = tokens.reshape(1, -1, 1) |
|
|
|
print(f"Encoded audio to {tokens.shape[1]} tokens, now streaming decoding...") |
|
yield None, f"Encoded to {tokens.shape[1]} tokens, starting decoding..." |
|
|
|
|
|
if tokens.shape[1] < 1500: |
|
|
|
tokens_tensor = torch.tensor(tokens, dtype=torch.long).to(model_device) |
|
|
|
|
|
semanticodec.to(model_device) |
|
waveform = semanticodec.decode(tokens_tensor) |
|
if isinstance(waveform, torch.Tensor): |
|
waveform = waveform.cpu().numpy() |
|
|
|
audio_data = waveform[0, 0] |
|
yield (SAMPLE_RATE, audio_data), f"Encoded to {tokens.shape[1]} tokens and decoded to audio" |
|
return |
|
|
|
|
|
chunk_size = 1500 |
|
num_chunks = (tokens.shape[1] + chunk_size - 1) // chunk_size |
|
|
|
all_audio_chunks = [] |
|
|
|
for i in range(num_chunks): |
|
start_idx = i * chunk_size |
|
end_idx = min((i + 1) * chunk_size, tokens.shape[1]) |
|
|
|
print(f"Decoding chunk {i+1}/{num_chunks}, tokens {start_idx} to {end_idx}") |
|
|
|
|
|
token_chunk = tokens[:, start_idx:end_idx, :] |
|
|
|
|
|
tokens_tensor = torch.tensor(token_chunk, dtype=torch.long).to(model_device) |
|
|
|
|
|
semanticodec.to(model_device) |
|
|
|
|
|
waveform = semanticodec.decode(tokens_tensor) |
|
if isinstance(waveform, torch.Tensor): |
|
waveform = waveform.cpu().numpy() |
|
|
|
|
|
audio_chunk = waveform[0, 0] |
|
all_audio_chunks.append(audio_chunk) |
|
|
|
|
|
combined_audio = np.concatenate(all_audio_chunks) |
|
|
|
|
|
yield (SAMPLE_RATE, combined_audio), f"Encoded to {tokens.shape[1]} tokens\nDecoded chunk {i+1}/{num_chunks} ({end_idx}/{tokens.shape[1]} tokens)" |
|
|
|
|
|
time.sleep(0.1) |
|
|
|
|
|
combined_audio = np.concatenate(all_audio_chunks) |
|
yield (SAMPLE_RATE, combined_audio), f"Completed: Encoded to {tokens.shape[1]} tokens and fully decoded" |
|
|
|
except Exception as e: |
|
print(f"Streaming process error: {str(e)}") |
|
yield None, f"Error processing audio: {str(e)}" |
|
|
|
@spaces.GPU(duration=250) |
|
def stream_decode_tokens(token_file): |
|
"""Decode tokens to audio in streaming chunks""" |
|
|
|
if not token_file or not os.path.exists(token_file): |
|
yield None, "Error: Empty or missing token file" |
|
return |
|
|
|
try: |
|
|
|
with open(token_file, "rb") as f: |
|
token_data = pickle.load(f) |
|
|
|
tokens = token_data['tokens'] |
|
intended_device = token_data.get('device', model_device) |
|
print(f"Loaded tokens with shape {tokens.shape}, intended device: {intended_device}") |
|
|
|
|
|
semanticodec.to(model_device) |
|
|
|
|
|
if tokens.shape[1] < 1500: |
|
|
|
tokens_tensor = torch.tensor(tokens, dtype=torch.long) |
|
tokens_tensor = tokens_tensor.to(model_device) |
|
|
|
|
|
waveform = semanticodec.decode(tokens_tensor) |
|
if isinstance(waveform, torch.Tensor): |
|
waveform = waveform.cpu().numpy() |
|
|
|
audio_data = waveform[0, 0] |
|
yield (SAMPLE_RATE, audio_data), f"Decoded {tokens.shape[1]} tokens to audio" |
|
return |
|
|
|
|
|
chunk_size = 1500 |
|
num_chunks = (tokens.shape[1] + chunk_size - 1) // chunk_size |
|
|
|
|
|
yield None, f"Starting decoding of {tokens.shape[1]} tokens in {num_chunks} chunks..." |
|
|
|
all_audio_chunks = [] |
|
|
|
for i in range(num_chunks): |
|
start_idx = i * chunk_size |
|
end_idx = min((i + 1) * chunk_size, tokens.shape[1]) |
|
|
|
print(f"Decoding chunk {i+1}/{num_chunks}, tokens {start_idx} to {end_idx}") |
|
|
|
|
|
token_chunk = tokens[:, start_idx:end_idx, :] |
|
|
|
|
|
tokens_tensor = torch.tensor(token_chunk, dtype=torch.long) |
|
tokens_tensor = tokens_tensor.to(model_device) |
|
|
|
|
|
semanticodec.to(model_device) |
|
|
|
|
|
waveform = semanticodec.decode(tokens_tensor) |
|
if isinstance(waveform, torch.Tensor): |
|
waveform = waveform.cpu().numpy() |
|
|
|
|
|
audio_chunk = waveform[0, 0] |
|
all_audio_chunks.append(audio_chunk) |
|
|
|
|
|
combined_audio = np.concatenate(all_audio_chunks) |
|
|
|
|
|
yield (SAMPLE_RATE, combined_audio), f"Decoded chunk {i+1}/{num_chunks} ({end_idx}/{tokens.shape[1]} tokens)" |
|
|
|
|
|
time.sleep(0.1) |
|
|
|
|
|
combined_audio = np.concatenate(all_audio_chunks) |
|
yield (SAMPLE_RATE, combined_audio), f"Completed decoding all {tokens.shape[1]} tokens" |
|
|
|
except Exception as e: |
|
print(f"Streaming decode error: {str(e)}") |
|
yield None, f"Error decoding tokens: {str(e)}" |
|
|
|
|
|
with gr.Blocks(title="Oterin Audio Codec") as demo: |
|
gr.Markdown("# Oterin Audio Codec") |
|
gr.Markdown("Upload an audio file to encode it to semantic tokens, decode tokens back to audio, or do both.") |
|
|
|
with gr.Tab("Encode Audio"): |
|
with gr.Row(): |
|
encode_input = gr.Audio(type="filepath", label="Input Audio") |
|
encode_output = gr.File(label="Encoded Tokens (.oterin)", file_types=[".oterin"]) |
|
encode_status = gr.Textbox(label="Status") |
|
encode_btn = gr.Button("Encode") |
|
encode_btn.click(encode_audio, inputs=encode_input, outputs=[encode_output, encode_status]) |
|
|
|
with gr.Tab("Decode Tokens"): |
|
with gr.Row(): |
|
decode_input = gr.File(label="Token File (.oterin)", file_types=[".oterin"]) |
|
decode_output = gr.Audio(label="Decoded Audio") |
|
decode_status = gr.Textbox(label="Status") |
|
decode_btn = gr.Button("Decode") |
|
decode_btn.click(decode_tokens, inputs=decode_input, outputs=[decode_output, decode_status]) |
|
|
|
with gr.Tab("Stream Decode (Listen while decoding)"): |
|
with gr.Row(): |
|
stream_decode_input = gr.File(label="Token File (.oterin)", file_types=[".oterin"]) |
|
stream_decode_output = gr.Audio(label="Streaming Audio Output") |
|
stream_decode_status = gr.Textbox(label="Status") |
|
stream_decode_btn = gr.Button("Start Streaming Decode") |
|
stream_decode_btn.click( |
|
stream_decode_tokens, |
|
inputs=stream_decode_input, |
|
outputs=[stream_decode_output, stream_decode_status], |
|
show_progress=True |
|
) |
|
|
|
with gr.Tab("Both (Encode & Decode)"): |
|
with gr.Row(): |
|
both_input = gr.Audio(type="filepath", label="Input Audio") |
|
both_output = gr.Audio(label="Reconstructed Audio") |
|
both_status = gr.Textbox(label="Status") |
|
both_btn = gr.Button("Process") |
|
both_btn.click(process_both, inputs=both_input, outputs=[both_output, both_status]) |
|
|
|
with gr.Tab("Both Streaming (Encode & Stream Decode)"): |
|
with gr.Row(): |
|
stream_both_input = gr.Audio(type="filepath", label="Input Audio") |
|
stream_both_output = gr.Audio(label="Streaming Reconstructed Audio") |
|
stream_both_status = gr.Textbox(label="Status") |
|
stream_both_btn = gr.Button("Encode & Stream Decode") |
|
stream_both_btn.click( |
|
stream_both, |
|
inputs=stream_both_input, |
|
outputs=[stream_both_output, stream_both_status], |
|
show_progress=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(share=True) |