Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import gensim.downloader as api | |
import numpy as np | |
import nltk, spacy, gensim | |
from sklearn.decomposition import LatentDirichletAllocation | |
from sklearn.feature_extraction.text import CountVectorizer | |
from pprint import pprint | |
import matplotlib | |
matplotlib.use('agg') | |
print("[x] Downloading word 2 vec") | |
model_w2v = api.load("word2vec-google-news-300") | |
def average_word2vec(word_list: list[str]): | |
# model_w2v = api.load("word2vec-google-news-300") | |
word_vectors = [] | |
for word in word_list: | |
if word in model_w2v: | |
word_vectors.append(model_w2v[word]) | |
if word_vectors: | |
average_vector = np.mean(word_vectors, axis=0) | |
else: | |
return None | |
most_similar_word = model_w2v.similar_by_vector(average_vector, topn=1) | |
word, similarity = most_similar_word[0] | |
return word, similarity | |
def concat_comments(*kwargs): | |
return ['\n'.join(ele) for ele in zip(*kwargs)] | |
def sent_to_words(sentences): | |
for sentence in sentences: | |
yield(gensim.utils.simple_preprocess(str(sentence), deacc=True)) # deacc=True removes punctuations | |
def lemmatization(texts, allowed_postags=['NOUN', 'ADJ', 'VERB', 'ADV'], nlp=None): #'NOUN', 'ADJ', 'VERB', 'ADV' | |
texts_out = [] | |
for sent in texts: | |
doc = nlp(" ".join(sent)) | |
texts_out.append(" ".join([ | |
token.lemma_ if token.lemma_ not in ['-PRON-'] else '' for token in doc if token.pos_ in allowed_postags | |
])) | |
return texts_out | |
def get_lda(n_components, n_top_subreddit_to_analyse, what_label_to_use): | |
df = pd.read_csv('./data/results.csv', index_col=0) | |
data = concat_comments(df.subreddit, df.sup_comment, df.comment) | |
data_words = list(sent_to_words(data)) | |
if what_label_to_use == 'Use True label': | |
label = 'label' | |
else: | |
label = 'prediction' | |
if not spacy.util.is_package("en_core_web_sm"): | |
print('[x] en_core_web_sm not found, downloading...') | |
os.system("python -m spacy download en_core_web_sm") | |
print('[x] en_core_web_sm downloaded') | |
print('[x] Lemmatization begins') | |
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) | |
data_lemmatized = lemmatization(data_words, allowed_postags=["NOUN", "ADJ"], nlp=nlp) #select noun and verb | |
print('[x] Vectorizing') | |
vectorizer = CountVectorizer( | |
analyzer='word', | |
min_df=10, | |
stop_words='english', | |
lowercase=True, | |
token_pattern='[a-zA-Z0-9]{3,}' | |
) | |
print('[x] Fitting vectorized data on lemmatization') | |
data_vectorized = vectorizer.fit_transform(data_lemmatized) | |
print('[x] Init LDA model') | |
lda_model = LatentDirichletAllocation( | |
n_components=n_components, | |
max_iter=10, | |
learning_method='online', | |
random_state=100, | |
batch_size=128, | |
evaluate_every = -1, | |
n_jobs = -1, | |
verbose=1, | |
) | |
print('[x] Fitting LDA model') | |
lda_output = lda_model.fit_transform(data_vectorized) | |
print(lda_model) # Model attributes | |
print('[x] Getting performances') | |
performances = lda_model.score(data_vectorized), lda_model.perplexity(data_vectorized) | |
# Log Likelyhood: Higher the better | |
print("Log Likelihood: ", performances[0]) | |
# Perplexity: Lower the better. Perplexity = exp(-1. * log-likelihood per word) | |
print("Perplexity: ", performances[1]) | |
print('[x] Check parameters if they look correct') | |
# See model parameters | |
pprint(lda_model.get_params()) | |
# switching to the best model | |
best_lda_model = lda_model | |
print('[x] Getting LDA output') | |
lda_output = best_lda_model.transform(data_vectorized) | |
print('[x] Assigning topics') | |
topicnames = ["Topic" + str(i) for i in range(best_lda_model.n_components)] | |
docnames = ["Doc" + str(i) for i in range(len(data))] | |
df_document_topic = pd.DataFrame(np.round(lda_output, 2), columns=topicnames, index=docnames) | |
print('[x] Checking dominant topics') | |
dominant_topic = np.argmax(df_document_topic.values, axis=1) | |
df_document_topic["dominant_topic"] = dominant_topic | |
# Topic-Keyword Matrix | |
df_topic_keywords = pd.DataFrame(best_lda_model.components_) | |
df_topic_keywords | |
# Assign Column and Index | |
df_topic_keywords.columns = vectorizer.get_feature_names_out() | |
df_topic_keywords.index = topicnames | |
print('[x] Computing word-topic association') | |
# Show top n keywords for each topic | |
def show_topics(vectorizer=vectorizer, lda_model=lda_model, n_words=20): | |
keywords = np.array(vectorizer.get_feature_names_out()) | |
topic_keywords = [] | |
for topic_weights in lda_model.components_: | |
top_keyword_locs = (-topic_weights).argsort()[:n_words] | |
topic_keywords.append(keywords.take(top_keyword_locs)) | |
return topic_keywords | |
topic_keywords = show_topics(vectorizer=vectorizer, lda_model=best_lda_model, n_words=15) | |
# Topic - Keywords Dataframe | |
df_topic_keywords = pd.DataFrame(topic_keywords) | |
df_topic_keywords.columns = ['Word '+str(i) for i in range(df_topic_keywords.shape[1])] | |
df_topic_keywords.index = ['Topic '+str(i) for i in range(df_topic_keywords.shape[0])] | |
df_topic_keywords | |
# topics = [ | |
# f'Topic {i}' for i in range(len(df_topic_keywords)) | |
# ] | |
topics = [] | |
for i, row in df_topic_keywords.iterrows(): | |
topics.append( | |
average_word2vec(row.to_list()[:5])[0] | |
) | |
df_topic_keywords["Topics"] = topics | |
df_topic_keywords | |
print('[x] Predicting dominant topic for each document') | |
# Define function to predict topic for a given text document. | |
def predict_topic(text, nlp=nlp): | |
global sent_to_words | |
global lemmatization | |
# Step 1: Clean with simple_preprocess | |
mytext_2 = list(sent_to_words(text)) | |
# Step 2: Lemmatize | |
mytext_3 = lemmatization(mytext_2, allowed_postags=['NOUN', 'ADJ', 'VERB', 'ADV'], nlp=nlp) | |
# Step 3: Vectorize transform | |
mytext_4 = vectorizer.transform(mytext_3) | |
# Step 4: LDA Transform | |
topic_probability_scores = best_lda_model.transform(mytext_4) | |
topic = df_topic_keywords.iloc[np.argmax(topic_probability_scores), 1:14].values.tolist() | |
# Step 5: Infer Topic | |
infer_topic = df_topic_keywords.iloc[np.argmax(topic_probability_scores), -1] | |
#topic_guess = df_topic_keywords.iloc[np.argmax(topic_probability_scores), Topics] | |
return infer_topic, topic, topic_probability_scores | |
# # Predict the topic | |
# mytext = ["This is a test of a random topic where I talk about politics"] | |
# infer_topic, topic, prob_scores = predict_topic(text = mytext, nlp=nlp) | |
def apply_predict_topic(text): | |
text = [text] | |
infer_topic, topic, prob_scores = predict_topic(text = text, nlp=nlp) | |
return(infer_topic) | |
df["Topic_key_word"] = df['comment'].apply(apply_predict_topic) | |
print('[x] Generating plot [1]') | |
print('Percentuale di commenti ironici per ogni topic') | |
perc_topic_irony = {} | |
for t in topics: | |
total_0label = sum((df[label] == 1) & (df.Topic_key_word == t)) | |
if total_0label != 0: | |
total_X_topic = df.Topic_key_word.value_counts()[t] | |
else: | |
total_0label, total_X_topic = 0, 0.001 # Non ci cono topic nel dataset | |
perc_topic_irony[t] = total_0label / total_X_topic | |
print(f'{t} w/ label 1: {total_0label}/{total_X_topic} ({total_0label / total_X_topic * 100 :.2f}%)') | |
fig1, ax = plt.subplots(figsize = (10, 7)) | |
bottom = np.zeros(len(perc_topic_irony)) | |
width = 0.9 | |
ax.bar(perc_topic_irony.keys(), perc_topic_irony.values(), width, label = 'sarcastic') | |
comp = list(map(lambda x: 1 - x if x > 0 else 0, perc_topic_irony.values())) | |
ax.bar(perc_topic_irony.keys(), comp, width, bottom=list(perc_topic_irony.values()), label = 'not sarcastic') | |
ax.set_title("% of sarcastic comments for each topic") | |
plt.xticks(rotation=70) | |
ax.set_ylim(bottom = 0, top = 1.02) | |
plt.legend() | |
plt.axhline(0.5, color = 'red', ls=":") | |
# probably not necessary (?) To drop eventually if log are to much cluttered! | |
print('Percentage of each topic for each subreddit') | |
weight_counts = {} | |
for t in topics: | |
weight_counts[t] = [] | |
for subreddit in df['subreddit'].value_counts().index[:n_top_subreddit_to_analyse]: # first 10 big subreddits | |
if sum(df[df.Topic_key_word == t].subreddit == subreddit) > 0: # se ci sono subreddit per il topic t (almeno una riga nel df) | |
perc_sub = df[df.Topic_key_word == t]['subreddit'].value_counts()[subreddit] / df['subreddit'].value_counts()[subreddit] | |
else: | |
perc_sub = 0 | |
weight_counts[t].append(perc_sub) | |
print(f'Perc of topic {t} in subreddit {subreddit}: {perc_sub * 100:.2f}') | |
print() | |
print('[x] Generating plot [2]') | |
# plot | |
subreddits = list(df.subreddit.value_counts().index)[:n_top_subreddit_to_analyse] | |
irony_percs = { | |
t: [ | |
len( | |
df[df.subreddit == subreddit][(df[df.subreddit == subreddit].Topic_key_word == t) & (df[df.subreddit == subreddit][label] == 1)] | |
) / | |
len( | |
df[df.subreddit == subreddit] | |
) for subreddit in subreddits | |
] for t in topics | |
} | |
width = 0.9 | |
fig2, ax = plt.subplots(figsize = (10, 7)) | |
plt.axhline(0.5, color = 'red', ls=":", alpha = .3) | |
bottom = np.zeros(len(subreddits)) | |
for k, v in weight_counts.items(): | |
p = ax.bar(subreddits, v, width, label=k, bottom=bottom) | |
ax.bar(subreddits, irony_percs[k], width - 0.01, bottom=bottom, color = 'black', edgecolor = 'white', alpha = .2, hatch = '\\') | |
bottom += v | |
ax.set_title("% of topics for each subreddit") | |
ax.legend(loc="upper right") | |
plt.xticks(rotation=50) | |
ax.set_ylim(bottom = 0, top = 1.02) | |
print('[v] All looking good!') | |
return df_topic_keywords, fig1, fig2 | |
# def main(): | |
with gr.Blocks() as demo: | |
gr.Markdown("# Dashboard per l'analisi con LDA") | |
gr.Markdown("### La dashboard permette l'addestramento di un modello LDA per controllare se e quali (dominant) topic sono pi霉 propensi a commenti di tipo sarcastico") | |
# gradio.Dataframe(路路路) | |
inputs = [] | |
with gr.Row(): | |
inputs.append(gr.Slider(2, 25, value=5, step = 1, label="LDA N components", info="Scegli il numero di componenti per LDA")) | |
inputs.append(gr.Slider(2, 20, value=5, step = 1, label="Subreddit dal dataset", info="Numero di subreddit da analizzare")) | |
inputs.append(gr.Radio( | |
choices = ['Use True label', 'Use BERT prediction'], | |
value = 'Use True label', | |
label = "Scegliere quali label sull'ironia utilizzare:", | |
) | |
) | |
btn = gr.Button(value="Submit") | |
gr.Markdown("## Risulati ottenuti") | |
gr.Markdown("#### Top 15 parole che pi霉 contribuiscono al topic di riferimento (utlima colonna):") | |
btn.click( | |
get_lda, | |
inputs=inputs, | |
outputs=[ | |
gr.DataFrame(), | |
gr.Plot(label="Quanto i topic trovati portano ironia?"), | |
gr.Plot(label="Come i topic sono correlati ai diversi subreddit del dataset?"), | |
] | |
) | |
# iface = gr.Interface(fn=greet, inputs="text", outputs="text") | |
if __name__ == "__main__": | |
demo.launch() | |