|
import streamlit as st |
|
import time |
|
import os |
|
import logging |
|
import torch |
|
import json |
|
import string |
|
import re |
|
import string |
|
import nltk |
|
import numpy as np |
|
import torch.nn as nn |
|
import transformers |
|
import lightgbm as lgb |
|
import pickle |
|
nltk.download('wordnet') |
|
nltk.download('stopwords') |
|
from collections import Counter |
|
from nltk.corpus import stopwords |
|
from nltk.stem import WordNetLemmatizer |
|
from nltk.tokenize import RegexpTokenizer |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.linear_model import LogisticRegression |
|
|
|
stop_words = set(stopwords.words('english')) |
|
|
|
|
|
with open('logreg.pkl', 'rb') as f: |
|
logreg = pickle.load(f) |
|
|
|
with open('tf.pkl', 'rb') as f: |
|
tf = pickle.load(f) |
|
|
|
def classical_pipeline(text): |
|
text = text.lower() |
|
text = re.sub(r'\d+', ' ', text) |
|
text = text.translate(str.maketrans('', '', string.punctuation)) |
|
text = re.sub(r'\n', '', text) |
|
wn_lemmatizer = WordNetLemmatizer() |
|
text = ' '.join([wn_lemmatizer.lemmatize(word) for word in text.split()]) |
|
reg_tokenizer = RegexpTokenizer('\w+') |
|
text = reg_tokenizer.tokenize_sents([text]) |
|
sw = stopwords.words('english') |
|
text = ' '.join([word for word in text[0] if word not in sw]) |
|
text = tf.transform([text]) |
|
return text |
|
|
|
def preprocess_single_string(input_string: str, seq_len: int, vocab_to_int: dict): |
|
preprocessed_string = data_preprocessing(input_string) |
|
result_list = [] |
|
for word in preprocessed_string.split(): |
|
try: |
|
result_list.append(vocab_to_int[word]) |
|
except KeyError as e: |
|
continue |
|
result_padded = padding([result_list], seq_len)[0] |
|
return torch.tensor(result_padded) |
|
|
|
|
|
|
|
def padding(reviews_int: list, seq_len: int): |
|
features = np.zeros((len(reviews_int), seq_len), dtype = int) |
|
for i, review in enumerate(reviews_int): |
|
if len(review) <= seq_len: |
|
zeros = list(np.zeros(seq_len - len(review))) |
|
new = zeros + review |
|
else: |
|
new = review[: seq_len] |
|
features[i, :] = np.array(new) |
|
return features |
|
|
|
|
|
def data_preprocessing(text: str): |
|
wn_lemmatizer = WordNetLemmatizer() |
|
text = text.lower() |
|
text = re.sub('<.*?>', '', text) |
|
text = ''.join([c for c in text if c not in string.punctuation]) |
|
text = [wn_lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words] |
|
text = ' '.join(text) |
|
return text |
|
|
|
with open('lstm_vocab_to_int.json') as json_file: |
|
vocab_to_int = json.load(json_file) |
|
|
|
with open('lstm_embedding_matrix.npy', 'rb') as f: |
|
embedding_matrix = np.load(f) |
|
|
|
embedding_layer = torch.nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix)) |
|
|
|
class LSTMClassifier(nn.Module): |
|
def __init__(self, embedding_dim: int, seq_len:int, hidden_size:int = 32, dropout:int = 0, num_layers:int = 1) -> None: |
|
super().__init__() |
|
|
|
self.embedding_dim = embedding_dim |
|
self.hidden_size = hidden_size |
|
self.embedding = embedding_layer |
|
self.dropout = dropout |
|
self.num_layers = num_layers |
|
self.seq_len = seq_len |
|
self.lstm = nn.LSTM( |
|
input_size=self.embedding_dim, |
|
hidden_size=self.hidden_size, |
|
batch_first=True, |
|
bidirectional=True, |
|
dropout=self.dropout, |
|
num_layers=self.num_layers |
|
) |
|
self.linear = nn.Sequential( |
|
nn.Linear(self.hidden_size * self.seq_len * 2, 128), |
|
nn.Linear(128, 1) |
|
) |
|
|
|
def forward(self, x): |
|
embeddings = self.embedding(x) |
|
output, _ = self.lstm(embeddings) |
|
output = output.contiguous().view(output.size(0), -1) |
|
out = self.linear(output.squeeze(0)) |
|
return out |
|
|
|
bert_model_class = transformers.DistilBertModel |
|
bert_tokenizer_class = transformers.DistilBertTokenizer |
|
bert_pretrained_weights = torch.load('basic_bert_weights.pt', map_location=torch.device('cpu')) |
|
bert_tokenizer = bert_tokenizer_class.from_pretrained('distilbert-base-uncased') |
|
bert_basic_model = bert_model_class.from_pretrained('distilbert-base-uncased') |
|
|
|
class BertReviews(nn.Module): |
|
def __init__(self, model): |
|
super(BertReviews, self).__init__() |
|
self.bert = model |
|
for param in self.bert.parameters(): |
|
param.requires_grad = False |
|
for i in range(6): |
|
self.bert.transformer.layer[i].output_layer_norm.weight.requires_grad = True |
|
self.bert.transformer.layer[i].output_layer_norm.bias.requires_grad = True |
|
self.fc = nn.Linear(768, 1) |
|
|
|
def forward(self, samples, att_masks): |
|
|
|
embeddings = self.bert(samples, attention_mask=att_masks) |
|
model_out = self.fc(embeddings[0][:, 0, :]) |
|
|
|
return embeddings, model_out |
|
|
|
bert_model = BertReviews(bert_basic_model) |
|
bert_model.load_state_dict(torch.load('bert_weights.pt', map_location=torch.device('cpu'))) |
|
bert_model.to('cpu').eval() |
|
|
|
model_lstm = LSTMClassifier(embedding_dim=64, hidden_size=64, seq_len = 150, dropout=0.5, num_layers=4) |
|
model_lstm.load_state_dict(torch.load('lstm_model_weights.pt', map_location=torch.device('cpu'))) |
|
model_lstm.to('cpu').eval() |
|
|
|
|
|
def predict_sentence_classical(text: str): |
|
start_time = time.time() |
|
text = classical_pipeline(text) |
|
res = logreg.predict(text)[0] |
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
return res, execution_time |
|
|
|
|
|
def predict_sentence_lstm(text: str): |
|
start_time = time.time() |
|
text = preprocess_single_string(text, 150, vocab_to_int) |
|
res = int(torch.sigmoid(model_lstm(text.unsqueeze(0))).cpu().detach().numpy().round()) |
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
return res, execution_time |
|
|
|
def predict_sentence_bert(text: str): |
|
start_time = time.time() |
|
text = bert_tokenizer.encode(text, add_special_tokens=True, truncation=True, max_length=200) |
|
text = np.array([text + [0]*(200-len(text))]) |
|
attention_mask = torch.Tensor(np.where(text != 0, 1, 0)).to(torch.int64) |
|
text = torch.Tensor(text).to(torch.int64) |
|
|
|
|
|
|
|
res = int(torch.sigmoid(bert_model(text, attention_mask)[1]).cpu().detach().numpy().round()) |
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
return res, execution_time |
|
|
|
reses = {0: 'negative', 1: 'positive'} |
|
|
|
def process_text(input_text): |
|
res_classical, time_classical = predict_sentence_classical(input_text) |
|
res_lstm, time_lstm = predict_sentence_lstm(input_text) |
|
res_bert, time_bert = predict_sentence_bert(input_text) |
|
st.write('Results:') |
|
st.write(f'Logistic regression: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.') |
|
st.write(f'LSTM: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.') |
|
st.write(f'Upgraded Bert: {reses[res_bert]}, execution time: {time_bert:.2f} seconds.') |
|
|
|
st.title('Film reviews classifier') |
|
st.write('Write a film review in a box below, and the application, powered by three NLP models (logistic regression, LSTM and upgraded Bert), will tell if it is a positive or a negative review.') |
|
|
|
user_input = st.text_area("Enter your text:") |
|
if st.button("Send a review for processing"): |
|
if user_input: |
|
processed_text = process_text(user_input) |
|
else: |
|
st.warning("Please enter some text before processing.") |
|
|