Build error
Build error
import base64 | |
import re | |
from collections import OrderedDict | |
from typing import Callable, Dict, List | |
import altair as alt | |
import numpy as np | |
import pandas as pd | |
import spacy | |
import streamlit as st | |
from pandas.core.series import Series | |
from PIL import Image | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.preprocessing import LabelEncoder | |
from sklearn.utils import resample | |
from stqdm import stqdm | |
from textacy.preprocessing import make_pipeline, normalize, remove, replace | |
from .configs import Languages, ModelConfigs, SupportedFiles | |
import string | |
stqdm.pandas() | |
def get_logo(path): | |
return | |
# @st.cache(suppress_st_warning=True) | |
def read_file(uploaded_file) -> pd.DataFrame: | |
file_type =".")[-1] | |
if file_type in set( for i in SupportedFiles): | |
read_f = SupportedFiles[file_type].value[0] | |
df = read_f(uploaded_file) | |
# remove any NA | |
df = df.dropna() | |
return df | |
else: | |
st.error("File type not supported") | |
def download_button(dataframe: pd.DataFrame, name: str): | |
csv = dataframe.to_csv(index=False) | |
# some strings <-> bytes conversions necessary here | |
b64 = base64.b64encode(csv.encode()).decode() | |
href = f'<a href="data:file/csv;base64,{b64}" download="{name}.csv">Download</a>' | |
st.write(href, unsafe_allow_html=True) | |
def encode(text: pd.Series, labels: pd.Series): | |
tfidf_vectorizer = TfidfVectorizer( | |
input="content", # default: file already in memory | |
encoding="utf-8", # default | |
decode_error="strict", # default | |
strip_accents=None, # do nothing | |
lowercase=False, # do nothing | |
preprocessor=None, # do nothing - default | |
tokenizer=None, # default | |
stop_words=None, # do nothing | |
analyzer="word", | |
ngram_range=(1, 3), # maximum 3-ngrams | |
min_df=0.001, | |
max_df=0.75, | |
sublinear_tf=True, | |
) | |
label_encoder = LabelEncoder() | |
with st.spinner("Encoding text using TF-IDF and Encoding labels"): | |
X = tfidf_vectorizer.fit_transform(text.values) | |
y = label_encoder.fit_transform(labels.values) | |
return { | |
"X": X, | |
"y": y, | |
"X_names": np.array(tfidf_vectorizer.get_feature_names()), | |
"y_names": label_encoder.classes_, | |
} | |
def wordifier(X, y, X_names: List[str], y_names: List[str], configs=ModelConfigs): | |
n_instances, n_features = X.shape | |
n_classes = len(y_names) | |
# NOTE: the * 10 / 10 trick is to have "nice" round-ups | |
sample_fraction = np.ceil((n_features / n_instances) * 10) / 10 | |
sample_size = min( | |
# this is the maximum supported | |
configs.MAX_SELECTION.value, | |
# at minimum you want MIN_SELECTION but in general you want | |
# n_instances * sample_fraction | |
max(configs.MIN_SELECTION.value, int(n_instances * sample_fraction)), | |
# however if previous one is bigger the the available instances take | |
# the number of available instances | |
n_instances, | |
) | |
# TODO: might want to try out something to subsample features at each iteration | |
# initialize coefficient matrices | |
pos_scores = np.zeros((n_classes, n_features), dtype=int) | |
neg_scores = np.zeros((n_classes, n_features), dtype=int) | |
with st.spinner("Wordifying!"): | |
for _ in stqdm(range(configs.NUM_ITERS.value)): | |
# run randomized regression | |
clf = LogisticRegression( | |
penalty="l1", | |
C=configs.PENALTIES.value[np.random.randint(len(configs.PENALTIES.value))], | |
solver="liblinear", | |
multi_class="auto", | |
max_iter=500, | |
class_weight="balanced", | |
) | |
# sample indices to subsample matrix | |
selection = resample(np.arange(n_instances), replace=True, stratify=y, n_samples=sample_size) | |
# fit | |
try: | |[selection], y[selection]) | |
except ValueError: | |
continue | |
# record coefficients | |
if n_classes == 2: | |
pos_scores[1] = pos_scores[1] + (clf.coef_ > 0.0) | |
neg_scores[1] = neg_scores[1] + (clf.coef_ < 0.0) | |
pos_scores[0] = pos_scores[0] + (clf.coef_ < 0.0) | |
neg_scores[0] = neg_scores[0] + (clf.coef_ > 0.0) | |
else: | |
pos_scores += clf.coef_ > 0 | |
neg_scores += clf.coef_ < 0 | |
# normalize | |
pos_scores = pos_scores / configs.NUM_ITERS.value | |
neg_scores = neg_scores / configs.NUM_ITERS.value | |
# get only active features | |
pos_positions = np.where(pos_scores >= configs.SELECTION_THRESHOLD.value, pos_scores, 0) | |
neg_positions = np.where(neg_scores >= configs.SELECTION_THRESHOLD.value, neg_scores, 0) | |
# prepare DataFrame | |
pos = [(X_names[i], pos_scores[c, i], y_names[c]) for c, i in zip(*pos_positions.nonzero())] | |
neg = [(X_names[i], neg_scores[c, i], y_names[c]) for c, i in zip(*neg_positions.nonzero())] | |
posdf = pd.DataFrame(pos, columns="word score label".split()).sort_values(["label", "score"], ascending=False) | |
negdf = pd.DataFrame(neg, columns="word score label".split()).sort_values(["label", "score"], ascending=False) | |
return posdf, negdf | |
# more [here]( | |
# and [here]( | |
_re_normalize_acronyms = re.compile("(?:[a-zA-Z]\.){2,}") | |
def normalize_acronyms(t): | |
return _re_normalize_acronyms.sub(t.translate(str.maketrans("", "", string.punctuation)).upper(), t) | |
_re_non_word = re.compile("\W") | |
def remove_non_word(t): | |
return _re_non_word.sub(" ", t) | |
_re_space = re.compile(" {2,}") | |
def normalize_useless_spaces(t): | |
return _re_space.sub(" ", t) | |
_re_rep = re.compile(r"(\S)(\1{2,})") | |
def normalize_repeating_chars(t): | |
def _replace_rep(m): | |
c, cc = m.groups() | |
return c | |
return _re_rep.sub(_replace_rep, t) | |
_re_wrep = re.compile(r"(?:\s|^)(\w+)\s+((?:\1\s+)+)\1(\s|\W|$)") | |
def normalize_repeating_words(t): | |
def _replace_wrep(m): | |
c, cc, e = m.groups() | |
return c | |
return _re_wrep.sub(_replace_wrep, t) | |
class TextPreprocessor: | |
def __init__( | |
self, language: str, cleaning_steps: List[str], lemmatizer_when: str = "last", remove_stop: bool = True | |
) -> None: | |
# prepare lemmatizer | |
self.language = language | |
self.nlp = spacy.load(Languages[language].value, exclude=["parser", "ner", "pos", "tok2vec"]) | |
self.lemmatizer_when = self._lemmatization_options().get(lemmatizer_when, None) | |
self.remove_stop = remove_stop | |
self._lemmatize = self._get_lemmatizer() | |
# prepare cleaning | |
self.cleaning_steps = [ | |
self._cleaning_options()[step] for step in cleaning_steps if step in self._cleaning_options() | |
] | |
self.cleaning_pipeline = make_pipeline(*self.cleaning_steps) if self.cleaning_steps else lambda x: x | |
def _get_lemmatizer(self) -> Callable: | |
"""Return the correct spacy Doc-level lemmatizer""" | |
if self.remove_stop: | |
def lemmatizer(doc: spacy.tokens.doc.Doc) -> str: | |
"""Lemmatizes spacy Doc and removes stopwords""" | |
return " ".join([t.lemma_ for t in doc if t.lemma_ != "-PRON-" and not t.is_stop]) | |
else: | |
def lemmatizer(doc: spacy.tokens.doc.Doc) -> str: | |
"""Lemmatizes spacy Doc""" | |
return " ".join([t.lemma_ for t in doc if t.lemma_ != "-PRON-"]) | |
return lemmatizer | |
def _lemmatization_options() -> Dict[str, str]: | |
return { | |
"Before preprocessing": "first", | |
"After preprocessing": "last", | |
"Never! Let's do it quick and dirty": None, | |
} | |
def lemmatizer(self, series: pd.Series) -> pd.Series: | |
""" | |
Apply spacy pipeline to transform string to spacy Doc and applies lemmatization | |
""" | |
res = [] | |
pbar = stqdm(total=len(series)) | |
for doc in self.nlp.pipe(series, batch_size=500): | |
res.append(self._lemmatize(doc)) | |
pbar.update(1) | |
pbar.close() | |
return pd.Series(res) | |
def _cleaning_options(): | |
"""Returns available cleaning steps in order""" | |
return OrderedDict( | |
[ | |
("lower", lambda x: x.lower()), | |
("normalize_unicode", normalize.unicode), | |
("normalize_bullet_points", normalize.bullet_points), | |
("normalize_hyphenated_words", normalize.hyphenated_words), | |
("normalize_quotation_marks", normalize.quotation_marks), | |
("normalize_whitespace", normalize.whitespace), | |
("replace_urls", replace.urls), | |
("replace_currency_symbols", replace.currency_symbols), | |
("replace_emails", replace.emails), | |
("replace_emojis", replace.emojis), | |
("replace_hashtags", replace.hashtags), | |
("replace_numbers", replace.numbers), | |
("replace_phone_numbers", replace.phone_numbers), | |
("replace_user_handles", replace.user_handles), | |
("normalize_acronyms", normalize_acronyms), | |
("remove_accents", remove.accents), | |
("remove_brackets", remove.brackets), | |
("remove_html_tags", remove.html_tags), | |
("remove_punctuation", remove.punctuation), | |
("remove_non_words", remove_non_word), | |
("normalize_useless_spaces", normalize_useless_spaces), | |
("normalize_repeating_chars", normalize_repeating_chars), | |
("normalize_repeating_words", normalize_repeating_words), | |
("strip", lambda x: x.strip()), | |
] | |
) | |
def fit_transform(self, series: pd.Series) -> Series: | |
"""Applies text preprocessing""" | |
if self.lemmatizer_when == "first": | |
with st.spinner("Lemmatizing"): | |
series = self.lemmatizer(series) | |
with st.spinner("Cleaning"): | |
series = series.progress_map(self.cleaning_pipeline) | |
if self.lemmatizer_when == "last": | |
with st.spinner("Lemmatizing"): | |
series = self.lemmatizer(series) | |
return series | |
def plot_labels_prop(data: pd.DataFrame, label_column: str): | |
unique_value_limit = 100 | |
if data[label_column].nunique() > unique_value_limit: | |
st.warning(f""" | |
The column you selected has more than {unique_value_limit}. | |
Are you sure it's the right column? If it is, please note that | |
this will impact __Wordify__ performance. | |
""") | |
return | |
source = data[label_column].value_counts().reset_index().rename(columns={"index": "Labels", label_column: "Counts"}) | |
source["Props"] = source["Counts"] / source["Counts"].sum() | |
source["Proportions"] = (source["Props"].round(3) * 100).map("{:,.2f}".format) + "%" | |
bars = ( | |
alt.Chart(source) | |
.mark_bar() | |
.encode( | |
x=alt.X("Labels:O", sort="-y"), | |
y="Counts:Q", | |
) | |
) | |
text = bars.mark_text(align="center", baseline="middle", dy=15).encode(text="Proportions:O") | |
return (bars + text).properties(height=300) | |
def plot_nchars(data: pd.DataFrame, text_column: str): | |
source = data[text_column].str.len().to_frame() | |
plot = ( | |
alt.Chart(source) | |
.mark_bar() | |
.encode( | |
alt.X(f"{text_column}:Q", bin=True, axis=alt.Axis(title="# chars per text")), | |
alt.Y("count()", axis=alt.Axis(title="")), | |
) | |
) | |
return | |
def plot_score(data: pd.DataFrame, label_col: str, label: str): | |
source = data.loc[data[label_col] == label].sort_values("score", ascending=False).head(100) | |
plot = ( | |
alt.Chart(source) | |
.mark_bar() | |
.encode( | |
y=alt.Y("word:O", sort="-x"), | |
x="score:Q", | |
) | |
) | |
return * source.shape[0], 50)) | |