import streamlit as st import os import threading import pyautogui import numpy as np import cv2 import pyaudio import wave import keyboard from logic import analyze_with_audio_video from dotenv import load_dotenv load_dotenv() # Audio settings FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 44100 CHUNK = 1024 # File paths audio_filename = "output.wav" video_filename = "output.mp4" # Initialize Streamlit st.set_page_config(page_title="T.A.P.A.S", page_icon=":camera:", layout="wide") st.title("T.A.P.A.S - Technical Assistance Platform for Advanced Solution") # Initialize session state for outputs if 'outputs' not in st.session_state or not isinstance(st.session_state.outputs, dict): st.session_state.outputs = {} if 'current_session' not in st.session_state: st.session_state.current_session = 'Session 1' def cleanup_files(): """Deletes old files before a new recording session starts.""" files_to_delete = [audio_filename, video_filename] for file in files_to_delete: if os.path.exists(file): os.remove(file) print(f"Deleted old file: {file}") def record_audio(filename, stop_event): audio = pyaudio.PyAudio() stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) frames = [] while not stop_event.is_set(): data = stream.read(CHUNK) frames.append(data) stream.stop_stream() audio.terminate() with wave.open(filename, 'wb') as wf: wf.setnchannels(CHANNELS) wf.setsampwidth(audio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) def record_screen(filename, stop_event, mouse_positions): screen_size = pyautogui.size() fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(filename, fourcc, 8, (screen_size.width, screen_size.height)) while not stop_event.is_set(): img = pyautogui.screenshot() frame = np.array(img) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Capture mouse cursor x, y = pyautogui.position() cv2.circle(frame, (x, y), 10, (0, 255, 0), -1) out.write(frame) mouse_positions.append((x, y)) # Track mouse positions out.release() # def minimize_browser(): # browser_window = None # for window in gw.getAllTitles(): # if "chrome" in window.lower() or "firefox" in window.lower() or "edge" in window.lower(): # browser_window = window # break # if browser_window: # app = Application().connect(title_re=browser_window) # app.window(title_re=browser_window).minimize() # else: # print("Browser window not found.") def main(): stop_event = threading.Event() # Sidebar for session selection with st.sidebar: st.title("Sessions") session_name = st.text_input("New Session Name", "") if st.button("Start New Session") and session_name: st.session_state.current_session = session_name st.session_state.outputs[session_name] = [] session_names = list(st.session_state.outputs.keys()) if session_names: session_selection = st.selectbox("Choose a session", session_names) if session_selection: st.session_state.current_session = session_selection st.header(f"Current Session: {st.session_state.current_session}") # Initialize the current session's outputs if it doesn't exist if st.session_state.current_session not in st.session_state.outputs: st.session_state.outputs[st.session_state.current_session] = [] col1, col2 = st.columns(2) with col1: start_button = st.button("Start") with col2: stop_button = st.button("Stop") if start_button: minimize_browser() cleanup_files() audio_thread = threading.Thread(target=record_audio, args=(audio_filename, stop_event)) mouse_positions = [] screen_thread = threading.Thread(target=record_screen, args=(video_filename, stop_event, mouse_positions)) audio_thread.start() screen_thread.start() st.write("Recording started. Press 'q' or click 'Stop' to stop.") while True: if keyboard.is_pressed('q') or stop_button: stop_event.set() break audio_thread.join() screen_thread.join() if not os.path.exists(audio_filename): st.error("Audio file was not created!") return if not os.path.exists(video_filename): st.error("Video file was not created!") return # Analyze the video and audio files together result = analyze_with_audio_video(video_filename, audio_filename) st.session_state.outputs[st.session_state.current_session].append(result) # Text input for additional queries additional_query = st.text_input("Type your query here if you're not satisfied with the solution:") if st.button("Submit Query") and additional_query: # Process the additional query (this would involve sending it to the model) result = analyze_with_audio_video(video_filename, audio_filename) st.session_state.outputs[st.session_state.current_session].append(f"Query: {additional_query}\n{result}") # Display all outputs for the current session for output in st.session_state.outputs[st.session_state.current_session]: st.markdown(f"""