Spaces:
Running
Running
import time | |
from nltk.tokenize import sent_tokenize | |
from googleapiclient.discovery import build | |
from collections import Counter | |
import re, math | |
from sentence_transformers import SentenceTransformer, util | |
import asyncio | |
import httpx | |
from bs4 import BeautifulSoup | |
import numpy as np | |
import concurrent | |
from multiprocessing import Pool | |
from const import url_types | |
from collections import defaultdict | |
WORD = re.compile(r"\w+") | |
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
months = { | |
"January": "01", | |
"February": "02", | |
"March": "03", | |
"April": "04", | |
"May": "05", | |
"June": "06", | |
"July": "07", | |
"August": "08", | |
"September": "09", | |
"October": "10", | |
"November": "11", | |
"December": "12", | |
} | |
color_map = [ | |
"#cf2323", | |
"#d65129", | |
"#d66329", | |
"#d67129", | |
"#eb9d59", | |
"#c2ad36", | |
"#d6ae29", | |
"#d6b929", | |
"#e1ed72", | |
"#c2db76", | |
"#a2db76", | |
] | |
def text_to_vector(text): | |
words = WORD.findall(text) | |
return Counter(words) | |
def cosineSim(text1, text2): | |
vector1 = text_to_vector(text1) | |
vector2 = text_to_vector(text2) | |
# print vector1,vector2 | |
cosine = get_cosine(vector1, vector2) | |
return cosine | |
def get_cosine(vec1, vec2): | |
intersection = set(vec1.keys()) & set(vec2.keys()) | |
numerator = sum([vec1[x] * vec2[x] for x in intersection]) | |
sum1 = sum([vec1[x] ** 2 for x in vec1.keys()]) | |
sum2 = sum([vec2[x] ** 2 for x in vec2.keys()]) | |
denominator = math.sqrt(sum1) * math.sqrt(sum2) | |
if denominator == 0: | |
return 0.0 | |
else: | |
return float(numerator) / denominator | |
def split_sentence_blocks(text, size): | |
if size == "Paragraph": | |
blocks = text.strip().split("\n") | |
return blocks | |
else: | |
sents = sent_tokenize(text.strip()) | |
return sents | |
def build_date(year=2024, month="March", day=1): | |
return f"{year}{months[month]}{day}" | |
def split_ngrams(text, n): | |
words = text.split() | |
return [words[i : i + n] for i in range(len(words) - n + 1)] | |
def sentence_similarity(text1, text2): | |
embedding_1 = model.encode(text1, convert_to_tensor=True) | |
embedding_2 = model.encode(text2, convert_to_tensor=True) | |
o = util.pytorch_cos_sim(embedding_1, embedding_2) | |
return o.item() | |
async def get_url_data(url, client): | |
try: | |
r = await client.get(url) | |
if r.status_code == 200: | |
soup = BeautifulSoup(r.content, "html.parser") | |
return soup | |
except Exception: | |
return None | |
async def parallel_scrap(urls): | |
async with httpx.AsyncClient(timeout=30) as client: | |
tasks = [] | |
for url in urls: | |
tasks.append(get_url_data(url=url, client=client)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
return results | |
def merge_ngrams_into_sentence(ngrams): | |
if ngrams == None: | |
return "" | |
if len(ngrams) > 20: | |
ngrams = ngrams[:20] | |
merged_sentence = [] | |
i = 0 | |
for ngram in ngrams: | |
overlap = len(set(ngram) & set(merged_sentence[-len(ngram) :])) | |
if overlap == 0: | |
merged_sentence.extend(ngram) | |
elif overlap < len(ngram): | |
merged_sentence.extend(ngram[overlap:]) | |
return " ".join(merged_sentence) | |
def remove_ngrams_after(ngrams, target_ngram): | |
try: | |
index = ngrams.index(target_ngram) | |
return ngrams[: index + 1] | |
except ValueError: | |
return None | |
def matching_score(sentence_content_tuple): | |
sentence, content, score = sentence_content_tuple | |
if sentence in content: | |
return 1, sentence | |
# if score > 0.9: | |
# return score | |
else: | |
n = 5 | |
# ngrams = split_ngrams(sentence, n) | |
# if len(ngrams) == 0: | |
# return 0 | |
# matched = [x for x in ngrams if " ".join(x) in content] | |
# return len(matched) / len(ngrams) | |
ngrams_sentence = split_ngrams(sentence, n) | |
if len(ngrams_sentence) == 0: | |
return 0, "" | |
ngrams_content = [tuple(ngram) for ngram in split_ngrams(content, n)] | |
matched_content_ngrams = [] | |
found = False | |
last_found = None | |
for ngram in ngrams_sentence: | |
for ngram_content in ngrams_content: | |
if tuple(ngram) == ngram_content: | |
found = True | |
last_found = ngram_content | |
if found: | |
matched_content_ngrams.append(ngram_content) | |
matched_content_ngrams = remove_ngrams_after( | |
matched_content_ngrams, last_found | |
) | |
matched_content = merge_ngrams_into_sentence(matched_content_ngrams) | |
matched_ngrams = [ | |
1 for ngram in ngrams_sentence if tuple(ngram) in ngrams_content | |
] | |
matched_count = sum(matched_ngrams) | |
return matched_count / len(ngrams_sentence), matched_content | |
def process_with_multiprocessing(input_data): | |
with Pool(processes=8) as pool: | |
scores = pool.map(matching_score, input_data) | |
return scores | |
def map_sentence_url(sentences, score_array): | |
sentenceToMaxURL = [-1] * len(sentences) | |
for j in range(len(sentences)): | |
if j > 0: | |
maxScore = score_array[sentenceToMaxURL[j - 1]][j] | |
sentenceToMaxURL[j] = sentenceToMaxURL[j - 1] | |
else: | |
maxScore = -1 | |
for i in range(len(score_array)): | |
margin = ( | |
0.05 | |
if (j > 0 and sentenceToMaxURL[j] == sentenceToMaxURL[j - 1]) | |
else 0 | |
) | |
if score_array[i][j] - maxScore > margin: | |
maxScore = score_array[i][j] | |
sentenceToMaxURL[j] = i | |
return sentenceToMaxURL | |
def check_url_category(url): | |
for category, urls in url_types.items(): | |
for u in urls: | |
if u in url: | |
return category | |
return "Internet Source" | |
def google_search( | |
plag_option, | |
sentences, | |
url_count, | |
score_array, | |
url_list, | |
snippets, | |
sorted_date, | |
domains_to_skip, | |
api_key, | |
cse_id, | |
**kwargs, | |
): | |
service = build("customsearch", "v1", developerKey=api_key) | |
num_pages = 3 | |
for i, sentence in enumerate(sentences): | |
results = ( | |
service.cse() | |
.list(q=sentence, cx=cse_id, sort=sorted_date, **kwargs) | |
.execute() | |
) | |
if "items" in results and len(results["items"]) > 0: | |
for count, link in enumerate(results["items"]): | |
if count >= num_pages: | |
break | |
# skip user selected domains | |
if (domains_to_skip is not None) and any( | |
("." + domain) in link["link"] for domain in domains_to_skip | |
): | |
continue | |
# clean up snippet of '...' | |
snippet = link["snippet"] | |
ind = snippet.find("...") | |
if ind < 20 and ind > 9: | |
snippet = snippet[ind + len("... ") :] | |
ind = snippet.find("...") | |
if ind > len(snippet) - 5: | |
snippet = snippet[:ind] | |
# update cosine similarity between snippet and given text | |
url = link["link"] | |
if url not in url_list: | |
url_list.append(url) | |
score_array.append([0] * len(sentences)) | |
snippets.append([""] * len(sentences)) | |
url_count[url] = url_count[url] + 1 if url in url_count else 1 | |
snippets[url_list.index(url)][i] = snippet | |
if plag_option == "Standard": | |
score_array[url_list.index(url)][i] = cosineSim( | |
sentence, snippet | |
) | |
else: | |
score_array[url_list.index(url)][i] = sentence_similarity( | |
sentence, snippet | |
) | |
return url_count, score_array | |
def plagiarism_check( | |
plag_option, | |
input, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
source_block_size, | |
): | |
# api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g" | |
# api_key = "AIzaSyA5VVwY1eEoIoflejObrxFDI0DJvtbmgW8" | |
# api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g" | |
# api_key = "AIzaSyCS1WQDMl1IMjaXtwSd_2rA195-Yc4psQE" | |
# api_key = "AIzaSyCB61O70B8AC3l5Kk3KMoLb6DN37B7nqIk" | |
api_key = "AIzaSyCg1IbevcTAXAPYeYreps6wYWDbU0Kz8tg" | |
# api_key = "AIzaSyA5VVwY1eEoIoflejObrxFDI0DJvtbmgW8" | |
cse_id = "851813e81162b4ed4" | |
url_scores = [] | |
sentence_scores = [] | |
sentences = split_sentence_blocks(input, source_block_size) | |
url_count = {} | |
score_array = [] | |
url_list = [] | |
snippets = [] | |
date_from = build_date(year_from, month_from, day_from) | |
date_to = build_date(year_to, month_to, day_to) | |
sort_date = f"date:r:{date_from}:{date_to}" | |
# get list of URLS to check | |
url_count, score_array = google_search( | |
plag_option, | |
sentences, | |
url_count, | |
score_array, | |
url_list, | |
snippets, | |
sort_date, | |
domains_to_skip, | |
api_key, | |
cse_id, | |
) | |
# Scrape URLs in list | |
soups = asyncio.run(parallel_scrap(url_list)) | |
input_data = [] | |
for i, soup in enumerate(soups): | |
if soup: | |
page_content = soup.text | |
for j, sent in enumerate(sentences): | |
input_data.append((sent, page_content, score_array[i][j])) | |
scores = process_with_multiprocessing(input_data) | |
matched_sentence_array = [ | |
["" for _ in range(len(score_array[0]))] | |
for _ in range(len(score_array)) | |
] | |
k = 0 | |
# Update score array for each (soup, sentence) | |
for i, soup in enumerate(soups): | |
if soup: | |
for j, _ in enumerate(sentences): | |
score_array[i][j] = scores[k][0] | |
matched_sentence_array[i][j] = scores[k][1] | |
k += 1 | |
sentenceToMaxURL = map_sentence_url(sentences, score_array) | |
index = np.unique(sentenceToMaxURL) | |
url_source = {} | |
for url in index: | |
s = [ | |
score_array[url][sen] | |
for sen in range(len(sentences)) | |
if sentenceToMaxURL[sen] == url | |
] | |
url_source[url] = sum(s) / len(s) | |
index_descending = sorted(url_source, key=url_source.get, reverse=True) | |
urlMap = {} | |
for count, i in enumerate(index_descending): | |
urlMap[i] = count + 1 | |
# build results | |
for i, sent in enumerate(sentences): | |
ind = sentenceToMaxURL[i] | |
if url_source[ind] > 0.1: | |
sentence_scores.append( | |
[ | |
sent, | |
round(url_source[ind] * 100, 2), | |
url_list[ind], | |
urlMap[ind], | |
] | |
) | |
else: | |
sentence_scores.append([sent, None, url_list[ind], -1]) | |
print("SNIPPETS: ", snippets) | |
snippets = [[item for item in sublist if item] for sublist in snippets] | |
for ind in index_descending: | |
if url_source[ind] > 0.1: | |
matched_sentence_array = [ | |
[item for item in sublist if item] | |
for sublist in matched_sentence_array | |
] | |
matched_sentence = "...".join( | |
[sent for sent in matched_sentence_array[ind]] | |
) | |
if matched_sentence == "": | |
matched_sentence = "...".join([sent for sent in snippets[ind]]) | |
url_scores.append( | |
[ | |
url_list[ind], | |
round(url_source[ind] * 100, 2), | |
urlMap[ind], | |
matched_sentence, | |
] | |
) | |
return sentence_scores, url_scores | |
def html_highlight( | |
plag_option, | |
input, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
source_block_size, | |
): | |
start_time = time.perf_counter() | |
sentence_scores, url_scores = plagiarism_check( | |
plag_option, | |
input, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
source_block_size, | |
) | |
html_content = """ | |
<link href='https://fonts.googleapis.com/css?family=Roboto' rel='stylesheet'> | |
<div style='font-family: {font}; border: 2px solid black; padding: 10px; color: #FFFFFF;'> | |
<html> | |
<head> | |
<title>Toggle Details</title> | |
<style> | |
.score-container { | |
display: flex; | |
justify-content: space-around; | |
align-items: left; | |
padding: 20px; | |
} | |
.score-item { | |
text-align: center; | |
padding: 10px; | |
background-color: #636362; | |
border-radius: 5px; | |
flex-grow: 1; | |
margin: 0 5px; | |
} | |
.details { | |
display: none; | |
padding: 10px; | |
} | |
.url-link { | |
font-size: 1.2em; | |
} | |
.url-link span { | |
margin-right: 10px; | |
} | |
.toggle-button { | |
color: #333; | |
border: none; | |
padding: 5px 10px; | |
text-align: center; | |
text-decoration: none; | |
display: inline-block; | |
cursor: pointer; | |
} | |
</style> | |
</head> | |
""" | |
prev_idx = None | |
combined_sentence = "" | |
total_score = 0 | |
total_count = 0 | |
category_scores = defaultdict(set) | |
for sentence, score, url, idx in sentence_scores: | |
category = check_url_category(url) | |
if score is None: | |
total_score += 0 | |
else: | |
total_score += score | |
category_scores[category].add(score) | |
total_count += 1 | |
if idx != prev_idx and prev_idx is not None: | |
color = color_map[prev_idx - 1] | |
index_part = f"<span>[{prev_idx}]</span>" | |
formatted_sentence = f'<p style="background-color: {color}; padding: 2px;">{combined_sentence} {index_part}</p>' | |
html_content += formatted_sentence | |
combined_sentence = "" | |
combined_sentence += " " + sentence | |
prev_idx = idx | |
print(category_scores) | |
total_average_score = round(total_score / total_count, 2) | |
category_averages = { | |
category: round((sum(scores) / len(scores)), 2) | |
for category, scores in category_scores.items() | |
} | |
if combined_sentence: | |
color = color_map[prev_idx - 1] | |
index_part = "" | |
if prev_idx != -1: | |
index_part = f"<span>[{prev_idx}]</span>" | |
formatted_sentence = f'<p style="background-color: {color}; padding: 2px;">{combined_sentence} {index_part}</p>' | |
html_content += formatted_sentence | |
html_content += "<hr>" | |
html_content += f""" | |
<div class="score-container"> | |
<div class="score-item"> | |
<h3>Overall Similarity</h3> | |
<p>{total_average_score}%</p> | |
</div> | |
""" | |
for category, score in category_averages.items(): | |
html_content += f""" | |
<div class="score-item"><h3>{category}</h3><p>{score}%</p></div> | |
""" | |
html_content += "</div>" | |
for url, score, idx, sentence in url_scores: | |
url_category = check_url_category(url) | |
color = color_map[idx - 1] | |
formatted_url = f""" | |
<p style="background-color: {color}; padding: 5px; font-size: 1.2em">[{idx}] <b>{url}</b></p><p><i>{url_category}</i></p> | |
<p> --- <b>Matching Score: </b>{score}%</p> | |
<p> --- <b>Original Source Content: </b>{sentence}</p> | |
""" | |
# formatted_url = f""" | |
# <div class="url-link"> | |
# <p style="background-color: {color}; padding: 5px; font-size: 1.2em">[{idx}] <b>{url}</b></p><p>{url_category}</p> | |
# <a href="#" onclick="toggleDetails(event)" class="toggle-button">></a> | |
# </div> | |
# <div id="detailsContainer" class="details"> | |
# <p> --- <b>Matching Score: </b>{score}%</p> | |
# <p> --- <b>Original Source Content: </b>{sentence}</p> | |
# </div> | |
# """ | |
html_content += formatted_url | |
html_content += "</html>" | |
print("PLAGIARISM PROCESSING TIME: ", time.perf_counter() - start_time) | |
return html_content |