copyright_checker / plagiarism.py
minko186's picture
increase workers in parallet
cf5bf4c
raw
history blame
11.3 kB
import time
from nltk.tokenize import sent_tokenize
from googleapiclient.discovery import build
from collections import Counter
import re, math
from sentence_transformers import SentenceTransformer, util
import asyncio
import httpx
from bs4 import BeautifulSoup
import numpy as np
import concurrent
from multiprocessing import Pool
WORD = re.compile(r"\w+")
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
# returns cosine similarity of two vectors
# input: two vectors
# output: integer between 0 and 1.
def get_cosine(vec1, vec2):
intersection = set(vec1.keys()) & set(vec2.keys())
# calculating numerator
numerator = sum([vec1[x] * vec2[x] for x in intersection])
# calculating denominator
sum1 = sum([vec1[x] ** 2 for x in vec1.keys()])
sum2 = sum([vec2[x] ** 2 for x in vec2.keys()])
denominator = math.sqrt(sum1) * math.sqrt(sum2)
# checking for divide by zero
if denominator == 0:
return 0.0
else:
return float(numerator) / denominator
# converts given text into a vector
def text_to_vector(text):
# uses the Regular expression above and gets all words
words = WORD.findall(text)
# returns a counter of all the words (count of number of occurences)
return Counter(words)
# returns cosine similarity of two words
# uses: text_to_vector(text) and get_cosine(v1,v2)
def cosineSim(text1, text2):
vector1 = text_to_vector(text1)
vector2 = text_to_vector(text2)
# print vector1,vector2
cosine = get_cosine(vector1, vector2)
return cosine
def cos_sim_torch(embedding_1, embedding_2):
return util.pytorch_cos_sim(embedding_1, embedding_2).item()
def embed_text(text):
return model.encode(text, convert_to_tensor=True)
def sentence_similarity(text1, text2):
embedding_1 = model.encode(text1, convert_to_tensor=True)
embedding_2 = model.encode(text2, convert_to_tensor=True)
o = util.pytorch_cos_sim(embedding_1, embedding_2)
return o.item()
def google_search(
plag_option,
sentences,
url_count,
score_array,
url_list,
sorted_date,
domains_to_skip,
api_key,
cse_id,
**kwargs,
):
service = build("customsearch", "v1", developerKey=api_key)
for i, sentence in enumerate(sentences):
results = (
service.cse()
.list(q=sentence, cx=cse_id, sort=sorted_date, **kwargs)
.execute()
)
if "items" in results and len(results["items"]) > 0:
for count, link in enumerate(results["items"]):
# stop after 3 pages
if count >= 3:
break
# skip user selected domains
if any(
("." + domain) in link["link"] for domain in domains_to_skip
):
continue
# clean up snippet of '...'
snippet = link["snippet"]
ind = snippet.find("...")
if ind < 20 and ind > 9:
snippet = snippet[ind + len("... ") :]
ind = snippet.find("...")
if ind > len(snippet) - 5:
snippet = snippet[:ind]
# update cosine similarity between snippet and given text
url = link["link"]
if url not in url_list:
url_list.append(url)
score_array.append([0] * len(sentences))
url_count[url] = url_count[url] + 1 if url in url_count else 1
if plag_option == "Standard":
score_array[url_list.index(url)][i] = cosineSim(
sentence, snippet
)
else:
score_array[url_list.index(url)][i] = sentence_similarity(
sentence, snippet
)
return url_count, score_array
def split_sentence_blocks(text):
two_sents = []
for para in text.split("\n\n"):
sents = sent_tokenize(para)
for i in range(len(sents)):
if (i % 2) == 0:
two_sents.append(sents[i])
else:
two_sents[len(two_sents) - 1] += " " + sents[i]
return two_sents
months = {
"January": "01",
"February": "02",
"March": "03",
"April": "04",
"May": "05",
"June": "06",
"July": "07",
"August": "08",
"September": "09",
"October": "10",
"November": "11",
"December": "12",
}
def build_date(year=2024, month="March", day=1):
return f"{year}{months[month]}{day}"
async def get_url_data(url, client):
try:
r = await client.get(url)
# print(r.status_code)
if r.status_code == 200:
# print("in")
soup = BeautifulSoup(r.content, "html.parser")
return soup
except Exception:
return None
def remove_punc(text):
res = re.sub(r"[^\w\s]", "", text)
return res
def split_ngrams(text, n):
# return n-grams of size n
words = text.split()
return [words[i : i + n] for i in range(len(words) - n + 1)]
async def parallel_scrap(urls):
async with httpx.AsyncClient(timeout=30) as client:
tasks = []
for url in urls:
tasks.append(get_url_data(url=url, client=client))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def matching_score(sentence_content_tuple):
sentence, content = sentence_content_tuple
if sentence in content:
return 1
else:
n = 5
ngrams = split_ngrams(sentence, n)
if len(ngrams) == 0:
return 0
matched = [x for x in ngrams if " ".join(x) in content]
return len(matched) / len(ngrams)
def process_with_multiprocessing(input_data):
with Pool(processes=4) as pool:
scores = pool.map(matching_score, input_data)
return scores
def print2d(array):
for row in array:
print(row)
def map_sentence_url(sentences, score_array):
sentenceToMaxURL = [-1] * len(sentences)
for j in range(len(sentences)):
if j > 0:
maxScore = score_array[sentenceToMaxURL[j - 1]][j]
sentenceToMaxURL[j] = sentenceToMaxURL[j - 1]
else:
maxScore = -1
for i in range(len(score_array)):
margin = (
0.05
if (j > 0 and sentenceToMaxURL[j] == sentenceToMaxURL[j - 1])
else 0
)
if score_array[i][j] - maxScore > margin:
maxScore = score_array[i][j]
sentenceToMaxURL[j] = i
return sentenceToMaxURL
def html_highlight(
plag_option,
input,
year_from,
month_from,
day_from,
year_to,
month_to,
day_to,
domains_to_skip,
):
sentence_scores, url_scores = plagiarism_check(
plag_option,
input,
year_from,
month_from,
day_from,
year_to,
month_to,
day_to,
domains_to_skip,
)
color_map = [
"#cf2323",
"#eb9d59",
"#c2ad36",
"#e1ed72",
"#c2db76",
"#a2db76",
]
font = "Roboto"
html_content = "<link href='https://fonts.googleapis.com/css?family=Roboto' rel='stylesheet'>\n<div style='font-family: {font}; border: 2px solid black; background-color: #333333; padding: 10px; color: #FFFFFF;'>"
prev_idx = None
combined_sentence = ""
for sentence, _, _, idx in sentence_scores:
if idx != prev_idx and prev_idx is not None:
color = color_map[prev_idx - 1]
index_part = f'<span style="background-color: {color}; padding: 2px;">[{prev_idx}]</span>'
formatted_sentence = f"<p>{combined_sentence} {index_part}</p>"
html_content += formatted_sentence
combined_sentence = ""
combined_sentence += " " + sentence
prev_idx = idx
if combined_sentence:
color = color_map[prev_idx - 1]
index_part = f'<span style="background-color: {color}; padding: 2px;">[{prev_idx}]</span>'
formatted_sentence = f"<p>{combined_sentence} {index_part}</p>"
html_content += formatted_sentence
html_content += "<hr>"
for url, score, idx in url_scores:
color = color_map[idx - 1]
formatted_url = f'<p style="background-color: {color}; padding: 5px;">({idx}) <b>{url}</b></p><p> --- Matching Score: {score}%</p>'
html_content += formatted_url
html_content += "</div>"
return html_content
def plagiarism_check(
plag_option,
input,
year_from,
month_from,
day_from,
year_to,
month_to,
day_to,
domains_to_skip,
):
api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g"
api_key = "AIzaSyCS1WQDMl1IMjaXtwSd_2rA195-Yc4psQE"
api_key = "AIzaSyCB61O70B8AC3l5Kk3KMoLb6DN37B7nqIk"
# api_key = "AIzaSyCg1IbevcTAXAPYeYreps6wYWDbU0Kz8tg"
# api_key = "AIzaSyA5VVwY1eEoIoflejObrxFDI0DJvtbmgW8"
cse_id = "851813e81162b4ed4"
url_scores = []
sentence_scores = []
sentences = split_sentence_blocks(input)
url_count = {}
score_array = []
url_list = []
date_from = build_date(year_from, month_from, day_from)
date_to = build_date(year_to, month_to, day_to)
sort_date = f"date:r:{date_from}:{date_to}"
# get list of URLS to check
url_count, score_array = google_search(
plag_option,
sentences,
url_count,
score_array,
url_list,
sort_date,
domains_to_skip,
api_key,
cse_id,
)
# Scrape URLs in list
soups = asyncio.run(parallel_scrap(url_list))
input_data = []
for i, soup in enumerate(soups):
if soup:
page_content = soup.text
for j, sent in enumerate(sentences):
input_data.append((sent, page_content))
scores = process_with_multiprocessing(input_data)
k = 0
# Update score array for each (soup, sentence)
for i, soup in enumerate(soups):
if soup:
for j, _ in enumerate(sentences):
score_array[i][j] = scores[k]
k += 1
sentenceToMaxURL = map_sentence_url(sentences, score_array)
index = np.unique(sentenceToMaxURL)
url_source = {}
for url in index:
s = [
score_array[url][sen]
for sen in range(len(sentences))
if sentenceToMaxURL[sen] == url
]
url_source[url] = sum(s) / len(s)
index_descending = sorted(url_source, key=url_source.get, reverse=True)
urlMap = {}
for count, i in enumerate(index_descending):
urlMap[i] = count + 1
# build results
for i, sent in enumerate(sentences):
ind = sentenceToMaxURL[i]
if url_source[ind] > 0.1:
sentence_scores.append(
[sent, url_source[ind], url_list[ind], urlMap[ind]]
)
else:
sentence_scores.append([sent, None, url_list[ind], -1])
for ind in index_descending:
if url_source[ind] > 0.1:
url_scores.append(
[url_list[ind], round(url_source[ind] * 100, 2), urlMap[ind]]
)
return sentence_scores, url_scores