import discord import logging import os import re import asyncio import subprocess import aiohttp import time from huggingface_hub import InferenceClient from googleapiclient.discovery import build from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound from youtube_transcript_api.formatters import TextFormatter from dotenv import load_dotenv from pytube import YouTube import whisper import torch from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq import librosa # 환경 변수 로드 load_dotenv() # 로깅 설정 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', handlers=[logging.StreamHandler()]) # 인텐트 설정 intents = discord.Intents.default() intents.message_content = True intents.messages = True intents.guilds = True intents.guild_messages = True # 추론 API 클라이언트 설정 hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN")) whisper_client = InferenceClient("openai/whisper-large-v3", token=os.getenv("HF_TOKEN")) # YouTube API 설정 API_KEY = os.getenv("YOUTUBE_API_KEY") youtube_service = build('youtube', 'v3', developerKey=API_KEY) # 특정 채널 ID SPECIFIC_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) # 전송 실패 시 재시도 횟수 MAX_RETRIES = 3 class MyClient(discord.Client): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.is_processing = False self.session = None async def on_ready(self): logging.info(f'{self.user}로 로그인되었습니다!') # web.py 파일 실행 subprocess.Popen(["python", "web.py"]) logging.info("Web.py 서버가 시작되었습니다.") # aiohttp 클라이언트 세션 생성 self.session = aiohttp.ClientSession() # 봇이 시작될 때 안내 메시지를 전송 channel = self.get_channel(SPECIFIC_CHANNEL_ID) if channel: await channel.send("유튜브 비디오 URL을 입력하면, 자막과 댓글을 기반으로 답글을 작성합니다.") async def on_message(self, message): if message.author == self.user: return if not self.is_message_in_specific_channel(message): return if self.is_processing: return self.is_processing = True try: video_id = extract_video_id(message.content) if video_id: transcript, language = await get_best_available_transcript(video_id) comments = await get_video_comments(video_id) if comments: if transcript: replies = await generate_replies(comments, transcript) await create_thread_and_send_replies(message, video_id, comments, replies, self.session) else: await message.channel.send("자막을 가져올 수 없습니다. Whisper 모델을 사용하여 자막을 생성합니다.") transcript = await generate_whisper_transcript(video_id) if transcript: replies = await generate_replies(comments, transcript) await create_thread_and_send_replies(message, video_id, comments, replies, self.session) else: await message.channel.send("Whisper 모델로도 자막을 생성할 수 없습니다. 댓글만을 기반으로 답변을 생성합니다.") replies = await generate_replies(comments, "") await create_thread_and_send_replies(message, video_id, comments, replies, self.session) else: await message.channel.send("댓글을 가져올 수 없습니다.") else: await message.channel.send("유효한 유튜브 비디오 URL을 제공해 주세요.") finally: self.is_processing = False def is_message_in_specific_channel(self, message): return message.channel.id == SPECIFIC_CHANNEL_ID or ( isinstance(message.channel, discord.Thread) and message.channel.parent_id == SPECIFIC_CHANNEL_ID ) async def close(self): # aiohttp 클라이언트 세션 종료 if self.session: await self.session.close() await super().close() def extract_video_id(url): video_id = None youtube_regex = ( r'(https?://)?(www\.)?' '(youtube|youtu|youtube-nocookie)\.(com|be)/' '(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})') match = re.match(youtube_regex, url) if match: video_id = match.group(6) logging.debug(f'추출된 비디오 ID: {video_id}') return video_id async def get_best_available_transcript(video_id, max_retries=5, delay=10): async def fetch_transcript(language): try: transcript = await asyncio.to_thread(YouTubeTranscriptApi.get_transcript, video_id, languages=[language]) return transcript, language except (NoTranscriptFound, TranscriptsDisabled): logging.warning(f'{language} 자막이 제공되지 않음.') return None, None except Exception as e: logging.warning(f'{language} 자막 가져오기 오류: {e}') return None, None for attempt in range(max_retries): # 우선 한국어 자막을 시도 ko_transcript, ko_lang = await fetch_transcript('ko') if ko_transcript: return ko_transcript, ko_lang # 영어 자막 시도 en_transcript, en_lang = await fetch_transcript('en') if en_transcript: return en_transcript, en_lang try: # 비디오에 자막 목록이 있는지 확인 transcripts = await asyncio.to_thread(YouTubeTranscriptApi.list_transcripts, video_id) # 수동으로 생성된 자막이 있는지 확인 manual_transcript = transcripts.find_manually_created_transcript(['ko', 'en']) transcript = await asyncio.to_thread(manual_transcript.fetch) return transcript, manual_transcript.language_code except (NoTranscriptFound, TranscriptsDisabled) as e: logging.warning(f'수동 자막을 찾을 수 없음: {e}') except Exception as e: if attempt < max_retries - 1: logging.error(f'자막 가져오기 실패 (시도 {attempt + 1}/{max_retries}): {e}') await asyncio.sleep(delay) else: logging.error(f'최종 자막 가져오기 실패: {e}') return None, None return None, None async def generate_whisper_transcript(video_id): try: # YouTube 비디오 다운로드 yt = YouTube(f'https://www.youtube.com/watch?v={video_id}') audio_stream = yt.streams.filter(only_audio=True).first() audio_file = audio_stream.download(output_path='temp', filename=f'{video_id}.mp3') # 오디오 파일 로드 audio, sr = librosa.load(audio_file, sr=16000) # Whisper 모델 및 프로세서 로드 device = "cuda" if torch.cuda.is_available() else "cpu" processor = AutoProcessor.from_pretrained("openai/whisper-large-v3") model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v3").to(device) # 오디오 처리 input_features = processor(audio, sampling_rate=sr, return_tensors="pt").input_features.to(device) # 생성 predicted_ids = model.generate(input_features) transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True) # 임시 파일 삭제 os.remove(audio_file) return transcription[0] except Exception as e: logging.error(f'Whisper 자막 생성 실패: {e}') return None async def get_video_comments(video_id): comments = [] response = youtube_service.commentThreads().list( part='snippet', videoId=video_id, maxResults=100 # 최대 100개의 댓글 가져오기 ).execute() for item in response.get('items', []): comment = item['snippet']['topLevelComment']['snippet']['textOriginal'] comment_id = item['snippet']['topLevelComment']['id'] comments.append((comment, comment_id)) logging.debug(f'가져온 댓글: {comments}') return comments async def generate_replies(comments, transcript): replies = [] for comment, _ in comments: messages = [ {"role": "system", "content": f"""너의 이름은 OpenFreeAI이다. 답글 생성후 가장 마지막에 너의 이름을 밝히고 공손하게 인사하라. 비디오 자막: {transcript}"""}, {"role": "user", "content": comment} ] loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, lambda: hf_client.chat_completion( messages, max_tokens=250, temperature=0.7, top_p=0.85)) if response.choices and response.choices[0].message: reply = response.choices[0].message['content'].strip() else: reply = "답글을 생성할 수 없습니다." replies.append(reply) logging.debug(f'생성된 답글: {replies}') return replies async def create_thread_and_send_replies(message, video_id, comments, replies, session): thread = await message.channel.create_thread(name=f"{message.author.name}의 댓글 답글", message=message) for (comment, comment_id), reply in zip(comments, replies): embed = discord.Embed(description=f"**댓글**: {comment}\n**답글**: {reply}") await thread.send(embed=embed) if __name__ == "__main__": discord_client = MyClient(intents=intents) discord_client.run(os.getenv('DISCORD_TOKEN'))