|
import os |
|
import pathlib |
|
import shutil |
|
import subprocess |
|
import sys |
|
import uuid |
|
from pathlib import Path |
|
|
|
import gradio as gr |
|
import opengraph |
|
import requests |
|
from moviepy.editor import AudioFileClip |
|
|
|
output_dir = Path("temp/").absolute() |
|
output_dir.mkdir(exist_ok=True, parents=True) |
|
|
|
|
|
class SpotifyApi: |
|
spotify_directory = Path(output_dir / "spotify") |
|
|
|
def __init__(self, url): |
|
self.setup_spotify() |
|
self.url = url |
|
self.opengraph = opengraph.OpenGraph(url=url) |
|
self.random_string = str(uuid.uuid4())[:8] |
|
self.perma_output_path = Path(output_dir / self.random_string) |
|
self.temp_output_path = Path( |
|
self.spotify_directory / self.random_string |
|
).resolve() |
|
self.folder_dir: Path = None |
|
|
|
def setup_spotify(self) -> None: |
|
|
|
if not os.path.exists("spotify.rc"): |
|
with open("spotify.rc", "w") as f: |
|
f.write( |
|
f"{os.environ['SPOTIFY_USERNAME']} {os.environ['SPOTIFY_PASSWORD']}" |
|
) |
|
subprocess.call(["spodcast", "-l", "spotify.rc"]) |
|
|
|
def download_episode(self) -> str: |
|
out_path = self.temp_output_path.resolve() |
|
subprocess.call(["spodcast", "--root-path", out_path, self.url]) |
|
mp3_path = self.get_final_mp3() |
|
assert mp3_path is not None |
|
return mp3_path |
|
|
|
def download_image(self): |
|
image = self.opengraph["image"] |
|
r = requests.get(image, allow_redirects=True) |
|
path = self.perma_output_path.with_suffix(".jpg").absolute() |
|
open(path, "wb").write(r.content) |
|
return path |
|
|
|
def get_title(self) -> str: |
|
return self.opengraph["title"] |
|
|
|
|
|
def get_final_mp3(self): |
|
for root, dirs, files in os.walk(self.temp_output_path.resolve()): |
|
for file in files: |
|
if file.endswith(".mp3"): |
|
final_mp3 = self.perma_output_path.with_suffix(".mp3").absolute() |
|
shutil.copy(os.path.join(root, file), final_mp3) |
|
shutil.rmtree(self.temp_output_path.absolute()) |
|
return final_mp3.as_posix() |
|
|
|
|
|
class AudioInput: |
|
def __init__(self, path: str, start_time: int, run_for: int): |
|
self.path = path |
|
self.start_time = start_time |
|
self.run_for = run_for |
|
|
|
|
|
def process_inputs( |
|
prompt: str, audio_path: str, spotify_url: str, start_time: int, run_for: int |
|
) -> str: |
|
audio_input = AudioInput(audio_path, start_time, run_for) |
|
if spotify_url: |
|
spotify = SpotifyApi(spotify_url) |
|
audio_input.path = spotify.download_episode() |
|
spotify_image = spotify.download_image() |
|
images = get_stable_diffusion_images(prompt) |
|
video = animate_images(images, audio_input, spotify_image) |
|
return video |
|
|
|
|
|
def animate_images( |
|
image_paths: list[str], audio_input: AudioInput, overlay_image_path: str |
|
) -> str: |
|
from animate import ( |
|
create_mp4_with_audio, |
|
get_video_frames, |
|
) |
|
|
|
|
|
foldername = str(uuid.uuid4())[:8] |
|
vid_output_dir = Path(output_dir / foldername) |
|
vid_output_dir.mkdir(exist_ok=True, parents=True) |
|
audio_clip = AudioFileClip(audio_input.path) |
|
audio_clip = audio_clip.subclip( |
|
audio_input.start_time, audio_input.start_time + audio_input.run_for |
|
) |
|
video_frames, cv2_images = get_video_frames(image_paths, vid_output_dir) |
|
path = Path(vid_output_dir / "output_final.mp4") |
|
return create_mp4_with_audio( |
|
video_frames, |
|
cv2_images, |
|
audio_clip.duration, |
|
audio_clip, |
|
path, |
|
overlay_image_path.as_posix(), |
|
) |
|
|
|
|
|
def get_stable_diffusion_images(prompt) -> str: |
|
stable_diffusion = gr.Blocks.load(name="spaces/stabilityai/stable-diffusion") |
|
gallery_dir = stable_diffusion(prompt, fn_index=2) |
|
return [os.path.join(gallery_dir, img) for img in os.listdir(gallery_dir)][:2] |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_inputs, |
|
inputs=[ |
|
gr.Textbox(label="Describe your podcast clip"), |
|
gr.Audio(type="filepath", label="Upload an mp3"), |
|
gr.Textbox(label="Or Paste a spotify episode link"), |
|
gr.Number(label="Start time (in seconds)"), |
|
gr.Number(label="Run for (in seconds)"), |
|
], |
|
outputs="video", |
|
examples=[ |
|
[ |
|
"A podcast clip", |
|
None, |
|
"https://open.spotify.com/episode/31u9tI8t5IFrdv3QhZtPHI", |
|
50, |
|
60, |
|
] |
|
], |
|
) |
|
|
|
if __name__ == "__main__": |
|
subprocess.call( |
|
[ |
|
"git", |
|
"clone", |
|
"https://github.com/google-research/frame-interpolation", |
|
"frame_interpolation", |
|
] |
|
) |
|
sys.path.append("frame_interpolation") |
|
|
|
|
|
os.chdir( |
|
output_dir |
|
) |
|
iface.launch() |
|
|