File size: 4,395 Bytes
1e06115 d48fce1 5b6c8ca d48fce1 2b12a37 5b6c8ca d48fce1 4471c86 1e06115 5b6c8ca 4471c86 1e06115 4471c86 5b6c8ca 1e06115 d76c658 d48fce1 d76c658 d48fce1 d76c658 1e06115 5b6c8ca 1e06115 d76c658 5b6c8ca d76c658 1e06115 5b6c8ca 1e06115 5b6c8ca 1e06115 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
import glob
import os
import random
import requests
from loguru import logger
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip
def check_file_exists(file_path):
if not os.path.exists(file_path):
raise FileNotFoundError(f"Tệp {file_path} không tồn tại.")
# Đường dẫn đầy đủ tới các file
bgm_file = "data/bg_music.mp3"
output_path = "output/final_video.mp4"
check_file_exists(bgm_file)
def get_pexels_video(query):
api_key = os.getenv('Pexels_API_KEY')
if not api_key:
raise ValueError("API key không được tìm thấy trong biến môi trường.")
url = f"https://api.pexels.com/videos/search?query={query}&per_page=30"
headers = {"Authorization": api_key}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data['videos']:
return random.choice(data['videos'])['video_files'][0]['link']
logger.error("Không tìm thấy video nào.")
return None
def download_video(url, save_path="downloaded_video.mp4"):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
return save_path
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download video from {url}: {e}")
return None
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""):
if not bgm_type:
return ""
if bgm_type == "random":
suffix = "*.mp3"
song_dir = "/data/bg-music"
files = glob.glob(os.path.join(song_dir, suffix))
return random.choice(files) if files else ""
if os.path.exists(bgm_file):
return bgm_file
return ""
def add_background_music(video_path, bgm_file, output_path="video_with_bgm.mp4"):
check_file_exists(video_path)
check_file_exists(bgm_file)
video = VideoFileClip(video_path)
bgm = AudioFileClip(bgm_file).subclip(0, video.duration)
final_audio = CompositeAudioClip([video.audio, bgm])
final_video = video.set_audio(final_audio)
final_video.write_videofile(output_path, audio_codec="aac")
return output_path
def combine_videos(combined_video_path: str,
video_paths: list[str],
audio_file: str,
max_clip_duration: int = 5,
threads: int = 2,
) -> str:
check_file_exists(audio_file)
audio_clip = AudioFileClip(audio_file)
audio_duration = audio_clip.duration
logger.info(f"Max duration of audio: {audio_duration} seconds")
clips = []
video_duration = 0
while video_duration < audio_duration:
random.shuffle(video_paths)
for video_path in video_paths:
check_file_exists(video_path)
clip = VideoFileClip(video_path).without_audio()
if (audio_duration - video_duration) < clip.duration:
clip = clip.subclip(0, (audio_duration - video_duration))
elif max_clip_duration < clip.duration:
clip = clip.subclip(0, max_clip_duration)
clip = clip.set_fps(30)
clips.append(clip)
video_duration += clip.duration
final_clip = concatenate_videoclips(clips)
final_clip = final_clip.set_fps(30)
logger.info(f"Writing combined video to {combined_video_path}")
final_clip.write_videofile(combined_video_path, threads=threads)
logger.success(f"Completed combining videos")
return combined_video_path
def generate_video(video_paths: list[str],
audio_path: str,
output_file: str,
) -> str:
logger.info(f"Start generating video")
combined_video_path = "temp_combined_video.mp4"
combine_videos(combined_video_path, video_paths, audio_path)
final_video = VideoFileClip(combined_video_path)
audio_clip = AudioFileClip(audio_path)
final_video = final_video.set_audio(audio_clip)
logger.info(f"Writing final video to {output_file}")
final_video.write_videofile(output_file, audio_codec="aac")
os.remove(combined_video_path)
logger.success(f"Completed generating video: {output_file}")
return output_file |