Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
from openai import OpenAI | |
from datetime import datetime, timezone, timedelta | |
import hashlib | |
import hmac | |
################# Start PERSONA-SPECIFIC VALUES ###################### | |
coach_code = os.getenv("COACH_CODE") | |
coach_name_short = os.getenv("COACH_NAME_SHORT") | |
coach_name_upper = os.getenv("COACH_NAME_UPPER") | |
sys_prompt_new = os.getenv("PROMPT_NEW") | |
theme=os.getenv("THEME") | |
################# End PERSONA-SPECIFIC VALUES ###################### | |
################# Start OpenAI-SPECIFIC VALUES ###################### | |
# Initialize OpenAI API client with API key | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# OpenAI model | |
openai_model = os.getenv("OPENAI_MODEL") | |
################# End OpenAI-SPECIFIC VALUES ###################### | |
tx = os.getenv("TX") | |
prefix = os.getenv("PREFIX") # "/data/" if in HF or "data/" if local | |
file_name = os.getenv("FILE_NAME") | |
############### VERIFY USER ################### | |
def generate_access_code(time): | |
secret = os.getenv("SHARED_SECRET_KEY") | |
time_block = time.replace(minute=(time.minute // 10) * 10, second=0, microsecond=0) | |
time_string = time_block.strftime('%Y%m%d%H%M') | |
hmac_obj = hmac.new(secret.encode(), time_string.encode(), hashlib.sha256) | |
hmac_digest = hmac_obj.hexdigest() | |
xor_result = bytes(int(hmac_digest[i], 16) ^ int(hmac_digest[-4+i], 16) for i in range(4)) | |
return xor_result.hex()[:4] | |
def verify_code(code, access_granted): | |
now = datetime.now(timezone.utc) | |
codes = [generate_access_code(now + timedelta(minutes=offset)) | |
for offset in [-20, -10, 0, 10, 20]] | |
if code in codes: | |
return True, gr.update(interactive=True), gr.update(interactive=True), "Access granted. Please proceed to the Chat tab." | |
else: | |
return False, gr.update(interactive=False), gr.update(interactive=False), "Incorrect code. Please try again." | |
############### CHAT ################### | |
def predict(user_input, history, access_granted): | |
if not access_granted: | |
return history, "Access not granted. Please enter the correct code in the Access tab." | |
max_length = 1000 | |
if len(user_input) > max_length: | |
user_input = "" | |
transcript_file_path = f"{prefix}{coach_code}-{file_name}" | |
if user_input == tx + coach_code: | |
try: | |
if os.path.exists(transcript_file_path): | |
with open(transcript_file_path, "r", encoding="UTF-8") as file: | |
return history, file.read() | |
except FileNotFoundError: | |
return history, "File '" + file_name + "' not found." | |
history_openai_format = [ | |
{"role": "system", "content": "IDENTITY: " + sys_prompt_new} | |
] | |
for human, assistant in history: | |
history_openai_format.append({"role": "user", "content": human}) | |
history_openai_format.append({"role": "assistant", "content": assistant}) | |
history_openai_format.append({"role": "user", "content": user_input}) | |
completion = client.chat.completions.create( | |
model=openai_model, | |
messages=history_openai_format, | |
temperature=0.8, | |
frequency_penalty=0.4, | |
presence_penalty=0.1, | |
stream=True | |
) | |
message_content = "" | |
for chunk in completion: | |
if chunk.choices[0].delta.content is not None: | |
message_content += chunk.choices[0].delta.content | |
# Append latest user and assistant messages to the transcript | |
transcript = f"Date/Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
transcript += f"YOU: {user_input}\n\n" | |
transcript += f"{coach_name_upper}: {message_content}\n\n\n" | |
# Write the updated transcript to the file | |
with open(transcript_file_path, "a", encoding="UTF-8") as file: | |
file.write(transcript) | |
history.append((user_input, message_content)) | |
return history, "" | |
with gr.Blocks(theme, css=""" | |
#chatbot { flex-grow: 1; height: 340px; overflow-y: auto; } | |
.gradio-container { height: 680px; max-width: 100% !important; padding: 0 !important; } | |
#component-0 { height: 95%; } | |
#component-3 { height: calc(95% - 250px); } | |
footer { display: none !important; } | |
#submit-btn { margin-top: 10px; } | |
#code_submit { | |
height: 50px !important; | |
font-size: 1.2em !important; | |
} | |
.message-wrap { max-height: none !important; overflow-y: auto !important; } | |
.chat-wrap { max-height: none !important; overflow-y: auto !important; } | |
@media (max-width: 600px) { | |
#code_submit { | |
height: 60px !important; | |
font-size: 1.3em !important; | |
} | |
#code_message { | |
font-size: 1.2em !important; | |
padding: 10px !important; | |
} | |
} | |
""") as demo: | |
access_granted = gr.State(False) | |
with gr.Tab("Access"): | |
with gr.Tab("Access"): | |
gr.Markdown("Enter the Access Code displayed in the upper-left corner.") | |
code_input = gr.Textbox(label="Access Code", type="text", placeholder="Enter CODE here...") | |
code_submit = gr.Button("Submit Code", elem_id="code_submit") | |
code_message = gr.Label(label="Status", elem_id="code_message") | |
with gr.Tab("Chat"): | |
chatbot = gr.Chatbot(label="Conversation", elem_id="chatbot", height=340) | |
msg = gr.Textbox( | |
label=f"Chat with {coach_name_short}", | |
placeholder="Type your message here... (MAX: 1000 characters)", | |
autofocus=True, | |
interactive=False | |
) | |
submit = gr.Button("Submit Message", interactive=False) | |
def submit_code(code, access_granted): | |
success, _, _, message = verify_code(code, access_granted) | |
color = "#388e3c" if success else "#d32f2f" # Green for success, Red for error | |
return success, gr.update(interactive=success), gr.update(interactive=success), gr.update(value=message, color=color) | |
code_input.submit(submit_code, inputs=[code_input, access_granted], outputs=[access_granted, msg, submit, code_message]) | |
code_submit.click(submit_code, inputs=[code_input, access_granted], outputs=[access_granted, msg, submit, code_message]) | |
msg.submit(predict, [msg, chatbot, access_granted], [chatbot, msg]) | |
submit.click(predict, [msg, chatbot, access_granted], [chatbot, msg]) | |
demo.launch(show_api=False) |