Spaces:
Sleeping
Sleeping
import os | |
import openai | |
import datetime | |
import gradio as gr | |
import json | |
from jinja2 import Template | |
import requests | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Initialize OpenAI | |
openai.api_key = os.environ.get('OPENAI_API_KEY') | |
# Configuration variables | |
airtable_api_key = os.environ.get('AIRTABLE_API_KEY') | |
# Airtable table names | |
policies_table_name = 'tbla6PC65qZfqdJhE' | |
prompts_table_name = 'tblYIZEB8m6JkGDEP' | |
qalog_table_name = 'tbl4oNgFPWM5xH1XO' | |
examples_table_name = 'tblu7sraOEmRgEGkp' | |
users_table_name = 'tblLNe5ZL47SvrAEk' | |
user_log_table_name = 'tblrlTsRrkl6BqMAJ' | |
# Define the style and content for the response field | |
label_text = "NILI Response" | |
color = "#6562F4" | |
background_color = "white" | |
border_radius = "10px" | |
response_label = f'<h3 style="color: {color}; background-color: {background_color}; border-radius: {border_radius}; padding: 10px;display: inline-block;">{label_text}</h3>' | |
#Airtable Base ID | |
base_id = 'appcUK3hUWC7GM2Kb' | |
#Name of the prompt temlate record | |
prompt_name = "NILI_v1" | |
#Header for the Airtable requests | |
headers = { | |
"Authorization": f"Bearer {airtable_api_key}", | |
"Content-Type": "application/json", | |
"Accept": "application/json", | |
} | |
#Function to trim prompts....not used | |
def prompt_trim(prompt: str) -> str: | |
lines = prompt.split('\n') | |
trimmed = '\n'.join([l.strip() for l in lines]) | |
return trimmed | |
#Get the policies for the selected schools and concatenate them. | |
def get_policies(school_selection): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' | |
# Parameters for the API request to filter by 'school' field and retrieve 'policy_text' | |
params = { | |
'filterByFormula': "OR({})".format(','.join(["school='{}'".format(school) for school in school_selection])), | |
'fields[]': 'policy_text', # Replace with the name of your field | |
} | |
# Initialize an empty string to store concatenated policies | |
concatenated_policies = '' | |
#print(params) | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'policy_text' values from each record and concatenate them | |
for record in data['records']: | |
policy_text = record['fields']['policy_text'] | |
if concatenated_policies: | |
concatenated_policies += "\n----------\n" | |
concatenated_policies += policy_text | |
else: | |
print("No records found in the 'policies' table for the selected schools.") | |
else: | |
print(f"Failed to retrieve data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
#print(concatenated_policies) | |
return concatenated_policies | |
#Get a list of School Name from the policies for the UI dropdown | |
def get_schools(): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' | |
# Parameters for the API request to select only the 'school' field | |
params = { | |
'fields[]': 'school', # Replace with the name of your field | |
'sort[0][field]': 'school', # Sort by the 'school' field | |
'sort[0][direction]': 'asc', # Sort in ascending order | |
} | |
schools = '' | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'school' values from each record | |
schools = [record['fields']['school'] for record in data['records']] | |
else: | |
print("No records found in the 'policies' table.") | |
else: | |
print(f"Failed to retrieve data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return schools | |
#Get the designated prompt template record | |
def get_prompt(header, template_content): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' | |
params = { | |
'filterByFormula': "prompt_name='NILI_v1'" | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check for errors | |
response.raise_for_status() | |
data = response.json() | |
# Check if there is at least one record matching the condition | |
if data.get('records'): | |
# Get the first record (there should be only one) | |
record = data['records'][0]['fields'] | |
# Assign system_prompt and user_prompt to variables | |
header = record.get('system_prompt', '') | |
template_content = record.get('user_prompt', '') | |
return header, template_content | |
def get_examples(): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{examples_table_name}' | |
# Send your request and parse the response | |
response = requests.get(airtable_endpoint, headers=headers) | |
data = json.loads(response.text) | |
# Check for errors | |
response.raise_for_status() | |
for record in data['records']: | |
nil_question = record['fields']['nil_question'] | |
ui_examples.append([None, None, None, nil_question]) | |
#print(ui_examples) | |
def append_to_at_qalog(your_role, school_selection, output_format, input_text, gpt_response,response_time,question_cost,prompt_tokens,completion_tokens): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{qalog_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'your_role': str(your_role), | |
'school_selection': str(school_selection), | |
'output_format': str(output_format), | |
'input_text': str(input_text), | |
'gpt_response': str(gpt_response), | |
'response_time': str(response_time), | |
'question_cost': question_cost, | |
'user_name': str(logged_in_user), | |
'prompt_tokens': prompt_tokens, | |
'completion_tokens': completion_tokens | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
#Chatbot Function | |
def chatbot(your_role,school_selection,output_format,input_text): | |
start_time = datetime.datetime.now() | |
# school_selection holds an array of one or more schools | |
#print(school_selection) | |
# Read the Hydrated policies | |
policies = get_policies(school_selection) | |
template_content = '' | |
header = '' | |
header, template_content = get_prompt(header, template_content) | |
#print(header) | |
#print(template_content) | |
header_template = Template(header) | |
merged_header = header_template.render(your_role=your_role) | |
# Create a Jinja2 template from the content | |
template = Template(template_content) | |
# Render the template with the policy JSON | |
analysis_input = template.render(policies=policies, question=input_text,format=output_format,your_role=your_role) | |
trimmed_input = prompt_trim(analysis_input) | |
with open('analysis_input.txt', 'w', encoding='utf-8') as out_file: | |
out_file.write(trimmed_input) | |
response = openai.ChatCompletion.create( | |
model="gpt-4", | |
#model="gpt-3.5-turbo", | |
temperature=0, | |
messages=[ | |
{ | |
"role": "system", | |
"content": merged_header | |
}, | |
{ | |
"role": "user", | |
"content": analysis_input | |
} | |
] | |
) | |
gpt_response = response.choices[0].message["content"] | |
tokens_used = response.usage | |
question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03 | |
prompt_tokens = tokens_used.get('prompt_tokens',) | |
completion_tokens = tokens_used.get('completion_tokens', 0) | |
""" | |
with open('response.txt', 'w', encoding='utf-8') as out_file: | |
out_file.write(gpt_response) | |
""" | |
end_time = datetime.datetime.now() | |
response_time = end_time - start_time | |
append_to_at_qalog(your_role, school_selection, output_format, input_text, gpt_response,response_time,question_cost,prompt_tokens,completion_tokens) | |
return response_label,gpt_response | |
def log_login(username): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'user_name': str(username), | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
def login_auth(username, password): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' | |
# Query the 'users' table to check for a match with the provided username and password | |
params = { | |
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
#If the matching user/password record is found: | |
if data.get('records'): | |
#Log that the user logged in | |
log_login(username) | |
#Set the global logged_in_user variable. This used in the append_to_at_qalog function to track what user asked the question | |
global logged_in_user | |
logged_in_user = username | |
return True | |
print(f"Invalid user/password combination") | |
return False | |
#Gradio UI | |
CIMStheme = gr.themes.Soft().set(button_primary_background_fill='#6562F4') | |
# Initialize an empty list to store the examples | |
ui_examples = [] | |
school_selection = [] | |
schools = get_schools() | |
get_examples() | |
logged_in_user = 'admin' | |
with gr.Blocks(CIMStheme) as iface: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Image(label="Logo",value="CIMS Logo Purple.png",width=10,show_download_button=False,interactive=False,show_label=False,elem_id="logo",container=False) | |
with gr.Column(scale=2): | |
#gr.Textbox(value="# NILI - Powered by CIMS.AI",show_label=False,interactive=False,text_align="center",elem_id="CIMSTitle") | |
gr.Markdown(value="# NILI - Powered by CIMS.AI") | |
with gr.Column(scale=2): | |
gr.Markdown("") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Interface(fn=chatbot, | |
inputs=[ | |
gr.components.Dropdown(["Student Athlete","Parent","Athletic Director"],multiselect=False,info="Select a role.",label="User Role", ), | |
gr.components.Dropdown(schools,multiselect=True,info="Select one or more schools. This will help set the context of your question.",label="School Context"), | |
gr.components.Dropdown(["Summary","Detailed Analysis","Table"],multiselect=False,info="Select the desired output format.",label="Output Format"), | |
gr.components.Textbox(lines=5, placeholder="Enter your question here", label="NIL Question")], | |
outputs=[ | |
gr.components.Markdown(response_label), | |
gr.components.HTML(label="NILI Response") | |
], | |
description="Ask any question about Name, Image, Likeness (NIL)", | |
allow_flagging="manual", | |
examples=ui_examples, | |
cache_examples=False, | |
flagging_options=["The response is incorrect","The response is inappropriate","The response doesn't make sense"] | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML('<center><i>CIMS.AI Confidential 2023</i></center>') | |
iface.launch(auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'") | |
#iface.launch(share=True) | |