Spaces:
Sleeping
Sleeping
import os | |
import openai | |
import datetime | |
import gradio as gr | |
import json | |
from jinja2 import Template | |
import requests | |
# Initialize OpenAI | |
openai.api_key = os.environ.get('OPENAI_API_KEY') | |
# Configuration variables | |
airtable_api_key = os.environ.get('AIRTABLE_API_KEY') | |
# Airtable table names | |
schools_table_name = 'tblFh7tguTjbjWms7' | |
prompts_table_name = 'tblYIZEB8m6JkGDEP' | |
qalog_table_name = 'tbl4oNgFPWM5xH1XO' | |
examples_table_name = 'tblu7sraOEmRgEGkp' | |
users_table_name = 'tblLNe5ZL47SvrAEk' | |
user_log_table_name = 'tblrlTsRrkl6BqMAJ' | |
students_table_name = 'tbl9h106z0Jg34DQX' | |
scoutlog_table_name = 'tblF30p0krA1FwOiD' | |
sports_table_name = 'tbler8HTzD0Jm8Woi' | |
# Define the style and content for the response field | |
label_text = "Scout Response" | |
color = "#6562F4" | |
background_color = "white" | |
border_radius = "10px" | |
response_label = f'<h3 style="color: {color}; background-color: {background_color}; border-radius: {border_radius}; padding: 10px;display: inline-block;">{label_text}</h3>' | |
base_id = 'appcUK3hUWC7GM2Kb' | |
#App name for user login logging | |
app="Scout" | |
headers = { | |
"Authorization": f"Bearer {airtable_api_key}", | |
"Content-Type": "application/json", | |
"Accept": "application/json", | |
} | |
def prompt_trim(prompt: str) -> str: | |
lines = prompt.split('\n') | |
trimmed = '\n'.join([l.strip() for l in lines]) | |
return trimmed | |
def get_students(school_selection): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{students_table_name}' | |
# Parameters for the API request to filter by 'school' field and retrieve 'policy_text' | |
params = { | |
'filterByFormula': "OR({})".format(','.join(["School='{}'".format(school) for school in school_selection])), | |
'fields[]': ['Athlete_Name','Primary_Sport','Position','Facebook_Fans','Twitter_Fans','Instagram_Fans','Interests','Ethnicity','Gender'], | |
} | |
# Initialize an empty string to store concatenated policies | |
students = '' | |
#print(params) | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
cleaned_data = [{'fields': entry['fields']} for entry in data.get('records')] | |
students = json.dumps(cleaned_data, indent=4) | |
else: | |
print("No records found in the 'students' table for the selected schools.") | |
else: | |
print(f"Failed to retrieve students data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return students | |
def get_schools(): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{schools_table_name}' | |
# Parameters for the API request to select only the 'school' field | |
params = { | |
'fields[]': 'school', # Replace with the name of your field | |
'sort[0][field]': 'school', # Sort by the 'school' field | |
'sort[0][direction]': 'asc', # Sort in ascending order | |
} | |
schools = '' | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'school' values from each record | |
schools = [record['fields']['school'] for record in data['records']] | |
# Print the list of 'school' values | |
# print(schools) | |
else: | |
print("No records found in the 'policies' table.") | |
else: | |
print(f"Failed to retrieve school data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return schools | |
def get_sports (): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{sports_table_name}' | |
# Parameters for the API request to select only the 'school' field | |
params = { | |
'fields[]': 'sport', # Replace with the name of your field | |
'sort[0][field]': 'sport', # Sort by the 'school' field | |
'sort[0][direction]': 'asc', # Sort in ascending order | |
} | |
schools = '' | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'school' values from each record | |
sports = [record['fields']['sport'] for record in data['records']] | |
# Print the list of 'school' values | |
# print(sports) | |
else: | |
print("No records found in the 'policies' table.") | |
else: | |
print(f"Failed to retrieve school data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return sports | |
def get_prompt(header, template_content): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' | |
params = { | |
'filterByFormula': "prompt_name='Scout_v1'", | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check for errors | |
response.raise_for_status() | |
data = response.json() | |
# Check if there is at least one record matching the condition | |
if data.get('records'): | |
# Get the first record (there should be only one) | |
record = data['records'][0]['fields'] | |
# Assign system_prompt and user_prompt to variables | |
header = record.get('system_prompt', '') | |
template_content = record.get('user_prompt', '') | |
return header, template_content | |
def get_examples(): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{examples_table_name}' | |
# Send your request and parse the response | |
response = requests.get(airtable_endpoint, headers=headers) | |
data = json.loads(response.text) | |
# Check for errors | |
response.raise_for_status() | |
for record in data['records']: | |
nil_question = record['fields']['nil_question'] | |
ui_examples.append([None, None, None, nil_question]) | |
#print(ui_examples) | |
def append_to_at_scountlog(school_selection, output_format, gender,interests,sports,gpt_response,number_of_athletes,social_media_channel,response_time,question_cost,prompt_tokens,completion_tokens): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{scoutlog_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'school_selection': str(school_selection), | |
'gender': str(gender), | |
'social_media_channel': str(social_media_channel), | |
'sports': str(sports), | |
'interests': str(interests), | |
'number_of_athletes': int(number_of_athletes), | |
'output_format': str(output_format), | |
'gpt_response': str(gpt_response), | |
'response_time': str(response_time), | |
'question_cost': question_cost, | |
'user_name': str(logged_in_user), | |
'prompt_tokens': prompt_tokens, | |
'completion_tokens': completion_tokens | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
#print(response.json()) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
#Chatbot Function | |
def chatbot(school_selection,gender,social_media_channel,sports,interests,output_format,number_of_athletes): | |
start_time = datetime.datetime.now() | |
students = get_students(school_selection) | |
template_content = '' | |
header = '' | |
header, template_content = get_prompt(header, template_content) | |
#print(header) | |
#print(template_content) | |
# Create a Jinja2 template from the content | |
template = Template(template_content) | |
# Render the template with the inputs | |
analysis_input = template.render(students=students, gender=gender,social_media_channel=social_media_channel,sports=sports,interests=interests,format=output_format,number_of_athletes=number_of_athletes) | |
trimmed_input = prompt_trim(analysis_input) | |
response = openai.ChatCompletion.create( | |
model="gpt-4", | |
#model="gpt-3.5-turbo", | |
temperature=0, | |
messages=[ | |
{ | |
"role": "system", | |
"content": header | |
}, | |
{ | |
"role": "user", | |
"content": analysis_input | |
} | |
] | |
) | |
gpt_response = response.choices[0].message["content"] | |
tokens_used = response.usage | |
question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03 | |
prompt_tokens = tokens_used.get('prompt_tokens',) | |
completion_tokens = tokens_used.get('completion_tokens', 0) | |
end_time = datetime.datetime.now() | |
response_time = end_time - start_time | |
global scout_response | |
scout_response = gpt_response | |
append_to_at_scountlog(school_selection, output_format, gender,interests,sports,gpt_response,number_of_athletes,social_media_channel,response_time,question_cost,prompt_tokens,completion_tokens) | |
return response_label,gpt_response | |
def log_login(username): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'user_name': str(username), | |
'app': str(app) | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
def login_auth(username, password): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' | |
# Query the 'users' table to check for a match with the provided username and password | |
params = { | |
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get('records'): | |
log_login(username) | |
global logged_in_user | |
logged_in_user = username | |
return True | |
print(f"Invalid user/password combination") | |
return False | |
#Gradio UI | |
CIMStheme = gr.themes.Soft().set(button_primary_background_fill='#6562F4') | |
# Initialize an empty list to store the examples | |
ui_examples = [] | |
school_selection = [] | |
schools = get_schools() | |
sports = get_sports() | |
get_examples() | |
logged_in_user = 'admin' | |
with gr.Blocks(CIMStheme) as iface: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Image(label="Logo",value="CIMS Logo Purple.png",width=10,show_download_button=False,interactive=False,show_label=False,elem_id="logo",container=False) | |
with gr.Column(scale=2): | |
gr.Markdown(value="# Scout - Powered by CIMS.AI") | |
with gr.Column(scale=2): | |
gr.Markdown("") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Interface(fn=chatbot, | |
inputs=[ | |
gr.components.Dropdown(schools,multiselect=True,info="Select one or more schools. This will select avaliable Student Athletes from the school(s)",label="School Selection"), | |
gr.components.Dropdown(["Male","Female"],multiselect=True,info="Select one or more.",label="Gender"), | |
gr.components.Dropdown(["Facebook","Instagram","TikTok","X (Twitter)","YouTube","SnapChat","Twitch"],multiselect=True,info="Select one or more.",label="Social Media Channel"), | |
gr.components.Dropdown(sports,multiselect=True,info="Select one or more.",label="Sports"), | |
gr.components.Dropdown(["Fashion","Sports","Food","Cooking","Travel","Arts","Music"],multiselect=True,info="Select one or more.",label="Interests"), | |
gr.components.Dropdown(["Summary","Detailed Analysis","Table"],multiselect=False,info="Select the desired output format.",label="Output Format"), | |
gr.components.Dropdown(["1","2","3","4","5","6","7","8","9","10"],multiselect=False,info="How many athletes are you looking for?",label="Number of Athletes")], | |
outputs=[ | |
gr.components.Markdown(response_label), | |
gr.components.HTML(label="Scout Response") | |
], | |
description="Use Scout to find and rank Athletes that meet your criteria", | |
allow_flagging="never", | |
cache_examples=False | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML('<center><i>CIMS.AI Confidential 2023</i></center>') | |
iface.launch(auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'",share=True) | |
#iface.launch() | |