Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
"""text-paraphraser.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1pFGR4uvXMMWVJFQeFmn--arumSxqa5Yy | |
""" | |
from transformers import AutoTokenizer | |
from transformers import AutoModelForSeq2SeqLM | |
import plotly.graph_objs as go | |
import textwrap | |
from transformers import pipeline | |
import re | |
import time | |
import requests | |
from PIL import Image | |
import itertools | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import matplotlib | |
from matplotlib.colors import ListedColormap, rgb2hex | |
import ipywidgets as widgets | |
from IPython.display import display, HTML | |
import pandas as pd | |
from pprint import pprint | |
from tenacity import retry | |
from tqdm import tqdm | |
# import tiktoken | |
import scipy.stats | |
import torch | |
from transformers import GPT2LMHeadModel | |
import seaborn as sns | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForMaskedLM | |
# from colorama import Fore, Style | |
# import openai | |
import random | |
from nltk.corpus import stopwords | |
from termcolor import colored | |
import nltk | |
from nltk.translate.bleu_score import sentence_bleu | |
from transformers import BertTokenizer, BertModel | |
import graphviz | |
import gradio as gr | |
nltk.download('stopwords') | |
# Function to Initialize the Model | |
def init_model(): | |
para_tokenizer = AutoTokenizer.from_pretrained("humarin/chatgpt_paraphraser_on_T5_base") | |
para_model = AutoModelForSeq2SeqLM.from_pretrained("humarin/chatgpt_paraphraser_on_T5_base") | |
return para_tokenizer, para_model | |
# Function to Paraphrase the Text | |
def paraphrase(question, para_tokenizer, para_model, num_beams=5, num_beam_groups=5, num_return_sequences=5, repetition_penalty=10.0, diversity_penalty=3.0, no_repeat_ngram_size=2, temperature=0.7, max_length=64): | |
input_ids = para_tokenizer( | |
f'paraphrase: {question}', | |
return_tensors="pt", padding="longest", | |
max_length=max_length, | |
truncation=True, | |
).input_ids | |
outputs = para_model.generate( | |
input_ids, temperature=temperature, repetition_penalty=repetition_penalty, | |
num_return_sequences=num_return_sequences, no_repeat_ngram_size=no_repeat_ngram_size, | |
num_beams=num_beams, num_beam_groups=num_beam_groups, | |
max_length=max_length, diversity_penalty=diversity_penalty | |
) | |
res = para_tokenizer.batch_decode(outputs, skip_special_tokens=True) | |
return res | |
# Function to Find the Longest Common Substring Words Subsequence | |
def longest_common_subss(original_sentence, paraphrased_sentences): | |
stop_words = set(stopwords.words('english')) | |
original_sentence_lower = original_sentence.lower() | |
paraphrased_sentences_lower = [s.lower() for s in paraphrased_sentences] | |
paraphrased_sentences_no_stopwords = [] | |
for sentence in paraphrased_sentences_lower: | |
words = re.findall(r'\b\w+\b', sentence) | |
filtered_sentence = ' '.join([word for word in words if word not in stop_words]) | |
paraphrased_sentences_no_stopwords.append(filtered_sentence) | |
results = [] | |
for sentence in paraphrased_sentences_no_stopwords: | |
common_words = set(original_sentence_lower.split()) & set(sentence.split()) | |
for word in common_words: | |
sentence = sentence.replace(word, colored(word, 'green')) | |
results.append({ | |
"Original Sentence": original_sentence_lower, | |
"Paraphrased Sentence": sentence, | |
"Substrings Word Pair": common_words | |
}) | |
return results | |
# Function to Find Common Substring Word between each paraphrase sentences | |
def common_substring_word(original_sentence, paraphrased_sentences): | |
stop_words = set(stopwords.words('english')) | |
original_sentence_lower = original_sentence.lower() | |
paraphrased_sentences_lower = [s.lower() for s in paraphrased_sentences] | |
paraphrased_sentences_no_stopwords = [] | |
for sentence in paraphrased_sentences_lower: | |
words = re.findall(r'\b\w+\b', sentence) | |
filtered_sentence = ' '.join([word for word in words if word not in stop_words]) | |
paraphrased_sentences_no_stopwords.append(filtered_sentence) | |
results = [] | |
for idx, sentence in enumerate(paraphrased_sentences_no_stopwords): | |
common_words = set(original_sentence_lower.split()) & set(sentence.split()) | |
common_substrings = ', '.join(sorted(common_words)) | |
for word in common_words: | |
sentence = sentence.replace(word, colored(word, 'green')) | |
results.append({ | |
f"Paraphrased Sentence {idx+1}": sentence, | |
"Common Substrings": common_substrings | |
}) | |
return results | |
# Function to Watermark a Word Take Randomly Between Each lcs Point (Random Sampling) | |
def random_sampling(original_sentence, paraphrased_sentences): | |
stop_words = set(stopwords.words('english')) | |
original_sentence_lower = original_sentence.lower() | |
paraphrased_sentences_lower = [s.lower() for s in paraphrased_sentences] | |
paraphrased_sentences_no_stopwords = [] | |
for sentence in paraphrased_sentences_lower: | |
words = re.findall(r'\b\w+\b', sentence) | |
filtered_sentence = ' '.join([word for word in words if word not in stop_words]) | |
paraphrased_sentences_no_stopwords.append(filtered_sentence) | |
results = [] | |
for idx, sentence in enumerate(paraphrased_sentences_no_stopwords): | |
common_words = set(original_sentence_lower.split()) & set(sentence.split()) | |
common_substrings = ', '.join(sorted(common_words)) | |
words_to_replace = [word for word in sentence.split() if word not in common_words] | |
if words_to_replace: | |
word_to_mark = random.choice(words_to_replace) | |
sentence = sentence.replace(word_to_mark, colored(word_to_mark, 'red')) | |
for word in common_words: | |
sentence = sentence.replace(word, colored(word, 'green')) | |
results.append({ | |
f"Paraphrased Sentence {idx+1}": sentence, | |
"Common Substrings": common_substrings | |
}) | |
return results | |
# Function for Inverse Transform Sampling | |
def inverse_transform_sampling(original_sentence, paraphrased_sentences): | |
stop_words = set(stopwords.words('english')) | |
original_sentence_lower = original_sentence.lower() | |
paraphrased_sentences_lower = [s.lower() for s in paraphrased_sentences] | |
paraphrased_sentences_no_stopwords = [] | |
for sentence in paraphrased_sentences_lower: | |
words = re.findall(r'\b\w+\b', sentence) | |
filtered_sentence = ' '.join([word for word in words if word not in stop_words]) | |
paraphrased_sentences_no_stopwords.append(filtered_sentence) | |
results = [] | |
for idx, sentence in enumerate(paraphrased_sentences_no_stopwords): | |
common_words = set(original_sentence_lower.split()) & set(sentence.split()) | |
common_substrings = ', '.join(sorted(common_words)) | |
words_to_replace = [word for word in sentence.split() if word not in common_words] | |
if words_to_replace: | |
probabilities = [1 / len(words_to_replace)] * len(words_to_replace) | |
chosen_word = random.choices(words_to_replace, weights=probabilities)[0] | |
sentence = sentence.replace(chosen_word, colored(chosen_word, 'magenta')) | |
for word in common_words: | |
sentence = sentence.replace(word, colored(word, 'green')) | |
results.append({ | |
f"Paraphrased Sentence {idx+1}": sentence, | |
"Common Substrings": common_substrings | |
}) | |
return results | |
# Function for Contextual Sampling | |
def contextual_sampling(original_sentence, paraphrased_sentences): | |
stop_words = set(stopwords.words('english')) | |
original_sentence_lower = original_sentence.lower() | |
paraphrased_sentences_lower = [s.lower() for s in paraphrased_sentences] | |
paraphrased_sentences_no_stopwords = [] | |
for sentence in paraphrased_sentences_lower: | |
words = re.findall(r'\b\w+\b', sentence) | |
filtered_sentence = ' '.join([word for word in words if word not in stop_words]) | |
paraphrased_sentences_no_stopwords.append(filtered_sentence) | |
results = [] | |
for idx, sentence in enumerate(paraphrased_sentences_no_stopwords): | |
common_words = set(original_sentence_lower.split()) & set(sentence.split()) | |
common_substrings = ', '.join(sorted(common_words)) | |
words_to_replace = [word for word in sentence.split() if word not in common_words] | |
if words_to_replace: | |
context = " ".join([word for word in sentence.split() if word not in common_words]) | |
chosen_word = random.choice(words_to_replace) | |
sentence = sentence.replace(chosen_word, colored(chosen_word, 'red')) | |
for word in common_words: | |
sentence = sentence.replace(word, colored(word, 'green')) | |
results.append({ | |
f"Paraphrased Sentence {idx+1}": sentence, | |
"Common Substrings": common_substrings | |
}) | |
return results | |
# Function for Exponential Minimum Sampling | |
def exponential_minimum_sampling(original_sentence, paraphrased_sentences): | |
stop_words = set(stopwords.words('english')) | |
original_sentence_lower = original_sentence.lower() | |
paraphrased_sentences_lower = [s.lower() for s in paraphrased_sentences] | |
paraphrased_sentences_no_stopwords = [] | |
for sentence in paraphrased_sentences_lower: | |
words = re.findall(r'\b\w+\b', sentence) | |
filtered_sentence = ' '.join([word for word in words if word not in stop_words]) | |
paraphrased_sentences_no_stopwords.append(filtered_sentence) | |
results = [] | |
for idx, sentence in enumerate(paraphrased_sentences_no_stopwords): | |
common_words = set(original_sentence_lower.split()) & set(sentence.split()) | |
common_substrings = ', '.join(sorted(common_words)) | |
words_to_replace = [word for word in sentence.split() if word not in common_words] | |
if words_to_replace: | |
num_words = len(words_to_replace) | |
probabilities = [2 ** (-i) for i in range(num_words)] | |
chosen_word = random.choices(words_to_replace, weights=probabilities)[0] | |
sentence = sentence.replace(chosen_word, colored(chosen_word, 'red')) | |
for word in common_words: | |
sentence = sentence.replace(word, colored(word, 'green')) | |
results.append({ | |
f"Paraphrased Sentence {idx+1}": sentence, | |
"Common Substrings": common_substrings | |
}) | |
return results | |
# Function to Calculate the BLEU score | |
def calculate_bleu(reference, candidate): | |
return sentence_bleu([reference], candidate) | |
# Function to calculate BERT score | |
def calculate_bert(reference, candidate): | |
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') | |
model = BertModel.from_pretrained('bert-base-uncased') | |
reference_tokens = tokenizer.tokenize(reference) | |
candidate_tokens = tokenizer.tokenize(candidate) | |
reference_ids = tokenizer.encode(reference, add_special_tokens=True, max_length=512, truncation=True, return_tensors="pt") | |
candidate_ids = tokenizer.encode(candidate, add_special_tokens=True, max_length=512, truncation=True, return_tensors="pt") | |
with torch.no_grad(): | |
reference_outputs = model(reference_ids) | |
candidate_outputs = model(candidate_ids) | |
reference_embeddings = reference_outputs[0][:, 0, :].numpy() | |
candidate_embeddings = candidate_outputs[0][:, 0, :].numpy() | |
cosine_similarity = np.dot(reference_embeddings, candidate_embeddings.T) / (np.linalg.norm(reference_embeddings) * np.linalg.norm(candidate_embeddings)) | |
return np.mean(cosine_similarity) | |
# Function to calculate minimum edit distance | |
def min_edit_distance(reference, candidate): | |
m = len(reference) | |
n = len(candidate) | |
dp = [[0] * (n + 1) for _ in range(m + 1)] | |
for i in range(m + 1): | |
for j in range(n + 1): | |
if i == 0: | |
dp[i][j] = j | |
elif j == 0: | |
dp[i][j] = i | |
elif reference[i - 1] == candidate[j - 1]: | |
dp[i][j] = dp[i - 1][j - 1] | |
else: | |
dp[i][j] = 1 + min(dp[i][j - 1], # Insert | |
dp[i - 1][j], # Remove | |
dp[i - 1][j - 1]) # Replace | |
return dp[m][n] | |
def generate_paraphrase(question): | |
para_tokenizer, para_model = init_model() | |
res = paraphrase(question, para_tokenizer, para_model) | |
return res | |
question = "Following the declaration of the State of Israel in 1948, neighboring Arab states invaded. The war ended with Israel controlling a significant portion of the territory. Many Palestinians became refugees." | |
import re | |
from nltk.corpus import stopwords | |
def find_common_subsequences(sentence, str_list): | |
stop_words = set(stopwords.words('english')) | |
sentence = sentence.lower() | |
str_list = [s.lower() for s in str_list] | |
def is_present(lcs, str_list): | |
for string in str_list: | |
if lcs not in string: | |
return False | |
return True | |
def remove_stop_words_and_special_chars(sentence): | |
sentence = re.sub(r'[^\w\s]', '', sentence) | |
words = sentence.split() | |
filtered_words = [word for word in words if word.lower() not in stop_words] | |
return " ".join(filtered_words) | |
sentence = remove_stop_words_and_special_chars(sentence) | |
str_list = [remove_stop_words_and_special_chars(s) for s in str_list] | |
words = sentence.split(" ") | |
common_grams = [] | |
added_phrases = set() | |
def is_covered(subseq, added_phrases): | |
for phrase in added_phrases: | |
if subseq in phrase: | |
return True | |
return False | |
for i in range(len(words) - 4): | |
penta = " ".join(words[i:i+5]) | |
if is_present(penta, str_list): | |
common_grams.append(penta) | |
added_phrases.add(penta) | |
for i in range(len(words) - 3): | |
quad = " ".join(words[i:i+4]) | |
if is_present(quad, str_list) and not is_covered(quad, added_phrases): | |
common_grams.append(quad) | |
added_phrases.add(quad) | |
for i in range(len(words) - 2): | |
tri = " ".join(words[i:i+3]) | |
if is_present(tri, str_list) and not is_covered(tri, added_phrases): | |
common_grams.append(tri) | |
added_phrases.add(tri) | |
for i in range(len(words) - 1): | |
bi = " ".join(words[i:i+2]) | |
if is_present(bi, str_list) and not is_covered(bi, added_phrases): | |
common_grams.append(bi) | |
added_phrases.add(bi) | |
for i in range(len(words)): | |
uni = words[i] | |
if is_present(uni, str_list) and not is_covered(uni, added_phrases): | |
common_grams.append(uni) | |
added_phrases.add(uni) | |
return common_grams | |
def llm_output(prompt): | |
return prompt, prompt | |
def highlight_phrases_with_colors(sentences, phrases): | |
color_map = {} | |
color_index = 0 | |
highlighted_html = [] | |
idx = 1 | |
for sentence in sentences: | |
sentence_with_idx = f"{idx}. {sentence}" | |
idx += 1 | |
highlighted_sentence = sentence_with_idx | |
phrase_count = 0 | |
words = re.findall(r'\b\w+\b', sentence) | |
word_index = 1 | |
for phrase in phrases: | |
if phrase not in color_map: | |
color_map[phrase] = f'hsl({color_index * 60 % 360}, 70%, 80%)' | |
color_index += 1 | |
escaped_phrase = re.escape(phrase) | |
pattern = rf'\b{escaped_phrase}\b' | |
highlighted_sentence, num_replacements = re.subn( | |
pattern, | |
lambda m, count=phrase_count, color=color_map[phrase], index=word_index: ( | |
f'<span style="background-color: {color}; font-weight: bold;' | |
f' padding: 2px 4px; border-radius: 2px; position: relative;">' | |
f'<span style="background-color: black; color: white; border-radius: 50%;' | |
f' padding: 2px 5px; margin-right: 5px;">{index}</span>' | |
f'{m.group(0)}' | |
f'</span>' | |
), | |
highlighted_sentence, | |
flags=re.IGNORECASE | |
) | |
if num_replacements > 0: | |
phrase_count += 1 | |
word_index += 1 | |
highlighted_html.append(highlighted_sentence) | |
final_html = "<br><br>".join(highlighted_html) | |
return f''' | |
<div style="border: solid 1px #; padding: 16px; background-color: #FFFFFF; color: #374151; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); border-radius: 2px;"> | |
<h3 style="margin-top: 0; font-size: 1em; color: #111827;">Paraphrased And Highlighted Text</h3> | |
<div style="background-color: #F5F5F5; line-height: 1.6; padding: 15px; border-radius: 2px;">{final_html}</div> | |
</div> | |
''' | |
# Masking Model | |
def mask_non_stopword(sentence): | |
stop_words = set(stopwords.words('english')) | |
words = sentence.split() | |
non_stop_words = [word for word in words if word.lower() not in stop_words] | |
if not non_stop_words: | |
return sentence | |
word_to_mask = random.choice(non_stop_words) | |
masked_sentence = sentence.replace(word_to_mask, '[MASK]', 1) | |
return masked_sentence | |
# Load tokenizer and model for masked language model | |
tokenizer = AutoTokenizer.from_pretrained("bert-large-cased-whole-word-masking") | |
model = AutoModelForMaskedLM.from_pretrained("bert-large-cased-whole-word-masking") | |
fill_mask = pipeline("fill-mask", model=model, tokenizer=tokenizer) | |
def mask(sentence): | |
predictions = fill_mask(sentence) | |
masked_sentences = [predictions[i]['sequence'] for i in range(len(predictions))] | |
return masked_sentences | |
#plotly tree | |
import plotly.graph_objs as go | |
import textwrap | |
import re | |
from collections import defaultdict | |
def generate_plot(original_sentence): | |
paraphrased_sentences = generate_paraphrase(original_sentence) | |
first_paraphrased_sentence = paraphrased_sentences[0] | |
masked_sentence = mask_non_stopword(first_paraphrased_sentence) | |
masked_versions = mask(masked_sentence) | |
nodes = [] | |
nodes.append(original_sentence) | |
nodes.extend(paraphrased_sentences) | |
nodes.extend(masked_versions) | |
nodes[0] += ' L0' | |
para_len = len(paraphrased_sentences) | |
for i in range(1, para_len+1): | |
nodes[i] += ' L1' | |
for i in range(para_len+1, len(nodes)): | |
nodes[i] += ' L2' | |
cleaned_nodes = [re.sub(r'\sL[0-9]$', '', node) for node in nodes] | |
wrapped_nodes = ['<br>'.join(textwrap.wrap(node, width=30)) for node in cleaned_nodes] | |
def get_levels_and_edges(nodes): | |
levels = {} | |
edges = [] | |
for i, node in enumerate(nodes): | |
level = int(node.split()[-1][1]) | |
levels[i] = level | |
# Add edges from L0 to all L1 nodes | |
root_node = next(i for i, level in levels.items() if level == 0) | |
for i, level in levels.items(): | |
if level == 1: | |
edges.append((root_node, i)) | |
# Identify the first L1 node | |
first_l1_node = next(i for i, level in levels.items() if level == 1) | |
# Add edges from the first L1 node to all L2 nodes | |
for i, level in levels.items(): | |
if level == 2: | |
edges.append((first_l1_node, i)) | |
return levels, edges | |
# Get levels and dynamic edges | |
levels, edges = get_levels_and_edges(nodes) | |
max_level = max(levels.values()) | |
# Calculate positions | |
positions = {} | |
level_widths = defaultdict(int) | |
for node, level in levels.items(): | |
level_widths[level] += 1 | |
x_offsets = {level: - (width - 1) / 2 for level, width in level_widths.items()} | |
y_gap = 4 | |
for node, level in levels.items(): | |
positions[node] = (x_offsets[level], -level * y_gap) | |
x_offsets[level] += 1 | |
# Create figure | |
fig = go.Figure() | |
# Add nodes to the figure | |
for i, node in enumerate(wrapped_nodes): | |
x, y = positions[i] | |
fig.add_trace(go.Scatter( | |
x=[x], | |
y=[y], | |
mode='markers', | |
marker=dict(size=10, color='blue'), | |
hoverinfo='none' | |
)) | |
fig.add_annotation( | |
x=x, | |
y=y, | |
text=node, | |
showarrow=False, | |
yshift=20, # Adjust the y-shift value to avoid overlap | |
align="center", | |
font=dict(size=10), | |
bordercolor='black', | |
borderwidth=1, | |
borderpad=4, | |
bgcolor='white', | |
width=200 | |
) | |
# Add edges to the figure | |
for edge in edges: | |
x0, y0 = positions[edge[0]] | |
x1, y1 = positions[edge[1]] | |
fig.add_trace(go.Scatter( | |
x=[x0, x1], | |
y=[y0, y1], | |
mode='lines', | |
line=dict(color='black', width=2) | |
)) | |
fig.update_layout( | |
showlegend=False, | |
margin=dict(t=50, b=50, l=50, r=50), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
width=1470, | |
height=800 # Increase height to provide more space | |
) | |
return masked_sentence, masked_versions, fig | |
# Function for the Gradio interface | |
def model(prompt): | |
generated, sentence = llm_output(prompt) | |
res = generate_paraphrase(sentence) | |
common_subs = longest_common_subss(sentence, res) | |
common_grams = find_common_subsequences(sentence, res) | |
for i in range(len(common_subs)): | |
common_subs[i]["Paraphrased Sentence"] = res[i] | |
result = highlight_phrases_with_colors(res, common_grams) | |
masked_sentence, masked_versions, tree = generate_plot(sentence) | |
return generated, generated, result, masked_sentence, masked_versions, tree | |
with gr.Blocks(theme = gr.themes.Monochrome()) as demo: | |
gr.Markdown("# Paraphrases the Text and Highlights the Non-melting Points") | |
with gr.Row(): | |
user_input = gr.Textbox(label="User Prompt") | |
with gr.Row(): | |
submit_button = gr.Button("Submit") | |
clear_button = gr.Button("Clear") | |
with gr.Row(): | |
ai_output = gr.Textbox(label="AI-generated Text (Llama3)") | |
with gr.Row(): | |
selected_sentence = gr.Textbox(label="Selected Sentence") | |
with gr.Row(): | |
html_output = gr.HTML() | |
with gr.Row(): | |
masked_sentence = gr.Textbox(label="Masked Sentence") | |
with gr.Row(): | |
masked_versions = gr.Textbox(label="Sentence Generated by Masking Model") | |
with gr.Row(): | |
tree = gr.Plot() | |
submit_button.click(model, inputs=user_input, outputs=[ai_output, selected_sentence, html_output, masked_sentence, masked_versions, tree]) | |
clear_button.click(lambda: "", inputs=None, outputs=user_input) | |
clear_button.click(lambda: "", inputs=None, outputs=[ai_output, selected_sentence, html_output, masked_sentence, masked_versions, tree]) | |
# Launch the demo | |
demo.launch(share=True) | |