Spaces:
Running
Running
import io | |
import os | |
import os.path as osp | |
import uuid | |
import requests | |
from pathlib import Path | |
import av | |
import numpy as np | |
import moviepy.editor as mpe | |
from cllm.services.utils import get_bytes_value | |
__ALL__ = [ | |
"video_classification", | |
"video_captioning", | |
"image_to_video", | |
"text_to_video", | |
"video_to_webpage", | |
"dub_video", | |
] | |
HOST = os.environ.get("CLLM_SERVICES_HOST", "localhost") | |
PORT = os.environ.get("CLLM_SERVICES_PORT", 10056) | |
def setup(host="localhost", port=10056): | |
global HOST, PORT | |
HOST = host | |
PORT = port | |
def video_classification(video: str | Path | bytes, **kwargs): | |
host = kwargs.get("host", HOST) | |
port = kwargs.get("port", PORT) | |
url = f"http://{host}:{port}/video_classification" | |
files = {"video": (video, get_bytes_value(video))} | |
response = requests.post(url, files=files) | |
return response.json() | |
def video_captioning(video: str | Path, **kwargs): | |
host = kwargs.get("host", HOST) | |
port = kwargs.get("port", PORT) | |
url = f"http://{host}:{port}/video_captioning" | |
files = {"video": (video, get_bytes_value(video))} | |
response = requests.post(url, files=files) | |
return response.json() | |
def image_audio_to_video(image: str | Path, audio: str | Path, **kwargs): | |
host = kwargs.get("host", HOST) | |
port = kwargs.get("port", PORT) | |
url = f"http://{host}:{port}/image_audio_to_video" | |
files = { | |
"image": (image, get_bytes_value(image)), | |
"audio": (audio, get_bytes_value(audio)), | |
} | |
response = requests.post(url, files=files) | |
return response.content | |
def image_to_video(image: str | Path, **kwargs): | |
host = kwargs.get("host", HOST) | |
port = kwargs.get("port", PORT) | |
url = f"http://{host}:{port}/image_to_video" | |
files = {"image": (image, get_bytes_value(image))} | |
response = requests.post(url, files=files) | |
return response.content | |
def text_to_video(prompt: str, **kwargs): | |
host = kwargs.get("host", HOST) | |
port = kwargs.get("port", PORT) | |
url = f"http://{host}:{port}/text_to_video" | |
data = {"prompt": prompt} | |
response = requests.post(url, data=data) | |
return response.content | |
def video_to_webpage( | |
video: str | Path, | |
title: str, | |
tags: list[str], | |
description: str, | |
**kwargs, | |
): | |
host = kwargs.get("host", HOST) | |
port = kwargs.get("port", PORT) | |
url = f"http://{host}:{port}/video_to_webpage" | |
files = {"video": (video, get_bytes_value(video))} | |
data = { | |
"title": title, | |
"tags": tags, | |
"description": description, | |
} | |
response = requests.post(url, files=files, data=data) | |
return response.json() | |
def dub_video(video: str | Path | bytes, audio: str | Path | bytes, **kwargs): | |
root_dir = kwargs["root_dir"] | |
vid_file_location = osp.join(root_dir, video) | |
aud_file_location = osp.join(root_dir, audio) | |
video = mpe.VideoFileClip(vid_file_location) | |
# read audio file | |
audio = mpe.AudioFileClip(aud_file_location) | |
# set audio for video | |
new_video = video.set_audio(audio) | |
# export the video file | |
save_path = osp.join(root_dir, f"new_{str(uuid.uuid4())[:6]}.mp4") | |
new_video.write_videofile(save_path) | |
return open(save_path, "rb").read() | |
def decoding_key_frames(video: str | Path | bytes, **kwargs): | |
video = io.BytesIO(get_bytes_value(video)) | |
container = av.open(video) | |
# extract evenly spaced frames from video | |
seg_len = container.streams.video[0].frames | |
indices = set(np.linspace(0, seg_len, num=4, endpoint=False).astype(np.int64)) | |
frames = [] | |
container.seek(0) | |
for i, frame in enumerate(container.decode(video=0)): | |
if i in indices: | |
stream = io.BytesIO() | |
# frame = frame.to_image().save(f"frame_{i}.png") | |
frame = frame.to_image().save(stream) | |
frames.append(frame) | |
return frames | |