File size: 8,504 Bytes
bc736f8
94559cb
b640e62
94559cb
 
 
 
 
 
 
 
49d7053
bc736f8
94559cb
49d7053
94559cb
 
a63bdf7
bc736f8
94559cb
 
9ec122f
94559cb
 
9e07c40
94559cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356c2dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94559cb
 
 
 
 
 
 
 
e3b3ac8
94559cb
356c2dc
94559cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356c2dc
94559cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356c2dc
94559cb
 
 
 
 
 
 
 
 
238c2d8
94559cb
 
 
 
 
 
 
 
 
 
 
 
 
e5c3484
94559cb
 
238c2d8
94559cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3b3ac8
94559cb
 
 
 
 
238c2d8
94559cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00318bd
e15181e
94559cb
 
 
 
 
 
 
5188c19
94559cb
 
 
bb98d15
94559cb
238c2d8
5188c19
94559cb
f220324
b30ba39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import gradio as gr
import requests
import re
import os
import json
import time
import threading
from googleapiclient.discovery import build
from huggingface_hub import InferenceClient
from pytube import YouTube
import whisper
import logging

# 둜그 μ„€μ •
logging.basicConfig(level=logging.INFO)

# Whisper λͺ¨λΈ λ‘œλ“œ
model = whisper.load_model("base")

# YouTube API ν‚€
API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY'

# YouTube API μ„œλΉ„μŠ€ λΉŒλ“œ
youtube = build('youtube', 'v3', developerKey=API_KEY)

# Hugging Face API μ„€μ •
client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN"))

WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc"
COMMENTS_FILE = 'comments.json'

DEFAULT_SYSTEM_PROMPT = "λŒ€ν™”μ‹œ λ°˜λ“œμ‹œ λ‚˜μ˜ 이름 'GPTube'λ₯Ό 밝히며 ν•œκΈ€λ‘œ 인사λ₯Όν•˜λΌ. λ°˜λ“œμ‹œ 'ν•œκΈ€'(ν•œκ΅­μ–΄)둜 250 토큰 μ΄λ‚΄λ‘œ 닡변을 μƒμ„±ν•˜κ³  좜λ ₯ν•˜λΌ. Respond to the following YouTube comment in a friendly and helpful manner:"

stop_event = threading.Event()  # μŠ€λ ˆλ“œ 쀑지λ₯Ό μœ„ν•œ 이벀트

def load_existing_comments():
    if os.path.exists(COMMENTS_FILE):
        with open(COMMENTS_FILE, 'r') as file:
            return json.load(file)
    return []

def save_comments(comments):
    with open(COMMENTS_FILE, 'w') as file:
        json.dump(comments, file)

def download_audio(video_url):
    try:
        yt = YouTube(video_url)
        audio = yt.streams.filter(only_audio=True).first()
        if audio is None:
            logging.error('μ˜€λ””μ˜€ μŠ€νŠΈλ¦Όμ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.')
            return None
        audio_path = audio.download(output_path=".")
        
        file_stats = os.stat(audio_path)
        logging.info(f'μ˜€λ””μ˜€ 파일 크기(Bytes): {file_stats.st_size}')
        
        if file_stats.st_size <= 30000000:  # 파일 크기 μ œν•œ 확인
            base, ext = os.path.splitext(audio_path)
            new_file = base + '.mp3'
            os.rename(audio_path, new_file)
            return new_file
        else:
            logging.error('파일 크기가 λ„ˆλ¬΄ ν½λ‹ˆλ‹€. 1.5μ‹œκ°„ μ΄ν•˜μ˜ λΉ„λ””μ˜€λ§Œ μ§€μ›λ©λ‹ˆλ‹€.')
            return None
    except Exception as e:
        logging.error(f"μ˜€λ””μ˜€ λ‹€μš΄λ‘œλ“œ 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
        return None

def generate_transcript(audio_path):
    try:
        if not audio_path or not os.path.exists(audio_path):
            raise ValueError("μœ νš¨ν•œ μ˜€λ””μ˜€ 파일 κ²½λ‘œκ°€ μ•„λ‹™λ‹ˆλ‹€.")
        
        result = model.transcribe(audio_path)
        return result['text'].strip()
    except Exception as e:
        logging.error(f"전사 쀑 였λ₯˜ λ°œμƒ: {str(e)}")
        return f"전사 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"

def generate_reply(comment_text, system_prompt):
    prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:"
    response = client.text_generation(
        prompt=prompt,
        max_new_tokens=250,
        temperature=0.7,
        top_p=0.9
    )
    if isinstance(response, dict) and 'generated_text' in response:
        return response['generated_text']
    return response

def send_webhook(data):
    response = requests.post(WEBHOOK_URL, json=data)
    return response.status_code, response.text

def get_video_comments(video_id):
    try:
        comments = []
        request = youtube.commentThreads().list(
            part='snippet',
            videoId=video_id,
            maxResults=100,   # λŒ“κΈ€ μ½μ–΄λ“€μ΄λŠ” 수 μ •μ˜
            textFormat='plainText'
        )
        response = request.execute()
        while request is not None:
            for item in response['items']:
                snippet = item['snippet']['topLevelComment']['snippet']
                comment = {
                    'comment_id': item['snippet']['topLevelComment']['id'],
                    'author': snippet['authorDisplayName'],
                    'published_at': snippet['publishedAt'],
                    'text': snippet['textDisplay'],
                    'reply_count': item['snippet']['totalReplyCount']
                }
                comments.append(comment)
            if 'nextPageToken' in response:
                request = youtube.commentThreads().list(
                    part='snippet',
                    videoId=video_id,
                    pageToken=response['nextPageToken'],
                    maxResults=100,  # λŒ“κΈ€ μ½μ–΄λ“€μ΄λŠ” 수 μ •μ˜
                    textFormat='plainText'
                )
                response = request.execute()
            else:
                break
        return comments
    except Exception as e:
        return [{'error': str(e)}]

def fetch_comments(video_url, system_prompt):
    log_entries = []
    video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url)
    if video_id_match:
        video_id = video_id_match.group(1)
        audio_path = download_audio(video_url)
        if not audio_path:
            return "μ˜€λ””μ˜€λ₯Ό λ‹€μš΄λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€."
        
        transcript = generate_transcript(audio_path)
        
        existing_comments = load_existing_comments()
        new_comments = get_video_comments(video_id)
        
        if not new_comments or 'error' in new_comments[0]:
            return "λŒ“κΈ€μ„ 찾을 수 μ—†κ±°λ‚˜ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€."
        
        recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0]
        
        if recent_new_comments:
            for most_recent_comment in recent_new_comments:
                combined_prompt = f"{transcript}\n\n{system_prompt}"
                reply_text = generate_reply(most_recent_comment['text'], combined_prompt)
                webhook_data = {
                    "comment_id": most_recent_comment['comment_id'],
                    "author": most_recent_comment['author'],
                    "published_at": most_recent_comment['published_at'],
                    "text": most_recent_comment['text'],
                    "reply_text": reply_text
                }
                webhook_status, webhook_response = send_webhook(webhook_data)
                log_entries.append(f"졜근 λŒ“κΈ€: {most_recent_comment['text']}\n\nλ‹΅λ³€ 생성: {reply_text}\n\nμ›Ήν›… 응닡: {webhook_status} - {webhook_response}")
                existing_comments.append(most_recent_comment)
            save_comments(existing_comments)
        else:
            log_entries.append("μƒˆλ‘œμš΄ λŒ“κΈ€μ΄ μ—†μŠ΅λ‹ˆλ‹€.")
    else:
        log_entries.append("μœ νš¨ν•˜μ§€ μ•Šμ€ YouTube URLμž…λ‹ˆλ‹€.")
    return "\n\n".join(log_entries)

def background_fetch_comments():
    while not stop_event.is_set():
        result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT)  # URLκ³Ό ν”„λ‘¬ν”„νŠΈ μ‹€μ œ μ‚¬μš© μ˜ˆμ‹œ
        print(result)
        time.sleep(10)

def start_background_fetch():
    threading.Thread(target=background_fetch_comments).start()

def stop_background_fetch():
    stop_event.set()

def get_text(video_url):
    audio_path = download_audio(video_url)
    if not audio_path:
        return "μ˜€λ””μ˜€λ₯Ό λ‹€μš΄λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€."
    
    transcript = generate_transcript(audio_path)
    return transcript

# Gradio μΈν„°νŽ˜μ΄μŠ€ μ •μ˜
demo = gr.Blocks()

with demo:
    gr.Markdown("<h1><center>GPTube</center></h1>")
   
    with gr.Row():
        input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL')
        input_text_prompt = gr.Textbox(placeholder='μ‹œμŠ€ν…œ ν”„λ‘¬ν”„νŠΈ', label='μ‹œμŠ€ν…œ ν”„λ‘¬ν”„νŠΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5)
    
    with gr.Row():
        result_button_transcribe = gr.Button('Transcribe')
        result_button_comments = gr.Button('Fetch Comments and Generate Reply')
    
    with gr.Row():
        output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20)
        output_text_prompt = gr.Textbox(placeholder='응닡 ν…μŠ€νŠΈ', label='응닡 ν…μŠ€νŠΈ', lines=20)
    
    result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api")
    result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api")

# μΈν„°νŽ˜μ΄μŠ€ μ‹€ν–‰
demo.launch()