import gradio as gr import os import pandas as pd import matplotlib.pyplot as plt import gensim.downloader as api import numpy as np import nltk, spacy, gensim from sklearn.decomposition import LatentDirichletAllocation from sklearn.feature_extraction.text import CountVectorizer from pprint import pprint import matplotlib matplotlib.use('agg') print("[x] Downloading word 2 vec") model_w2v = api.load("word2vec-google-news-300") def average_word2vec(word_list: list[str]): # model_w2v = api.load("word2vec-google-news-300") word_vectors = [] for word in word_list: if word in model_w2v: word_vectors.append(model_w2v[word]) if word_vectors: average_vector = np.mean(word_vectors, axis=0) else: return None most_similar_word = model_w2v.similar_by_vector(average_vector, topn=1) word, similarity = most_similar_word[0] return word, similarity def concat_comments(*kwargs): return ['\n'.join(ele) for ele in zip(*kwargs)] def sent_to_words(sentences): for sentence in sentences: yield(gensim.utils.simple_preprocess(str(sentence), deacc=True)) # deacc=True removes punctuations def lemmatization(texts, allowed_postags=['NOUN', 'ADJ', 'VERB', 'ADV'], nlp=None): #'NOUN', 'ADJ', 'VERB', 'ADV' texts_out = [] for sent in texts: doc = nlp(" ".join(sent)) texts_out.append(" ".join([ token.lemma_ if token.lemma_ not in ['-PRON-'] else '' for token in doc if token.pos_ in allowed_postags ])) return texts_out def get_lda(n_components, n_top_subreddit_to_analyse, what_label_to_use): df = pd.read_csv('./data/results.csv', index_col=0) data = concat_comments(df.subreddit, df.sup_comment, df.comment) data_words = list(sent_to_words(data)) if what_label_to_use == 'Use True label': label = 'label' else: label = 'prediction' if not spacy.util.is_package("en_core_web_sm"): print('[x] en_core_web_sm not found, downloading...') os.system("python -m spacy download en_core_web_sm") print('[x] en_core_web_sm downloaded') print('[x] Lemmatization begins') nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) data_lemmatized = lemmatization(data_words, allowed_postags=["NOUN", "ADJ"], nlp=nlp) #select noun and verb print('[x] Vectorizing') vectorizer = CountVectorizer( analyzer='word', min_df=10, stop_words='english', lowercase=True, token_pattern='[a-zA-Z0-9]{3,}' ) print('[x] Fitting vectorized data on lemmatization') data_vectorized = vectorizer.fit_transform(data_lemmatized) print('[x] Init LDA model') lda_model = LatentDirichletAllocation( n_components=n_components, max_iter=10, learning_method='online', random_state=100, batch_size=128, evaluate_every = -1, n_jobs = -1, verbose=1, ) print('[x] Fitting LDA model') lda_output = lda_model.fit_transform(data_vectorized) print(lda_model) # Model attributes print('[x] Getting performances') performances = lda_model.score(data_vectorized), lda_model.perplexity(data_vectorized) # Log Likelyhood: Higher the better print("Log Likelihood: ", performances[0]) # Perplexity: Lower the better. Perplexity = exp(-1. * log-likelihood per word) print("Perplexity: ", performances[1]) print('[x] Check parameters if they look correct') # See model parameters pprint(lda_model.get_params()) # switching to the best model best_lda_model = lda_model print('[x] Getting LDA output') lda_output = best_lda_model.transform(data_vectorized) print('[x] Assigning topics') topicnames = ["Topic" + str(i) for i in range(best_lda_model.n_components)] docnames = ["Doc" + str(i) for i in range(len(data))] df_document_topic = pd.DataFrame(np.round(lda_output, 2), columns=topicnames, index=docnames) print('[x] Checking dominant topics') dominant_topic = np.argmax(df_document_topic.values, axis=1) df_document_topic["dominant_topic"] = dominant_topic # Topic-Keyword Matrix df_topic_keywords = pd.DataFrame(best_lda_model.components_) df_topic_keywords # Assign Column and Index df_topic_keywords.columns = vectorizer.get_feature_names_out() df_topic_keywords.index = topicnames print('[x] Computing word-topic association') # Show top n keywords for each topic def show_topics(vectorizer=vectorizer, lda_model=lda_model, n_words=20): keywords = np.array(vectorizer.get_feature_names_out()) topic_keywords = [] for topic_weights in lda_model.components_: top_keyword_locs = (-topic_weights).argsort()[:n_words] topic_keywords.append(keywords.take(top_keyword_locs)) return topic_keywords topic_keywords = show_topics(vectorizer=vectorizer, lda_model=best_lda_model, n_words=15) # Topic - Keywords Dataframe df_topic_keywords = pd.DataFrame(topic_keywords) df_topic_keywords.columns = ['Word '+str(i) for i in range(df_topic_keywords.shape[1])] df_topic_keywords.index = ['Topic '+str(i) for i in range(df_topic_keywords.shape[0])] df_topic_keywords # topics = [ # f'Topic {i}' for i in range(len(df_topic_keywords)) # ] topics = [] for i, row in df_topic_keywords.iterrows(): topics.append( average_word2vec(row.to_list()[:5])[0] ) df_topic_keywords["Topics"] = topics df_topic_keywords print('[x] Predicting dominant topic for each document') # Define function to predict topic for a given text document. def predict_topic(text, nlp=nlp): global sent_to_words global lemmatization # Step 1: Clean with simple_preprocess mytext_2 = list(sent_to_words(text)) # Step 2: Lemmatize mytext_3 = lemmatization(mytext_2, allowed_postags=['NOUN', 'ADJ', 'VERB', 'ADV'], nlp=nlp) # Step 3: Vectorize transform mytext_4 = vectorizer.transform(mytext_3) # Step 4: LDA Transform topic_probability_scores = best_lda_model.transform(mytext_4) topic = df_topic_keywords.iloc[np.argmax(topic_probability_scores), 1:14].values.tolist() # Step 5: Infer Topic infer_topic = df_topic_keywords.iloc[np.argmax(topic_probability_scores), -1] #topic_guess = df_topic_keywords.iloc[np.argmax(topic_probability_scores), Topics] return infer_topic, topic, topic_probability_scores # # Predict the topic # mytext = ["This is a test of a random topic where I talk about politics"] # infer_topic, topic, prob_scores = predict_topic(text = mytext, nlp=nlp) def apply_predict_topic(text): text = [text] infer_topic, topic, prob_scores = predict_topic(text = text, nlp=nlp) return(infer_topic) df["Topic_key_word"] = df['comment'].apply(apply_predict_topic) print('[x] Generating plot [1]') print('Percentuale di commenti ironici per ogni topic') perc_topic_irony = {} for t in topics: total_0label = sum((df[label] == 1) & (df.Topic_key_word == t)) if total_0label != 0: total_X_topic = df.Topic_key_word.value_counts()[t] else: total_0label, total_X_topic = 0, 0.001 # Non ci cono topic nel dataset perc_topic_irony[t] = total_0label / total_X_topic print(f'{t} w/ label 1: {total_0label}/{total_X_topic} ({total_0label / total_X_topic * 100 :.2f}%)') fig1, ax = plt.subplots(figsize = (10, 7)) bottom = np.zeros(len(perc_topic_irony)) width = 0.9 ax.bar(perc_topic_irony.keys(), perc_topic_irony.values(), width, label = 'sarcastic') comp = list(map(lambda x: 1 - x if x > 0 else 0, perc_topic_irony.values())) ax.bar(perc_topic_irony.keys(), comp, width, bottom=list(perc_topic_irony.values()), label = 'not sarcastic') ax.set_title("% of sarcastic comments for each topic") plt.xticks(rotation=70) ax.set_ylim(bottom = 0, top = 1.02) plt.legend() plt.axhline(0.5, color = 'red', ls=":") # probably not necessary (?) To drop eventually if log are to much cluttered! print('Percentage of each topic for each subreddit') weight_counts = {} for t in topics: weight_counts[t] = [] for subreddit in df['subreddit'].value_counts().index[:n_top_subreddit_to_analyse]: # first 10 big subreddits if sum(df[df.Topic_key_word == t].subreddit == subreddit) > 0: # se ci sono subreddit per il topic t (almeno una riga nel df) perc_sub = df[df.Topic_key_word == t]['subreddit'].value_counts()[subreddit] / df['subreddit'].value_counts()[subreddit] else: perc_sub = 0 weight_counts[t].append(perc_sub) print(f'Perc of topic {t} in subreddit {subreddit}: {perc_sub * 100:.2f}') print() print('[x] Generating plot [2]') # plot subreddits = list(df.subreddit.value_counts().index)[:n_top_subreddit_to_analyse] irony_percs = { t: [ len( df[df.subreddit == subreddit][(df[df.subreddit == subreddit].Topic_key_word == t) & (df[df.subreddit == subreddit][label] == 1)] ) / len( df[df.subreddit == subreddit] ) for subreddit in subreddits ] for t in topics } width = 0.9 fig2, ax = plt.subplots(figsize = (10, 7)) plt.axhline(0.5, color = 'red', ls=":", alpha = .3) bottom = np.zeros(len(subreddits)) for k, v in weight_counts.items(): p = ax.bar(subreddits, v, width, label=k, bottom=bottom) ax.bar(subreddits, irony_percs[k], width - 0.01, bottom=bottom, color = 'black', edgecolor = 'white', alpha = .2, hatch = '\\') bottom += v ax.set_title("% of topics for each subreddit") ax.legend(loc="upper right") plt.xticks(rotation=50) ax.set_ylim(bottom = 0, top = 1.02) print('[v] All looking good!') return df_topic_keywords, fig1, fig2 # def main(): with gr.Blocks() as demo: gr.Markdown("# Dashboard per l'analisi con LDA") gr.Markdown("### La dashboard permette l'addestramento di un modello LDA per controllare se e quali (dominant) topic sono più propensi a commenti di tipo sarcastico") # gradio.Dataframe(···) inputs = [] with gr.Row(): inputs.append(gr.Slider(2, 25, value=5, step = 1, label="LDA N components", info="Scegli il numero di componenti per LDA")) inputs.append(gr.Slider(2, 20, value=5, step = 1, label="Subreddit dal dataset", info="Numero di subreddit da analizzare")) inputs.append(gr.Radio( choices = ['Use True label', 'Use BERT prediction'], value = 'Use True label', label = "Scegliere quali label sull'ironia utilizzare:", ) ) btn = gr.Button(value="Submit") gr.Markdown("## Risulati ottenuti") gr.Markdown("#### Top 15 parole che più contribuiscono al topic di riferimento (utlima colonna):") btn.click( get_lda, inputs=inputs, outputs=[ gr.DataFrame(), gr.Plot(label="Quanto i topic trovati portano ironia?"), gr.Plot(label="Come i topic sono correlati ai diversi subreddit del dataset?"), ] ) # iface = gr.Interface(fn=greet, inputs="text", outputs="text") if __name__ == "__main__": demo.launch()