import requests import httpx import torch import re from bs4 import BeautifulSoup import numpy as np from transformers import AutoTokenizer, AutoModelForSequenceClassification import asyncio from evaluate import load from datetime import date import nltk from transformers import GPT2LMHeadModel, GPT2TokenizerFast import plotly.graph_objects as go import torch.nn.functional as F import nltk from unidecode import unidecode import time from scipy.special import softmax import yaml import os from utils import * with open("config.yaml", "r") as file: params = yaml.safe_load(file) nltk.download("punkt") nltk.download("stopwords") device = "cuda" if torch.cuda.is_available() else "cpu" text_bc_model_path = params["TEXT_BC_MODEL_PATH"] text_mc_model_path = params["TEXT_MC_MODEL_PATH"] text_quillbot_model_path = params["TEXT_QUILLBOT_MODEL_PATH"] text_1on1_models = params["TEXT_1ON1_MODEL"] quillbot_labels = params["QUILLBOT_LABELS"] mc_label_map = params["MC_OUTPUT_LABELS"] text_1on1_label_map = params["1ON1_OUTPUT_LABELS"] mc_token_size = int(params["MC_TOKEN_SIZE"]) bc_token_size = int(params["BC_TOKEN_SIZE"]) text_bc_tokenizer = AutoTokenizer.from_pretrained(text_bc_model_path) text_bc_model = AutoModelForSequenceClassification.from_pretrained( text_bc_model_path ).to(device) text_mc_tokenizer = AutoTokenizer.from_pretrained(text_mc_model_path) text_mc_model = AutoModelForSequenceClassification.from_pretrained( text_mc_model_path ).to(device) quillbot_tokenizer = AutoTokenizer.from_pretrained(text_quillbot_model_path) quillbot_model = AutoModelForSequenceClassification.from_pretrained( text_quillbot_model_path ).to(device) tokenizers_1on1 = {} models_1on1 = {} for model_name, model in zip(mc_label_map, text_1on1_models): tokenizers_1on1[model_name] = AutoTokenizer.from_pretrained(model) models_1on1[model_name] = ( AutoModelForSequenceClassification.from_pretrained(model).to(device) ) def split_text_allow_complete_sentences_nltk( text, max_length=256, tolerance=30, min_last_segment_length=100, type_det="bc", ): sentences = nltk.sent_tokenize(text) segments = [] current_segment = [] current_length = 0 if type_det == "bc": tokenizer = text_bc_tokenizer max_length = bc_token_size elif type_det == "mc": tokenizer = text_mc_tokenizer max_length = mc_token_size for sentence in sentences: tokens = tokenizer.tokenize(sentence) sentence_length = len(tokens) if current_length + sentence_length <= max_length + tolerance - 2: current_segment.append(sentence) current_length += sentence_length else: if current_segment: encoded_segment = tokenizer.encode( " ".join(current_segment), add_special_tokens=True, max_length=max_length + tolerance, truncation=True, ) segments.append((current_segment, len(encoded_segment))) current_segment = [sentence] current_length = sentence_length if current_segment: encoded_segment = tokenizer.encode( " ".join(current_segment), add_special_tokens=True, max_length=max_length + tolerance, truncation=True, ) segments.append((current_segment, len(encoded_segment))) final_segments = [] for i, (seg, length) in enumerate(segments): if i == len(segments) - 1: if length < min_last_segment_length and len(final_segments) > 0: prev_seg, prev_length = final_segments[-1] combined_encoded = tokenizer.encode( " ".join(prev_seg + seg), add_special_tokens=True, max_length=max_length + tolerance, truncation=True, ) if len(combined_encoded) <= max_length + tolerance: final_segments[-1] = (prev_seg + seg, len(combined_encoded)) else: final_segments.append((seg, length)) else: final_segments.append((seg, length)) else: final_segments.append((seg, length)) decoded_segments = [] encoded_segments = [] for seg, _ in final_segments: encoded_segment = tokenizer.encode( " ".join(seg), add_special_tokens=True, max_length=max_length + tolerance, truncation=True, ) decoded_segment = tokenizer.decode(encoded_segment) decoded_segments.append(decoded_segment) return decoded_segments def predict_quillbot(text): with torch.no_grad(): quillbot_model.eval() tokenized_text = quillbot_tokenizer( text, padding="max_length", truncation=True, max_length=256, return_tensors="pt", ).to(device) output = quillbot_model(**tokenized_text) output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] q_score = { "Humanized": output_norm[1].item(), "Original": output_norm[0].item(), } return q_score def predict_proba_quillbot(text): with torch.no_grad(): tokenized_text = quillbot_tokenizer(text, return_tensors="pt", padding=True).to(device) outputs = quillbot_model(**tokenized_text) tensor_logits = outputs[0] probas = F.softmax(tensor_logits).detach().cpu().numpy() return probas def predict_bc(model, tokenizer, text): with torch.no_grad(): model.eval() tokens = text_bc_tokenizer( text, padding="max_length", truncation=True, max_length=bc_token_size, return_tensors="pt", ).to(device) output = model(**tokens) output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] return output_norm def predict_mc(model, tokenizer, text): with torch.no_grad(): model.eval() tokens = text_mc_tokenizer( text, padding="max_length", truncation=True, return_tensors="pt", max_length=mc_token_size, ).to(device) output = model(**tokens) output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] return output_norm def predict_mc_scores(input): bc_scores = [] mc_scores = [] samples_len_bc = len( split_text_allow_complete_sentences_nltk(input, type_det="bc") ) segments_bc = split_text_allow_complete_sentences_nltk(input, type_det="bc") for i in range(samples_len_bc): cleaned_text_bc = remove_special_characters(segments_bc[i]) bc_score = predict_bc(text_bc_model, text_bc_tokenizer, cleaned_text_bc) bc_scores.append(bc_score) bc_scores_array = np.array(bc_scores) average_bc_scores = np.mean(bc_scores_array, axis=0) bc_score_list = average_bc_scores.tolist() bc_score = {"AI": bc_score_list[1], "HUMAN": bc_score_list[0]} segments_mc = split_text_allow_complete_sentences_nltk(input, type_det="mc") samples_len_mc = len( split_text_allow_complete_sentences_nltk(input, type_det="mc") ) for i in range(samples_len_mc): cleaned_text_mc = remove_special_characters(segments_mc[i]) mc_score = predict_mc(text_mc_model, text_mc_tokenizer, cleaned_text_mc) mc_scores.append(mc_score) mc_scores_array = np.array(mc_scores) average_mc_scores = np.mean(mc_scores_array, axis=0) mc_score_list = average_mc_scores.tolist() mc_score = {} for score, label in zip(mc_score_list, mc_label_map): mc_score[label.upper()] = score sum_prob = 1 - bc_score["HUMAN"] for key, value in mc_score.items(): mc_score[key] = value * sum_prob if sum_prob < 0.01: mc_score = {} return mc_score def predict_bc_scores(input): bc_scores = [] samples_len_bc = len( split_text_allow_complete_sentences_nltk(input, type_det="bc") ) segments_bc = split_text_allow_complete_sentences_nltk(input, type_det="bc") for i in range(samples_len_bc): cleaned_text_bc = remove_special_characters(segments_bc[i]) bc_score = predict_bc(text_bc_model, text_bc_tokenizer, cleaned_text_bc) bc_scores.append(bc_score) bc_scores_array = np.array(bc_scores) average_bc_scores = np.mean(bc_scores_array, axis=0) bc_score_list = average_bc_scores.tolist() bc_score = {"AI": bc_score_list[1], "HUMAN": bc_score_list[0]} return bc_score def predict_1on1(model, tokenizer, text): with torch.no_grad(): model.eval() tokens = tokenizer( text, padding="max_length", truncation=True, return_tensors="pt", max_length=mc_token_size, ).to(device) output = model(**tokens) output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] return output_norm def predict_1on1_combined(input): predictions = [] for i, model in enumerate(text_1on1_models): predictions.append( predict_1on1(models_1on1[model], tokenizers_1on1[model], input)[1] ) return predictions def predict_1on1_single(input, model): predictions = predict_1on1( models_1on1[model], tokenizers_1on1[model], input )[1] return predictions def predict_1on1_scores(input, models): if len(models) == 0: return {} print(f"Models to Test: {models}") # BC SCORE bc_scores = [] samples_len_bc = len( split_text_allow_complete_sentences_nltk(input, type_det="bc") ) segments_bc = split_text_allow_complete_sentences_nltk(input, type_det="bc") for i in range(samples_len_bc): cleaned_text_bc = remove_special_characters(segments_bc[i]) bc_score = predict_bc(text_bc_model, text_bc_tokenizer, cleaned_text_bc) bc_scores.append(bc_score) bc_scores_array = np.array(bc_scores) average_bc_scores = np.mean(bc_scores_array, axis=0) bc_score_list = average_bc_scores.tolist() bc_score = {"AI": bc_score_list[1], "HUMAN": bc_score_list[0]} # MC SCORE if len(models) > 1: print("Starting MC") mc_scores = [] segments_mc = split_text_allow_complete_sentences_nltk( input, type_det="mc" ) samples_len_mc = len( split_text_allow_complete_sentences_nltk(input, type_det="mc") ) for i in range(samples_len_mc): cleaned_text_mc = remove_special_characters(segments_mc[i]) mc_score = predict_mc( text_mc_model, text_mc_tokenizer, cleaned_text_mc ) mc_scores.append(mc_score) mc_scores_array = np.array(mc_scores) average_mc_scores = np.mean(mc_scores_array, axis=0) mc_score_list = average_mc_scores.tolist() mc_score = {} for score, label in zip(mc_score_list, mc_label_map): mc_score[label.upper()] = score mc_score = { key: mc_score[key.upper()] for key in models if key.upper() in mc_score } total = sum(mc_score.values()) # Normalize each value by dividing it by the total mc_score = {key: value / total for key, value in mc_score.items()} sum_prob = 1 - bc_score["HUMAN"] for key, value in mc_score.items(): mc_score[key] = value * sum_prob if sum_prob < 0.01: mc_score = {} elif len(models) == 1: print("Starting 1on1") mc_scores = [] segments_mc = split_text_allow_complete_sentences_nltk( input, type_det="mc" ) samples_len_mc = len( split_text_allow_complete_sentences_nltk(input, type_det="mc") ) for i in range(samples_len_mc): cleaned_text_mc = remove_special_characters(segments_mc[i]) mc_score = predict_1on1_single(cleaned_text_mc, models[0]) mc_scores.append(mc_score) mc_scores_array = np.array(mc_scores) average_mc_scores = np.mean(mc_scores_array, axis=0) print(average_mc_scores) mc_score_list = average_mc_scores.tolist() mc_score = {} mc_score[models[0].upper()] = mc_score_list mc_score["OTHER"] = 1 - mc_score_list sum_prob = 1 - bc_score["HUMAN"] for key, value in mc_score.items(): mc_score[key] = value * sum_prob if sum_prob < 0.01: mc_score = {} return mc_score