PhoBERTa / app.py
minhdang14902's picture
Upload 3 files
d8314fb verified
raw
history blame
12 kB
# import streamlit as st
# import torch
# # Tiêu đề của ứng dụng
# st.title('Hiển thị hình ảnh từ URL')
# # URL hình ảnh mẫu
# image_url = "https://upload.wikimedia.org/wikipedia/commons/4/47/PNG_transparency_demonstration_1.png"
# # Hiển thị hình ảnh
# st.image(image_url, caption='Hình ảnh từ URL', use_column_width=True)
import streamlit as st
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
import spacy
import json
import pandas as pd
import re
from transformers.models.roberta.modeling_roberta import *
class MRCQuestionAnswering(RobertaPreTrainedModel):
config_class = RobertaConfig
def _reorder_cache(self, past, beam_idx):
pass
_keys_to_ignore_on_load_unexpected = [r"pooler"]
_keys_to_ignore_on_load_missing = [r"position_ids"]
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.roberta = RobertaModel(config, add_pooling_layer=False)
self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels)
self.init_weights()
def forward(
self,
input_ids=None,
words_lengths=None,
start_idx=None,
end_idx=None,
attention_mask=None,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
start_positions=None,
end_positions=None,
span_answer_ids=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=None,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
context_embedding = sequence_output
batch_size = input_ids.shape[0]
max_sub_word = input_ids.shape[1]
max_word = words_lengths.shape[1]
align_matrix = torch.zeros((batch_size, max_word, max_sub_word))
for i, sample_length in enumerate(words_lengths):
for j in range(len(sample_length)):
start_idx = torch.sum(sample_length[:j])
align_matrix[i][j][start_idx: start_idx + sample_length[j]] = 1 if sample_length[j] > 0 else 0
align_matrix = align_matrix.to(context_embedding.device)
context_embedding_align = torch.bmm(align_matrix, context_embedding)
logits = self.qa_outputs(context_embedding_align)
start_logits, end_logits = logits.split(1, dim=-1)
start_logits = start_logits.squeeze(-1).contiguous()
end_logits = end_logits.squeeze(-1).contiguous()
total_loss = None
if start_positions is not None and end_positions is not None:
if len(start_positions.size()) > 1:
start_positions = start_positions.squeeze(-1)
if len(end_positions.size()) > 1:
end_positions = end_positions.squeeze(-1)
ignored_index = start_logits.size(1)
start_positions = start_positions.clamp(0, ignored_index)
end_positions = end_positions.clamp(0, ignored_index)
loss_fct = CrossEntropyLoss(ignore_index=ignored_index)
start_loss = loss_fct(start_logits, start_positions)
end_loss = loss_fct(end_logits, end_positions)
total_loss = (start_loss + end_loss) / 2
if not return_dict:
output = (start_logits, end_logits) + outputs[2:]
return ((total_loss,) + output) if total_loss is not None else output
return QuestionAnsweringModelOutput(
loss=total_loss,
start_logits=start_logits,
end_logits=end_logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
from nltk import word_tokenize
from transformers.models.auto import MODEL_FOR_QUESTION_ANSWERING_MAPPING
def tokenize_function(example, tokenizer):
question_word = word_tokenize(example["question"])
context_word = word_tokenize(example["context"])
question_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in question_word]
context_sub_words_ids = [tokenizer.convert_tokens_to_ids(tokenizer.tokenize(w)) for w in context_word]
valid = True
if len([j for i in question_sub_words_ids + context_sub_words_ids for j in
i]) > tokenizer.model_max_length - 1:
valid = False
question_sub_words_ids = [[tokenizer.bos_token_id]] + question_sub_words_ids + [[tokenizer.eos_token_id]]
context_sub_words_ids = context_sub_words_ids + [[tokenizer.eos_token_id]]
input_ids = [j for i in question_sub_words_ids + context_sub_words_ids for j in i]
if len(input_ids) > tokenizer.model_max_length:
valid = False
words_lengths = [len(item) for item in question_sub_words_ids + context_sub_words_ids]
return {
"input_ids": input_ids,
"words_lengths": words_lengths,
"valid": valid
}
def data_collator(samples, tokenizer):
if len(samples) == 0:
return {}
def collate_tokens(values, pad_idx, eos_idx=None, left_pad=False, move_eos_to_beginning=False):
size = max(v.size(0) for v in values)
res = values[0].new(len(values), size).fill_(pad_idx)
def copy_tensor(src, dst):
assert dst.numel() == src.numel()
if move_eos_to_beginning:
assert src[-1] == eos_idx
dst[0] = eos_idx
dst[1:] = src[:-1]
else:
dst.copy_(src)
for i, v in enumerate(values):
copy_tensor(v, res[i][size - len(v):] if left_pad else res[i][:len(v)])
return res
input_ids = collate_tokens([torch.tensor(item['input_ids']) for item in samples], pad_idx=tokenizer.pad_token_id)
attention_mask = torch.zeros_like(input_ids)
for i in range(len(samples)):
attention_mask[i][:len(samples[i]['input_ids'])] = 1
words_lengths = collate_tokens([torch.tensor(item['words_lengths']) for item in samples], pad_idx=0)
batch_samples = {
'input_ids': input_ids,
'attention_mask': attention_mask,
'words_lengths': words_lengths,
}
return batch_samples
def extract_answer(inputs, outputs, tokenizer):
plain_result = []
for sample_input, start_logit, end_logit in zip(inputs, outputs.start_logits, outputs.end_logits):
sample_words_length = sample_input['words_lengths']
input_ids = sample_input['input_ids']
answer_start = sum(sample_words_length[:torch.argmax(start_logit)])
answer_end = sum(sample_words_length[:torch.argmax(end_logit) + 1])
if answer_start <= answer_end:
answer = tokenizer.convert_tokens_to_string(
tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))
if answer == tokenizer.bos_token:
answer = ''
else:
answer = ''
score_start = torch.max(torch.softmax(start_logit, dim=-1)).cpu().detach().numpy().tolist()
score_end = torch.max(torch.softmax(end_logit, dim=-1)).cpu().detach().numpy().tolist()
plain_result.append({
"answer": answer,
"score_start": score_start,
"score_end": score_end
})
return plain_result
# Load mô hình Phobert
model_checkpoint = "minhdang14902/Roberta_edu"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = MRCQuestionAnswering.from_pretrained(model_checkpoint)
# Load mô hình Roberta
from transformers import AutoModelForSequenceClassification
model_sentiment = AutoModelForSequenceClassification.from_pretrained('minhdang14902/PhoBert_Edu')
tokenizer_sentiment = AutoTokenizer.from_pretrained('minhdang14902/PhoBert_Edu')
chatbot_sentiment = pipeline("sentiment-analysis", model=model_sentiment, tokenizer=tokenizer_sentiment)
import spacy
import json
# Khởi tạo mô hình spaCy tiếng Việt
nlp = spacy.load('vi_core_news_lg')
import pandas as pd
def load_json_file(filename):
with open(filename) as f:
file = json.load(f)
return file
filename = './data/QA_Legal_converted_merged.json'
intents = load_json_file(filename)
def create_df():
df = pd.DataFrame({
'Pattern' : [],
'Tag' : []
})
return df
df = create_df()
def extract_json_info(json_file, df):
for intent in json_file['intents']:
for pattern in intent['patterns']:
sentence_tag = [pattern, intent['tag']]
df.loc[len(df.index)] = sentence_tag
return df
df = extract_json_info(intents, df)
df2 = df.copy()
labels = df2['Tag'].unique().tolist()
labels = [s.strip() for s in labels]
num_labels = len(labels)
id2label = {i: label for i, label in enumerate(labels)}
label2id = {v: k for k, v in id2label.items()}
def preprocess(text, df):
def remove_numbers_and_special_chars(text):
text = re.sub(r'\d+', '', text)
text = re.sub(r'[^\w\s]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
text = text.lower()
text = remove_numbers_and_special_chars(text)
text_nlp = nlp(text)
filtered_sentence = [token.text for token in text_nlp if not token.is_stop]
text = ' '.join(filtered_sentence)
return text
def predict(text):
new_text = preprocess(text, df2)
probs = chatbot_sentiment(new_text)
predicted_label = max(probs, key=lambda x: x['score'])['label']
return predicted_label
# Thiết lập giao diện người dùng bằng Streamlit
st.title("Vietnamese Legal Q&A Chatbot")
st.write("Nhập câu hỏi của bạn về các vấn đề pháp lý:")
user_question = st.text_input("Câu hỏi:")
if st.button("Gửi câu hỏi"):
if user_question:
st.write("Câu hỏi của bạn:", user_question)
# Tìm câu trả lời từ tập dữ liệu intents
found_intent = None
for intent in intents['intents']:
if user_question.lower() in [pattern.lower() for pattern in intent['patterns']]:
found_intent = intent
break
if found_intent:
answer = found_intent['responses'][0]
st.write("Câu trả lời:", answer)
else:
result = predict(user_question)
if result:
st.write("Thẻ dự đoán:", result)
# Tạo đầu vào cho mô hình QA
qa_inputs = [{
'context': found_intent['responses'][0] if found_intent else 'Tôi không có thông tin phù hợp.',
'question': user_question
}]
qa_features = []
for qa_input in qa_inputs:
feature = tokenize_function(qa_input, tokenizer)
if feature["valid"]:
qa_features.append(feature)
qa_batch = data_collator(qa_features, tokenizer)
with torch.no_grad():
outputs = model(**qa_batch)
answers = extract_answer(qa_features, outputs, tokenizer)
best_answer = max(answers, key=lambda x: (x['score_start'] + x['score_end']) / 2)
st.write("Câu trả lời:", best_answer['answer'])