import streamlit as st import time import random import json from datetime import datetime import pytz import platform import uuid import extra_streamlit_components as stx from io import BytesIO from PIL import Image import base64 import cv2 import requests from moviepy.editor import VideoFileClip from gradio_client import Client from openai import OpenAI import openai import os from collections import deque # Set page config st.set_page_config(page_title="Personalized Real-Time Chat", page_icon="💬", layout="wide") # Initialize cookie manager cookie_manager = stx.CookieManager() # File to store chat history and user data CHAT_FILE = "chat_history.txt" # Function to save chat history and user data to file def save_data(): with open(CHAT_FILE, 'w') as f: json.dump({ 'messages': st.session_state.messages, 'users': st.session_state.users }, f) # Function to load chat history and user data from file def load_data(): try: with open(CHAT_FILE, 'r') as f: data = json.load(f) st.session_state.messages = data['messages'] st.session_state.users = data['users'] except FileNotFoundError: st.session_state.messages = [] st.session_state.users = [] # Load data at the start load_data() # Function to get or create user def get_or_create_user(): user_id = cookie_manager.get(cookie='user_id') if not user_id: user_id = str(uuid.uuid4()) cookie_manager.set('user_id', user_id) user = next((u for u in st.session_state.users if u['id'] == user_id), None) if not user: user = { 'id': user_id, 'name': random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Henry']), 'browser': f"{platform.system()} - {st.session_state.get('browser_info', 'Unknown')}" } st.session_state.users.append(user) save_data() return user # Initialize session state if 'messages' not in st.session_state: st.session_state.messages = [] if 'users' not in st.session_state: st.session_state.users = [] if 'current_user' not in st.session_state: st.session_state.current_user = get_or_create_user() # Sidebar for user information and settings with st.sidebar: st.title("User Info") st.write(f"Current User: {st.session_state.current_user['name']}") st.write(f"Browser: {st.session_state.current_user['browser']}") new_name = st.text_input("Change your name:") if st.button("Update Name"): if new_name: for user in st.session_state.users: if user['id'] == st.session_state.current_user['id']: user['name'] = new_name st.session_state.current_user['name'] = new_name save_data() st.success(f"Name updated to {new_name}") break st.title("Active Users") for user in st.session_state.users: st.write(f"{user['name']} ({user['browser']})") # Main chat area st.title("Personalized Real-Time Chat") # Display chat messages chat_container = st.container() # Input for new message new_message = st.text_input("Type your message:") if st.button("Send"): if new_message: timestamp = datetime.now(pytz.utc).strftime('%Y-%m-%d %H:%M:%S %Z') st.session_state.messages.append({ 'user': st.session_state.current_user['name'], 'message': new_message, 'timestamp': timestamp }) save_data() st.experimental_rerun() # Function to display chat messages def display_messages(): for msg in st.session_state.messages: with chat_container.container(): st.write(f"**{msg['user']}** ({msg['timestamp']}): {msg['message']}") # Display messages display_messages() # Polling for updates if st.button("Refresh Chat"): load_data() st.experimental_rerun() # Auto-refresh (note: this will refresh the entire app) time.sleep(5) st.experimental_rerun() # Additional functionalities for text, image, audio, and video processing # Function to generate filenames def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") replaced_prompt = prompt.replace(" ", "_").replace("\n", "_") safe_prompt = "".join(x for x in replaced_prompt if x.isalnum() or x == "_")[:90] return f"{safe_date_time}_{safe_prompt}.{file_type}" # Function to process text def process_text(text_input): if text_input: st.session_state.messages.append({"role": "user", "content": text_input}) with st.chat_message("user"): st.markdown(text_input) with st.chat_message("assistant"): completion = client.chat.completions.create( model=MODEL, messages=[ {"role": m["role"], "content": m["content"]} for m in st.session_state.messages ], stream=False ) return_text = completion.choices[0].message.content st.write("Assistant: " + return_text) filename = generate_filename(text_input, "md") create_file(filename, text_input, return_text, should_save) st.session_state.messages.append({"role": "assistant", "content": return_text}) # Function to process image def process_image(image_input, user_prompt): if isinstance(image_input, str): with open(image_input, "rb") as image_file: image_input = image_file.read() base64_image = base64.b64encode(image_input).decode("utf-8") response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": "You are a helpful assistant that responds in Markdown."}, {"role": "user", "content": [ {"type": "text", "text": user_prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} ]} ], temperature=0.0, ) image_response = response.choices[0].message.content st.markdown(image_response) filename_md = generate_filename(user_prompt, "md") create_file(filename_md, image_response, '', True) return image_response # Function to process audio def process_audio(audio_input, text_input): if audio_input: transcription = client.audio.transcriptions.create( model="whisper-1", file=audio_input, ) st.session_state.messages.append({"role": "user", "content": transcription.text}) with st.chat_message("assistant"): st.markdown(transcription.text) SpeechSynthesis(transcription.text) filename = generate_filename(transcription.text, "wav") create_audio_file(filename, audio_input, should_save) filename = generate_filename(transcription.text, "md") create_file(filename, transcription.text, transcription.text, should_save) # Function to process video def process_video(video_input, user_prompt): if isinstance(video_input, str): with open(video_input, "rb") as video_file: video_input = video_file.read() base64Frames, audio_path = extract_video_frames(video_input) transcript = process_audio_for_video(video_input) response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": "You are generating a video summary. Create a summary of the provided video and its transcript. Respond in Markdown"}, {"role": "user", "content": [ "These are the frames from the video.", *map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames), {"type": "text", "text": f"The audio transcription is: {transcript}"}, {"type": "text", "text": user_prompt} ]} ], temperature=0, ) video_response = response.choices[0].message.content st.markdown(video_response) filename_md = generate_filename(user_prompt, "md") create_file(filename_md, video_response, '', True) return video_response # Function to extract video frames def extract_video_frames(video_path, seconds_per_frame=2): base64Frames = [] video = cv2.VideoCapture(video_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) frames_to_skip = int(fps * seconds_per_frame) curr_frame = 0 while curr_frame < total_frames - 1: video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) curr_frame += frames_to_skip video.release() return base64Frames, None # Function to process audio for video def process_audio_for_video(video_input): try: transcription = client.audio.transcriptions.create( model="whisper-1", file=video_input, ) return transcription.text except: return '' # Function to create files def create_file(filename, prompt, response, is_image=False): with open(filename, "w", encoding="utf-8") as f: f.write(prompt + "\n\n" + response) # Initialize OpenAI client openai.api_key = os.getenv('OPENAI_API_KEY') openai.organization = os.getenv('OPENAI_ORG_ID') client = OpenAI(api_key=openai.api_key, organization=openai.organization) MODEL = "gpt-4o-2024-05-13" should_save = st.sidebar.checkbox("💾 Save", value=True, help="Save your session data.") # Main function def main(): st.markdown("##### GPT-4o Omni Model: Text, Audio, Image, & Video") option = st.selectbox("Select an option", ("Text", "Image", "Audio", "Video")) if option == "Text": text_input = st.text_input("Enter your text:") if text_input: process_text(text_input) elif option == "Image": text_input = st.text_input("Enter text prompt to use with Image context:") image_input = st.file_uploader("Upload an image", type=["png"]) if image_input: process_image(image_input, text_input) elif option == "Audio": text_input = st.text_input("Enter text prompt to use with Audio context:") uploaded_files = st.file_uploader("Upload an audio file", type=["mp3", "wav"], accept_multiple_files=True) for audio_input in uploaded_files: process_audio(audio_input, text_input) elif option == "Video": video_input = st.file_uploader("Upload a video file", type=["mp4"]) text_input = st.text_input("Enter text prompt to use with Video context:") if video_input and text_input: process_video(video_input, text_input) main()