Spaces:
Sleeping
Sleeping
import os | |
import io | |
import openai | |
import datetime | |
import time | |
import gradio as gr | |
import json | |
from jinja2 import Template | |
import requests | |
import fitz | |
from xhtml2pdf import pisa | |
from io import BytesIO | |
# Initialize OpenAI | |
openai.api_key = os.environ.get('OPENAI_API_KEY') | |
# Configuration variables | |
airtable_api_key = os.environ.get('AIRTABLE_API_KEY') | |
# Airtable table names | |
prompts_table_name = 'tblYIZEB8m6JkGDEP' | |
users_table_name = 'tblLNe5ZL47SvrAEk' | |
user_log_table_name = 'tblrlTsRrkl6BqMAJ' | |
compliancelog_table_name = 'tblQMXWKGlOonkIw2' | |
policies_table_name = 'tbla6PC65qZfqdJhE' | |
# Define the style and content for the response field | |
label_text = "Contract Redline" | |
color = "#6562F4" | |
background_color = "white" | |
border_radius = "10px" | |
# response_label = f'<h3 style="color: {color}; background-color: {background_color}; border-radius: {border_radius}; padding: 10px;display: inline-block;">{label_text}</h3>' | |
response_label = f'<span style="display: inline-block; position: relative; z-index: var(--layer-4); border: solid var(--block-title-border-width) var(--block-title-border-color); border-radius: var(--block-title-radius); background: var(--block-title-background-fill); padding: var(--block-title-padding); color: var(--block-title-text-color); font-weight: var(--block-title-text-weight); font-size: var(--block-title-text-size); line-height: var(--line-sm); margin-bottom: var(--spacing-lg);">{label_text}</span>' | |
base_id = 'appcUK3hUWC7GM2Kb' | |
# App name for user login logging | |
app = "Compliance" | |
headers = { | |
"Authorization": f"Bearer {airtable_api_key}", | |
"Content-Type": "application/json", | |
"Accept": "application/json", | |
} | |
def prompt_trim(prompt: str) -> str: | |
lines = prompt.split('\n') | |
trimmed = '\n'.join([l.strip() for l in lines]) | |
return trimmed | |
def get_policy_text(school): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' | |
# Parameters for the API request to select only the 'school' field | |
params = { | |
'filterByFormula': f'school="{school}"', | |
'fields[]': 'policy_text' | |
} | |
global policy_text | |
policy_text = '' | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'school' values from each record | |
policy_text = [record['fields']['policy_text'] for record in data['records']] | |
else: | |
print("No records found in the 'policies' table.") | |
else: | |
print(f"Failed to retrieve data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
#print(policy_text) | |
return policy_text | |
def get_schools(): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' | |
# Parameters for the API request to select only the 'school' field | |
params = { | |
'fields[]': 'school', # Replace with the name of your field | |
'sort[0][field]': 'school', # Sort by the 'school' field | |
'sort[0][direction]': 'asc', # Sort in ascending order | |
} | |
schools = '' | |
try: | |
# Send a GET request to the Airtable API | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Parse the JSON response | |
data = response.json() | |
# Check if there are records in the response | |
if data.get('records'): | |
# Extract the 'school' values from each record | |
schools = [record['fields']['school'] for record in data['records']] | |
else: | |
print("No records found in the 'policies' table.") | |
else: | |
print(f"Failed to retrieve data. Status code: {response.status_code}") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return schools | |
def get_prompt(header, template_content): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' | |
params = { | |
'filterByFormula': "prompt_name='Compliance_v1'", | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
# Check for errors | |
response.raise_for_status() | |
data = response.json() | |
# Check if there is at least one record matching the condition | |
if data.get('records'): | |
# Get the first record (there should be only one) | |
record = data['records'][0]['fields'] | |
# Assign system_prompt and user_prompt to variables | |
header = record.get('system_prompt', '') | |
template_content = record.get('user_prompt', '') | |
return header, template_content | |
def append_to_at_compliancelog(policy_name_dd,contract_text,gpt_response, response_time, question_cost, prompt_tokens, completion_tokens): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliancelog_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'policy_name': str(policy_name_dd), | |
'contract_text': str(contract_text), | |
'gpt_response': str(gpt_response), | |
'response_time': str(response_time), | |
'question_cost': question_cost, | |
'user_name': str(logged_in_user), | |
'prompt_tokens': prompt_tokens, | |
'completion_tokens': completion_tokens | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# print(response.json()) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
# Chatbot Function | |
def chatbot(policy_name_dd,contract_text,progress=gr.Progress()): | |
start_time = datetime.datetime.now() | |
progress(progress=None) | |
""" | |
time.sleep(10) | |
for i in progress.tqdm(range(100)): | |
time.sleep(1) | |
""" | |
#print(policy_name) | |
#students = get_students(school_selection) | |
get_policy_text(policy_name_dd) | |
#print(policy_text) | |
#print(contract_text) | |
template_content = '' | |
header = '' | |
header, template_content = get_prompt(header, template_content) | |
# print(header) | |
# print(template_content) | |
# Create a Jinja2 template from the content | |
template = Template(template_content) | |
# Render the template with the inputs | |
analysis_input = template.render(contract_text=contract_text,policy_text=policy_text) | |
trimmed_input = prompt_trim(analysis_input) | |
""" | |
with open('analysis_input.txt', 'w', encoding='utf-8') as out_file: | |
out_file.write(trimmed_input) | |
""" | |
gpt_model = "gpt-4-1106-preview" | |
response = openai.ChatCompletion.create( | |
#response = openai.chat.completions.create( | |
model=gpt_model, | |
temperature=0, | |
messages=[ | |
{ | |
"role": "system", | |
"content": header | |
}, | |
{ | |
"role": "user", | |
"content": analysis_input | |
} | |
] | |
) | |
gpt_response = response.choices[0].message["content"] | |
#gpt_response = response.choices[0].message.content | |
tokens_used = response.usage | |
if gpt_model == "gpt-4": | |
question_cost = (tokens_used.get('total_tokens',0) / 1000) * .03 | |
prompt_tokens = tokens_used.get('prompt_tokens',0) | |
completion_tokens = tokens_used.get('completion_tokens',0) | |
else: | |
prompt_tokens = tokens_used.get('prompt_tokens',0) | |
completion_tokens = tokens_used.get('completion_tokens',0) | |
question_cost = ((prompt_tokens / 1000) * .01) + ((completion_tokens / 1000) * .03) | |
""" | |
with open('response.txt', 'w', encoding='utf-8') as out_file: | |
out_file.write(gpt_response) | |
""" | |
end_time = datetime.datetime.now() | |
response_time = end_time - start_time | |
contract_redline = gpt_response | |
#print(contract_redline) | |
append_to_at_compliancelog(policy_name_dd,contract_text,contract_redline, response_time, question_cost, prompt_tokens, completion_tokens) | |
return {contract_redline_html: contract_redline, download_row: gr.Row(visible=True)} | |
def log_login(username): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' | |
# Organize data for Airtable | |
new_fields = { | |
'user_name': str(username), | |
'app': str(app) | |
} | |
data = { | |
'fields': new_fields | |
} | |
try: | |
# Post data to Airtable | |
response = requests.post(airtable_endpoint, headers=headers, json=data) | |
# Check for errors | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as http_error: | |
# Handle the HTTP error (e.g., log it or display an error message) | |
print(f"HTTP error occurred: {http_error}") | |
except Exception as e: | |
# Handle exceptions, log errors, or raise them as needed | |
print(f"An error occurred: {str(e)}") | |
def login_auth(username, password): | |
airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' | |
# Query the 'users' table to check for a match with the provided username and password | |
params = { | |
'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' | |
} | |
response = requests.get(airtable_endpoint, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get('records'): | |
log_login(username) | |
global logged_in_user | |
logged_in_user = username | |
return True | |
print(f"Invalid user/password combination") | |
return False | |
def extract_text_with_spacing(pdf_path): | |
document = fitz.open(pdf_path) | |
all_text = [] | |
for page in document: | |
# Extract text in a dict structure | |
blocks = page.get_text("dict")["blocks"] | |
for b in blocks: | |
if "lines" in b: # Check if the block contains lines of text | |
for line in b["lines"]: | |
span_texts = [span["text"] for span in line["spans"]] | |
all_text.append(" ".join(span_texts)) | |
all_text.append("\n") # Presume a new block is a new paragraph | |
document.close() | |
return "\n".join(all_text) | |
def pdf_to_text(contract_file_cmpt, contract_text_tbox, file_name_tbox): | |
file_text = extract_text_with_spacing(contract_file_cmpt.name) | |
#file_text = extract_text(contract_file_cmpt.name) | |
original_file_name = contract_file_cmpt.name.split("/")[-1] | |
redline_file_name = original_file_name.split(".")[0]+" Redline.pdf" | |
return file_text, redline_file_name, None, None, None | |
def convert_html_to_pdf(source_html, output_filename): | |
# Result file stream | |
result_file = open(output_filename, "w+b") | |
# Convert HTML to PDF | |
pisa_status = pisa.CreatePDF( | |
BytesIO(source_html.encode("UTF-8")), # HTML content | |
dest=result_file) # File handle to receive the result | |
# Close the result file | |
result_file.close() | |
# Return True on success and False on errors | |
return pisa_status.err | |
def download_pdf(compliance_comments,contract_redline_html,file_name_tbox): | |
#config = pdfkit.configuration(wkhtmltopdf="/usr/local/bin/wkhtmltopdf") | |
contract_redline_comments = "<h2><u>Contract Redline:</u></h2><br>"+contract_redline_html + "<br><h2><u>Compliance Comments:</u></h2><br>"+compliance_comments | |
#global pdf_download | |
#pdf_download = pdfkit.from_string(contract_redline_comments,file_name_tbox,configuration=config) | |
convert_html_to_pdf(contract_redline_comments, file_name_tbox) | |
return {pdf_download_file: file_name_tbox} | |
# Gradio UI | |
CIMStheme = gr.themes.Soft().set( | |
button_primary_background_fill='#6562F4', | |
) | |
schools = get_schools() | |
#policy_text = get_policy_text("LSU") #for testing the function call | |
# scout_response = "<!DOCTYPEhtml><html><head><title>HTMLTableGenerator</title><style>table{width:100%;border:2pxsolid#000000;padding:5px;}tableth{border:2pxsolid#000000;padding:5px;background:#f0f0f0;color:#000000;}tabletd{border:2pxsolid#000000;text-align:left;padding:5px;background:#ffffff;color:#000000;}</style></head><body><table><thead><tr><th>Rank</th><th>CIMS.AI <br>Scout Ranking</th><th>Athlete Name</th><th>Gender</th><th>Primary Sport</th><th>Social Media <br>Fan Total</th><th>Interests</th><th>Commentary on Ranking</th></tr></thead><tbody><tr><td></td><td></td><td></td><td></td><td></td><td></td><td></td><td></td></tr></tbody></table></body></html>" | |
scout_response = "<!DOCTYPEhtml><html><head><title>HTMLTableGenerator</title><style>table{width:100%;border:2pxsolid#000000;padding:5px;background:#ffffff}tableth{border:2pxsolid#000000;padding:5px;background:#f0f0f0;color:#000000;}tabletd{border:2pxsolid#000000;text-align:left;padding:5px;background:#ffffff;color:#000000;}</style></head><body><table><thead><tr><th>Rank</th><th>CIMS.AI<br>Scout<br>Ranking</th><th>Athlete Name</th><th>Gender</th><th>Primary Sport</th><th>Social Media<br>Fan Total</th><th>Interests</th><th>Commentary on Ranking</th></tr></thead><tbody><tr><td></td><td></td><td></td><td></td><td></td><td></td><td></td><td></td></tr></tbody></table></body></html>" | |
#contract_redline = "<p><strong>Campaign:</strong></p><p><strong>Engagement Name:</strong> Applebee's Fall Burger Promo</p><p><strong>Engagement Id:</strong> 7015j000001DvCAAA0</p><p><strong>Sponsor:</strong> Applebee's</p><p><strong>Start Date:</strong> 2023-10-28</p><p><strong>End Date:</strong> 2023-11-11</p><p><strong>Engagement Description</strong></p><p>The goal of the engagement is to Increase Sales by having the Student-Athlete Social Media Post. For the Social Media Post, the sponsor is requesting the student athlete take a photo in front of the <span style='text-decoration: line-through;'>football stadium</span> in eating a Applebee's burger <span style='text-decoration: line-through;'>in your team jersey</span>. The Media rights for the content will be 90 Days.</p><p><strong>Engagement Compensation.</strong></p><p>For successful completion of the engagement the student-athlete will receive payment in the form of Cash.</p><p>Part or all of the payment will be in cash, paid via PayPal.</p><p>The total value of compensation will be 250.</p>" | |
#pdf_download = pdfkit.from_string(contract_redline, False) | |
#print(pdf_download) | |
logged_in_user = 'admin' | |
file_text = '' | |
contract_text = '' | |
policy_text = '' | |
compliance_comments = '' | |
file_name = 'redline.pdf' | |
pdf_download = '' | |
with gr.Blocks(CIMStheme) as iface: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Image(label="Logo", value="CIMS Logo Purple.png", width=10, show_download_button=False, | |
interactive=False, show_label=False, elem_id="logo", container=False) | |
with gr.Column(scale=2): | |
gr.Markdown(value="<H1 style='text-align: center;'>Compliance Desktop - Powered by CIMS.AI</h1>") | |
with gr.Column(scale=2): | |
gr.Markdown("") | |
with gr.Row(): | |
with gr.Column(variant='panel',scale=1): | |
policy_name_dd = gr.components.Dropdown(schools, multiselect=False,label="NIL Policy Selection") | |
contract_file_cmpt = gr.File(label="Select Contract File",file_count="single",file_types=[".pdf"],height=150) | |
with gr.Column(variant='panel',scale=3): | |
contract_text_tbox = gr.Textbox(label="Contract Text",interactive=True,info="Upload .pdf or paste in text") | |
with gr.Row(): | |
with gr.Column(): | |
upload_btn = gr.components.Button(value="Upload Contract", size='sm', variant="primary") | |
with gr.Column(): | |
gr.Markdown("") | |
with gr.Column(): | |
redline_btn = gr.components.Button(value="Redline Contract", size='sm', variant="primary") | |
with gr.Column(): | |
gr.Markdown("") | |
with gr.Row(): | |
with gr.Column(variant='panel'): | |
gr.components.Markdown(response_label) | |
contract_redline_html = gr.components.HTML(label="Contract Redline") | |
compliance_comments_tbox = gr.Textbox(interactive=True,label='Compliance Comments') | |
with gr.Row(visible=False) as download_row: | |
with gr.Column(variant='panel'): | |
file_name_tbox = gr.Textbox(interactive=False,label='File Name',visible=False) | |
pdf_download_file = gr.File() | |
download_btn = gr.components.Button(value="Create Redline PDF", size='sm', variant="primary") | |
upload_btn.click(pdf_to_text,inputs=[contract_file_cmpt,contract_text_tbox,file_name_tbox],outputs=[contract_text_tbox,file_name_tbox,contract_redline_html,compliance_comments_tbox,pdf_download_file]) | |
redline_btn.click(chatbot,inputs=[policy_name_dd,contract_text_tbox],outputs=[contract_redline_html,download_row]) | |
download_btn.click(download_pdf,inputs=[compliance_comments_tbox,contract_redline_html,file_name_tbox],outputs=pdf_download_file) | |
with gr.Column(): | |
gr.Markdown("") | |
#pdf_download_file = gr.File() | |
#email_btn = gr.components.Button(value="Email Redline", size='sm', variant="primary") | |
#email_btn.click(send_contract_email) | |
#download_btn.click(download_pdf,inputs=[file_name_tbox,compliance_comments_tbox,contract_redline_html]) | |
with gr.Column(): | |
gr.Markdown("") | |
with gr.Column(): | |
gr.Markdown("") | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML('<center><i>© 2023 Collegiate Influencer Marketing Systems, Inc.</i><br>CIMS.AI, CIMS.AI logo, NILI, NILI logo, and EzNIL are trademarks of Collegiate Influencer Marketing Systems, Inc.</center>') | |
#iface.queue(concurrency_count=20).launch(show_api=False, auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'") | |
iface.queue().launch(show_api=False) | |