Spaces:
Sleeping
Sleeping
File size: 7,407 Bytes
60cb352 66e9d7c 8467cc8 60cb352 66e9d7c 186a961 60cb352 66e9d7c 60cb352 186a961 60cb352 66e9d7c 60cb352 66e9d7c 60cb352 66e9d7c 60cb352 66e9d7c 60cb352 8467cc8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import streamlit as st
import time
import os
import logging
import torch
import json
import string
import re
import string
import nltk
import numpy as np
import torch.nn as nn
import transformers
import lightgbm as lgb
import pickle
nltk.download('wordnet')
nltk.download('stopwords')
from collections import Counter
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import RegexpTokenizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
stop_words = set(stopwords.words('english'))
with open('logreg.pkl', 'rb') as f:
logreg = pickle.load(f)
with open('tf.pkl', 'rb') as f:
tf = pickle.load(f)
def classical_pipeline(text):
text = text.lower()
text = re.sub(r'\d+', ' ', text)
text = text.translate(str.maketrans('', '', string.punctuation))
text = re.sub(r'\n', '', text)
wn_lemmatizer = WordNetLemmatizer()
text = ' '.join([wn_lemmatizer.lemmatize(word) for word in text.split()])
reg_tokenizer = RegexpTokenizer('\w+')
text = reg_tokenizer.tokenize_sents([text])
sw = stopwords.words('english')
text = ' '.join([word for word in text[0] if word not in sw])
text = tf.transform([text])
return text
def preprocess_single_string(input_string: str, seq_len: int, vocab_to_int: dict):
preprocessed_string = data_preprocessing(input_string)
result_list = []
for word in preprocessed_string.split():
try:
result_list.append(vocab_to_int[word])
except KeyError as e:
continue
result_padded = padding([result_list], seq_len)[0]
return torch.tensor(result_padded)
def padding(reviews_int: list, seq_len: int):
features = np.zeros((len(reviews_int), seq_len), dtype = int)
for i, review in enumerate(reviews_int):
if len(review) <= seq_len:
zeros = list(np.zeros(seq_len - len(review)))
new = zeros + review
else:
new = review[: seq_len]
features[i, :] = np.array(new)
return features
def data_preprocessing(text: str):
wn_lemmatizer = WordNetLemmatizer()
text = text.lower()
text = re.sub('<.*?>', '', text)
text = ''.join([c for c in text if c not in string.punctuation])
text = [wn_lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]
text = ' '.join(text)
return text
with open('lstm_vocab_to_int.json') as json_file:
vocab_to_int = json.load(json_file)
with open('lstm_embedding_matrix.npy', 'rb') as f:
embedding_matrix = np.load(f)
embedding_layer = torch.nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix))
class LSTMClassifier(nn.Module):
def __init__(self, embedding_dim: int, seq_len:int, hidden_size:int = 32, dropout:int = 0, num_layers:int = 1) -> None:
super().__init__()
self.embedding_dim = embedding_dim
self.hidden_size = hidden_size
self.embedding = embedding_layer
self.dropout = dropout
self.num_layers = num_layers
self.seq_len = seq_len
self.lstm = nn.LSTM(
input_size=self.embedding_dim,
hidden_size=self.hidden_size,
batch_first=True,
bidirectional=True,
dropout=self.dropout,
num_layers=self.num_layers
)
self.linear = nn.Sequential(
nn.Linear(self.hidden_size * self.seq_len * 2, 128),
nn.Linear(128, 1)
)
def forward(self, x):
embeddings = self.embedding(x)
output, _ = self.lstm(embeddings)
output = output.contiguous().view(output.size(0), -1)
out = self.linear(output.squeeze(0))
return out
bert_model_class = transformers.DistilBertModel
bert_tokenizer_class = transformers.DistilBertTokenizer
bert_pretrained_weights = torch.load('basic_bert_weights.pt', map_location=torch.device('cpu'))
bert_tokenizer = bert_tokenizer_class.from_pretrained('distilbert-base-uncased')
bert_basic_model = bert_model_class.from_pretrained('distilbert-base-uncased')
class BertReviews(nn.Module):
def __init__(self, model):
super(BertReviews, self).__init__()
self.bert = model
for param in self.bert.parameters():
param.requires_grad = False
for i in range(6):
self.bert.transformer.layer[i].output_layer_norm.weight.requires_grad = True
self.bert.transformer.layer[i].output_layer_norm.bias.requires_grad = True
self.fc = nn.Linear(768, 1)
def forward(self, samples, att_masks):
embeddings = self.bert(samples, attention_mask=att_masks)
model_out = self.fc(embeddings[0][:, 0, :])
return embeddings, model_out
bert_model = BertReviews(bert_basic_model)
bert_model.load_state_dict(torch.load('bert_weights.pt', map_location=torch.device('cpu')))
bert_model.to('cpu').eval()
model_lstm = LSTMClassifier(embedding_dim=64, hidden_size=64, seq_len = 150, dropout=0.5, num_layers=4)
model_lstm.load_state_dict(torch.load('lstm_model_weights.pt', map_location=torch.device('cpu')))
model_lstm.to('cpu').eval()
def predict_sentence_classical(text: str):
start_time = time.time()
text = classical_pipeline(text)
res = logreg.predict(text)[0]
end_time = time.time()
execution_time = end_time - start_time
return res, execution_time
def predict_sentence_lstm(text: str):
start_time = time.time()
text = preprocess_single_string(text, 150, vocab_to_int)
res = int(torch.sigmoid(model_lstm(text.unsqueeze(0))).cpu().detach().numpy().round())
end_time = time.time()
execution_time = end_time - start_time
return res, execution_time
def predict_sentence_bert(text: str):
start_time = time.time()
text = bert_tokenizer.encode(text, add_special_tokens=True, truncation=True, max_length=200)
text = np.array([text + [0]*(200-len(text))])
attention_mask = torch.Tensor(np.where(text != 0, 1, 0)).to(torch.int64)
text = torch.Tensor(text).to(torch.int64)
# output = bert_model(text, attention_mask)[1]
# res = output.squeeze().detach().numpy().round()
res = int(torch.sigmoid(bert_model(text, attention_mask)[1]).cpu().detach().numpy().round())
end_time = time.time()
execution_time = end_time - start_time
return res, execution_time
reses = {0: 'negative', 1: 'positive'}
def process_text(input_text):
res_classical, time_classical = predict_sentence_classical(input_text)
res_lstm, time_lstm = predict_sentence_lstm(input_text)
res_bert, time_bert = predict_sentence_bert(input_text)
st.write('Results:')
st.write(f'Logistic regression: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.')
st.write(f'LSTM: {reses[res_lstm]}, execution time: {time_lstm:.2f} seconds.')
st.write(f'Upgraded Bert: {reses[res_bert]}, execution time: {time_bert:.2f} seconds.')
st.title('Film reviews classifier')
st.write('Write a film review in a box below, and the application, powered by three NLP models (logistic regression, LSTM and upgraded Bert), will tell if it is a positive or a negative review.')
user_input = st.text_area("Enter your text:")
if st.button("Send a review for processing"):
if user_input:
processed_text = process_text(user_input)
else:
st.warning("Please enter some text before processing.")
|