arcan3
added secrets
d433edf
import os
import glob
import json
import math
import time
import pickle
import openai
import requests
import pandas as pd
import gradio as gr
import plotly.graph_objects as go
from googlesearch import search
from scrapingbee import ScrapingBeeClient
global scraped_folder_path, category_folder_path, log_file_path, openapikey, scrapingbeekey, excel_file
# definition to create folder
def create_folder(_path):
try:
os.makedirs(_path)
return _path
except:
return _path
data_folder = 'data_folder'
scraped_folder = 'scrapped_files'
category_folder = 'categories'
excel_file = 'Scorecard_Final.xlsx'
log_folder = 'log_files'
create_folder(data_folder)
scraped_folder_path = create_folder(f'{data_folder}/{scraped_folder}')
category_folder_path = create_folder(f'{data_folder}/{category_folder}')
log_file_path = create_folder(f'{data_folder}/{log_folder}')
categories_found = [item.split('/')[-1] for item in glob.glob(f'{category_folder_path}/*')]
json_found = [item.split('/')[-1] for item in glob.glob(f'{scraped_folder_path}/*.json')]
def update_json_(x):
new_options = []
for count_item, item in enumerate(sorted(glob.glob(f'{scraped_folder_path}/*'))):
# print(count_item+1)
new_options.append(item.split('/')[-1])
return gr.Dropdown.update(choices=new_options, interactive=True)
# get file score
def get_raw_score(data):
score_dict = {}
for key in data.keys():
score = 0
if "Questions" in data[key].keys():
for question in data[key]['Questions'].keys():
if len(data[key]['Questions'][question]) > 0:
score += 1
if data[key]['Topic'] not in score_dict:
score_dict[data[key]['Topic']] = 0
score_dict[data[key]['Topic']] += score
df = pd.DataFrame(score_dict, index=[0]).T.reset_index()
df.columns = ['theta', 'r']
return df, min(score_dict.values()), sum(score_dict.values())
# getting the overall score
def get_overall_score(name):
# get the raw score
try:
data = json.load(open(f'{scraped_folder_path}/{name}'))
except:
data = json.load(open(name))
score, level, experience = get_raw_score(data)
# adding the polar plots
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r = score.r,
theta = score.theta,
marker=dict(size=10, color = "magenta"),
fill='toself',
))
file_type = name.replace('.json', '')
fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7))
fig.update_layout(title_text=f'{file_type} >> level:{level}, exp:{experience}/100',
polar=dict(radialaxis=dict(range = [0, 20], visible=True,)),
showlegend=False)
return fig
def get_comparative_score(file, group=''):
if group != '':
json_files = glob.glob(f'{category_folder_path}/{group}/*.json')
if len(json_files) > 0:
for enum_jf, jf in enumerate(json_files):
print(enum_jf)
data = json.load(open(jf))
if enum_jf == 0:
df, _, _ = get_raw_score(data)
continue
temp_df, _, _ = get_raw_score(data)
df = pd.concat([df, temp_df])
else:
df = None
fig = get_overall_score(file)
if group != '':
if df is not None:
# adding the polar plots
fig.add_trace(go.Scatterpolar(
r = df.r,
theta = df.theta,
marker=dict(size=7, color = "limegreen"),
))
fig.update_traces(mode="markers", marker=dict(line_color='white', opacity=0.7))
fig.update_layout(polar=dict(radialaxis=dict(range = [0, 20], visible=True,)),
showlegend=False)
return fig
def create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls):
tries = 0
gpt_filter_answer = ''
while tries < 3 and gpt_filter_answer == '':
try:
tries += 1
gpt_filter_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an assistant who helps to extract information of a startup from its homepage. You should answer if the text is about a specific topic. You should only answer with either yes or no as the first word and explain why you made your choice"},
{"role": "user", "content": f"{gpt_filter_prompt} \n {text}" },
])
for choice in gpt_filter_response.choices:
gpt_filter_answer += choice.message.content
except Exception as e:
print("filter tries: ", tries)
print(e)
# if the website is about the general question, then proceed to ask the scoring questions
if gpt_filter_answer == '':
print("Error The gpt filter responded with an empty string")
completely_useless_urls.append([url, gpt_filter_answer])
return question_dictionary, useless_urls, completely_useless_urls
if gpt_filter_answer[:3].lower() == "yes" or gpt_filter_answer[:2].lower() == "ja":
for question in questions:
if type(question) != str:
continue
if question not in question_dictionary.keys():
question_dictionary[question] = []
tries = 0
question_answer = ""
while tries < 3 and question_answer == '':
try:
tries += 1
question_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an assistant who tries to answer always with yes or no in the first place. When yes, you explain the reason for it in 80 words by using a list of short descriptions"},
{"role": "user", "content": f"{question} \n {text}" },
])
for choice in question_response.choices:
question_answer += choice.message.content
except Exception as e:
print("question tries: ", tries)
print(e)
# If the question is answered yes, save the reason and website
if question_answer[:3].lower() == "yes" or question_answer[:2].lower() == "ja":
# save question, url and answer?
question_dictionary[question].append([url, question_answer])
else:
useless_urls.append([url, question_answer])
else:
# safe url that didnt pass the filter
completely_useless_urls.append([url, gpt_filter_answer])
return question_dictionary, useless_urls, completely_useless_urls
def log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls):
f.write("Frage: " + general_question + "\n")
print("Frage: ", general_question)
print("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:")
f.write("\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden mit Punktevergabe:\n")
for question in question_dictionary.keys():
question_list = question_dictionary[question]
if len(question_list) > 0:
for url, answer in question_list:
print("\t\tQuelle: ", url)
f.write("\t\t--Quelle: " + url + " | ")
print("\t\tAntwort: ", answer)
f.write("Antwort: " + answer.replace("\n", " ") + "\n")
print("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n")
f.write("\n\tAuf den Folgenden Websiten wurden Informationen zu dieser Frage gefunden, aber keine Punktevergabe:\n")
for u_url in useless_urls:
print("\t\t", u_url)
f.write("\t\t--" + u_url[0] + " | " + u_url[1].replace("\n", " ") + "\n")
print("\n\t Auf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n")
f.write("\n\tAuf den Folgenden Websiten wurden keine Informationen zu dieser Frage gefunden:\n")
for cu_url in completely_useless_urls:
print("\t\t", cu_url)
f.write("\t\t--" + cu_url[0]+ " | " + cu_url[1].replace("\n", " ") + "\n")
f.write("---------------------------------------------------------------------------------------------------------------------------------\n")
print("----------------------------------------------------------------------\n")
# constants = pickle.load(open('aux_files/aux_file.exii', 'rb'))
scarpingbeekey = os.environ['getkey']
openai.api_key = os.environ['chatkey']
# openapikey = constants['openapi_key']
# scarpingbeekey = constants['scrapingbee_key']
# os.environ['OPEN_API_KEY'] = openapikey
# openai.api_key = openapikey
def send_request(google_prompt):
response = requests.get(
url="https://app.scrapingbee.com/api/v1/store/google",
params={
"api_key": scarpingbeekey,
"search": google_prompt,
"add_html": True,
"nb_results": 1
},
)
return response.json()
def get_json_dict(df, web_list, progress, name):
filename="/" + name + "_log.txt"
with open(log_file_path+filename, 'w+', encoding='utf-8') as f:
output_dictionary = {}
topics = df['Topic'].unique()
for topic in progress.tqdm(topics, desc='Topic'): ##########
time.sleep(0.2)
print(topic)
df_topic = df[df['Topic'] == topic]
general_questions = df_topic['Questions'].unique()
for general_question in progress.tqdm(general_questions, desc='GenQ'): ###########
time.sleep(0.3)
output_dictionary[general_question] = {"Topic": topic}
df_question = df_topic[df_topic['Questions']==general_question].reset_index()
google_prompt = df_question['Google Prompts'].values[0]
gpt_filter_prompt = df_question['GPT Filter Prompt'].values[0]
questions = df_question.iloc[0, 5:].values.tolist()
question_dictionary = {}
useless_urls = [] # a list of urls that have the information that we are looking for but are answered not with yes
completely_useless_urls = [] # a list of urls that dont have the information that we are looking
# scrape google with google_prompt
request_json = send_request(google_prompt)
search_results = []
num_urls = 1
if 'organic_results' in request_json.keys():
if len(request_json['organic_results']) == 0:
print("organic_results are empty")
else:
for i in range(num_urls):
search_results.append(request_json['organic_results'][i]['url'])
else:
print("organic_results not in request_json")
# adding the extra user defined prompts
search_results = list(set(search_results + web_list))
if len(search_results) == 0:
print("Didnt have any search results for googleprompt:", google_prompt)
continue
# print the first 10 URLs
for url in progress.tqdm(search_results, desc='url'):
time.sleep(0.4)
# scrape the text of the website
client = ScrapingBeeClient(api_key=scarpingbeekey)
url_text = client.get(url,
params = {
'json_response': 'True',
'extract_rules': {"text": "body",},
}
)
json_object = json.loads(url_text.text)
if 'body' not in json_object.keys():
print("json_object has no key: body")
continue
if "text" in json_object['body'].keys():
text_content = json_object['body']['text']
else:
print("json_object['body'] has no key: text")
continue
if len(text_content) == 0:
continue
splitsize = 10000
if len(text_content) > splitsize:
num_splits = math.ceil(len(text_content) / splitsize)
#for i in range(num_splits-1):
for i in range(1):
text = text_content[i*splitsize:(i+1)*splitsize]
question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls)
text = text_content[(i+1)*splitsize:]
question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls)
else:
question_dictionary, useless_urls, completely_useless_urls = create_question_dictionary(url, text_content, questions, question_dictionary, gpt_filter_prompt, useless_urls, completely_useless_urls)
log_to_file(f, general_question, question_dictionary, useless_urls, completely_useless_urls)
output_dictionary[general_question]['Questions'] = question_dictionary
return output_dictionary
def scrape_me(name, progress=gr.Progress()):
if name == '':
return f"Scraping not possible, empty entries found !"
#load the excel file
df_reference = pd.read_excel(excel_file, sheet_name="Sheet1")
# working with prompts in first brackets
if ',' in name:
entity_names = name.split(',')
else:
entity_names = [name]
entity_websites = []
for en_num, en in enumerate(entity_names):
if "(" in en:
start = en.find("(")
end = en.find(")")
websites = en[start+1:end].split(";")
entity_names[en_num] = en[:start].strip()
else:
entity_names[en_num] = en.strip()
websites = []
entity_websites.append(websites)
# looping thru' the entity and the prompts
count_web = 0
for en in progress.tqdm(entity_names, desc='iterating through searchable entities'):
time.sleep(0.1)
# replacing the corporate name in the question string
enum_df = df_reference.replace({"<corporate>": en}, regex=True)
# retrieving the scraped data dictionary
json_dict = get_json_dict(enum_df.head(25), entity_websites[count_web], progress, en)
# converting and saving the json file
json_object = json.dumps(json_dict, indent = 4)
json_file = f'{scraped_folder_path}/{en}.json'
with open(json_file, "w") as outfile:
outfile.write(json_object)
count_web += 1
return f"Scraped results for the following entities: {name} !"
with gr.Blocks(title='EXii Startup Scapper') as demo:
with gr.Tab("Scraping Toolbox"):
result_text = gr.Textbox(label='Debug Information', placeholder='Debug Information')
with gr.Row():
scrapin_it_digga = gr.Text(label="Startup to scrape",
info='Separate two startups by a "," and force to seach in custom URLs within "()" and separate URLs ";"',
placeholder='saturn (https://www.saturn.de/; https://www.mediamarkt.de/), cyberport (https://www.cyberport.de/)')
with gr.Row():
scrape_button = gr.Button("Start scraping")
with gr.Column():
with gr.Row():
scrapes_found = gr.Dropdown(json_found, label="Scraped startups", info="Select a scraped json files")
with gr.Row():
json_update_button = gr.Button("Update scrapped data")
with gr.Column():
# with gr.Row():
# show_the_score = gr.Button('Plot score')
sexy_plot = gr.Plot(label='Exponential Growth Score')
json_update_button.click(update_json_, inputs=scrapes_found, outputs=scrapes_found)
scrape_button.click(scrape_me, inputs=scrapin_it_digga, outputs=result_text)
# show_the_score.click(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot)
scrapes_found.change(get_comparative_score, inputs=[scrapes_found], outputs=sexy_plot)
demo.queue(concurrency_count=4).launch(debug=True)