File size: 4,395 Bytes
1e06115
 
 
 
 
d48fce1
 
5b6c8ca
 
 
 
d48fce1
2b12a37
5b6c8ca
 
 
d48fce1
4471c86
1e06115
5b6c8ca
 
 
4471c86
1e06115
 
 
 
4471c86
 
5b6c8ca
1e06115
 
d76c658
d48fce1
 
 
d76c658
 
 
 
d48fce1
 
d76c658
 
1e06115
 
 
 
 
5b6c8ca
1e06115
 
 
 
 
 
 
 
d76c658
5b6c8ca
 
 
d76c658
 
 
 
 
 
 
1e06115
 
 
 
 
 
5b6c8ca
 
1e06115
 
 
 
 
 
 
 
 
 
 
5b6c8ca
1e06115
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import glob
import os
import random
import requests
from loguru import logger
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip

def check_file_exists(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Tệp {file_path} không tồn tại.")

# Đường dẫn đầy đủ tới các file
bgm_file = "data/bg_music.mp3"
output_path = "output/final_video.mp4"

check_file_exists(bgm_file)

def get_pexels_video(query):
    api_key = os.getenv('Pexels_API_KEY')
    if not api_key:
        raise ValueError("API key không được tìm thấy trong biến môi trường.")
    
    url = f"https://api.pexels.com/videos/search?query={query}&per_page=30"
    headers = {"Authorization": api_key}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.json()
        if data['videos']:
            return random.choice(data['videos'])['video_files'][0]['link']
    logger.error("Không tìm thấy video nào.")
    return None

def download_video(url, save_path="downloaded_video.mp4"):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                f.write(chunk)
        return save_path
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to download video from {url}: {e}")
        return None

def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""):
    if not bgm_type:
        return ""
    if bgm_type == "random":
        suffix = "*.mp3"
        song_dir = "/data/bg-music"
        files = glob.glob(os.path.join(song_dir, suffix))
        return random.choice(files) if files else ""

    if os.path.exists(bgm_file):
        return bgm_file

    return ""

def add_background_music(video_path, bgm_file, output_path="video_with_bgm.mp4"):
    check_file_exists(video_path)
    check_file_exists(bgm_file)

    video = VideoFileClip(video_path)
    bgm = AudioFileClip(bgm_file).subclip(0, video.duration)
    final_audio = CompositeAudioClip([video.audio, bgm])
    final_video = video.set_audio(final_audio)
    final_video.write_videofile(output_path, audio_codec="aac")
    return output_path

def combine_videos(combined_video_path: str,
                   video_paths: list[str],
                   audio_file: str,
                   max_clip_duration: int = 5,
                   threads: int = 2,
                   ) -> str:
    check_file_exists(audio_file)

    audio_clip = AudioFileClip(audio_file)
    audio_duration = audio_clip.duration
    logger.info(f"Max duration of audio: {audio_duration} seconds")

    clips = []
    video_duration = 0

    while video_duration < audio_duration:
        random.shuffle(video_paths)

        for video_path in video_paths:
            check_file_exists(video_path)
            clip = VideoFileClip(video_path).without_audio()
            if (audio_duration - video_duration) < clip.duration:
                clip = clip.subclip(0, (audio_duration - video_duration))
            elif max_clip_duration < clip.duration:
                clip = clip.subclip(0, max_clip_duration)
            clip = clip.set_fps(30)
            clips.append(clip)
            video_duration += clip.duration

    final_clip = concatenate_videoclips(clips)
    final_clip = final_clip.set_fps(30)
    logger.info(f"Writing combined video to {combined_video_path}")
    final_clip.write_videofile(combined_video_path, threads=threads)
    logger.success(f"Completed combining videos")
    return combined_video_path

def generate_video(video_paths: list[str],
                   audio_path: str,
                   output_file: str,
                   ) -> str:
    logger.info(f"Start generating video")
    combined_video_path = "temp_combined_video.mp4"

    combine_videos(combined_video_path, video_paths, audio_path)

    final_video = VideoFileClip(combined_video_path)
    audio_clip = AudioFileClip(audio_path)
    final_video = final_video.set_audio(audio_clip)

    logger.info(f"Writing final video to {output_file}")
    final_video.write_videofile(output_file, audio_codec="aac")

    os.remove(combined_video_path)
    logger.success(f"Completed generating video: {output_file}")
    return output_file