import os import io import openai import datetime import time import gradio as gr import json from jinja2 import Template import requests import fitz from xhtml2pdf import pisa from io import BytesIO #from dotenv import load_dotenv from datetime import datetime #load_dotenv() # Initialize OpenAI openai.api_key = os.environ.get('OPENAI_API_KEY') # Configuration variables airtable_api_key = os.environ.get('AIRTABLE_API_KEY') # Airtable table names prompts_table_name = 'tblYIZEB8m6JkGDEP' users_table_name = 'tblLNe5ZL47SvrAEk' user_log_table_name = 'tblrlTsRrkl6BqMAJ' compliancelog_table_name = 'tblQMXWKGlOonkIw2' policies_table_name = 'tbla6PC65qZfqdJhE' compliance_history_table_name = 'tbltA3vCb2upKVeFT' # Define the style and content for the response field label_text = "Contract Redline" color = "#6562F4" background_color = "white" border_radius = "10px" # response_label = f'

{label_text}

' response_label = f'{label_text}' base_id = 'appcUK3hUWC7GM2Kb' # App name for user login logging app = "Compliance" headers = { "Authorization": f"Bearer {airtable_api_key}", "Content-Type": "application/json", "Accept": "application/json", } def prompt_trim(prompt: str) -> str: lines = prompt.split('\n') trimmed = '\n'.join([l.strip() for l in lines]) return trimmed def get_policy_text(school): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' # Parameters for the API request to select only the 'school' field params = { 'filterByFormula': f'school="{school}"', 'fields[]': 'policy_text' } global policy_text policy_text = '' try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): # Extract the 'school' values from each record policy_text = [record['fields']['policy_text'] for record in data['records']] else: print("No records found in the 'policies' table.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") #print(policy_text) return policy_text def get_schools(): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' # Parameters for the API request to select only the 'school' field params = { 'fields[]': 'school', # Replace with the name of your field 'sort[0][field]': 'school', # Sort by the 'school' field 'sort[0][direction]': 'asc', # Sort in ascending order } schools = '' try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): # Extract the 'school' values from each record schools = [record['fields']['school'] for record in data['records']] else: print("No records found in the 'policies' table.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") return schools def get_prompt(header, template_content): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' params = { 'filterByFormula': "prompt_name='Compliance_v1'", } response = requests.get(airtable_endpoint, headers=headers, params=params) # Check for errors response.raise_for_status() data = response.json() # Check if there is at least one record matching the condition if data.get('records'): # Get the first record (there should be only one) record = data['records'][0]['fields'] # Assign system_prompt and user_prompt to variables header = record.get('system_prompt', '') template_content = record.get('user_prompt', '') return header, template_content def append_to_at_compliancelog(policy_name_dd,contract_text,gpt_response, response_time, question_cost, prompt_tokens, completion_tokens): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliancelog_table_name}' # Organize data for Airtable new_fields = { 'policy_name': str(policy_name_dd), 'contract_text': str(contract_text), 'gpt_response': str(gpt_response), 'response_time': str(response_time), 'question_cost': question_cost, 'user_name': str(logged_in_user), 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens } data = { 'fields': new_fields } try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) # print(response.json()) # Check for errors response.raise_for_status() except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") def format_date(date_str): # Convert YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS.000Z to MM/DD/YYYY HH:MM:SS format try: date_obj = datetime.strptime(date_str, "%Y-%m-%d") return date_obj.strftime("%m/%d/%Y") except ValueError: try: date_obj = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") return date_obj.strftime("%m/%d/%Y %H:%M") except ValueError: return date_str def get_compliance_history(): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliance_history_table_name}' params = { #'filterByFormula': f'school="{policy_name_dd}"', 'fields[]': ['sponsor', 'contract_value', 'student_name', 'student_email', 'start_date', 'end_date', 'status', 'Created'], 'sort[0][field]': 'Created', 'sort[0][direction]': 'desc', } compliance_fields = [ 'sponsor', 'contract_value', 'student_name', 'student_email', 'start_date', 'end_date', 'status', 'Created' ] compliance_records = [] try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): for record in data['records']: # Use list comprehension to create the record in the correct field order record_data = [ format_date(record['fields'][field]) if field in ['start_date', 'end_date', 'Created'] else record['fields'][field] for field in compliance_fields ] compliance_records.append(record_data) else: print("No records found in the 'compliance history' table.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") #print(compliance_records) return compliance_records def append_to_at_compliance_history(policy_name_dd,contract_redline_html,compliance_comments_tbox,sponsor_tbox,compensation_num,status_ddss,name_tbox,email_tbox,start_date_tbox,end_date_tbox): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliance_history_table_name}' # Organize data for Airtable new_fields = { 'school': str(policy_name_dd), 'sponsor': str(sponsor_tbox), 'contract_redline': str(contract_redline_html), 'compliance_comments': str(compliance_comments_tbox), 'contract_value': compensation_num, 'status': str(status_ddss), 'student_name': str(name_tbox), 'student_email': str(email_tbox), 'start_date': str(start_date_tbox), 'end_date': str(end_date_tbox) } data = { 'fields': new_fields } if ( not policy_name_dd or not contract_redline_html or not compliance_comments_tbox or not sponsor_tbox or not compensation_num or not status_ddss or not name_tbox or not email_tbox or not start_date_tbox or not end_date_tbox ): gr.Warning("One or more fields are blank. Contract cannot be saved") return try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) #print(response.json()) # Check for errors response.raise_for_status() gr.Info("Contract Saved") compliance_history = get_compliance_history() return {compliance_history_df: compliance_history } except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") # Chatbot Function def chatbot(policy_name_dd,contract_text,progress=gr.Progress()): start_time = datetime.now() progress(progress=None) """ time.sleep(10) for i in progress.tqdm(range(100)): time.sleep(1) """ #print(policy_name) #students = get_students(school_selection) get_policy_text(policy_name_dd) #print(policy_text) #print(contract_text) template_content = '' header = '' header, template_content = get_prompt(header, template_content) # print(header) # print(template_content) # Create a Jinja2 template from the content template = Template(template_content) # Render the template with the inputs analysis_input = template.render(contract_text=contract_text,policy_text=policy_text) trimmed_input = prompt_trim(analysis_input) with open('analysis_input.txt', 'w', encoding='utf-8') as out_file: out_file.write(trimmed_input) gpt_model = "gpt-4-1106-preview" response = openai.ChatCompletion.create( model=gpt_model, temperature=0, messages=[ { "role": "system", "content": header }, { "role": "user", "content": analysis_input } ] ) gpt_response = response.choices[0].message["content"] tokens_used = response.usage if gpt_model == "gpt-4": question_cost = (tokens_used.get('total_tokens', 0) / 1000) * .03 prompt_tokens = tokens_used.get('prompt_tokens', ) completion_tokens = tokens_used.get('completion_tokens', 0) else: prompt_tokens = tokens_used.get('prompt_tokens', ) completion_tokens = tokens_used.get('completion_tokens', 0) question_cost = ((prompt_tokens / 1000) * .01) + ((completion_tokens / 1000) * .03) with open('response.txt', 'w', encoding='utf-8') as out_file: out_file.write(gpt_response) end_time = datetime.now() response_time = end_time - start_time data = json.loads(gpt_response.replace("```json", "").replace("```", "").strip()) global student_name, student_email, sponsor_name, start_date, end_date student_name = data['header']['Student Name'] student_email = data['header']['Student Email'] sponsor_name = data['header']['Sponsor Name'] start_date = data['header']['Start Date'] end_date = data['header']['End Date'] # Extracting data from the 'body' node html_content = data['body'] append_to_at_compliancelog(policy_name_dd,contract_text,html_content, response_time, question_cost, prompt_tokens, completion_tokens) return {contract_redline_html: html_content, download_row: gr.Row(visible=True)} def log_login(username): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' # Organize data for Airtable new_fields = { 'user_name': str(username), 'app': str(app) } data = { 'fields': new_fields } try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) # Check for errors response.raise_for_status() except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") def login_auth(username, password): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' # Query the 'users' table to check for a match with the provided username and password params = { 'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' } response = requests.get(airtable_endpoint, headers=headers, params=params) if response.status_code == 200: data = response.json() if data.get('records'): log_login(username) global logged_in_user logged_in_user = username #gr.Info("Login Success") return {login_row: gr.Row(visible=False), app_row: gr.Row(visible=True)} #gr.Info("Login Failed") print(f"Invalid user/password combination") return {login_row: gr.Row(visible=True), app_row: gr.Row(visible=False)} def extract_text_with_spacing(pdf_path): document = fitz.open(pdf_path) all_text = [] for page in document: # Extract text in a dict structure blocks = page.get_text("dict")["blocks"] for b in blocks: if "lines" in b: # Check if the block contains lines of text for line in b["lines"]: span_texts = [span["text"] for span in line["spans"]] all_text.append(" ".join(span_texts)) all_text.append("\n") # Presume a new block is a new paragraph document.close() return "\n".join(all_text) def pdf_to_text(contract_file_cmpt, contract_text_tbox, file_name_tbox): file_text = extract_text_with_spacing(contract_file_cmpt.name) #file_text = extract_text(contract_file_cmpt.name) original_file_name = contract_file_cmpt.name.split("/")[-1] redline_file_name = original_file_name.split(".")[0]+" Redline.pdf" return file_text, redline_file_name, None, None, None def convert_html_to_pdf(source_html, output_filename): # Result file stream result_file = open(output_filename, "w+b") # Convert HTML to PDF pisa_status = pisa.CreatePDF( BytesIO(source_html.encode("UTF-8")), # HTML content dest=result_file) # File handle to receive the result # Close the result file result_file.close() # Return True on success and False on errors return pisa_status.err def download_pdf(compliance_comments,contract_redline_html,file_name_tbox): #config = pdfkit.configuration(wkhtmltopdf="/usr/local/bin/wkhtmltopdf") contract_redline_comments = "

Contract Redline:


"+contract_redline_html + "

Compliance Comments:


"+compliance_comments #global pdf_download #pdf_download = pdfkit.from_string(contract_redline_comments,file_name_tbox,configuration=config) convert_html_to_pdf(contract_redline_comments, file_name_tbox) return {pdf_download_file: file_name_tbox} def change_tab(contract_text): if ( not contract_text ): gr.Warning("Contract Text is blank.") return False return gr.Tabs(selected=1) def update_tboxes(): return {sponsor_tbox: sponsor_name, name_tbox: student_name, email_tbox: student_email, start_date_tbox: start_date, end_date_tbox: end_date} def policy_name_change(policy_name_dd): gr.Info(f'Policy Name Changed to {policy_name_dd}') # Gradio UI CIMStheme = gr.themes.Soft().set( button_primary_background_fill='#6562F4', ) schools = get_schools() #compliance_history = get_compliance_history(["Wake Forest University"]) #compliance_history = get_compliance_history() #policy_text = get_policy_text("LSU") #for testing the function call #contract_redline = "

Campaign:

Engagement Name: Applebee's Fall Burger Promo

Engagement Id: 7015j000001DvCAAA0

Sponsor: Applebee's

Start Date: 2023-10-28

End Date: 2023-11-11

Engagement Description

The goal of the engagement is to Increase Sales by having the Student-Athlete Social Media Post. For the Social Media Post, the sponsor is requesting the student athlete take a photo in front of the football stadium in eating a Applebee's burger in your team jersey. The Media rights for the content will be 90 Days.

Engagement Compensation.

For successful completion of the engagement the student-athlete will receive payment in the form of Cash.

Part or all of the payment will be in cash, paid via PayPal.

The total value of compensation will be 250.

" #pdf_download = pdfkit.from_string(contract_redline, False) #print(pdf_download) logged_in_user = 'admin' file_text = '' contract_text = '' policy_text = '' compliance_comments = '' file_name = 'redline.pdf' pdf_download = '' with gr.Blocks(CIMStheme) as iface: with gr.Row(visible=False) as app_row: with gr.Column(): with gr.Row(): with gr.Column(scale=2): gr.Image(label="Logo", value="Nili_v2_Character.png", width=100, height=100, show_download_button=False, interactive=False, show_label=False, elem_id="logo", container=False) with gr.Column(scale=2): gr.Markdown(value="

NILI Compliance Desktop

") with gr.Column(scale=2): gr.Markdown("") with gr.Tabs() as tabs: with gr.Tab(label="Contract Upload", id=0) as upload_tab: with gr.Row(): with gr.Column(variant='panel',scale=1): contract_file_cmpt = gr.File(label="Select Contract File",file_count="single",file_types=[".pdf"],height=150) with gr.Column(variant='panel',scale=4): with gr.Row(): with gr.Column(variant='panel'): contract_text_tbox = gr.Textbox(label="Contract Text",interactive=True,info="Upload .pdf or paste in text. Shift-Enter to add a line") with gr.Row(): with gr.Column(scale=1): upload_btn = gr.components.Button(value="Upload Contract", size='sm', variant="primary") with gr.Column(scale=2): gr.Markdown("") with gr.Column(scale=1): redline_btn = gr.components.Button(value="Redline Contract", size='sm', variant="primary") with gr.Column(scale=2): gr.Markdown("") with gr.Tab(label="Contract Redline", id=1) as redline_tab: with gr.Row(variant='panel'): with gr.Column(): sponsor_tbox = gr.Textbox(label="Sponsor:", interactive=True) compensation_num = gr.Number(label="Contract Value",value=0) status_ddss = gr.Dropdown(["Pending","Approved","Rejected"],multiselect=False,label="Status", value="Pending") with gr.Column(): name_tbox = gr.Textbox(label="Name:", interactive=True) email_tbox = gr.Textbox(label="Email:", interactive=True, type='email',placeholder='xxxxxxx@xxxxxx.xxx') with gr.Column(): start_date_tbox = gr.Textbox(label="Start Date:", interactive=True, placeholder='MM/DD/YYYY') end_date_tbox = gr.Textbox(label="End Date:", interactive=True, placeholder='MM/DD/YYYY') with gr.Row(): with gr.Column(variant='panel'): gr.components.Markdown(response_label) contract_redline_html = gr.HTML(label="Contract Redline") compliance_comments_tbox = gr.Textbox(interactive=True,label='Compliance Comments') with gr.Row(): with gr.Column(): save_btn = gr.Button(value="Save Contract", size='sm', variant="primary") with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Row(visible=False) as download_row: with gr.Column(variant='panel'): file_name_tbox = gr.Textbox(interactive=False,label='File Name',visible=False) pdf_download_file = gr.File() download_btn = gr.Button(value="Create Redline PDF", size='sm', variant="primary") upload_btn.click(pdf_to_text,inputs=[contract_file_cmpt,contract_text_tbox,file_name_tbox],outputs=[contract_text_tbox,file_name_tbox,contract_redline_html,compliance_comments_tbox,pdf_download_file]) download_btn.click(download_pdf,inputs=[compliance_comments_tbox,contract_redline_html,file_name_tbox],outputs=pdf_download_file) with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Tab(label="History", id=3) as history_tab: with gr.Row(): with gr.Column(variant='panel'): compliance_history_df = gr.Dataframe(get_compliance_history, headers=["Sponsor", "Contract Value", "Student Name", "Student Email", "Start Date", "End Date", "Status","Created"], datatype=["str", "number", "str", "str", "date", "date", "str","date"], label="Contract History", interactive=False ) with gr.Tab(label="Settings", id=4) as settings_tab: with gr.Row(): with gr.Column(variant='panel',scale=1): policy_name_dd = gr.Dropdown(schools, multiselect=False,label="NIL Policy Selection", value="Wake Forest University") policy_name_dd.change(policy_name_change,inputs=policy_name_dd,outputs=None) redline_btn.click(change_tab, inputs=contract_text_tbox, outputs=tabs).success(chatbot,inputs=[policy_name_dd,contract_text_tbox], outputs=[contract_redline_html,download_row]).then(update_tboxes,inputs=None, outputs=[sponsor_tbox,name_tbox,email_tbox,start_date_tbox,end_date_tbox]) save_btn.click(append_to_at_compliance_history,inputs=[policy_name_dd,contract_redline_html,compliance_comments_tbox,sponsor_tbox,compensation_num,status_ddss,name_tbox,email_tbox,start_date_tbox,end_date_tbox],outputs=compliance_history_df) with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Row(visible=True) as login_row: with gr.Column(): with gr.Row(): with gr.Column(scale=2): gr.Markdown("") with gr.Column(scale=1): with gr.Row(): gr.Image(label="Logo", value="Nili_v2_Character.png", width=200, height=200, show_download_button=False, interactive=False, show_label=False, elem_id="logo", container=False) with gr.Row(): gr.Markdown(value="

NILI Login

") with gr.Column(scale=2): gr.Markdown("") with gr.Row(): with gr.Column(scale=2): gr.Markdown("") with gr.Column(scale=1, variant='panel'): username_tbox = gr.Textbox(label="User Name", interactive=True) password_tbox = gr.Textbox(label="Password", interactive=True, type='password') submit_btn = gr.Button(value='Submit', variant='primary', size='sm') submit_btn.click(login_auth, inputs=[username_tbox, password_tbox], outputs=[login_row,app_row]) with gr.Column(scale=2): gr.Markdown("") with gr.Row(): with gr.Column(scale=4): gr.HTML('
© 2023 Collegiate Influencer Marketing Systems, Inc.
CIMS.AI, CIMS.AI logo, NILI, NILI logo, and EzNIL are trademarks of Collegiate Influencer Marketing Systems, Inc.
') iface.queue() #iface.queue(concurrency_count=20).launch(auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'") iface.launch()