|
import gradio as gr |
|
from transformers import pipeline |
|
from wordcloud import WordCloud, STOPWORDS |
|
from youtubesearchpython import * |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from PIL import Image |
|
import re |
|
import io |
|
from io import BytesIO |
|
|
|
sentiment_task = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest") |
|
text_summarization_task = pipeline("summarization", model="facebook/bart-large-cnn") |
|
|
|
def extract_youtube_video_id(url_or_id): |
|
""" |
|
Extracts the YouTube video ID from a given URL or returns the ID if a direct ID is provided. |
|
|
|
Args: |
|
url_or_id (str): A YouTube URL or a video ID. |
|
|
|
Returns: |
|
str: The extracted YouTube video ID. |
|
""" |
|
|
|
if len(url_or_id) == 11 and not re.search(r'[^0-9A-Za-z_-]', url_or_id): |
|
return url_or_id |
|
|
|
|
|
regex_patterns = [ |
|
r'(?:https?://)?www\.youtube\.com/watch\?v=([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?youtu\.be/([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?www\.youtube\.com/embed/([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?www\.youtube\.com/v/([0-9A-Za-z_-]{11})', |
|
r'(?:https?://)?www\.youtube\.com/shorts/([0-9A-Za-z_-]{11})' |
|
] |
|
|
|
|
|
for pattern in regex_patterns: |
|
match = re.search(pattern, url_or_id) |
|
if match: |
|
return match.group(1) |
|
|
|
|
|
return "Invalid YouTube URL or ID" |
|
|
|
def comments_collector(video_link, max_comments = 100): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
video_id = extract_youtube_video_id(video_link) |
|
max_comments -= 1 |
|
|
|
try: |
|
|
|
comments = Comments(video_id) |
|
print(f'Comments Retrieved and Loading...') |
|
|
|
|
|
while comments.hasMoreComments and (len(comments.comments["result"]) <= max_comments): |
|
comments.getNextComments() |
|
print(f'Found all the {len(comments.comments["result"])} comments.') |
|
|
|
|
|
comments = comments.comments |
|
|
|
|
|
data = [] |
|
|
|
|
|
for i in range(len(comments['result'])): |
|
|
|
is_author = comments['result'][i]['authorIsChannelOwner'] |
|
|
|
|
|
if is_author: |
|
pass |
|
|
|
|
|
else: |
|
comment_dict = {} |
|
comment_id = comments['result'][i]['id'] |
|
author = comments['result'][i]['author']['name'] |
|
content = comments['result'][i]['content'] |
|
|
|
|
|
|
|
if comments['result'][i]['votes']['label'] is None: |
|
likes = 0 |
|
else: |
|
likes = comments['result'][i]['votes']['label'].split(' ')[0] |
|
if 'K' in likes: |
|
likes = int(float(likes.replace('K', '')) * 1000) |
|
|
|
|
|
|
|
replyCount = comments['result'][i]['replyCount'] |
|
|
|
if replyCount is None: |
|
comment_dict['replyCount'] = 0 |
|
|
|
else: |
|
comment_dict['replyCount'] = int(replyCount) |
|
|
|
|
|
comment_dict['comment_id'] = comment_id |
|
comment_dict['author'] = author |
|
comment_dict['content'] = content |
|
comment_dict['likes'] = likes |
|
|
|
data.append(comment_dict) |
|
|
|
print(f'Excluding author comments, we ended up with {len(data)} comments') |
|
return pd.DataFrame(data) |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
def comments_analyzer(comments_df): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if comments_df is None: |
|
return None |
|
else: |
|
comments_df['sentiment'] = comments_df['content'].apply(lambda x: sentiment_task(x)[0]['label']) |
|
|
|
data = {} |
|
|
|
data['total_comments'] = len(comments_df) |
|
data['num_positive'] = comments_df['sentiment'].value_counts().get('positive', 0) |
|
data['num_neutral'] = comments_df['sentiment'].value_counts().get('neutral', 0) |
|
data['num_negative'] = comments_df['sentiment'].value_counts().get('negative', 0) |
|
|
|
|
|
data['blended_comments'] = comments_df['content'].str.cat(sep=' ') |
|
data['pct_positive'] = 100 * round(data['num_positive']/data['total_comments'], 2) |
|
|
|
return data |
|
|
|
def generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stopwords = set(STOPWORDS) |
|
|
|
|
|
all_stopwords = stopwords.union(additional_stopwords) |
|
|
|
|
|
wordcloud = WordCloud(max_font_size=50, max_words=20, background_color="black", stopwords=all_stopwords, colormap='plasma').generate(long_text) |
|
|
|
|
|
plt.figure(figsize=(10,10), facecolor=None) |
|
plt.imshow(wordcloud, interpolation="bilinear") |
|
plt.axis("off") |
|
plt.tight_layout(pad=0) |
|
|
|
|
|
img_buf = io.BytesIO() |
|
plt.savefig(img_buf, format='png', bbox_inches='tight', pad_inches=0) |
|
img_buf.seek(0) |
|
|
|
|
|
plt.close() |
|
|
|
|
|
image = Image.open(img_buf) |
|
|
|
return image |
|
|
|
def create_sentiment_analysis_chart(data): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df = {} |
|
df['num_positive'] = data['num_positive'] |
|
df['num_negative'] = data['num_negative'] |
|
df['num_neutral'] = data['num_neutral'] |
|
df = pd.DataFrame(df, index=[0]) |
|
|
|
|
|
plt.figure(figsize=(8, 6)) |
|
bar_colors = ['green', 'red', 'blue'] |
|
df.plot(kind='bar', color=bar_colors, legend=True) |
|
|
|
|
|
plt.title('Sentiment Analysis Results') |
|
plt.xlabel('Sentiment Types') |
|
plt.ylabel('Number of Comments') |
|
plt.xticks(ticks=[0], labels=['Sentiments'], rotation=0) |
|
plt.legend(['Positive', 'Negative', 'Neutral']) |
|
|
|
|
|
buf = BytesIO() |
|
plt.savefig(buf, format='png') |
|
buf.seek(0) |
|
|
|
|
|
plt.close() |
|
|
|
|
|
image = Image.open(buf) |
|
|
|
return image |
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_youtube_comments(youtube_link, max_comments, stop_words): |
|
|
|
|
|
|
|
comments_df = comments_collector(video_link=youtube_link, max_comments=max_comments) |
|
|
|
analysis_dict = comments_analyzer(comments_df) |
|
long_text = analysis_dict['blended_comments'] |
|
|
|
|
|
word_cloud_img = generate_wordcloud(long_text, additional_stopwords=['Timestamps', 'timestamps']) |
|
|
|
|
|
summarized_text = text_summarization_task(long_text, min_length=100, max_length=200, truncation=True)[0]['summary_text'] |
|
|
|
|
|
sentiment_chart = create_sentiment_analysis_chart(analysis_dict) |
|
|
|
|
|
return word_cloud_img, summarized_text, sentiment_chart |
|
|
|
|
|
|
|
interface = gr.Interface( |
|
fn=process_youtube_comments, |
|
inputs=[ |
|
gr.Textbox(label="YouTube Video Link"), |
|
gr.Number(label="Maximum Comments", value=100), |
|
gr.Textbox(label="Excluded Words (comma-separated)") |
|
], |
|
outputs=[ |
|
gr.Image(label="Word Cloud"), |
|
gr.Textbox(label="Summary of Comments"), |
|
gr.Image(label="Sentiment Analysis Chart") |
|
], |
|
title="YouTube Comments Analyzer", |
|
description="Enter a YouTube link to generate a word cloud, summary, and sentiment analysis of the comments." |
|
) |
|
|
|
|
|
interface.launch() |
|
|
|
|