File size: 8,504 Bytes
bc736f8 94559cb b640e62 94559cb 49d7053 bc736f8 94559cb 49d7053 94559cb a63bdf7 bc736f8 94559cb 9ec122f 94559cb 9e07c40 94559cb 356c2dc 94559cb e3b3ac8 94559cb 356c2dc 94559cb 356c2dc 94559cb 356c2dc 94559cb 238c2d8 94559cb e5c3484 94559cb 238c2d8 94559cb e3b3ac8 94559cb 238c2d8 94559cb 00318bd e15181e 94559cb 5188c19 94559cb bb98d15 94559cb 238c2d8 5188c19 94559cb f220324 b30ba39 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
import gradio as gr
import requests
import re
import os
import json
import time
import threading
from googleapiclient.discovery import build
from huggingface_hub import InferenceClient
from pytube import YouTube
import whisper
import logging
# λ‘κ·Έ μ€μ
logging.basicConfig(level=logging.INFO)
# Whisper λͺ¨λΈ λ‘λ
model = whisper.load_model("base")
# YouTube API ν€
API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY'
# YouTube API μλΉμ€ λΉλ
youtube = build('youtube', 'v3', developerKey=API_KEY)
# Hugging Face API μ€μ
client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN"))
WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc"
COMMENTS_FILE = 'comments.json'
DEFAULT_SYSTEM_PROMPT = "λνμ λ°λμ λμ μ΄λ¦ 'GPTube'λ₯Ό λ°νλ©° νκΈλ‘ μΈμ¬λ₯ΌνλΌ. λ°λμ 'νκΈ'(νκ΅μ΄)λ‘ 250 ν ν° μ΄λ΄λ‘ λ΅λ³μ μμ±νκ³ μΆλ ₯νλΌ. Respond to the following YouTube comment in a friendly and helpful manner:"
stop_event = threading.Event() # μ€λ λ μ€μ§λ₯Ό μν μ΄λ²€νΈ
def load_existing_comments():
if os.path.exists(COMMENTS_FILE):
with open(COMMENTS_FILE, 'r') as file:
return json.load(file)
return []
def save_comments(comments):
with open(COMMENTS_FILE, 'w') as file:
json.dump(comments, file)
def download_audio(video_url):
try:
yt = YouTube(video_url)
audio = yt.streams.filter(only_audio=True).first()
if audio is None:
logging.error('μ€λμ€ μ€νΈλ¦Όμ μ°Ύμ μ μμ΅λλ€.')
return None
audio_path = audio.download(output_path=".")
file_stats = os.stat(audio_path)
logging.info(f'μ€λμ€ νμΌ ν¬κΈ°(Bytes): {file_stats.st_size}')
if file_stats.st_size <= 30000000: # νμΌ ν¬κΈ° μ ν νμΈ
base, ext = os.path.splitext(audio_path)
new_file = base + '.mp3'
os.rename(audio_path, new_file)
return new_file
else:
logging.error('νμΌ ν¬κΈ°κ° λ무 ν½λλ€. 1.5μκ° μ΄νμ λΉλμ€λ§ μ§μλ©λλ€.')
return None
except Exception as e:
logging.error(f"μ€λμ€ λ€μ΄λ‘λ μ€ μ€λ₯ λ°μ: {str(e)}")
return None
def generate_transcript(audio_path):
try:
if not audio_path or not os.path.exists(audio_path):
raise ValueError("μ ν¨ν μ€λμ€ νμΌ κ²½λ‘κ° μλλλ€.")
result = model.transcribe(audio_path)
return result['text'].strip()
except Exception as e:
logging.error(f"μ μ¬ μ€ μ€λ₯ λ°μ: {str(e)}")
return f"μ μ¬ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}"
def generate_reply(comment_text, system_prompt):
prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:"
response = client.text_generation(
prompt=prompt,
max_new_tokens=250,
temperature=0.7,
top_p=0.9
)
if isinstance(response, dict) and 'generated_text' in response:
return response['generated_text']
return response
def send_webhook(data):
response = requests.post(WEBHOOK_URL, json=data)
return response.status_code, response.text
def get_video_comments(video_id):
try:
comments = []
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=100, # λκΈ μ½μ΄λ€μ΄λ μ μ μ
textFormat='plainText'
)
response = request.execute()
while request is not None:
for item in response['items']:
snippet = item['snippet']['topLevelComment']['snippet']
comment = {
'comment_id': item['snippet']['topLevelComment']['id'],
'author': snippet['authorDisplayName'],
'published_at': snippet['publishedAt'],
'text': snippet['textDisplay'],
'reply_count': item['snippet']['totalReplyCount']
}
comments.append(comment)
if 'nextPageToken' in response:
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
pageToken=response['nextPageToken'],
maxResults=100, # λκΈ μ½μ΄λ€μ΄λ μ μ μ
textFormat='plainText'
)
response = request.execute()
else:
break
return comments
except Exception as e:
return [{'error': str(e)}]
def fetch_comments(video_url, system_prompt):
log_entries = []
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url)
if video_id_match:
video_id = video_id_match.group(1)
audio_path = download_audio(video_url)
if not audio_path:
return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€."
transcript = generate_transcript(audio_path)
existing_comments = load_existing_comments()
new_comments = get_video_comments(video_id)
if not new_comments or 'error' in new_comments[0]:
return "λκΈμ μ°Ύμ μ μκ±°λ μ€λ₯κ° λ°μνμ΅λλ€."
recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0]
if recent_new_comments:
for most_recent_comment in recent_new_comments:
combined_prompt = f"{transcript}\n\n{system_prompt}"
reply_text = generate_reply(most_recent_comment['text'], combined_prompt)
webhook_data = {
"comment_id": most_recent_comment['comment_id'],
"author": most_recent_comment['author'],
"published_at": most_recent_comment['published_at'],
"text": most_recent_comment['text'],
"reply_text": reply_text
}
webhook_status, webhook_response = send_webhook(webhook_data)
log_entries.append(f"μ΅κ·Ό λκΈ: {most_recent_comment['text']}\n\nλ΅λ³ μμ±: {reply_text}\n\nμΉν
μλ΅: {webhook_status} - {webhook_response}")
existing_comments.append(most_recent_comment)
save_comments(existing_comments)
else:
log_entries.append("μλ‘μ΄ λκΈμ΄ μμ΅λλ€.")
else:
log_entries.append("μ ν¨νμ§ μμ YouTube URLμ
λλ€.")
return "\n\n".join(log_entries)
def background_fetch_comments():
while not stop_event.is_set():
result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) # URLκ³Ό ν둬ννΈ μ€μ μ¬μ© μμ
print(result)
time.sleep(10)
def start_background_fetch():
threading.Thread(target=background_fetch_comments).start()
def stop_background_fetch():
stop_event.set()
def get_text(video_url):
audio_path = download_audio(video_url)
if not audio_path:
return "μ€λμ€λ₯Ό λ€μ΄λ‘λν μ μμ΅λλ€."
transcript = generate_transcript(audio_path)
return transcript
# Gradio μΈν°νμ΄μ€ μ μ
demo = gr.Blocks()
with demo:
gr.Markdown("<h1><center>GPTube</center></h1>")
with gr.Row():
input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL')
input_text_prompt = gr.Textbox(placeholder='μμ€ν
ν둬ννΈ', label='μμ€ν
ν둬ννΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5)
with gr.Row():
result_button_transcribe = gr.Button('Transcribe')
result_button_comments = gr.Button('Fetch Comments and Generate Reply')
with gr.Row():
output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20)
output_text_prompt = gr.Textbox(placeholder='μλ΅ ν
μ€νΈ', label='μλ΅ ν
μ€νΈ', lines=20)
result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api")
result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api")
# μΈν°νμ΄μ€ μ€ν
demo.launch()
|