File size: 5,839 Bytes
2667557 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import streamlit as st
import os
import threading
import pyautogui
import numpy as np
import cv2
import pyaudio
import wave
import keyboard
from logic import analyze_with_audio_video
from dotenv import load_dotenv
load_dotenv()
# Audio settings
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
# File paths
audio_filename = "output.wav"
video_filename = "output.mp4"
# Initialize Streamlit
st.set_page_config(page_title="T.A.P.A.S", page_icon=":camera:", layout="wide")
st.title("T.A.P.A.S - Technical Assistance Platform for Advanced Solution")
# Initialize session state for outputs
if 'outputs' not in st.session_state or not isinstance(st.session_state.outputs, dict):
st.session_state.outputs = {}
if 'current_session' not in st.session_state:
st.session_state.current_session = 'Session 1'
def cleanup_files():
"""Deletes old files before a new recording session starts."""
files_to_delete = [audio_filename, video_filename]
for file in files_to_delete:
if os.path.exists(file):
os.remove(file)
print(f"Deleted old file: {file}")
def record_audio(filename, stop_event):
audio = pyaudio.PyAudio()
stream = audio.open(format=FORMAT, channels=CHANNELS,
rate=RATE, input=True,
frames_per_buffer=CHUNK)
frames = []
while not stop_event.is_set():
data = stream.read(CHUNK)
frames.append(data)
stream.stop_stream()
audio.terminate()
with wave.open(filename, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(audio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
def record_screen(filename, stop_event, mouse_positions):
screen_size = pyautogui.size()
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(filename, fourcc, 8, (screen_size.width, screen_size.height))
while not stop_event.is_set():
img = pyautogui.screenshot()
frame = np.array(img)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Capture mouse cursor
x, y = pyautogui.position()
cv2.circle(frame, (x, y), 10, (0, 255, 0), -1)
out.write(frame)
mouse_positions.append((x, y)) # Track mouse positions
out.release()
# def minimize_browser():
# browser_window = None
# for window in gw.getAllTitles():
# if "chrome" in window.lower() or "firefox" in window.lower() or "edge" in window.lower():
# browser_window = window
# break
# if browser_window:
# app = Application().connect(title_re=browser_window)
# app.window(title_re=browser_window).minimize()
# else:
# print("Browser window not found.")
def main():
stop_event = threading.Event()
# Sidebar for session selection
with st.sidebar:
st.title("Sessions")
session_name = st.text_input("New Session Name", "")
if st.button("Start New Session") and session_name:
st.session_state.current_session = session_name
st.session_state.outputs[session_name] = []
session_names = list(st.session_state.outputs.keys())
if session_names:
session_selection = st.selectbox("Choose a session", session_names)
if session_selection:
st.session_state.current_session = session_selection
st.header(f"Current Session: {st.session_state.current_session}")
# Initialize the current session's outputs if it doesn't exist
if st.session_state.current_session not in st.session_state.outputs:
st.session_state.outputs[st.session_state.current_session] = []
col1, col2 = st.columns(2)
with col1:
start_button = st.button("Start")
with col2:
stop_button = st.button("Stop")
if start_button:
minimize_browser()
cleanup_files()
audio_thread = threading.Thread(target=record_audio, args=(audio_filename, stop_event))
mouse_positions = []
screen_thread = threading.Thread(target=record_screen, args=(video_filename, stop_event, mouse_positions))
audio_thread.start()
screen_thread.start()
st.write("Recording started. Press 'q' or click 'Stop' to stop.")
while True:
if keyboard.is_pressed('q') or stop_button:
stop_event.set()
break
audio_thread.join()
screen_thread.join()
if not os.path.exists(audio_filename):
st.error("Audio file was not created!")
return
if not os.path.exists(video_filename):
st.error("Video file was not created!")
return
# Analyze the video and audio files together
result = analyze_with_audio_video(video_filename, audio_filename)
st.session_state.outputs[st.session_state.current_session].append(result)
# Text input for additional queries
additional_query = st.text_input("Type your query here if you're not satisfied with the solution:")
if st.button("Submit Query") and additional_query:
# Process the additional query (this would involve sending it to the model)
result = analyze_with_audio_video(video_filename, audio_filename)
st.session_state.outputs[st.session_state.current_session].append(f"Query: {additional_query}\n{result}")
# Display all outputs for the current session
for output in st.session_state.outputs[st.session_state.current_session]:
st.markdown(f"""
<div style="background-color: darkgray; border-radius: 10px; padding: 10px; margin-bottom: 10px; color: black;">
<i class="fas fa-check-circle"></i> {output}
</div>
""", unsafe_allow_html=True)
if __name__ == "__main__":
main()
|