zwgao's picture
add file
3fdcc70
raw
history blame
3.86 kB
import io
import os
import os.path as osp
import uuid
import requests
from pathlib import Path
import av
import numpy as np
import moviepy.editor as mpe
from cllm.services.utils import get_bytes_value
__ALL__ = [
"video_classification",
"video_captioning",
"image_to_video",
"text_to_video",
"video_to_webpage",
"dub_video",
]
HOST = os.environ.get("CLLM_SERVICES_HOST", "localhost")
PORT = os.environ.get("CLLM_SERVICES_PORT", 10056)
def setup(host="localhost", port=10056):
global HOST, PORT
HOST = host
PORT = port
def video_classification(video: str | Path | bytes, **kwargs):
host = kwargs.get("host", HOST)
port = kwargs.get("port", PORT)
url = f"http://{host}:{port}/video_classification"
files = {"video": (video, get_bytes_value(video))}
response = requests.post(url, files=files)
return response.json()
def video_captioning(video: str | Path, **kwargs):
host = kwargs.get("host", HOST)
port = kwargs.get("port", PORT)
url = f"http://{host}:{port}/video_captioning"
files = {"video": (video, get_bytes_value(video))}
response = requests.post(url, files=files)
return response.json()
def image_audio_to_video(image: str | Path, audio: str | Path, **kwargs):
host = kwargs.get("host", HOST)
port = kwargs.get("port", PORT)
url = f"http://{host}:{port}/image_audio_to_video"
files = {
"image": (image, get_bytes_value(image)),
"audio": (audio, get_bytes_value(audio)),
}
response = requests.post(url, files=files)
return response.content
def image_to_video(image: str | Path, **kwargs):
host = kwargs.get("host", HOST)
port = kwargs.get("port", PORT)
url = f"http://{host}:{port}/image_to_video"
files = {"image": (image, get_bytes_value(image))}
response = requests.post(url, files=files)
return response.content
def text_to_video(prompt: str, **kwargs):
host = kwargs.get("host", HOST)
port = kwargs.get("port", PORT)
url = f"http://{host}:{port}/text_to_video"
data = {"prompt": prompt}
response = requests.post(url, data=data)
return response.content
def video_to_webpage(
video: str | Path,
title: str,
tags: list[str],
description: str,
**kwargs,
):
host = kwargs.get("host", HOST)
port = kwargs.get("port", PORT)
url = f"http://{host}:{port}/video_to_webpage"
files = {"video": (video, get_bytes_value(video))}
data = {
"title": title,
"tags": tags,
"description": description,
}
response = requests.post(url, files=files, data=data)
return response.json()
def dub_video(video: str | Path | bytes, audio: str | Path | bytes, **kwargs):
root_dir = kwargs["root_dir"]
vid_file_location = osp.join(root_dir, video)
aud_file_location = osp.join(root_dir, audio)
video = mpe.VideoFileClip(vid_file_location)
# read audio file
audio = mpe.AudioFileClip(aud_file_location)
# set audio for video
new_video = video.set_audio(audio)
# export the video file
save_path = osp.join(root_dir, f"new_{str(uuid.uuid4())[:6]}.mp4")
new_video.write_videofile(save_path)
return open(save_path, "rb").read()
def decoding_key_frames(video: str | Path | bytes, **kwargs):
video = io.BytesIO(get_bytes_value(video))
container = av.open(video)
# extract evenly spaced frames from video
seg_len = container.streams.video[0].frames
indices = set(np.linspace(0, seg_len, num=4, endpoint=False).astype(np.int64))
frames = []
container.seek(0)
for i, frame in enumerate(container.decode(video=0)):
if i in indices:
stream = io.BytesIO()
# frame = frame.to_image().save(f"frame_{i}.png")
frame = frame.to_image().save(stream)
frames.append(frame)
return frames