from datetime import datetime import json import os import time import random import string import shutil import traceback import sys from pathlib import Path import streamlit as st from utils import ( load_json, load_json_no_cache, parse_arguments, format_chat_message, find_screenshot, gather_chat_history, get_screenshot, load_page, ) def show_selectbox(demonstration_dir): # find all the subdirectories in the current directory dirs = [ d for d in os.listdir(demonstration_dir) if os.path.isdir(f"{demonstration_dir}/{d}") ] if not dirs: st.title("No recordings found.") return None # sort by date dirs.sort(key=lambda x: os.path.getmtime(f"{demonstration_dir}/{x}"), reverse=True) # offer the user a dropdown to select which recording to visualize, set a default recording_name = st.sidebar.selectbox("Recording", dirs, index=0) return recording_name def show_overview(data, recording_name, basedir): st.title(f"Recording {recording_name}") screenshot_size = st.session_state.get("screenshot_size_view_mode", "regular") show_advanced_info = st.session_state.get("show_advanced_information", False) if screenshot_size == "regular": col_layout = [1.5, 1.5, 7, 3.5] elif screenshot_size == "small": col_layout = [1.5, 1.5, 7, 2] else: # screenshot_size == 'large' col_layout = [1.5, 1.5, 11] # col_i, col_time, col_act, col_actvis = st.columns(col_layout) # screenshots = load_screenshots(data, basedir) for i, d in enumerate(data): if i > 0 and show_advanced_info: # Use html to add a horizontal line with minimal gap st.markdown( "
", unsafe_allow_html=True, ) if screenshot_size == "large": col_time, col_i, col_act = st.columns(col_layout) col_actvis = col_act else: col_time, col_i, col_act, col_actvis = st.columns(col_layout) secs_from_start = d["timestamp"] - data[0]["timestamp"] # `secs_from_start` is a float including ms, display in MM:SS.mm format col_time.markdown( f"**{datetime.utcfromtimestamp(secs_from_start).strftime('%M:%S')}**" ) if not st.session_state.get("enable_html_download", True): col_i.markdown(f"**#{i}**") elif d["type"] == "browser" and (page_filename := d["state"]["page"]): page_path = f"{basedir}/pages/{page_filename}" col_i.download_button( label="#" + str(i), data=load_page(page_path), file_name=recording_name + "-" + page_filename, mime="multipart/related", key=f"page{i}", ) else: col_i.button(f"#{i}", type='secondary') if d["type"] == "chat": col_act.markdown(format_chat_message(d), unsafe_allow_html=True) continue # screenshot_filename = d["state"]["screenshot"] img = get_screenshot(d, basedir) arguments = parse_arguments(d["action"]) event_type = d["action"]["intent"] action_str = f"**{event_type}**({arguments})" if img: col_actvis.image(img) col_act.markdown(action_str) if show_advanced_info: colors = { "good": "green", "broken": "red", "delayed": "orange" } status = d["state"].get("screenshot_status", "unknown") color = colors.get(status, "blue") text = "" text += f'Status: **:{color}[{status.upper()}]**\n\n' text += f'Screenshot: `{d["state"]["screenshot"]}`\\\n' text += f'Page: `{d["state"]["page"]}`\n' col_act.markdown(text) def show_slide(data, example_index, recording_name, basedir): columns = st.columns([1, 5], gap="large") chat = gather_chat_history(data, example_index) example = data[example_index] # display current message if example["type"] == "chat": columns[0].markdown(format_chat_message(example), unsafe_allow_html=True) # display chat history with columns[0]: for d in chat: columns[0].markdown(format_chat_message(d), unsafe_allow_html=True) img = find_screenshot(data, example_index, basedir) if example["type"] == "browser": event_type = example["action"]["intent"] arguments = parse_arguments(example["action"]) event_type = example["action"]["intent"] action = f"**{event_type}**({arguments})" with columns[1]: st.write(action) with columns[1]: st.image(img) def show_presentation(data, recording_name, basedir): example_index = st.slider( "example", min_value=0, max_value=len(data) - 1, step=1, label_visibility="hidden", ) show_slide(data, example_index, recording_name=recording_name, basedir=basedir) def show_video(basedir): # find the mp4 file in the basedir video_filename = None for filename in os.listdir(f"{basedir}"): if filename.endswith(".mp4"): video_filename = filename video_path = f"{basedir}/{video_filename}" st.video(video_path) if not video_filename: st.error("No video found") def load_recording(basedir): # Before loading replay, we need a dropdown that allows us to select replay.json or replay_orig.json # Find all files in basedir starting with "replay" and ending with ".json" replay_files = sorted( [ f for f in os.listdir(basedir) if f.startswith("replay") and f.endswith(".json") ] ) replay_file = st.sidebar.selectbox("Select replay", replay_files, index=0) st.sidebar.checkbox( "Advanced Screenshot Info", False, key="show_advanced_information" ) st.sidebar.checkbox( "Enable HTML download", True, key="enable_html_download" ) replay_file = replay_file.replace(".json", "") metadata = load_json(basedir, "metadata") extension_version = metadata["version"] st.sidebar.markdown(f"**extension version**: {extension_version}") # convert timestamp to readable date string recording_start_timestamp = metadata["recordingStart"] recording_start_date = datetime.fromtimestamp( int(recording_start_timestamp) / 1000 ).strftime("%Y-%m-%d %H:%M:%S") st.sidebar.markdown(f"**started**: {recording_start_date}") # recording_end_timestamp = k["recordingEnd"] # calculate duration # duration = int(recording_end_timestamp) - int(recording_start_timestamp) # duration = time.strftime("%M:%S", time.gmtime(duration / 1000)) # Read in the JSON data replay_dict = load_json_no_cache(basedir, replay_file) form = load_json(basedir, "form") duration = replay_dict["data"][-1]["timestamp"] - replay_dict["data"][0]["timestamp"] duration = time.strftime("%M:%S", time.gmtime(duration)) st.sidebar.markdown(f"**duration**: {duration}") if not replay_dict: return None for key in [ "annotator", "description", "tasks", "upload_date", "instructor_sees_screen", "uses_ai_generated_output", ]: if form and key in form: if type(form[key]) == list: st.sidebar.markdown(f"**{key}**: {', '.join(form[key])}") else: st.sidebar.markdown(f"**{key}**: {form[key]}") st.sidebar.markdown("---") if replay_dict and "status" in replay_dict: st.sidebar.markdown(f"**Validation status**: {replay_dict['status']}") processed_meta_path = Path(basedir).joinpath('processed_metadata.json') start_frame = 'file not found' if processed_meta_path.exists(): with open(processed_meta_path) as f: processed_meta = json.load(f) start_frame = processed_meta.get('start_frame', 'info not in file') st.sidebar.markdown(f"**Recording start frame**: {start_frame}") # st.sidebar.button("Delete recording", type="primary", on_click=delete_recording, args=[basedir]) data = replay_dict["data"] return data def run(): mode = st.sidebar.radio("Mode", ["Overview"]) demonstration_dir = "./demonstrations" params = st.experimental_get_query_params() # list demonstrations/ demo_names = os.listdir(demonstration_dir) if params.get("recording"): recording_name = params["recording"][0] else: recording_name = demo_names[0] recording_name = st.sidebar.selectbox( "Recordings", demo_names, index=demo_names.index(recording_name), ) if recording_name != params.get("recording", [None])[0]: st.experimental_set_query_params(recording=recording_name) if mode == "Overview": with st.sidebar: # Want a dropdown st.selectbox( "Screenshot size", ["small", "regular", "large"], index=1, key="screenshot_size_view_mode", ) if recording_name is not None: basedir = f"{demonstration_dir}/{recording_name}" data = load_recording(basedir=basedir) if not data: st.stop() if mode == "Overview": show_overview(data, recording_name=recording_name, basedir=basedir) elif mode == "Presentation": show_presentation(data, recording_name=recording_name, basedir=basedir) elif mode == "Video": show_video(basedir=basedir) if __name__ == "__main__": st.set_page_config(layout="wide") run()