import os
import io
import openai
import datetime
import time
import gradio as gr
import json
from jinja2 import Template
import requests
import fitz
from xhtml2pdf import pisa
from io import BytesIO
#from dotenv import load_dotenv
from datetime import datetime
#load_dotenv()
# Initialize OpenAI
openai.api_key = os.environ.get('OPENAI_API_KEY')
# Configuration variables
airtable_api_key = os.environ.get('AIRTABLE_API_KEY')
# Airtable table names
prompts_table_name = 'tblYIZEB8m6JkGDEP'
users_table_name = 'tblLNe5ZL47SvrAEk'
user_log_table_name = 'tblrlTsRrkl6BqMAJ'
compliancelog_table_name = 'tblQMXWKGlOonkIw2'
policies_table_name = 'tbla6PC65qZfqdJhE'
compliance_history_table_name = 'tbltA3vCb2upKVeFT'
# Define the style and content for the response field
label_text = "Contract Redline"
color = "#6562F4"
background_color = "white"
border_radius = "10px"
# response_label = f'
{label_text}
'
response_label = f'{label_text}'
base_id = 'appcUK3hUWC7GM2Kb'
# App name for user login logging
app = "Compliance"
headers = {
"Authorization": f"Bearer {airtable_api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def prompt_trim(prompt: str) -> str:
lines = prompt.split('\n')
trimmed = '\n'.join([l.strip() for l in lines])
return trimmed
def get_policy_text(school):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}'
# Parameters for the API request to select only the 'school' field
params = {
'filterByFormula': f'school="{school}"',
'fields[]': 'policy_text'
}
global policy_text
policy_text = ''
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
# Extract the 'school' values from each record
policy_text = [record['fields']['policy_text'] for record in data['records']]
else:
print("No records found in the 'policies' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
#print(policy_text)
return policy_text
def get_schools():
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}'
# Parameters for the API request to select only the 'school' field
params = {
'fields[]': 'school', # Replace with the name of your field
'sort[0][field]': 'school', # Sort by the 'school' field
'sort[0][direction]': 'asc', # Sort in ascending order
}
schools = ''
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
# Extract the 'school' values from each record
schools = [record['fields']['school'] for record in data['records']]
else:
print("No records found in the 'policies' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
return schools
def get_prompt(header, template_content):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}'
params = {
'filterByFormula': "prompt_name='Compliance_v1'",
}
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check for errors
response.raise_for_status()
data = response.json()
# Check if there is at least one record matching the condition
if data.get('records'):
# Get the first record (there should be only one)
record = data['records'][0]['fields']
# Assign system_prompt and user_prompt to variables
header = record.get('system_prompt', '')
template_content = record.get('user_prompt', '')
return header, template_content
def append_to_at_compliancelog(policy_name_dd,contract_text,gpt_response, response_time, question_cost, prompt_tokens, completion_tokens):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliancelog_table_name}'
# Organize data for Airtable
new_fields = {
'policy_name': str(policy_name_dd),
'contract_text': str(contract_text),
'gpt_response': str(gpt_response),
'response_time': str(response_time),
'question_cost': question_cost,
'user_name': str(logged_in_user),
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens
}
data = {
'fields': new_fields
}
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
# print(response.json())
# Check for errors
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
def format_date(date_str):
# Convert YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS.000Z to MM/DD/YYYY HH:MM:SS format
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
return date_obj.strftime("%m/%d/%Y")
except ValueError:
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
return date_obj.strftime("%m/%d/%Y %H:%M")
except ValueError:
return date_str
def get_compliance_history():
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliance_history_table_name}'
params = {
#'filterByFormula': f'school="{policy_name_dd}"',
'fields[]': ['sponsor', 'contract_value', 'student_name', 'student_email', 'start_date', 'end_date', 'status', 'Created'],
'sort[0][field]': 'Created',
'sort[0][direction]': 'desc',
}
compliance_fields = [
'sponsor', 'contract_value', 'student_name', 'student_email', 'start_date', 'end_date', 'status', 'Created'
]
compliance_records = []
try:
# Send a GET request to the Airtable API
response = requests.get(airtable_endpoint, headers=headers, params=params)
# Check if the request was successful (status code 200)
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if there are records in the response
if data.get('records'):
for record in data['records']:
# Use list comprehension to create the record in the correct field order
record_data = [
format_date(record['fields'][field]) if field in ['start_date', 'end_date', 'Created'] else
record['fields'][field]
for field in compliance_fields
]
compliance_records.append(record_data)
else:
print("No records found in the 'compliance history' table.")
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")
except Exception as e:
print(f"An error occurred: {str(e)}")
#print(compliance_records)
return compliance_records
def append_to_at_compliance_history(policy_name_dd,contract_redline_html,compliance_comments_tbox,sponsor_tbox,compensation_num,status_ddss,name_tbox,email_tbox,start_date_tbox,end_date_tbox):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliance_history_table_name}'
# Organize data for Airtable
new_fields = {
'school': str(policy_name_dd),
'sponsor': str(sponsor_tbox),
'contract_redline': str(contract_redline_html),
'compliance_comments': str(compliance_comments_tbox),
'contract_value': compensation_num,
'status': str(status_ddss),
'student_name': str(name_tbox),
'student_email': str(email_tbox),
'start_date': str(start_date_tbox),
'end_date': str(end_date_tbox)
}
data = {
'fields': new_fields
}
if (
not policy_name_dd or
not contract_redline_html or
not compliance_comments_tbox or
not sponsor_tbox or
not compensation_num or
not status_ddss or
not name_tbox or
not email_tbox or
not start_date_tbox or
not end_date_tbox
):
gr.Warning("One or more fields are blank. Contract cannot be saved")
return
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
#print(response.json())
# Check for errors
response.raise_for_status()
gr.Info("Contract Saved")
compliance_history = get_compliance_history()
return {compliance_history_df: compliance_history }
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
# Chatbot Function
def chatbot(policy_name_dd,contract_text,progress=gr.Progress()):
start_time = datetime.now()
progress(progress=None)
"""
time.sleep(10)
for i in progress.tqdm(range(100)):
time.sleep(1)
"""
#print(policy_name)
#students = get_students(school_selection)
get_policy_text(policy_name_dd)
#print(policy_text)
#print(contract_text)
template_content = ''
header = ''
header, template_content = get_prompt(header, template_content)
# print(header)
# print(template_content)
# Create a Jinja2 template from the content
template = Template(template_content)
# Render the template with the inputs
analysis_input = template.render(contract_text=contract_text,policy_text=policy_text)
trimmed_input = prompt_trim(analysis_input)
with open('analysis_input.txt', 'w', encoding='utf-8') as out_file:
out_file.write(trimmed_input)
gpt_model = "gpt-4-1106-preview"
response = openai.ChatCompletion.create(
model=gpt_model,
temperature=0,
messages=[
{
"role": "system",
"content": header
},
{
"role": "user",
"content": analysis_input
}
]
)
gpt_response = response.choices[0].message["content"]
tokens_used = response.usage
if gpt_model == "gpt-4":
question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03
prompt_tokens = tokens_used.get('prompt_tokens', )
completion_tokens = tokens_used.get('completion_tokens', 0)
else:
prompt_tokens = tokens_used.get('prompt_tokens', )
completion_tokens = tokens_used.get('completion_tokens', 0)
question_cost = ((prompt_tokens / 1000) * .01) + ((completion_tokens / 1000) * .03)
with open('response.txt', 'w', encoding='utf-8') as out_file:
out_file.write(gpt_response)
end_time = datetime.now()
response_time = end_time - start_time
data = json.loads(gpt_response.replace("```json", "").replace("```", "").strip())
global student_name, student_email, sponsor_name, start_date, end_date
student_name = data['header']['Student Name']
student_email = data['header']['Student Email']
sponsor_name = data['header']['Sponsor Name']
start_date = data['header']['Start Date']
end_date = data['header']['End Date']
# Extracting data from the 'body' node
html_content = data['body']
append_to_at_compliancelog(policy_name_dd,contract_text,html_content, response_time, question_cost, prompt_tokens, completion_tokens)
return {contract_redline_html: html_content, download_row: gr.Row(visible=True)}
def log_login(username):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}'
# Organize data for Airtable
new_fields = {
'user_name': str(username),
'app': str(app)
}
data = {
'fields': new_fields
}
try:
# Post data to Airtable
response = requests.post(airtable_endpoint, headers=headers, json=data)
# Check for errors
response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
# Handle the HTTP error (e.g., log it or display an error message)
print(f"HTTP error occurred: {http_error}")
except Exception as e:
# Handle exceptions, log errors, or raise them as needed
print(f"An error occurred: {str(e)}")
def login_auth(username, password):
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}'
# Query the 'users' table to check for a match with the provided username and password
params = {
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")'
}
response = requests.get(airtable_endpoint, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
if data.get('records'):
log_login(username)
global logged_in_user
logged_in_user = username
#gr.Info("Login Success")
return {login_row: gr.Row(visible=False), app_row: gr.Row(visible=True)}
#gr.Info("Login Failed")
print(f"Invalid user/password combination")
return {login_row: gr.Row(visible=True), app_row: gr.Row(visible=False)}
def extract_text_with_spacing(pdf_path):
document = fitz.open(pdf_path)
all_text = []
for page in document:
# Extract text in a dict structure
blocks = page.get_text("dict")["blocks"]
for b in blocks:
if "lines" in b: # Check if the block contains lines of text
for line in b["lines"]:
span_texts = [span["text"] for span in line["spans"]]
all_text.append(" ".join(span_texts))
all_text.append("\n") # Presume a new block is a new paragraph
document.close()
return "\n".join(all_text)
def pdf_to_text(contract_file_cmpt, contract_text_tbox, file_name_tbox):
file_text = extract_text_with_spacing(contract_file_cmpt.name)
#file_text = extract_text(contract_file_cmpt.name)
original_file_name = contract_file_cmpt.name.split("/")[-1]
redline_file_name = original_file_name.split(".")[0]+" Redline.pdf"
return file_text, redline_file_name, None, None, None
def convert_html_to_pdf(source_html, output_filename):
# Result file stream
result_file = open(output_filename, "w+b")
# Convert HTML to PDF
pisa_status = pisa.CreatePDF(
BytesIO(source_html.encode("UTF-8")), # HTML content
dest=result_file) # File handle to receive the result
# Close the result file
result_file.close()
# Return True on success and False on errors
return pisa_status.err
def download_pdf(compliance_comments,contract_redline_html,file_name_tbox):
#config = pdfkit.configuration(wkhtmltopdf="/usr/local/bin/wkhtmltopdf")
contract_redline_comments = "Contract Redline:
"+contract_redline_html + "
Compliance Comments:
"+compliance_comments
#global pdf_download
#pdf_download = pdfkit.from_string(contract_redline_comments,file_name_tbox,configuration=config)
convert_html_to_pdf(contract_redline_comments, file_name_tbox)
return {pdf_download_file: file_name_tbox}
def change_tab(contract_text):
if (
not contract_text
):
gr.Warning("Contract Text is blank.")
return False
return gr.Tabs(selected=1)
def update_tboxes():
return {sponsor_tbox: sponsor_name, name_tbox: student_name, email_tbox: student_email, start_date_tbox: start_date, end_date_tbox: end_date}
def policy_name_change(policy_name_dd):
gr.Info(f'Policy Name Changed to {policy_name_dd}')
# Gradio UI
CIMStheme = gr.themes.Soft().set(
button_primary_background_fill='#6562F4',
)
schools = get_schools()
#compliance_history = get_compliance_history(["Wake Forest University"])
#compliance_history = get_compliance_history()
#policy_text = get_policy_text("LSU") #for testing the function call
#contract_redline = "Campaign:
Engagement Name: Applebee's Fall Burger Promo
Engagement Id: 7015j000001DvCAAA0
Sponsor: Applebee's
Start Date: 2023-10-28
End Date: 2023-11-11
Engagement Description
The goal of the engagement is to Increase Sales by having the Student-Athlete Social Media Post. For the Social Media Post, the sponsor is requesting the student athlete take a photo in front of the football stadium in eating a Applebee's burger in your team jersey. The Media rights for the content will be 90 Days.
Engagement Compensation.
For successful completion of the engagement the student-athlete will receive payment in the form of Cash.
Part or all of the payment will be in cash, paid via PayPal.
The total value of compensation will be 250.
"
#pdf_download = pdfkit.from_string(contract_redline, False)
#print(pdf_download)
logged_in_user = 'admin'
file_text = ''
contract_text = ''
policy_text = ''
compliance_comments = ''
file_name = 'redline.pdf'
pdf_download = ''
with gr.Blocks(CIMStheme) as iface:
with gr.Row(visible=False) as app_row:
with gr.Column():
with gr.Row():
with gr.Column(scale=2):
gr.Image(label="Logo", value="Nili_v2_Character.png", width=100, height=100, show_download_button=False,
interactive=False, show_label=False, elem_id="logo", container=False)
with gr.Column(scale=2):
gr.Markdown(value="NILI Compliance Desktop
")
with gr.Column(scale=2):
gr.Markdown("")
with gr.Tabs() as tabs:
with gr.Tab(label="Contract Upload", id=0) as upload_tab:
with gr.Row():
with gr.Column(variant='panel',scale=1):
contract_file_cmpt = gr.File(label="Select Contract File",file_count="single",file_types=[".pdf"],height=150)
with gr.Column(variant='panel',scale=4):
with gr.Row():
with gr.Column(variant='panel'):
contract_text_tbox = gr.Textbox(label="Contract Text",interactive=True,info="Upload .pdf or paste in text. Shift-Enter to add a line")
with gr.Row():
with gr.Column(scale=1):
upload_btn = gr.components.Button(value="Upload Contract", size='sm', variant="primary")
with gr.Column(scale=2):
gr.Markdown("")
with gr.Column(scale=1):
redline_btn = gr.components.Button(value="Redline Contract", size='sm', variant="primary")
with gr.Column(scale=2):
gr.Markdown("")
with gr.Tab(label="Contract Redline", id=1) as redline_tab:
with gr.Row(variant='panel'):
with gr.Column():
sponsor_tbox = gr.Textbox(label="Sponsor:", interactive=True)
compensation_num = gr.Number(label="Contract Value",value=0)
status_ddss = gr.Dropdown(["Pending","Approved","Rejected"],multiselect=False,label="Status", value="Pending")
with gr.Column():
name_tbox = gr.Textbox(label="Name:", interactive=True)
email_tbox = gr.Textbox(label="Email:", interactive=True, type='email',placeholder='xxxxxxx@xxxxxx.xxx')
with gr.Column():
start_date_tbox = gr.Textbox(label="Start Date:", interactive=True, placeholder='MM/DD/YYYY')
end_date_tbox = gr.Textbox(label="End Date:", interactive=True, placeholder='MM/DD/YYYY')
with gr.Row():
with gr.Column(variant='panel'):
gr.components.Markdown(response_label)
contract_redline_html = gr.HTML(label="Contract Redline")
compliance_comments_tbox = gr.Textbox(interactive=True,label='Compliance Comments')
with gr.Row():
with gr.Column():
save_btn = gr.Button(value="Save Contract", size='sm', variant="primary")
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Row(visible=False) as download_row:
with gr.Column(variant='panel'):
file_name_tbox = gr.Textbox(interactive=False,label='File Name',visible=False)
pdf_download_file = gr.File()
download_btn = gr.Button(value="Create Redline PDF", size='sm', variant="primary")
upload_btn.click(pdf_to_text,inputs=[contract_file_cmpt,contract_text_tbox,file_name_tbox],outputs=[contract_text_tbox,file_name_tbox,contract_redline_html,compliance_comments_tbox,pdf_download_file])
download_btn.click(download_pdf,inputs=[compliance_comments_tbox,contract_redline_html,file_name_tbox],outputs=pdf_download_file)
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Tab(label="History", id=3) as history_tab:
with gr.Row():
with gr.Column(variant='panel'):
compliance_history_df = gr.Dataframe(get_compliance_history,
headers=["Sponsor", "Contract Value", "Student Name", "Student Email", "Start Date", "End Date", "Status","Created"],
datatype=["str", "number", "str", "str", "date", "date", "str","date"],
label="Contract History",
interactive=False
)
with gr.Tab(label="Settings", id=4) as settings_tab:
with gr.Row():
with gr.Column(variant='panel',scale=1):
policy_name_dd = gr.Dropdown(schools, multiselect=False,label="NIL Policy Selection", value="Wake Forest University")
policy_name_dd.change(policy_name_change,inputs=policy_name_dd,outputs=None)
redline_btn.click(change_tab, inputs=contract_text_tbox, outputs=tabs).success(chatbot,inputs=[policy_name_dd,contract_text_tbox],
outputs=[contract_redline_html,download_row]).then(update_tboxes,inputs=None,
outputs=[sponsor_tbox,name_tbox,email_tbox,start_date_tbox,end_date_tbox])
save_btn.click(append_to_at_compliance_history,inputs=[policy_name_dd,contract_redline_html,compliance_comments_tbox,sponsor_tbox,compensation_num,status_ddss,name_tbox,email_tbox,start_date_tbox,end_date_tbox],outputs=compliance_history_df)
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Column():
gr.Markdown("")
with gr.Row(visible=True) as login_row:
with gr.Column():
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("")
with gr.Column(scale=1):
with gr.Row():
gr.Image(label="Logo", value="Nili_v2_Character.png", width=200, height=200,
show_download_button=False,
interactive=False, show_label=False, elem_id="logo", container=False)
with gr.Row():
gr.Markdown(value="NILI Login
")
with gr.Column(scale=2):
gr.Markdown("")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("")
with gr.Column(scale=1, variant='panel'):
username_tbox = gr.Textbox(label="User Name", interactive=True)
password_tbox = gr.Textbox(label="Password", interactive=True, type='password')
submit_btn = gr.Button(value='Submit', variant='primary', size='sm')
submit_btn.click(login_auth, inputs=[username_tbox, password_tbox], outputs=[login_row,app_row])
with gr.Column(scale=2):
gr.Markdown("")
with gr.Row():
with gr.Column(scale=4):
gr.HTML('© 2023 Collegiate Influencer Marketing Systems, Inc.
CIMS.AI, CIMS.AI logo, NILI, NILI logo, and EzNIL are trademarks of Collegiate Influencer Marketing Systems, Inc.')
iface.queue()
#iface.queue(concurrency_count=20).launch(auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'")
iface.launch()