Spaces:
Runtime error
Runtime error
import os | |
import glob | |
import json | |
import math | |
import time | |
import pickle | |
import openai | |
import requests | |
import pandas as pd | |
import gradio as gr | |
import plotly.graph_objects as go | |
from googlesearch import search | |
from scrapingbee import ScrapingBeeClient | |
global scraped_folder_path, category_folder_path, log_file_path, openapikey, scrapingbeekey, excel_file | |
# definition to create folder | |
def create_folder(_path): | |
try: | |
os.makedirs(_path) | |
return _path | |
except: | |
return _path | |
data_folder = 'data_folder' | |
scraped_folder = 'scrapped_files' | |
category_folder = 'categories' | |
excel_file = 'Scorecard_Final.xlsx' | |
log_folder = 'log_files' | |
create_folder(data_folder) | |
scraped_folder_path = create_folder(f'{data_folder}/{scraped_folder}') | |
category_folder_path = create_folder(f'{data_folder}/{category_folder}') | |
log_file_path = create_folder(f'{data_folder}/{log_folder}') | |
categories_found = [item.split('/')[-1] for item in glob.glob(f'{category_folder_path}/*')] | |
json_found = [item.split('/')[-1] for item in glob.glob(f'{scraped_folder_path}/*.json')] | |
def update_json_(x): | |
new_options = [] | |
for count_item, item in enumerate(sorted(glob.glob(f'{scraped_folder_path}/*'))): | |
# print(count_item+1) | |
new_options.append(item.split('/')[-1]) | |
return gr.Dropdown.update(choices=new_options, interactive=True) | |
# get file score | |
def get_raw_score(data): | |
score_dict = {} | |
for key in data.keys(): | |
score = 0 | |
if "Questions" in data[key].keys(): | |
for question in data[key]['Questions'].keys(): | |
if len(data[key]['Questions'][question]) > 0: | |
score += 1 | |
if data[key]['Topic'] not in score_dict: | |
score_dict[data[key]['Topic']] = 0 | |
score_dict[data[key]['Topic']] += score | |
df = pd.DataFrame(score_dict, index=[0]).T.reset_index() | |
df.columns = ['theta', 'r'] | |
return df, min(score_dict.values()), sum(score_dict.values()) | |
# getting the overall score | |
def get_overall_score(name): | |
# get the raw score | |
try: | |
data = json.load(open(f'{scraped_folder_path}/{name}')) | |
except: | |
data = json.load(open(name)) | |
score, level, experience = get_raw_score(data) | |
# adding the polar plots | |
fig = go.Figure() | |
fig.add_trace(go.Scatterpolar( | |
r = score.r, | |
theta = score.theta, | |
marker=dict(size=10, color = "magenta"), | |
fill='toself', | |
)) | |
file_type = name.replace('.json', '') | |
fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7)) | |
fig.update_layout(title_text=f'{file_type} >> level:{level}, exp:{experience}/100', | |
polar=dict(radialaxis=dict(range = [0, 20], visible=True,)), | |
showlegend=False) | |
return fig | |
def get_comparative_score(file, group=''): | |
if group != '': | |
json_files = glob.glob(f'{category_folder_path}/{group}/*.json') | |
if len(json_files) > 0: | |
for enum_jf, jf in enumerate(json_files): | |
print(enum_jf) | |
data = json.load(open(jf)) | |
if enum_jf == 0: | |
df, _, _ = get_raw_score(data) | |
continue | |
temp_df, _, _ = get_raw_score(data) | |
df = pd.concat([df, temp_df]) | |
else: | |
df = None | |
fig = get_overall_score(file) | |
if group != '': | |
if df is not None: | |
# adding the polar plots | |
fig.add_trace(go.Scatterpolar( | |
r = df.r, | |
theta = df.theta, | |
marker=dict(size=7, color = "limegreen"), | |
)) | |
fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7)) | |
fig.update_layout(polar=dict(radialaxis=dict(range = [0, 20], visible=True,)), | |
showlegend=False) | |
return fig | |
def create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls): | |
tries = 0 | |
gpt_filter_answer = '' | |
while tries < 3 and gpt_filter_answer == '': | |
try: | |
tries += 1 | |
gpt_filter_response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are an assistant who helps to extract information of a startup from its homepage. You should answer if the text is about a specific topic. You should only answer with either yes or no as the first word and explain why you made your choice"}, | |
{"role": "user", "content": f"{gpt_filter_prompt} \n {text}" }, | |
]) | |
for choice in gpt_filter_response.choices: | |
gpt_filter_answer += choice.message.content | |
except Exception as e: | |
print("filter tries: ", tries) | |
print(e) | |
# if the website is about the general question, then proceed to ask the scoring questions | |
if gpt_filter_answer == '': | |
print("Error The gpt filter responded with an empty string") | |
completely_useless_urls.append([url, gpt_filter_answer]) | |
return question_dictionary, useless_urls, completely_useless_urls | |
if gpt_filter_answer[:3].lower() == "yes" or gpt_filter_answer[:2].lower() == "ja": | |
for question in questions: | |
if type(question) != str: | |
continue | |
if question not in question_dictionary.keys(): | |
question_dictionary[question] = [] | |
tries = 0 | |
question_answer = "" | |
while tries < 3 and question_answer == '': | |
try: | |
tries += 1 | |
question_response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are an assistant who tries to answer always with yes or no in the first place. When yes, you explain the reason for it in 80 words by using a list of short descriptions"}, | |
{"role": "user", "content": f"{question} \n {text}" }, | |
]) | |
for choice in question_response.choices: | |
question_answer += choice.message.content | |
except Exception as e: | |
print("question tries: ", tries) | |
print(e) | |
# If the question is answered yes, save the reason and website | |
if question_answer[:3].lower() == "yes" or question_answer[:2].lower() == "ja": | |
# save question, url and answer? | |
question_dictionary[question].append([url, question_answer]) | |
else: | |
useless_urls.append([url, question_answer]) | |
else: | |
# safe url that didnt pass the filter | |
completely_useless_urls.append([url, gpt_filter_answer]) | |
return question_dictionary, useless_urls, completely_useless_urls | |
def log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls): | |
f.write("Frage: " + general_question + "\n") | |
print("Frage: ", general_question) | |
print("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:") | |
f.write("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:\n") | |
for question in question_dictionary.keys(): | |
question_list = question_dictionary[question] | |
if len(question_list) > 0: | |
for url, answer in question_list: | |
print("\t\tQuelle: ", url) | |
f.write("\t\t--Quelle: " + url + " | ") | |
print("\t\tAntwort: ", answer) | |
f.write("Antwort: " + answer.replace("\n", " ") + "\n") | |
print("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n") | |
f.write("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n") | |
for u_url in useless_urls: | |
print("\t\t", u_url) | |
f.write("\t\t--" + u_url[0] + " | " + u_url[1].replace("\n", " ") + "\n") | |
print("\n\t Auf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n") | |
f.write("\n\tAuf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n") | |
for cu_url in completely_useless_urls: | |
print("\t\t", cu_url) | |
f.write("\t\t--" + cu_url[0]+ " | " + cu_url[1].replace("\n", " ") + "\n") | |
f.write("---------------------------------------------------------------------------------------------------------------------------------\n") | |
print("----------------------------------------------------------------------\n") | |
# constants = pickle.load(open('aux_files/aux_file.exii', 'rb')) | |
scarpingbeekey = os.environ['getkey'] | |
openai.api_key = os.environ['chatkey'] | |
# openapikey = constants['openapi_key'] | |
# scarpingbeekey = constants['scrapingbee_key'] | |
# os.environ['OPEN_API_KEY'] = openapikey | |
# openai.api_key = openapikey | |
def send_request(google_prompt): | |
response = requests.get( | |
url="https://app.scrapingbee.com/api/v1/store/google", | |
params={ | |
"api_key": scarpingbeekey, | |
"search": google_prompt, | |
"add_html": True, | |
"nb_results": 1 | |
}, | |
) | |
return response.json() | |
def get_json_dict(df, web_list, progress, name): | |
filename="/" + name + "_log.txt" | |
with open(log_file_path+filename, 'w+', encoding='utf-8') as f: | |
output_dictionary = {} | |
topics = df['Topic'].unique() | |
for topic in progress.tqdm(topics, desc='Topic'): ########## | |
time.sleep(0.2) | |
print(topic) | |
df_topic = df[df['Topic'] == topic] | |
general_questions = df_topic['Questions'].unique() | |
for general_question in progress.tqdm(general_questions, desc='GenQ'): ########### | |
time.sleep(0.3) | |
output_dictionary[general_question] = {"Topic": topic} | |
df_question = df_topic[df_topic['Questions']==general_question].reset_index() | |
google_prompt = df_question['Google Prompts'].values[0] | |
gpt_filter_prompt = df_question['GPT Filter Prompt'].values[0] | |
questions = df_question.iloc[0, 5:].values.tolist() | |
question_dictionary = {} | |
useless_urls = [] # a list of urls that have the information that we are looking for but are answered not with yes | |
completely_useless_urls = [] # a list of urls that dont have the information that we are looking | |
# scrape google with google_prompt | |
request_json = send_request(google_prompt) | |
search_results = [] | |
num_urls = 1 | |
if 'organic_results' in request_json.keys(): | |
if len(request_json['organic_results']) == 0: | |
print("organic_results are empty") | |
else: | |
for i in range(num_urls): | |
search_results.append(request_json['organic_results'][i]['url']) | |
else: | |
print("organic_results not in request_json") | |
# adding the extra user defined prompts | |
search_results = list(set(search_results + web_list)) | |
if len(search_results) == 0: | |
print("Didnt have any search results for googleprompt:", google_prompt) | |
continue | |
# print the first 10 URLs | |
for url in progress.tqdm(search_results, desc='url'): | |
time.sleep(0.4) | |
# scrape the text of the website | |
client = ScrapingBeeClient(api_key=scarpingbeekey) | |
url_text = client.get(url, | |
params = { | |
'json_response': 'True', | |
'extract_rules': {"text": "body",}, | |
} | |
) | |
json_object = json.loads(url_text.text) | |
if 'body' not in json_object.keys(): | |
print("json_object has no key: body") | |
continue | |
if "text" in json_object['body'].keys(): | |
text_content = json_object['body']['text'] | |
else: | |
print("json_object['body'] has no key: text") | |
continue | |
if len(text_content) == 0: | |
continue | |
splitsize = 10000 | |
if len(text_content) > splitsize: | |
num_splits = math.ceil(len(text_content) / splitsize) | |
#for i in range(num_splits-1): | |
for i in range(1): | |
text = text_content[i*splitsize:(i+1)*splitsize] | |
question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) | |
text = text_content[(i+1)*splitsize:] | |
question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) | |
else: | |
question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text_content, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls) | |
log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls) | |
output_dictionary[general_question]['Questions'] = question_dictionary | |
return output_dictionary | |
def scrape_me(name, progress=gr.Progress()): | |
if name == '': | |
return f"Scraping not possible, empty entries found !" | |
#load the excel file | |
df_reference = pd.read_excel(excel_file, sheet_name="Sheet1") | |
# working with prompts in first brackets | |
if ',' in name: | |
entity_names = name.split(',') | |
else: | |
entity_names = [name] | |
entity_websites = [] | |
for en_num, en in enumerate(entity_names): | |
if "(" in en: | |
start = en.find("(") | |
end = en.find(")") | |
websites = en[start+1:end].split(";") | |
entity_names[en_num] = en[:start].strip() | |
else: | |
entity_names[en_num] = en.strip() | |
websites = [] | |
entity_websites.append(websites) | |
# looping thru' the entity and the prompts | |
count_web = 0 | |
for en in progress.tqdm(entity_names, desc='iterating through searchable entities'): | |
time.sleep(0.1) | |
# replacing the corporate name in the question string | |
enum_df = df_reference.replace({"<corporate>": en}, regex=True) | |
# retrieving the scraped data dictionary | |
json_dict = get_json_dict(enum_df.head(25), entity_websites[count_web], progress, en) | |
# converting and saving the json file | |
json_object = json.dumps(json_dict, indent = 4) | |
json_file = f'{scraped_folder_path}/{en}.json' | |
with open(json_file, "w") as outfile: | |
outfile.write(json_object) | |
count_web += 1 | |
return f"Scraped results for the following entities: {name} !" | |
with gr.Blocks(title='EXii Startup Scapper') as demo: | |
with gr.Tab("Scraping Toolbox"): | |
result_text = gr.Textbox(label='Debug Information', placeholder='Debug Information') | |
with gr.Row(): | |
scrapin_it_digga = gr.Text(label="Startup to scrape", | |
info='Separate two startups by a "," and force to seach in custom URLs within "()" and separate URLs ";"', | |
placeholder='saturn (https://www.saturn.de/; https://www.mediamarkt.de/), cyberport (https://www.cyberport.de/)') | |
with gr.Row(): | |
scrape_button = gr.Button("Start scraping") | |
with gr.Column(): | |
with gr.Row(): | |
scrapes_found = gr.Dropdown(json_found, label="Scraped startups", info="Select a scraped json files") | |
with gr.Row(): | |
json_update_button = gr.Button("Update scrapped data") | |
with gr.Column(): | |
# with gr.Row(): | |
# show_the_score = gr.Button('Plot score') | |
sexy_plot = gr.Plot(label='Exponential Growth Score') | |
json_update_button.click(update_json_, inputs=scrapes_found, outputs=scrapes_found) | |
scrape_button.click(scrape_me, inputs=scrapin_it_digga, outputs=result_text) | |
# show_the_score.click(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot) | |
scrapes_found.change(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot) | |
demo.queue(concurrency_count=4).launch(debug=True) |