chap0lin's picture
Update app.py
21051e8
raw
history blame
6.58 kB
import os
import re
import contractions
import unicodedata
import spacy
import keras
import requests
import shutil
import json
import gradio as gr
import pandas as pd
import numpy as np
from PIL import Image
from keras import backend as K
from keras.utils.data_utils import pad_sequences
from gensim.models import Word2Vec
from gensim.models.callbacks import CallbackAny2Vec
import nltk
nltk.download('punkt')
nltk.download('stopwords')
os.system('python -m spacy download en_core_web_sm')
import en_core_web_sm
nlp = en_core_web_sm.load()
def recall_m(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
recall = true_positives / (possible_positives + K.epsilon())
return recall
def precision_m(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
precision = true_positives / (predicted_positives + K.epsilon())
return precision
def f1_m(y_true, y_pred):
precision = precision_m(y_true, y_pred)
recall = recall_m(y_true, y_pred)
return 2*((precision*recall)/(precision+recall+K.epsilon()))
#initialise callback class
class callback(CallbackAny2Vec):
"""
Print the loss value after each epoch
"""
def __init__(self):
self.epoch = 0
#gensim loss is cumulative, so we record previous values to print
self.loss_previous_step = 0
def on_epoch_end(self, model):
loss = model.get_latest_training_loss()
if self.epoch % 100 == 0:
print('Loss after epoch {}: {}'.format(self.epoch, loss-self.loss_previous_step))
self.epoch+= 1
self.loss_previous_step = loss
def spacy_lemmatize_text(text):
text = nlp(text)
text = ' '.join([word.lemma_ if word.lemma_ != '-PRON-' else word.text for word in text])
return text
def remove_accented_chars(text):
text = unicodedata.normalize('NFC', text).encode('ascii', 'ignore').decode('utf-8', 'ignore')
return text
def remove_special_characters(text, remove_digits=False):
pattern = r'[^a-zA-Z0-9\s]' if not remove_digits else r'[^a-zA-Z\s]'
text = re.sub(pattern, '', text)
return text
def remove_stopwords(text, is_lower_case=False, stopwords=None):
if not stopwords:
stopwords = nltk.corpus.stopwords.words('english')
tokens = nltk.word_tokenize(text)
tokens = [token.strip() for token in tokens]
if is_lower_case:
filtered_tokens = [token for token in tokens if token not in stopwords]
else:
filtered_tokens = [token for token in tokens if token.lower() not in stopwords]
filtered_text = ' '.join(filtered_tokens)
return filtered_text
def pre_process(df):
opo_texto_data = df['opo_texto']
opo_texto_ele_data = df['opo_texto_ele']
opo_texto_final = []
for i in range(len(opo_texto_data)):
if opo_texto_data[i] == opo_texto_ele_data[i]:
opo_texto_final.append(opo_texto_data[i])
elif pd.isna(opo_texto_ele_data[i]):
opo_texto_final.append(opo_texto_data[i])
elif len(nltk.word_tokenize(opo_texto_data[i])) < 4000:
opo_texto_final.append(opo_texto_data[i]+". "+opo_texto_ele_data[i])
else:
opo_texto_final.append(opo_texto_data[i])
pre_processed_data = []
for opo in opo_texto_final:
opo_texto_sem_caracteres_especiais = (remove_accented_chars(opo))
sentenceExpanded = contractions.fix(opo_texto_sem_caracteres_especiais)
sentenceWithoutPunctuation = remove_special_characters(sentenceExpanded , remove_digits=True)
sentenceLowered = sentenceWithoutPunctuation.lower()
sentenceLemmatized = spacy_lemmatize_text(sentenceLowered)
sentenceLemStopped = remove_stopwords(sentenceLemmatized, is_lower_case=False)
sentenceTokenized = nltk.word_tokenize(sentenceLemStopped)
pre_processed_data.append(sentenceTokenized)
df['opo_pre_tkn'] = pre_processed_data
return df
def classify(df, new_column = True, pre_processed = False):
sentencesMCTIList_xp8 = df['opo_pre_tkn']
formatted_sentences = []
if not pre_processed:
for sentence in sentencesMCTIList_xp8:
formatted_sentences.append(json.loads(sentence.replace("'",'"')))
del sentencesMCTIList_xp8
else:
formatted_sentences = sentencesMCTIList_xp8
words = list(reloaded_w2v_model.wv.vocab)
item_shape = np.shape(reloaded_w2v_model.wv[words[0]])
MCTIinput_vector = []
for sentence in formatted_sentences:
aux_vector = []
for word in sentence:
if word in reloaded_w2v_model.wv.vocab:
aux_vector.append(reloaded_w2v_model.wv[word])
else:
aux_vector.append(np.zeros(item_shape))
MCTIinput_vector.append(aux_vector)
del formatted_sentences
MCTIinput_padded = pad_sequences(MCTIinput_vector, maxlen=2726, padding='pre')
del MCTIinput_vector
predictions = reconstructed_model_CNN.predict(MCTIinput_padded)
del MCTIinput_padded
print(predictions)
cleaned_up_predictions = []
for prediction in predictions:
cleaned_up_predictions.append(1 if prediction >= 0.5 else 0);
del predictions
df['classification'] = cleaned_up_predictions
if not new_column:
df = df.loc[df['classification'] == 1]
return df
def gen_output(data):
data.to_excel("output.xlsx", index=False)
return "output.xlsx"
reloaded_w2v_model = Word2Vec.load('word2vec_xp8.model')
reconstructed_model_CNN = keras.models.load_model("best weights CNN.h5",
custom_objects={'f1_m':f1_m,
"precision_m":precision_m,
"recall_m":recall_m})
def app(operacao, resultado, dados):
data = pd.read_excel(dados)
print("Dados Carregados!")
if operacao == "Pré-processamento + Classificação" :
preprocessed = pre_process(data)
df = classify(preprocessed, resultado == "Nova Coluna", pre_processed = True)
output = gen_output(df)
return output
elif operacao == "Apenas Pré-processamento" :
df = pre_process(data)
output = gen_output(df)
return output
elif operacao == "Apenas Classificação" :
df = classify(data, resultado == "Nova Coluna")
output = gen_output(df)
return output
iface = gr.Interface(
fn=app,
inputs=[
gr.Radio(["Pré-processamento + Classificação", "Apenas Pré-processamento", "Apenas Classificação"]),
gr.Radio(["Nova Coluna", "Filtrar planilha"]),
"file"
],
outputs="file"
)
iface.launch()