Spaces:
Running
Running
import time | |
from nltk.tokenize import sent_tokenize | |
from googleapiclient.discovery import build | |
from collections import Counter | |
import re, math | |
from sentence_transformers import SentenceTransformer, util | |
import asyncio | |
import httpx | |
from bs4 import BeautifulSoup | |
import numpy as np | |
import concurrent | |
from multiprocessing import Pool | |
WORD = re.compile(r"\w+") | |
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
# returns cosine similarity of two vectors | |
# input: two vectors | |
# output: integer between 0 and 1. | |
def get_cosine(vec1, vec2): | |
intersection = set(vec1.keys()) & set(vec2.keys()) | |
# calculating numerator | |
numerator = sum([vec1[x] * vec2[x] for x in intersection]) | |
# calculating denominator | |
sum1 = sum([vec1[x] ** 2 for x in vec1.keys()]) | |
sum2 = sum([vec2[x] ** 2 for x in vec2.keys()]) | |
denominator = math.sqrt(sum1) * math.sqrt(sum2) | |
# checking for divide by zero | |
if denominator == 0: | |
return 0.0 | |
else: | |
return float(numerator) / denominator | |
# converts given text into a vector | |
def text_to_vector(text): | |
# uses the Regular expression above and gets all words | |
words = WORD.findall(text) | |
# returns a counter of all the words (count of number of occurences) | |
return Counter(words) | |
# returns cosine similarity of two words | |
# uses: text_to_vector(text) and get_cosine(v1,v2) | |
def cosineSim(text1, text2): | |
vector1 = text_to_vector(text1) | |
vector2 = text_to_vector(text2) | |
# print vector1,vector2 | |
cosine = get_cosine(vector1, vector2) | |
return cosine | |
def cos_sim_torch(embedding_1, embedding_2): | |
return util.pytorch_cos_sim(embedding_1, embedding_2).item() | |
def embed_text(text): | |
return model.encode(text, convert_to_tensor=True) | |
def sentence_similarity(text1, text2): | |
embedding_1 = model.encode(text1, convert_to_tensor=True) | |
embedding_2 = model.encode(text2, convert_to_tensor=True) | |
o = util.pytorch_cos_sim(embedding_1, embedding_2) | |
return o.item() | |
def google_search( | |
plag_option, | |
sentences, | |
urlCount, | |
scoreArray, | |
urlList, | |
sorted_date, | |
domains_to_skip, | |
api_key, | |
cse_id, | |
**kwargs, | |
): | |
service = build("customsearch", "v1", developerKey=api_key) | |
for i, sentence in enumerate(sentences): | |
results = ( | |
service.cse() | |
.list(q=sentence, cx=cse_id, sort=sorted_date, **kwargs) | |
.execute() | |
) | |
if "items" in results and len(results["items"]) > 0: | |
for count, link in enumerate(results["items"]): | |
# stop after 3 pages | |
if count >= 4: | |
break | |
# skip user selected domains | |
if any( | |
("." + domain) in link["link"] for domain in domains_to_skip | |
): | |
continue | |
# clean up snippet of '...' | |
snippet = link["snippet"] | |
ind = snippet.find("...") | |
if ind < 20 and ind > 9: | |
snippet = snippet[ind + len("... ") :] | |
ind = snippet.find("...") | |
if ind > len(snippet) - 5: | |
snippet = snippet[:ind] | |
# update cosine similarity between snippet and given text | |
url = link["link"] | |
if url not in urlList: | |
urlList.append(url) | |
scoreArray.append([0] * len(sentences)) | |
urlCount[url] = urlCount[url] + 1 if url in urlCount else 1 | |
if plag_option == "Standard": | |
scoreArray[urlList.index(url)][i] = cosineSim( | |
sentence, snippet | |
) | |
else: | |
scoreArray[urlList.index(url)][i] = sentence_similarity( | |
sentence, snippet | |
) | |
return urlCount, scoreArray | |
def split_sentence_blocks(text): | |
sents = sent_tokenize(text) | |
two_sents = [] | |
for i in range(len(sents)): | |
if (i % 4) == 0: | |
two_sents.append(sents[i]) | |
else: | |
two_sents[len(two_sents) - 1] += " " + sents[i] | |
return two_sents | |
months = { | |
"January": "01", | |
"February": "02", | |
"March": "03", | |
"April": "04", | |
"May": "05", | |
"June": "06", | |
"July": "07", | |
"August": "08", | |
"September": "09", | |
"October": "10", | |
"November": "11", | |
"December": "12", | |
} | |
def build_date(year=2024, month="March", day=1): | |
return f"{year}{months[month]}{day}" | |
async def get_url_data(url, client): | |
try: | |
r = await client.get(url) | |
# print(r.status_code) | |
if r.status_code == 200: | |
# print("in") | |
soup = BeautifulSoup(r.content, "html.parser") | |
return soup | |
except Exception: | |
return None | |
def remove_punc(text): | |
res = re.sub(r"[^\w\s]", "", text) | |
return res | |
def split_ngrams(text, n): | |
# return n-grams of size n | |
words = text.split() | |
return [words[i : i + n] for i in range(len(words) - n + 1)] | |
async def parallel_scrap(urls): | |
async with httpx.AsyncClient(timeout=30) as client: | |
tasks = [] | |
for url in urls: | |
tasks.append(get_url_data(url=url, client=client)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
return results | |
def matching_score(sentence_content_tuple): | |
sentence, content = sentence_content_tuple | |
if sentence in content: | |
return 1 | |
else: | |
n = 5 | |
ngrams = split_ngrams(sentence, n) | |
if len(ngrams) == 0: | |
return 0 | |
matched = [x for x in ngrams if " ".join(x) in content] | |
return len(matched) / len(ngrams) | |
def process_with_multiprocessing(input_data): | |
with Pool(processes=4) as pool: | |
scores = pool.map(matching_score, input_data) | |
return scores | |
def plagiarism_check( | |
plag_option, | |
input, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
): | |
api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g" | |
api_key = "AIzaSyCS1WQDMl1IMjaXtwSd_2rA195-Yc4psQE" | |
api_key = "AIzaSyCB61O70B8AC3l5Kk3KMoLb6DN37B7nqIk" | |
# api_key = "AIzaSyCg1IbevcTAXAPYeYreps6wYWDbU0Kz8tg" | |
api_key = "AIzaSyA5VVwY1eEoIoflejObrxFDI0DJvtbmgW8" | |
cse_id = "851813e81162b4ed4" | |
sentences = split_sentence_blocks(input) | |
urlCount = {} | |
ScoreArray = [] | |
urlList = [] | |
date_from = build_date(year_from, month_from, day_from) | |
date_to = build_date(year_to, month_to, day_to) | |
sort_date = f"date:r:{date_from}:{date_to}" | |
# get list of URLS to check | |
urlCount, ScoreArray = google_search( | |
plag_option, | |
sentences, | |
urlCount, | |
ScoreArray, | |
urlList, | |
sort_date, | |
domains_to_skip, | |
api_key, | |
cse_id, | |
) | |
# Scrape URLs in list | |
formatted_tokens = [] | |
soups = asyncio.run(parallel_scrap(urlList)) | |
# # Populate matching scores for scrapped pages | |
# for i, soup in enumerate(soups): | |
# print(f"Analyzing {i+1} of {len(soups)} soups........................") | |
# if soup: | |
# page_content = soup.text | |
# for j, sent in enumerate(sentences): | |
# args_list = (sent, page_content) | |
# score = matching_score(args_list) | |
# # score = cos_sim_torch(embed_text(sent), source_embeddings[i]) | |
# ScoreArray[i][j] = score | |
input_data = [] | |
for i, soup in enumerate(soups): | |
if soup: | |
page_content = soup.text | |
for j, sent in enumerate(sentences): | |
input_data.append((sent, page_content)) | |
scores = process_with_multiprocessing(input_data) | |
k = 0 | |
for i, soup in enumerate(soups): | |
if soup: | |
for j, _ in enumerate(sentences): | |
ScoreArray[i][j] = scores[k] | |
k += 1 | |
sentenceToMaxURL = [-1] * len(sentences) | |
for j in range(len(sentences)): | |
if j > 0: | |
maxScore = ScoreArray[sentenceToMaxURL[j - 1]][j] | |
sentenceToMaxURL[j] = sentenceToMaxURL[j - 1] | |
else: | |
maxScore = -1 | |
for i in range(len(ScoreArray)): | |
margin = ( | |
0.1 | |
if (j > 0 and sentenceToMaxURL[j] == sentenceToMaxURL[j - 1]) | |
else 0 | |
) | |
if ScoreArray[i][j] - maxScore > margin: | |
maxScore = ScoreArray[i][j] | |
sentenceToMaxURL[j] = i | |
index = np.unique(sentenceToMaxURL) | |
urlScore = {} | |
for url in index: | |
s = [ | |
ScoreArray[url][sen] | |
for sen in range(len(sentences)) | |
if sentenceToMaxURL[sen] == url | |
] | |
urlScore[url] = sum(s) / len(s) | |
index_descending = sorted(urlScore, key=urlScore.get, reverse=True) | |
urlMap = {} | |
for count, i in enumerate(index_descending): | |
urlMap[i] = count + 1 | |
for i, sent in enumerate(sentences): | |
formatted_tokens.append( | |
(sent, "[" + str(urlMap[sentenceToMaxURL[i]]) + "]") | |
) | |
formatted_tokens.append("\n\n\n") | |
for ind in index_descending: | |
formatted_tokens.append( | |
( | |
urlList[ind] | |
+ " --- Matching Score: " | |
+ f"{str(round(urlScore[ind] * 100, 2))}%", | |
"[" + str(urlMap[ind]) + "]", | |
) | |
) | |
formatted_tokens.append(("\n", None)) | |
return formatted_tokens | |