import time from nltk.tokenize import sent_tokenize from googleapiclient.discovery import build from collections import Counter import re, math from sentence_transformers import SentenceTransformer, util import asyncio import httpx from bs4 import BeautifulSoup import numpy as np import concurrent from multiprocessing import Pool WORD = re.compile(r"\w+") model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # returns cosine similarity of two vectors # input: two vectors # output: integer between 0 and 1. def get_cosine(vec1, vec2): intersection = set(vec1.keys()) & set(vec2.keys()) # calculating numerator numerator = sum([vec1[x] * vec2[x] for x in intersection]) # calculating denominator sum1 = sum([vec1[x] ** 2 for x in vec1.keys()]) sum2 = sum([vec2[x] ** 2 for x in vec2.keys()]) denominator = math.sqrt(sum1) * math.sqrt(sum2) # checking for divide by zero if denominator == 0: return 0.0 else: return float(numerator) / denominator # converts given text into a vector def text_to_vector(text): # uses the Regular expression above and gets all words words = WORD.findall(text) # returns a counter of all the words (count of number of occurences) return Counter(words) # returns cosine similarity of two words # uses: text_to_vector(text) and get_cosine(v1,v2) def cosineSim(text1, text2): vector1 = text_to_vector(text1) vector2 = text_to_vector(text2) # print vector1,vector2 cosine = get_cosine(vector1, vector2) return cosine def cos_sim_torch(embedding_1, embedding_2): return util.pytorch_cos_sim(embedding_1, embedding_2).item() def embed_text(text): return model.encode(text, convert_to_tensor=True) def sentence_similarity(text1, text2): embedding_1 = model.encode(text1, convert_to_tensor=True) embedding_2 = model.encode(text2, convert_to_tensor=True) o = util.pytorch_cos_sim(embedding_1, embedding_2) return o.item() def google_search( plag_option, sentences, url_count, score_array, url_list, sorted_date, domains_to_skip, api_key, cse_id, **kwargs, ): service = build("customsearch", "v1", developerKey=api_key) for i, sentence in enumerate(sentences): results = ( service.cse() .list(q=sentence, cx=cse_id, sort=sorted_date, **kwargs) .execute() ) if "items" in results and len(results["items"]) > 0: for count, link in enumerate(results["items"]): # stop after 3 pages if count >= 3: break # skip user selected domains if any( ("." + domain) in link["link"] for domain in domains_to_skip ): continue # clean up snippet of '...' snippet = link["snippet"] ind = snippet.find("...") if ind < 20 and ind > 9: snippet = snippet[ind + len("... ") :] ind = snippet.find("...") if ind > len(snippet) - 5: snippet = snippet[:ind] # update cosine similarity between snippet and given text url = link["link"] if url not in url_list: url_list.append(url) score_array.append([0] * len(sentences)) url_count[url] = url_count[url] + 1 if url in url_count else 1 if plag_option == "Standard": score_array[url_list.index(url)][i] = cosineSim( sentence, snippet ) else: score_array[url_list.index(url)][i] = sentence_similarity( sentence, snippet ) return url_count, score_array def split_sentence_blocks(text): two_sents = [] for para in text.split("\n\n"): sents = sent_tokenize(para) for i in range(len(sents)): if (i % 2) == 0: two_sents.append(sents[i]) else: two_sents[len(two_sents) - 1] += " " + sents[i] return two_sents months = { "January": "01", "February": "02", "March": "03", "April": "04", "May": "05", "June": "06", "July": "07", "August": "08", "September": "09", "October": "10", "November": "11", "December": "12", } def build_date(year=2024, month="March", day=1): return f"{year}{months[month]}{day}" async def get_url_data(url, client): try: r = await client.get(url) # print(r.status_code) if r.status_code == 200: # print("in") soup = BeautifulSoup(r.content, "html.parser") return soup except Exception: return None def remove_punc(text): res = re.sub(r"[^\w\s]", "", text) return res def split_ngrams(text, n): # return n-grams of size n words = text.split() return [words[i : i + n] for i in range(len(words) - n + 1)] async def parallel_scrap(urls): async with httpx.AsyncClient(timeout=30) as client: tasks = [] for url in urls: tasks.append(get_url_data(url=url, client=client)) results = await asyncio.gather(*tasks, return_exceptions=True) return results def matching_score(sentence_content_tuple): sentence, content = sentence_content_tuple if sentence in content: return 1 else: n = 5 ngrams = split_ngrams(sentence, n) if len(ngrams) == 0: return 0 matched = [x for x in ngrams if " ".join(x) in content] return len(matched) / len(ngrams) def process_with_multiprocessing(input_data): with Pool(processes=4) as pool: scores = pool.map(matching_score, input_data) return scores def print2d(array): for row in array: print(row) def map_sentence_url(sentences, score_array): sentenceToMaxURL = [-1] * len(sentences) for j in range(len(sentences)): if j > 0: maxScore = score_array[sentenceToMaxURL[j - 1]][j] sentenceToMaxURL[j] = sentenceToMaxURL[j - 1] else: maxScore = -1 for i in range(len(score_array)): margin = ( 0.05 if (j > 0 and sentenceToMaxURL[j] == sentenceToMaxURL[j - 1]) else 0 ) if score_array[i][j] - maxScore > margin: maxScore = score_array[i][j] sentenceToMaxURL[j] = i return sentenceToMaxURL def html_highlight( plag_option, input, year_from, month_from, day_from, year_to, month_to, day_to, domains_to_skip, ): sentence_scores, url_scores = plagiarism_check( plag_option, input, year_from, month_from, day_from, year_to, month_to, day_to, domains_to_skip, ) color_map = [ "#cf2323", "#eb9d59", "#c2ad36", "#e1ed72", "#c2db76", "#a2db76", ] font = "Roboto" html_content = "\n
{combined_sentence} {index_part}
" html_content += formatted_sentence combined_sentence = "" combined_sentence += " " + sentence prev_idx = idx if combined_sentence: color = color_map[prev_idx - 1] index_part = f'[{prev_idx}]' formatted_sentence = f"{combined_sentence} {index_part}
" html_content += formatted_sentence html_content += "({idx}) {url}
--- Matching Score: {score}%
' html_content += formatted_url html_content += "