Spaces:
Running
Running
#Import the libraries we know we'll need for the Generator. | |
import pandas as pd, spacy, nltk, numpy as np, re | |
from spacy.matcher import Matcher | |
nlp = spacy.load("en_core_web_lg") | |
import altair as alt | |
import streamlit as st | |
from annotated_text import annotated_text as ant | |
#Import the libraries to support the model and predictions. | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TextClassificationPipeline | |
import lime | |
import torch | |
import torch.nn.functional as F | |
from lime.lime_text import LimeTextExplainer | |
#Import WNgen.py | |
from WNgen import * | |
class_names = ['negative', 'positive'] | |
explainer = LimeTextExplainer(class_names=class_names) | |
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english") | |
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english") | |
pipe = TextClassificationPipeline(model=model, tokenizer=tokenizer, return_all_scores=True) | |
def predictor(texts): | |
outputs = model(**tokenizer(texts, return_tensors="pt", padding=True)) | |
probas = F.softmax(outputs.logits, dim=1).detach().numpy() | |
return probas | |
def critical_words(document, options=False): | |
'''This function is meant to select the critical part of a sentence. Critical, in this context means | |
the part of the sentence that is either: A) a NOUN or PROPN from the correct entity group, B) a NOUN, | |
C) a NOUN + ADJ combination, or D) ADJ and PROPN used to modify other NOUN tokens. | |
It also checks this against what the model thinks is important if the user defines "options" as "LIME" or True.''' | |
if type(document) is not spacy.tokens.doc.Doc: | |
document = nlp(document) | |
chunks = list(document.noun_chunks) | |
pos_options = [] | |
lime_options = [] | |
#Identify what the model cares about. | |
if options: | |
#Run Lime Setup code | |
exp = explainer.explain_instance(document.text, predictor, num_features=15, num_samples=2000) | |
lime_results = exp.as_list() | |
for feature in lime_results: | |
lime_options.append(feature[0]) | |
lime_results = pd.DataFrame(lime_results, columns=["Word","Weight"]) | |
#Identify what we care about "parts of speech" | |
# Here I am going to try to pick up pronouns, which are people, and Adjectival Compliments. | |
for token in document: | |
if (token.text not in pos_options) and ((token.text in lime_options) or (options == False)): | |
#print(f"executed {token.text} with {token.pos_} and {token.dep_}") #QA | |
if (token.pos_ in ["ADJ","PROPN"]) and (token.dep_ in ["compound", "amod"]) and (document[token.i - 1].dep_ in ["compound", "amod"]): | |
compound = document[token.i - 1: token.i +1].text | |
pos_options.append(compound) | |
print(f'Added {compound} based on "amod" and "compound" adjectives.') | |
elif (token.pos_ in ["NOUN"]) and (token.dep_ in ["compound", "amod", "conj"]) and (document[token.i - 1].dep_ in ["compound"]): | |
compound = document[token.i - 1: token.i +1].text | |
pos_options.append(compound) | |
print(f'Added {compound} based on "amod" and "compound" and "conj" nouns.') | |
elif (token.pos_ == "PROPN") and (token.dep_ in ["prep","amod"]): | |
pos_options.append(token.text) | |
print(f"Added '{token.text}' based on their adjectival state.") | |
elif (token.pos_ == "ADJ") and (token.dep_ in ["acomp","conj","amod"]): | |
pos_options.append(token.text) | |
print(f"Added '{token.text}' based on their adjectival state.") | |
elif (token.pos_ == "PRON") and (len(token.morph) !=0): | |
if (token.morph.get("PronType") == "Prs"): | |
pos_options.append(token.text) | |
print(f"Added '{token.text}' because it's a human pronoun.") | |
#Noun Chunks parsing | |
for chunk in chunks: | |
#The use of chunk[-1] is due to testing that it appears to always match the root | |
root = chunk[-1] | |
#This currently matches to a list I've created. I don't know the best way to deal with this so I'm leaving it as is for the moment. | |
if root.ent_type_: | |
cur_values = [] | |
if (len(chunk) > 1) and (chunk[-2].dep_ == "compound"): | |
#creates the compound element of the noun | |
compound = [x.text for x in chunk if x.dep_ == "compound"] | |
print(f"This is the contents of {compound} and it is {all(elem in lime_options for elem in compound)} that all elements are present in {lime_options}.") #for QA | |
#checks to see all elements in the compound are important to the model or use the compound if not checking importance. | |
if (all(elem in lime_options for elem in cur_values) and (options is True)) or ((options is False)): | |
#creates a span for the entirety of the compound noun and adds it to the list. | |
span = -1 * (1 + len(compound)) | |
pos_options.append(chunk[span:].text) | |
cur_values + [token.text for token in chunk if token.pos_ in ["ADJ","NOUN","PROPN"]] | |
else: | |
print(f"The elmenents in {compound} could not be added to the final list because they are not all relevant to the model.") | |
else: | |
cur_values = [token.text for token in chunk if (token.ent_type_) or (token.pos_ == "ADJ")] | |
if (all(elem in lime_options for elem in cur_values) and (options is True)) or ((options is False)): | |
pos_options.extend(cur_values) | |
print(f"From {chunk.text}, {cur_values} added to pos_options due to entity recognition.") #for QA | |
elif len(chunk) >= 1: | |
cur_values = [token.text for token in chunk if token.pos_ in ["NOUN","ADJ","PROPN"]] | |
if (all(elem in lime_options for elem in cur_values) and (options is True)) or ((options is False)): | |
pos_options.extend(cur_values) | |
print(f"From {chunk.text}, {cur_values} added to pos_options due to wildcard.") #for QA | |
else: | |
print(f"No options added for \'{chunk.text}\' ") | |
pos_options = list(set(pos_options)) | |
if options: | |
return pos_options, lime_results | |
else: | |
return pos_options | |
# Return the Viz of elements critical to LIME. | |
def lime_viz(df): | |
if not isinstance(df, pd.DataFrame): | |
df = pd.DataFrame(df, columns=["Word","Weight"]) | |
single_nearest = alt.selection_single(on='mouseover', nearest=True) | |
viz = alt.Chart(df).encode( | |
alt.X('Weight:Q', scale=alt.Scale(domain=(-1, 1))), | |
alt.Y('Word:N', sort='x', axis=None), | |
color=alt.Color("Weight", scale=alt.Scale(scheme='blueorange', domain=[0], type="threshold", range='diverging'), legend=None), | |
tooltip = ("Word","Weight") | |
).mark_bar().properties(title ="Importance of individual words") | |
text = viz.mark_text( | |
fill="black", | |
align='right', | |
baseline='middle' | |
).encode( | |
text='Word:N' | |
) | |
limeplot = alt.LayerChart(layer=[viz,text], width = 300).configure_axis(grid=False).configure_view(strokeWidth=0) | |
return limeplot | |
# Evaluate Predictions using the model and pipe. | |
def eval_pred(text, return_all = False): | |
'''A basic function for evaluating the prediction from the model and turning it into a visualization friendly number.''' | |
preds = pipe(text) | |
neg_score = -1 * preds[0][0]['score'] | |
sent_neg = preds[0][0]['label'] | |
pos_score = preds[0][1]['score'] | |
sent_pos = preds[0][1]['label'] | |
prediction = 0 | |
sentiment = '' | |
if pos_score > abs(neg_score): | |
prediction = pos_score | |
sentiment = sent_pos | |
elif abs(neg_score) > pos_score: | |
prediction = neg_score | |
sentiment = sent_neg | |
if return_all: | |
return prediction, sentiment | |
else: | |
return prediction | |
def construct_nlexp(text,sentiment,probability): | |
prob = str(np.round(100 * abs(probability),2)) | |
if sentiment == "NEGATIVE": | |
color_sent = ant('The model predicts the sentiment of the sentence you provided is ', (sentiment, "-", "#FFA44F"), ' with a probability of ', (prob, "neg", "#FFA44F"),"%.") | |
elif sentiment == "POSITIVE": | |
color_sent = ant('The model predicts the sentiment of the sentence you provided is ', (sentiment, "+", "#50A9FF"), ' with a probability of ', (prob, "pos", "#50A9FF"),"%.") | |
return color_sent | |
def get_min_max(df, seed): | |
'''This function provides the alternatives with the highest spaCy similarity scores and the lowest similarity scores. As similarity is based on vectorization of words and documents this may not be the best way to identify bias. | |
text2 = Most Similar | |
text3 = Least Similar''' | |
maximum = df[df['similarity'] < .9999].similarity.max() | |
text2 = df.loc[df['similarity'] == maximum, 'text'].iloc[0] | |
minimum = df[df['similarity'] > .0001].similarity.min() | |
text3 = df.loc[df['similarity'] == minimum, 'text'].iloc[0] | |
return text2, text3 | |
# Inspired by https://stackoverflow.com/questions/17758023/return-rows-in-a-dataframe-closest-to-a-user-defined-number/17758115#17758115 | |
def abs_dif(df,seed): | |
'''This function enables a user to identify the alternative that is closest to the seed and farthest from the seed should that be the what they wish to display. | |
text2 = Nearest Prediction | |
text3 = Farthest Prediction''' | |
#seed = process_text(seed) | |
target = df[df['Words'] == seed].pred.iloc[0] | |
sub_df = df[df['Words'] != seed].reset_index() | |
nearest_prediction = sub_df.pred[(sub_df.pred-target).abs().argsort()[:1]] | |
farthest_prediction = sub_df.pred[(sub_df.pred-target).abs().argsort()[-1:]] | |
text2 = sub_df.text.iloc[nearest_prediction.index[0]] | |
text3 = sub_df.text.iloc[farthest_prediction.index[0]] | |
return text2, text3 | |
#@st.experimental_singleton #I've enabled this to prevent it from triggering every time the code runs... which could get very messy | |
def sampled_alts(df, seed, fixed=False): | |
'''This function enables a user to select an alternate way of choosing which counterfactuals are shown for MultiNLC, MultiNLC + Lime, and VizNLC. If you use this then you are enabling random sampling over other options (ex. spaCy similarity scores, or absolute difference). | |
Both samples are random.''' | |
sub_df = df[df['Words'] != seed] | |
if fixed: | |
sample = sub_df.sample(n=2, random_state = 2052) | |
else: | |
sample = sub_df.sample(n=2) | |
text2 = sample.text.iloc[0] | |
text3 = sample.text.iloc[1] | |
return text2, text3 | |
def gen_cf_country(df,_document,selection): | |
df['text'] = df.Words.apply(lambda x: re.sub(r'\b'+selection+r'\b',x,_document.text)) | |
df['pred'] = df.text.apply(eval_pred) | |
df['seed'] = df.Words.apply(lambda x: 'seed' if x == selection else 'alternative') | |
df['similarity'] = df.Words.apply(lambda x: nlp(selection).similarity(nlp(x))) | |
return df | |
def gen_cf_profession(df,_document,selection): | |
category = df.loc[df['Words'] == selection, 'Major'].iloc[0] | |
df = df[df.Major == category] | |
df['text'] = df.Words.apply(lambda x: re.sub(r'\b'+selection+r'\b',x,_document.text)) | |
df['pred'] = df.text.apply(eval_pred) | |
df['seed'] = df.Words.apply(lambda x: 'seed' if x == selection else 'alternative') | |
df['similarity'] = df.Words.apply(lambda x: nlp(selection).similarity(nlp(x))) | |
return df |