import os import io import openai import datetime import time import gradio as gr import json from jinja2 import Template import requests import fitz from xhtml2pdf import pisa from io import BytesIO # Initialize OpenAI openai.api_key = os.environ.get('OPENAI_API_KEY') # Configuration variables airtable_api_key = os.environ.get('AIRTABLE_API_KEY') # Airtable table names prompts_table_name = 'tblYIZEB8m6JkGDEP' users_table_name = 'tblLNe5ZL47SvrAEk' user_log_table_name = 'tblrlTsRrkl6BqMAJ' compliancelog_table_name = 'tblQMXWKGlOonkIw2' policies_table_name = 'tbla6PC65qZfqdJhE' # Define the style and content for the response field label_text = "Contract Redline" color = "#6562F4" background_color = "white" border_radius = "10px" # response_label = f'

{label_text}

' response_label = f'{label_text}' base_id = 'appcUK3hUWC7GM2Kb' # App name for user login logging app = "Compliance" headers = { "Authorization": f"Bearer {airtable_api_key}", "Content-Type": "application/json", "Accept": "application/json", } def prompt_trim(prompt: str) -> str: lines = prompt.split('\n') trimmed = '\n'.join([l.strip() for l in lines]) return trimmed def get_policy_text(school): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' # Parameters for the API request to select only the 'school' field params = { 'filterByFormula': f'school="{school}"', 'fields[]': 'policy_text' } global policy_text policy_text = '' try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): # Extract the 'school' values from each record policy_text = [record['fields']['policy_text'] for record in data['records']] else: print("No records found in the 'policies' table.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") #print(policy_text) return policy_text def get_schools(): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{policies_table_name}' # Parameters for the API request to select only the 'school' field params = { 'fields[]': 'school', # Replace with the name of your field 'sort[0][field]': 'school', # Sort by the 'school' field 'sort[0][direction]': 'asc', # Sort in ascending order } schools = '' try: # Send a GET request to the Airtable API response = requests.get(airtable_endpoint, headers=headers, params=params) # Check if the request was successful (status code 200) if response.status_code == 200: # Parse the JSON response data = response.json() # Check if there are records in the response if data.get('records'): # Extract the 'school' values from each record schools = [record['fields']['school'] for record in data['records']] else: print("No records found in the 'policies' table.") else: print(f"Failed to retrieve data. Status code: {response.status_code}") except Exception as e: print(f"An error occurred: {str(e)}") return schools def get_prompt(header, template_content): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{prompts_table_name}' params = { 'filterByFormula': "prompt_name='Compliance_v1'", } response = requests.get(airtable_endpoint, headers=headers, params=params) # Check for errors response.raise_for_status() data = response.json() # Check if there is at least one record matching the condition if data.get('records'): # Get the first record (there should be only one) record = data['records'][0]['fields'] # Assign system_prompt and user_prompt to variables header = record.get('system_prompt', '') template_content = record.get('user_prompt', '') return header, template_content def append_to_at_compliancelog(policy_name_dd,contract_text,gpt_response, response_time, question_cost, prompt_tokens, completion_tokens): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{compliancelog_table_name}' # Organize data for Airtable new_fields = { 'policy_name': str(policy_name_dd), 'contract_text': str(contract_text), 'gpt_response': str(gpt_response), 'response_time': str(response_time), 'question_cost': question_cost, 'user_name': str(logged_in_user), 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens } data = { 'fields': new_fields } try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) # print(response.json()) # Check for errors response.raise_for_status() except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") # Chatbot Function def chatbot(policy_name_dd,contract_text,progress=gr.Progress()): start_time = datetime.datetime.now() progress(progress=None) """ time.sleep(10) for i in progress.tqdm(range(100)): time.sleep(1) """ #print(policy_name) #students = get_students(school_selection) get_policy_text(policy_name_dd) #print(policy_text) #print(contract_text) template_content = '' header = '' header, template_content = get_prompt(header, template_content) # print(header) # print(template_content) # Create a Jinja2 template from the content template = Template(template_content) # Render the template with the inputs analysis_input = template.render(contract_text=contract_text,policy_text=policy_text) trimmed_input = prompt_trim(analysis_input) """ with open('analysis_input.txt', 'w', encoding='utf-8') as out_file: out_file.write(trimmed_input) """ gpt_model = "gpt-4-1106-preview" response = openai.ChatCompletion.create( #response = openai.chat.completions.create( model=gpt_model, temperature=0, messages=[ { "role": "system", "content": header }, { "role": "user", "content": analysis_input } ] ) gpt_response = response.choices[0].message["content"] #gpt_response = response.choices[0].message.content tokens_used = response.usage if gpt_model == "gpt-4": question_cost = (tokens_used.get('total_tokens',0) / 1000) * .03 prompt_tokens = tokens_used.get('prompt_tokens',0) completion_tokens = tokens_used.get('completion_tokens',0) else: prompt_tokens = tokens_used.get('prompt_tokens',0) completion_tokens = tokens_used.get('completion_tokens',0) question_cost = ((prompt_tokens / 1000) * .01) + ((completion_tokens / 1000) * .03) """ with open('response.txt', 'w', encoding='utf-8') as out_file: out_file.write(gpt_response) """ end_time = datetime.datetime.now() response_time = end_time - start_time contract_redline = gpt_response #print(contract_redline) append_to_at_compliancelog(policy_name_dd,contract_text,contract_redline, response_time, question_cost, prompt_tokens, completion_tokens) return {contract_redline_html: contract_redline, download_row: gr.Row(visible=True)} def log_login(username): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{user_log_table_name}' # Organize data for Airtable new_fields = { 'user_name': str(username), 'app': str(app) } data = { 'fields': new_fields } try: # Post data to Airtable response = requests.post(airtable_endpoint, headers=headers, json=data) # Check for errors response.raise_for_status() except requests.exceptions.HTTPError as http_error: # Handle the HTTP error (e.g., log it or display an error message) print(f"HTTP error occurred: {http_error}") except Exception as e: # Handle exceptions, log errors, or raise them as needed print(f"An error occurred: {str(e)}") def login_auth(username, password): airtable_endpoint = f'https://api.airtable.com/v0/{base_id}/{users_table_name}' # Query the 'users' table to check for a match with the provided username and password params = { 'filterByFormula': f'AND(user_name = "{username}", password = "{password}")' } response = requests.get(airtable_endpoint, headers=headers, params=params) if response.status_code == 200: data = response.json() if data.get('records'): log_login(username) global logged_in_user logged_in_user = username return True print(f"Invalid user/password combination") return False def extract_text_with_spacing(pdf_path): document = fitz.open(pdf_path) all_text = [] for page in document: # Extract text in a dict structure blocks = page.get_text("dict")["blocks"] for b in blocks: if "lines" in b: # Check if the block contains lines of text for line in b["lines"]: span_texts = [span["text"] for span in line["spans"]] all_text.append(" ".join(span_texts)) all_text.append("\n") # Presume a new block is a new paragraph document.close() return "\n".join(all_text) def pdf_to_text(contract_file_cmpt, contract_text_tbox, file_name_tbox): file_text = extract_text_with_spacing(contract_file_cmpt.name) #file_text = extract_text(contract_file_cmpt.name) original_file_name = contract_file_cmpt.name.split("/")[-1] redline_file_name = original_file_name.split(".")[0]+" Redline.pdf" return file_text, redline_file_name, None, None, None def convert_html_to_pdf(source_html, output_filename): # Result file stream result_file = open(output_filename, "w+b") # Convert HTML to PDF pisa_status = pisa.CreatePDF( BytesIO(source_html.encode("UTF-8")), # HTML content dest=result_file) # File handle to receive the result # Close the result file result_file.close() # Return True on success and False on errors return pisa_status.err def download_pdf(compliance_comments,contract_redline_html,file_name_tbox): #config = pdfkit.configuration(wkhtmltopdf="/usr/local/bin/wkhtmltopdf") contract_redline_comments = "

Contract Redline:


"+contract_redline_html + "

Compliance Comments:


"+compliance_comments #global pdf_download #pdf_download = pdfkit.from_string(contract_redline_comments,file_name_tbox,configuration=config) convert_html_to_pdf(contract_redline_comments, file_name_tbox) return {pdf_download_file: file_name_tbox} # Gradio UI CIMStheme = gr.themes.Soft().set( button_primary_background_fill='#6562F4', ) schools = get_schools() #policy_text = get_policy_text("LSU") #for testing the function call # scout_response = "HTMLTableGenerator
RankCIMS.AI
Scout Ranking
Athlete NameGenderPrimary SportSocial Media
Fan Total
InterestsCommentary on Ranking
" scout_response = "HTMLTableGenerator
RankCIMS.AI
Scout
Ranking
Athlete NameGenderPrimary SportSocial Media
Fan Total
InterestsCommentary on Ranking
" #contract_redline = "

Campaign:

Engagement Name: Applebee's Fall Burger Promo

Engagement Id: 7015j000001DvCAAA0

Sponsor: Applebee's

Start Date: 2023-10-28

End Date: 2023-11-11

Engagement Description

The goal of the engagement is to Increase Sales by having the Student-Athlete Social Media Post. For the Social Media Post, the sponsor is requesting the student athlete take a photo in front of the football stadium in eating a Applebee's burger in your team jersey. The Media rights for the content will be 90 Days.

Engagement Compensation.

For successful completion of the engagement the student-athlete will receive payment in the form of Cash.

Part or all of the payment will be in cash, paid via PayPal.

The total value of compensation will be 250.

" #pdf_download = pdfkit.from_string(contract_redline, False) #print(pdf_download) logged_in_user = 'admin' file_text = '' contract_text = '' policy_text = '' compliance_comments = '' file_name = 'redline.pdf' pdf_download = '' with gr.Blocks(CIMStheme) as iface: with gr.Row(): with gr.Column(scale=2): gr.Image(label="Logo", value="CIMS Logo Purple.png", width=10, show_download_button=False, interactive=False, show_label=False, elem_id="logo", container=False) with gr.Column(scale=2): gr.Markdown(value="

Compliance Desktop - Powered by CIMS.AI

") with gr.Column(scale=2): gr.Markdown("") with gr.Row(): with gr.Column(variant='panel',scale=1): policy_name_dd = gr.components.Dropdown(schools, multiselect=False,label="NIL Policy Selection") contract_file_cmpt = gr.File(label="Select Contract File",file_count="single",file_types=[".pdf"],height=150) with gr.Column(variant='panel',scale=3): contract_text_tbox = gr.Textbox(label="Contract Text",interactive=True,info="Upload .pdf or paste in text") with gr.Row(): with gr.Column(): upload_btn = gr.components.Button(value="Upload Contract", size='sm', variant="primary") with gr.Column(): gr.Markdown("") with gr.Column(): redline_btn = gr.components.Button(value="Redline Contract", size='sm', variant="primary") with gr.Column(): gr.Markdown("") with gr.Row(): with gr.Column(variant='panel'): gr.components.Markdown(response_label) contract_redline_html = gr.components.HTML(label="Contract Redline") compliance_comments_tbox = gr.Textbox(interactive=True,label='Compliance Comments') with gr.Row(visible=False) as download_row: with gr.Column(variant='panel'): file_name_tbox = gr.Textbox(interactive=False,label='File Name',visible=False) pdf_download_file = gr.File() download_btn = gr.components.Button(value="Create Redline PDF", size='sm', variant="primary") upload_btn.click(pdf_to_text,inputs=[contract_file_cmpt,contract_text_tbox,file_name_tbox],outputs=[contract_text_tbox,file_name_tbox,contract_redline_html,compliance_comments_tbox,pdf_download_file]) redline_btn.click(chatbot,inputs=[policy_name_dd,contract_text_tbox],outputs=[contract_redline_html,download_row]) download_btn.click(download_pdf,inputs=[compliance_comments_tbox,contract_redline_html,file_name_tbox],outputs=pdf_download_file) with gr.Column(): gr.Markdown("") #pdf_download_file = gr.File() #email_btn = gr.components.Button(value="Email Redline", size='sm', variant="primary") #email_btn.click(send_contract_email) #download_btn.click(download_pdf,inputs=[file_name_tbox,compliance_comments_tbox,contract_redline_html]) with gr.Column(): gr.Markdown("") with gr.Column(): gr.Markdown("") with gr.Row(): with gr.Column(): gr.HTML('
© 2023 Collegiate Influencer Marketing Systems, Inc.
CIMS.AI, CIMS.AI logo, NILI, NILI logo, and EzNIL are trademarks of Collegiate Influencer Marketing Systems, Inc.
') #iface.queue(concurrency_count=20).launch(show_api=False, auth=login_auth, auth_message= "Enter your username and password that you received from CIMS.AI. To request a login, please email 'info@cims.ai'") iface.queue().launch(show_api=False)