import os import glob import json import math import time import pickle import openai import requests import pandas as pd import gradio as gr import plotly.graph_objects as go from googlesearch import search from scrapingbee import ScrapingBeeClient global scraped_folder_path, category_folder_path, log_file_path, openapikey, scrapingbeekey, excel_file # definition to create folder def create_folder(_path): try: os.makedirs(_path) return _path except: return _path data_folder = 'data_folder' scraped_folder = 'scrapped_files' category_folder = 'categories' excel_file = 'Scorecard_Final.xlsx' log_folder = 'log_files' create_folder(data_folder) scraped_folder_path = create_folder(f'{data_folder}/{scraped_folder}') category_folder_path = create_folder(f'{data_folder}/{category_folder}') log_file_path = create_folder(f'{data_folder}/{log_folder}') categories_found = [item.split('/')[-1] for item in glob.glob(f'{category_folder_path}/*')] json_found = [item.split('/')[-1] for item in glob.glob(f'{scraped_folder_path}/*.json')] def update_json_(x): new_options = [] for count_item, item in enumerate(sorted(glob.glob(f'{scraped_folder_path}/*'))): # print(count_item+1) new_options.append(item.split('/')[-1]) return gr.Dropdown.update(choices=new_options, interactive=True) # get file score def get_raw_score(data): score_dict = {} for key in data.keys(): score = 0 if "Questions" in data[key].keys(): for question in data[key]['Questions'].keys(): if len(data[key]['Questions'][question]) > 0: score += 1 if data[key]['Topic'] not in score_dict: score_dict[data[key]['Topic']] = 0 score_dict[data[key]['Topic']] += score df = pd.DataFrame(score_dict, index=[0]).T.reset_index() df.columns = ['theta', 'r'] return df, min(score_dict.values()), sum(score_dict.values()) # getting the overall score def get_overall_score(name): # get the raw score try: data = json.load(open(f'{scraped_folder_path}/{name}')) except: data = json.load(open(name)) score, level, experience = get_raw_score(data) # adding the polar plots fig = go.Figure() fig.add_trace(go.Scatterpolar( r = score.r, theta = score.theta, marker=dict(size=10, color = "magenta"), fill='toself', )) file_type = name.replace('.json', '') fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7)) fig.update_layout(title_text=f'{file_type} >> level:{level}, exp:{experience}/100', polar=dict(radialaxis=dict(range = [0, 20], visible=True,)), showlegend=False) return fig def get_comparative_score(file, group=''): if group != '': json_files = glob.glob(f'{category_folder_path}/{group}/*.json') if len(json_files) > 0: for enum_jf, jf in enumerate(json_files): print(enum_jf) data = json.load(open(jf)) if enum_jf == 0: df, _, _ = get_raw_score(data) continue temp_df, _, _ = get_raw_score(data) df = pd.concat([df, temp_df]) else: df = None fig = get_overall_score(file) if group != '': if df is not None: # adding the polar plots fig.add_trace(go.Scatterpolar( r = df.r, theta = df.theta, marker=dict(size=7, color = "limegreen"), )) fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7)) fig.update_layout(polar=dict(radialaxis=dict(range = [0, 20], visible=True,)), showlegend=False) return fig def create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls): tries = 0 gpt_filter_answer = '' while tries < 3 and gpt_filter_answer == '': try: tries += 1 gpt_filter_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are an assistant who helps to extract information of a startup from its homepage. You should answer if the text is about a specific topic. You should only answer with either yes or no as the first word and explain why you made your choice"}, {"role": "user", "content": f"{gpt_filter_prompt} \n {text}" }, ]) for choice in gpt_filter_response.choices: gpt_filter_answer += choice.message.content except Exception as e: print("filter tries: ", tries) print(e) # if the website is about the general question, then proceed to ask the scoring questions if gpt_filter_answer == '': print("Error The gpt filter responded with an empty string") completely_useless_urls.append([url, gpt_filter_answer]) return question_dictionary, useless_urls, completely_useless_urls if gpt_filter_answer[:3].lower() == "yes" or gpt_filter_answer[:2].lower() == "ja": for question in questions: if type(question) != str: continue if question not in question_dictionary.keys(): question_dictionary[question] = [] tries = 0 question_answer = "" while tries < 3 and question_answer == '': try: tries += 1 question_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are an assistant who tries to answer always with yes or no in the first place. When yes, you explain the reason for it in 80 words by using a list of short descriptions"}, {"role": "user", "content": f"{question} \n {text}" }, ]) for choice in question_response.choices: question_answer += choice.message.content except Exception as e: print("question tries: ", tries) print(e) # If the question is answered yes, save the reason and website if question_answer[:3].lower() == "yes" or question_answer[:2].lower() == "ja": # save question, url and answer? question_dictionary[question].append([url, question_answer]) else: useless_urls.append([url, question_answer]) else: # safe url that didnt pass the filter completely_useless_urls.append([url, gpt_filter_answer]) return question_dictionary, useless_urls, completely_useless_urls def log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls): f.write("Frage: " + general_question + "\n") print("Frage: ", general_question) print("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:") f.write("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:\n") for question in question_dictionary.keys(): question_list = question_dictionary[question] if len(question_list) > 0: for url, answer in question_list: print("\t\tQuelle: ", url) f.write("\t\t--Quelle: " + url + " | ") print("\t\tAntwort: ", answer) f.write("Antwort: " + answer.replace("\n", " ") + "\n") print("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n") f.write("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n") for u_url in useless_urls: print("\t\t", u_url) f.write("\t\t--" + u_url[0] + " | " + u_url[1].replace("\n", " ") + "\n") print("\n\t Auf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n") f.write("\n\tAuf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n") for cu_url in completely_useless_urls: print("\t\t", cu_url) f.write("\t\t--" + cu_url[0]+ " | " + cu_url[1].replace("\n", " ") + "\n") f.write("---------------------------------------------------------------------------------------------------------------------------------\n") print("----------------------------------------------------------------------\n") # constants = pickle.load(open('aux_files/aux_file.exii', 'rb')) scarpingbeekey = os.environ['getkey'] openai.api_key = os.environ['chatkey'] # openapikey = constants['openapi_key'] # scarpingbeekey = constants['scrapingbee_key'] # os.environ['OPEN_API_KEY'] = openapikey # openai.api_key = openapikey def send_request(google_prompt): response = requests.get( url="https://app.scrapingbee.com/api/v1/store/google", params={ "api_key": scarpingbeekey, "search": google_prompt, "add_html": True, "nb_results": 1 }, ) return response.json() def get_json_dict(df, web_list, progress, name): filename="/" + name + "_log.txt" with open(log_file_path+filename, 'w+', encoding='utf-8') as f: output_dictionary = {} topics = df['Topic'].unique() for topic in progress.tqdm(topics, desc='Topic'): ########## time.sleep(0.2) print(topic) df_topic = df[df['Topic'] == topic] general_questions = df_topic['Questions'].unique() for general_question in progress.tqdm(general_questions, desc='GenQ'): ########### time.sleep(0.3) output_dictionary[general_question] = {"Topic": topic} df_question = df_topic[df_topic['Questions']==general_question].reset_index() google_prompt = df_question['Google Prompts'].values[0] gpt_filter_prompt = df_question['GPT Filter Prompt'].values[0] questions = df_question.iloc[0, 5:].values.tolist() question_dictionary = {} useless_urls = [] # a list of urls that have the information that we are looking for but are answered not with yes completely_useless_urls = [] # a list of urls that dont have the information that we are looking # scrape google with google_prompt request_json = send_request(google_prompt) search_results = [] num_urls = 1 if 'organic_results' in request_json.keys(): if len(request_json['organic_results']) == 0: print("organic_results are empty") else: for i in range(num_urls): search_results.append(request_json['organic_results'][i]['url']) else: print("organic_results not in request_json") # adding the extra user defined prompts search_results = list(set(search_results + web_list)) if len(search_results) == 0: print("Didnt have any search results for googleprompt:", google_prompt) continue # print the first 10 URLs for url in progress.tqdm(search_results, desc='url'): time.sleep(0.4) # scrape the text of the website client = ScrapingBeeClient(api_key=scarpingbeekey) url_text = client.get(url, params = { 'json_response': 'True', 'extract_rules': {"text": "body",}, } ) json_object = json.loads(url_text.text) if 'body' not in json_object.keys(): print("json_object has no key: body") continue if "text" in json_object['body'].keys(): text_content = json_object['body']['text'] else: print("json_object['body'] has no key: text") continue if len(text_content) == 0: continue splitsize = 10000 if len(text_content) > splitsize: num_splits = math.ceil(len(text_content) / splitsize) #for i in range(num_splits-1): for i in range(1): text = text_content[i*splitsize:(i+1)*splitsize] question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) text = text_content[(i+1)*splitsize:] question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) else: question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text_content, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls) output_dictionary[general_question]['Questions'] = question_dictionary return output_dictionary def scrape_me(name, progress=gr.Progress()): if name == '': return f"Scraping not possible, empty entries found !" #load the excel file df_reference = pd.read_excel(excel_file, sheet_name="Sheet1") # working with prompts in first brackets if ',' in name: entity_names = name.split(',') else: entity_names = [name] entity_websites = [] for en_num, en in enumerate(entity_names): if "(" in en: start = en.find("(") end = en.find(")") websites = en[start+1:end].split(";") entity_names[en_num] = en[:start].strip() else: entity_names[en_num] = en.strip() websites = [] entity_websites.append(websites) # looping thru' the entity and the prompts count_web = 0 for en in progress.tqdm(entity_names, desc='iterating through searchable entities'): time.sleep(0.1) # replacing the corporate name in the question string enum_df = df_reference.replace({"": en}, regex=True) # retrieving the scraped data dictionary json_dict = get_json_dict(enum_df.head(25), entity_websites[count_web], progress, en) # converting and saving the json file json_object = json.dumps(json_dict, indent = 4) json_file = f'{scraped_folder_path}/{en}.json' with open(json_file, "w") as outfile: outfile.write(json_object) count_web += 1 return f"Scraped results for the following entities: {name} !" with gr.Blocks(title='EXii Startup Scapper') as demo: with gr.Tab("Scraping Toolbox"): result_text = gr.Textbox(label='Debug Information', placeholder='Debug Information') with gr.Row(): scrapin_it_digga = gr.Text(label="Startup to scrape", info='Separate two startups by a "," and force to seach in custom URLs within "()" and separate URLs ";"', placeholder='saturn (https://www.saturn.de/; https://www.mediamarkt.de/), cyberport (https://www.cyberport.de/)') with gr.Row(): scrape_button = gr.Button("Start scraping") with gr.Column(): with gr.Row(): scrapes_found = gr.Dropdown(json_found, label="Scraped startups", info="Select a scraped json files") with gr.Row(): json_update_button = gr.Button("Update scrapped data") with gr.Column(): # with gr.Row(): # show_the_score = gr.Button('Plot score') sexy_plot = gr.Plot(label='Exponential Growth Score') json_update_button.click(update_json_, inputs=scrapes_found, outputs=scrapes_found) scrape_button.click(scrape_me, inputs=scrapin_it_digga, outputs=result_text) # show_the_score.click(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot) scrapes_found.change(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot) demo.queue(concurrency_count=4).launch(debug=True)