Spaces:
Runtime error
Runtime error
import os | |
import re | |
import contractions | |
import unicodedata | |
import spacy | |
import keras | |
import requests | |
import shutil | |
import json | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from PIL import Image | |
from keras import backend as K | |
from keras.utils.data_utils import pad_sequences | |
from gensim.models import Word2Vec | |
from gensim.models.callbacks import CallbackAny2Vec | |
import nltk | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
os.system('python -m spacy download en_core_web_sm') | |
import en_core_web_sm | |
nlp = en_core_web_sm.load() | |
def recall_m(y_true, y_pred): | |
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) | |
possible_positives = K.sum(K.round(K.clip(y_true, 0, 1))) | |
recall = true_positives / (possible_positives + K.epsilon()) | |
return recall | |
def precision_m(y_true, y_pred): | |
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) | |
predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1))) | |
precision = true_positives / (predicted_positives + K.epsilon()) | |
return precision | |
def f1_m(y_true, y_pred): | |
precision = precision_m(y_true, y_pred) | |
recall = recall_m(y_true, y_pred) | |
return 2*((precision*recall)/(precision+recall+K.epsilon())) | |
#initialise callback class | |
class callback(CallbackAny2Vec): | |
""" | |
Print the loss value after each epoch | |
""" | |
def __init__(self): | |
self.epoch = 0 | |
#gensim loss is cumulative, so we record previous values to print | |
self.loss_previous_step = 0 | |
def on_epoch_end(self, model): | |
loss = model.get_latest_training_loss() | |
if self.epoch % 100 == 0: | |
print('Loss after epoch {}: {}'.format(self.epoch, loss-self.loss_previous_step)) | |
self.epoch+= 1 | |
self.loss_previous_step = loss | |
def spacy_lemmatize_text(text): | |
text = nlp(text) | |
text = ' '.join([word.lemma_ if word.lemma_ != '-PRON-' else word.text for word in text]) | |
return text | |
def remove_accented_chars(text): | |
text = unicodedata.normalize('NFC', text).encode('ascii', 'ignore').decode('utf-8', 'ignore') | |
return text | |
def remove_special_characters(text, remove_digits=False): | |
pattern = r'[^a-zA-Z0-9\s]' if not remove_digits else r'[^a-zA-Z\s]' | |
text = re.sub(pattern, '', text) | |
return text | |
def remove_stopwords(text, is_lower_case=False, stopwords=None): | |
if not stopwords: | |
stopwords = nltk.corpus.stopwords.words('english') | |
tokens = nltk.word_tokenize(text) | |
tokens = [token.strip() for token in tokens] | |
if is_lower_case: | |
filtered_tokens = [token for token in tokens if token not in stopwords] | |
else: | |
filtered_tokens = [token for token in tokens if token.lower() not in stopwords] | |
filtered_text = ' '.join(filtered_tokens) | |
return filtered_text | |
def pre_process(df): | |
opo_texto_data = df['opo_texto'] | |
opo_texto_ele_data = df['opo_texto_ele'] | |
opo_texto_final = [] | |
for i in range(len(opo_texto_data)): | |
if opo_texto_data[i] == opo_texto_ele_data[i]: | |
opo_texto_final.append(opo_texto_data[i]) | |
elif pd.isna(opo_texto_ele_data[i]): | |
opo_texto_final.append(opo_texto_data[i]) | |
elif len(nltk.word_tokenize(opo_texto_data[i])) < 4000: | |
opo_texto_final.append(opo_texto_data[i]+". "+opo_texto_ele_data[i]) | |
else: | |
opo_texto_final.append(opo_texto_data[i]) | |
pre_processed_data = [] | |
for opo in opo_texto_final: | |
opo_texto_sem_caracteres_especiais = (remove_accented_chars(opo)) | |
sentenceExpanded = contractions.fix(opo_texto_sem_caracteres_especiais) | |
sentenceWithoutPunctuation = remove_special_characters(sentenceExpanded , remove_digits=True) | |
sentenceLowered = sentenceWithoutPunctuation.lower() | |
sentenceLemmatized = spacy_lemmatize_text(sentenceLowered) | |
sentenceLemStopped = remove_stopwords(sentenceLemmatized, is_lower_case=False) | |
sentenceTokenized = nltk.word_tokenize(sentenceLemStopped) | |
pre_processed_data.append(sentenceTokenized) | |
df['opo_pre_tkn'] = pre_processed_data | |
return df | |
def classify(df, new_column = True, pre_processed = False): | |
sentencesMCTIList_xp8 = df['opo_pre_tkn'] | |
formatted_sentences = [] | |
if not pre_processed: | |
for sentence in sentencesMCTIList_xp8: | |
formatted_sentences.append(json.loads(sentence.replace("'",'"'))) | |
del sentencesMCTIList_xp8 | |
else: | |
formatted_sentences = sentencesMCTIList_xp8 | |
words = list(reloaded_w2v_model.wv.vocab) | |
item_shape = np.shape(reloaded_w2v_model.wv[words[0]]) | |
MCTIinput_vector = [] | |
for sentence in formatted_sentences: | |
aux_vector = [] | |
for word in sentence: | |
if word in reloaded_w2v_model.wv.vocab: | |
aux_vector.append(reloaded_w2v_model.wv[word]) | |
else: | |
aux_vector.append(np.zeros(item_shape)) | |
MCTIinput_vector.append(aux_vector) | |
del formatted_sentences | |
MCTIinput_padded = pad_sequences(MCTIinput_vector, maxlen=2726, padding='pre') | |
del MCTIinput_vector | |
predictions = reconstructed_model_CNN.predict(MCTIinput_padded) | |
del MCTIinput_padded | |
print(predictions) | |
cleaned_up_predictions = [] | |
for prediction in predictions: | |
cleaned_up_predictions.append(1 if prediction >= 0.5 else 0); | |
del predictions | |
df['classification'] = cleaned_up_predictions | |
if not new_column: | |
df = df.loc[df['classification'] == 1] | |
return df | |
def gen_output(data): | |
data.to_excel("output.xlsx", index=False) | |
return "output.xlsx" | |
reloaded_w2v_model = Word2Vec.load('word2vec_xp8.model') | |
reconstructed_model_CNN = keras.models.load_model("best weights CNN.h5", | |
custom_objects={'f1_m':f1_m, | |
"precision_m":precision_m, | |
"recall_m":recall_m}) | |
def app(operacao, resultado, dados): | |
data = pd.read_excel(dados) | |
print("Dados Carregados!") | |
if operacao == "Pré-processamento + Classificação" : | |
preprocessed = pre_process(data) | |
df = classify(preprocessed, resultado == "Nova Coluna", pre_processed = True) | |
output = gen_output(df) | |
return output | |
elif operacao == "Apenas Pré-processamento" : | |
df = pre_process(data) | |
output = gen_output(df) | |
return output | |
elif operacao == "Apenas Classificação" : | |
df = classify(data, resultado == "Nova Coluna") | |
output = gen_output(df) | |
return output | |
iface = gr.Interface( | |
fn=app, | |
inputs=[ | |
gr.Radio(["Pré-processamento + Classificação", "Apenas Pré-processamento", "Apenas Classificação"]), | |
gr.Radio(["Nova Coluna", "Filtrar planilha"]), | |
"file" | |
], | |
outputs="file" | |
) | |
iface.launch() |