Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
import snscrape.modules.twitter as sntwitter | |
import streamlit as st | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
from transformers import pipeline | |
import plotly.express as px | |
import plotly.io as pio | |
import plotly.graph_objects as go | |
import matplotlib as mpl | |
import matplotlib.pyplot as plt | |
from wordcloud import WordCloud | |
from PIL import Image | |
import requests | |
from itertools import islice | |
from youtube_comment_downloader import * | |
def get_nltk(): | |
import nltk | |
nltk.download( | |
["punkt", "wordnet", "omw-1.4", "averaged_perceptron_tagger", "universal_tagset"] | |
) | |
return | |
get_nltk() | |
from nltk.stem import WordNetLemmatizer | |
from nltk.tag import pos_tag | |
from nltk.tokenize import word_tokenize | |
import re | |
from sklearn.feature_extraction.text import CountVectorizer | |
# Create a custom plotly theme and set it as default | |
pio.templates["custom"] = pio.templates["plotly_white"] | |
pio.templates["custom"].layout.margin = {"b": 25, "l": 25, "r": 25, "t": 50} | |
pio.templates["custom"].layout.width = 600 | |
pio.templates["custom"].layout.height = 450 | |
pio.templates["custom"].layout.autosize = False | |
pio.templates["custom"].layout.font.update( | |
{"family": "Arial", "size": 12, "color": "#707070"} | |
) | |
pio.templates["custom"].layout.title.update( | |
{ | |
"xref": "container", | |
"yref": "container", | |
"x": 0.5, | |
"yanchor": "top", | |
"font_size": 16, | |
"y": 0.95, | |
"font_color": "#353535", | |
} | |
) | |
pio.templates["custom"].layout.xaxis.update( | |
{"showline": True, "linecolor": "lightgray", "title_font_size": 14} | |
) | |
pio.templates["custom"].layout.yaxis.update( | |
{"showline": True, "linecolor": "lightgray", "title_font_size": 14} | |
) | |
pio.templates["custom"].layout.colorway = [ | |
"#1F77B4", | |
"#FF7F0E", | |
"#54A24B", | |
"#D62728", | |
"#C355FA", | |
"#8C564B", | |
"#E377C2", | |
"#7F7F7F", | |
"#FFE323", | |
"#17BECF", | |
] | |
pio.templates.default = "custom" | |
def get_sentiment_model(): | |
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") | |
model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") | |
return tokenizer,model | |
tokenizer_sentiment,model_sentiment = get_sentiment_model() | |
def get_tweets(query, max_tweets): | |
if query[0] == '@': | |
query = query[1:] | |
tweets_list = [] | |
# Using TwitterSearchScraper to scrape data | |
for i,tweet in enumerate(sntwitter.TwitterSearchScraper('from:'+query).get_items()): | |
if i>max_tweets: | |
break | |
tweets_list.append([tweet.date, tweet.user.username, tweet.content]) | |
# Creating a dataframe from the tweets list above | |
tweets_df = pd.DataFrame(tweets_list, columns=['Datetime', 'Username', 'Tweet']) | |
else: | |
# Creating list to append tweet data to | |
tweets_list = [] | |
# Using TwitterSearchScraper to scrape data and append tweets to list | |
for i,tweet in enumerate(sntwitter.TwitterSearchScraper(query+' until:').get_items()): | |
if i>max_tweets: | |
break | |
tweets_list.append([tweet.date, tweet.user.username, tweet.content]) | |
# Creating a dataframe from the tweets list above | |
tweets_df = pd.DataFrame(tweets_list, columns=['Datetime', 'Username', 'Tweet']) | |
tweets_df['Datetime'] = pd.to_datetime(tweets_df['Datetime']) | |
tweets_df['Date'] = tweets_df['Datetime'].dt.date | |
tweets_df['Time'] = tweets_df['Datetime'].dt.strftime('%H:%M') #tweets_df['Datetime'].dt.time | |
tweets_df.drop('Datetime', axis=1, inplace=True) | |
return tweets_df | |
def get_youtube_comments(url, num_comments): | |
pattern = '"playabilityStatus":{"status":"ERROR","reason":"Video unavailable"' | |
def try_site(url): | |
request = requests.get(url) | |
return False if pattern in request.text else True | |
video_exists = try_site(url) | |
if video_exists: | |
comment_list = [] | |
downloader = YoutubeCommentDownloader() | |
comments = downloader.get_comments_from_url(url, sort_by=SORT_BY_POPULAR) | |
for comment in islice(comments, num_comments): | |
comment_list.append(comment['text']) | |
return comment_list | |
else: | |
raise Exception('Video does not exist') | |
def get_sentiment_youtube(useful_sentence): | |
tokenizer = tokenizer_sentiment | |
model = model_sentiment | |
pipe = pipeline(model="ProsusAI/finbert") | |
classifier = pipeline(model="ProsusAI/finbert") | |
output=[] | |
i=0 | |
useful_sentence_len = len(useful_sentence) | |
for temp in useful_sentence: | |
output.extend(classifier(temp)) | |
i=i+1 | |
df = pd.DataFrame.from_dict(useful_sentence) | |
df_temp = pd.DataFrame.from_dict(output) | |
df = pd.concat([df, df_temp], axis=1) | |
df = df.rename(columns={'label': 'Sentiment'}) | |
df = df.rename(columns={0: 'Comment'}) | |
df['Sentiment'] = df['Sentiment'].replace('positive', 'Positive') | |
df['Sentiment'] = df['Sentiment'].replace('negative', 'Negative') | |
df['Sentiment'] = df['Sentiment'].replace('neutral', 'Neutral') | |
return df | |
def text_preprocessing(text): | |
stopwords = set() | |
with open("static/en_stopwords.txt", "r") as file: | |
for word in file: | |
stopwords.add(word.rstrip("\n")) | |
lemmatizer = WordNetLemmatizer() | |
try: | |
url_pattern = r"((http://)[^ ]*|(https://)[^ ]*|(www\.)[^ ]*)" | |
user_pattern = r"@[^\s]+" | |
entity_pattern = r"&.*;" | |
neg_contraction = r"n't\W" | |
non_alpha = "[^a-z]" | |
cleaned_text = text.lower() | |
cleaned_text = re.sub(neg_contraction, " not ", cleaned_text) | |
cleaned_text = re.sub(url_pattern, " ", cleaned_text) | |
cleaned_text = re.sub(user_pattern, " ", cleaned_text) | |
cleaned_text = re.sub(entity_pattern, " ", cleaned_text) | |
cleaned_text = re.sub(non_alpha, " ", cleaned_text) | |
tokens = word_tokenize(cleaned_text) | |
# provide POS tag for lemmatization to yield better result | |
word_tag_tuples = pos_tag(tokens, tagset="universal") | |
tag_dict = {"NOUN": "n", "VERB": "v", "ADJ": "a", "ADV": "r"} | |
final_tokens = [] | |
for word, tag in word_tag_tuples: | |
if len(word) > 1 and word not in stopwords: | |
if tag in tag_dict: | |
final_tokens.append(lemmatizer.lemmatize(word, tag_dict[tag])) | |
else: | |
final_tokens.append(lemmatizer.lemmatize(word)) | |
return " ".join(final_tokens) | |
except: | |
return np.nan | |
def get_sentiment(df): | |
useful_sentence = df['Tweet'].tolist() | |
tokenizer = tokenizer_sentiment | |
model = model_sentiment | |
pipe = pipeline(model="ProsusAI/finbert") | |
classifier = pipeline(model="ProsusAI/finbert") | |
output=[] | |
i=0 | |
useful_sentence_len = len(useful_sentence) | |
for temp in useful_sentence: | |
output.extend(classifier(temp)) | |
i=i+1 | |
df_temp = pd.DataFrame.from_dict(output) | |
df = pd.concat([df, df_temp], axis=1) | |
df = df.rename(columns={'label': 'Sentiment'}) | |
df['Sentiment'] = df['Sentiment'].replace('positive', 'Positive') | |
df['Sentiment'] = df['Sentiment'].replace('negative', 'Negative') | |
df['Sentiment'] = df['Sentiment'].replace('neutral', 'Neutral') | |
return df | |
def plot_sentiment(tweet_df): | |
sentiment_count = tweet_df["Sentiment"].value_counts() | |
fig = px.pie( | |
values=sentiment_count.values, | |
names=sentiment_count.index, | |
hole=0.3, | |
title="<b>Sentiment Distribution</b>", | |
color=sentiment_count.index, | |
color_discrete_map={"Positive": "#54A24B", "Negative": "#FF7F0E", "Neutral": "#1F77B4"}, | |
) | |
fig.update_traces( | |
textposition="inside", | |
texttemplate="%{label}<br>%{value} (%{percent})", | |
hovertemplate="<b>%{label}</b><br>Percentage=%{percent}<br>Count=%{value}", | |
) | |
fig.update_layout(showlegend=False) | |
return fig | |
def get_top_n_gram(tweet_df, ngram_range, n=10): | |
try: | |
stopwords = set() | |
with open("static/en_stopwords_ngram.txt", "r") as file: | |
for word in file: | |
stopwords.add(word.rstrip("\n")) | |
stopwords = list(stopwords) | |
corpus = tweet_df["Tweet"] | |
vectorizer = CountVectorizer( | |
analyzer="word", ngram_range=ngram_range, stop_words=stopwords | |
) | |
X = vectorizer.fit_transform(corpus.astype(str).values) | |
words = vectorizer.get_feature_names_out() | |
words_count = np.ravel(X.sum(axis=0)) | |
df = pd.DataFrame(zip(words, words_count)) | |
df.columns = ["words", "counts"] | |
df = df.sort_values(by="counts", ascending=False).head(n) | |
df["words"] = df["words"].str.title() | |
return df | |
except: | |
pass | |
def plot_n_gram(n_gram_df, title, color="#54A24B"): | |
try: | |
fig = px.bar( | |
# n_gram_df, | |
# x="counts", | |
# y="words", | |
x=n_gram_df.counts, | |
y=n_gram_df.words, | |
title="<b>{}</b>".format(title), | |
text_auto=True, | |
) | |
fig.update_layout(plot_bgcolor="white") | |
fig.update_xaxes(title=None) | |
fig.update_yaxes(autorange="reversed", title=None) | |
fig.update_traces(hovertemplate="<b>%{y}</b><br>Count=%{x}", marker_color=color) | |
return fig | |
except: | |
fig = go.Figure() | |
return fig | |
def plot_wordcloud(tweet_df, colormap="Greens", mask_url="static/twitter_mask.png"): | |
try: | |
stopwords = set() | |
with open("static/en_stopwords_ngram.txt", "r") as file: | |
for word in file: | |
stopwords.add(word.rstrip("\n")) | |
cmap = mpl.cm.get_cmap(colormap)(np.linspace(0, 1, 20)) | |
cmap = mpl.colors.ListedColormap(cmap[10:15]) | |
mask = np.array(Image.open(mask_url)) | |
font = "static/quartzo.ttf" | |
tweet_df["Cleaned_Tweet"] = tweet_df["Tweet"].apply(text_preprocessing) | |
text = " ".join(tweet_df["Cleaned_Tweet"]) | |
wc = WordCloud( | |
background_color="white", | |
font_path=font, | |
stopwords=stopwords, | |
max_words=90, | |
colormap=cmap, | |
mask=mask, | |
random_state=42, | |
collocations=False, | |
min_word_length=2, | |
max_font_size=200, | |
) | |
wc.generate(text) | |
fig = plt.figure(figsize=(8, 8)) | |
ax = fig.add_subplot(1, 1, 1) | |
plt.imshow(wc, interpolation="bilinear") | |
plt.axis("off") | |
plt.title("Wordcloud", fontdict={"fontsize": 16}, fontweight="heavy", pad=20, y=1.0) | |
return fig | |
except: | |
fig = go.Figure() | |
return fig | |