Spaces:
Sleeping
Sleeping
import pandas as pd | |
import streamlit as st | |
import csv | |
import io | |
import openpyxl # Add this import for Excel handling | |
from datetime import datetime | |
import re | |
def preprocess_csv(input_bytes): | |
# Keep this for backward compatibility with CSV files | |
text = input_bytes.decode() # Decode bytes to text | |
output = io.StringIO() | |
writer = csv.writer(output) | |
for row in csv.reader(io.StringIO(text)): # Read text as csv | |
if len(row) > 5: | |
row = row[0:5] + [','.join(row[5:])] # Combine extra fields into one | |
writer.writerow(row) | |
output.seek(0) # go to the start of the StringIO object | |
return output | |
def load_data(file): | |
column_names = [ | |
'Functional area', | |
'Scenario name', | |
'Start datetime', | |
'End datetime', | |
'Status', | |
'Error message' | |
] | |
data = pd.read_csv(file, header=None, names=column_names) | |
return data | |
def preprocess_xlsx(uploaded_file): | |
"""Process Excel file with step-level data and convert to scenario-level summary""" | |
# Define data types for columns | |
dtype_dict = { | |
'Feature Name': 'string', | |
'Scenario Name': 'string', | |
'Total Time Taken (ms)': 'float64', | |
'Failed Scenario': 'string' | |
} | |
# Read both the first sheet for error messages and "Time Taken" sheet | |
excel_file = pd.ExcelFile(uploaded_file, engine='openpyxl') | |
# Read detailed step data from first sheet (contains error messages) | |
error_df = pd.read_excel(excel_file, sheet_name=0) | |
# Read time taken data from the "Time Taken" sheet | |
df = pd.read_excel( | |
excel_file, | |
sheet_name='Time Taken', | |
dtype=dtype_dict | |
) | |
# Print column names and sample values for debugging | |
# st.write("Excel columns:", df.columns.tolist()) | |
# st.write("Sample data from Time Taken sheet:", df.head()) | |
# st.write("Unique Feature Names:", df['Feature Name'].unique()) | |
# st.write("Feature Name count:", df['Feature Name'].nunique()) | |
# # Check for any empty or NaN values in Feature Name | |
# empty_features = df['Feature Name'].isna().sum() | |
# st.write(f"Empty Feature Names: {empty_features}") | |
# Convert Failed Scenario column to boolean after reading | |
# Handle different possible values (TRUE/FALSE, True/False, etc.) | |
df['Failed Scenario'] = df['Failed Scenario'].astype(str).str.upper() | |
df['Status'] = df['Failed Scenario'].map( | |
lambda x: 'FAILED' if x in ['TRUE', 'YES', 'Y', '1'] else 'PASSED' | |
) | |
# Count failed and passed scenarios | |
failed_count = (df['Status'] == 'FAILED').sum() | |
passed_count = (df['Status'] == 'PASSED').sum() | |
# Extract error messages from the first sheet | |
# Find rows with FAILED result and group by Scenario Name to get the error message | |
if 'Result' in error_df.columns: | |
failed_steps = error_df[error_df['Result'] == 'FAILED'].copy() | |
# If there are failed steps, get the error messages | |
if not failed_steps.empty: | |
# Group by Scenario Name and get the first error message and step for each scenario | |
error_messages = failed_steps.groupby('Scenario Name').agg({ | |
'Error Message': 'first', | |
'Step': 'first' # Capture the step where it failed | |
}).reset_index() | |
else: | |
# Create empty DataFrame with required columns | |
error_messages = pd.DataFrame(columns=['Scenario Name', 'Error Message', 'Step']) | |
else: | |
# If Result column doesn't exist, create empty DataFrame | |
error_messages = pd.DataFrame(columns=['Scenario Name', 'Error Message', 'Step']) | |
# Extract date from filename (e.g., RI2211_batch_20250225_27031.xlsx) | |
filename = uploaded_file.name | |
date_match = re.search(r'_(\d{8})_', filename) | |
if date_match: | |
date_str = date_match.group(1) | |
file_date = datetime.strptime(date_str, '%Y%m%d').date() | |
else: | |
st.warning(f"Could not extract date from filename: {filename}. Using current date.") | |
file_date = datetime.now().date() | |
# Extract environment from filename | |
if any(pattern in filename for pattern in ['_batch_', '_fin_', '_priority_', '_Puppeteer_']): | |
environment = filename.split('_')[0] | |
else: | |
environment = filename.split('.')[0] | |
# Create result dataframe | |
result_df = pd.DataFrame({ | |
'Functional area': df['Feature Name'], | |
'Scenario Name': df['Scenario Name'], | |
'Status': df['Status'], | |
'Time spent': df['Total Time Taken (ms)'] / 1000 # Convert ms to seconds | |
}) | |
# Fill any NaN values in Functional area | |
result_df['Functional area'] = result_df['Functional area'].fillna('Unknown') | |
# Merge error messages with result dataframe | |
if not error_messages.empty: | |
result_df = result_df.merge(error_messages[['Scenario Name', 'Error Message', 'Step']], | |
on='Scenario Name', how='left') | |
# Add environment column | |
result_df['Environment'] = environment | |
# Calculate formatted time spent | |
result_df['Time spent(m:s)'] = pd.to_datetime(result_df['Time spent'], unit='s').dt.strftime('%M:%S') | |
result_df['Start datetime'] = pd.to_datetime(file_date) | |
result_df['End datetime'] = result_df['Start datetime'] + pd.to_timedelta(result_df['Time spent'], unit='s') | |
# Add failed step information if available | |
if 'Step' in result_df.columns: | |
result_df['Failed Step'] = result_df['Step'] | |
result_df.drop('Step', axis=1, inplace=True) | |
# Extract start time from the first sheet | |
before_steps = error_df[error_df['Step'].str.contains('before', case=False, na=False)] | |
if not before_steps.empty: | |
# Get the first 'before' step for each scenario | |
before_steps['Time Stamp'] = pd.to_datetime(before_steps['Time Stamp'], format='%H:%M:%S', errors='coerce') | |
start_times = before_steps.groupby('Scenario Name').agg({'Time Stamp': 'first'}).reset_index() | |
# Store the timestamps in a variable for efficient reuse | |
result_df = result_df.merge(start_times, on='Scenario Name', how='left') | |
result_df.rename(columns={'Time Stamp': 'Scenario Start Time'}, inplace=True) | |
scenario_start_times = result_df['Scenario Start Time'] | |
# Combine the date from the filename with the time stamp | |
result_df['Start datetime'] = pd.to_datetime(scenario_start_times.dt.strftime('%H:%M:%S') + ' ' + file_date.strftime('%Y-%m-%d')) | |
# Print counts for debugging | |
# st.write(f"Processed data - Failed: {len(result_df[result_df['Status'] == 'FAILED'])}, Passed: {len(result_df[result_df['Status'] == 'PASSED'])}") | |
# st.write(f"Unique functional areas in processed data: {result_df['Functional area'].nunique()}") | |
# st.write(f"Unique functional areas: {result_df['Functional area'].unique()}") | |
# Debugging: Print the columns of the first sheet | |
# st.write("Columns in the first sheet:", error_df.columns.tolist()) | |
# st.write("Sample data from the first sheet:", error_df.head()) | |
return result_df | |
def fill_missing_data(data, column_index, value): | |
data.iloc[:, column_index] = data.iloc[:, column_index].fillna(value) | |
return data | |
# Define a function to convert a string to camel case | |
def to_camel_case(s): | |
parts = s.split('_') | |
return ''.join([part.capitalize() for part in parts]) | |
# Define the function to preprocess a file (CSV or XLSX) | |
def preprocess_uploaded_file(uploaded_file): | |
# Commenting out the spinner to disable it | |
# with st.spinner(f'Processing {uploaded_file.name}...'): | |
# Determine file type based on extension | |
if uploaded_file.name.lower().endswith('.xlsx'): | |
data = preprocess_xlsx(uploaded_file) | |
else: | |
# Original CSV processing | |
file_content = uploaded_file.read() | |
processed_output = preprocess_csv(file_content) | |
processed_file = io.StringIO(processed_output.getvalue()) | |
data = load_data(processed_file) | |
data = fill_missing_data(data, 4, 0) | |
data['Start datetime'] = pd.to_datetime(data['Start datetime'], dayfirst=True, errors='coerce') | |
data['End datetime'] = pd.to_datetime(data['End datetime'], dayfirst=True, errors='coerce') | |
data['Time spent'] = (data['End datetime'] - data['Start datetime']).dt.total_seconds() | |
data['Time spent(m:s)'] = pd.to_datetime(data['Time spent'], unit='s').dt.strftime('%M:%S') | |
# Extract environment name from filename | |
filename = uploaded_file.name | |
environment = filename.split('_Puppeteer')[0] | |
# Add environment column to the dataframe | |
data['Environment'] = environment | |
return data | |
def add_app_description(): | |
app_title = '<p style="font-family:Roboto, sans-serif; color:#004E7C; font-size: 42px;">DataLink Compare</p>' | |
st.markdown(app_title, unsafe_allow_html=True) | |
is_selected = st.sidebar.checkbox('Show App Description', value=False) | |
if is_selected: | |
with st.expander('Show App Description'): | |
st.markdown("Welcome to DataLink Compare. This tool allows you to analyze batch run reports and provides insights into their statuses, processing times, and more. You can also compare two files to identify differences and similarities between them.") | |
st.markdown("### Instructions:") | |
st.write("1. Upload your CSV or XLSX file using the file uploader on the sidebar.") | |
st.write("2. Choose between 'Multi', 'Compare', 'Weekly', and 'Multi-Env Compare' mode using the dropdown on the sidebar.") | |
st.write("3. In 'Multi' mode, you can upload and analyze multiple files for individual environments.") | |
st.write("4. In 'Compare' mode, you can upload two files to compare them.") | |
st.markdown("### Features:") | |
st.write("- View statistics of passing and failing scenarios.") | |
st.write("- Filter scenarios by functional area and status.") | |
st.write("- Calculate average time spent for each functional area.") | |
st.write("- Display bar graphs showing the number of failed scenarios.") | |
st.write("- Identify consistent failures, new failures, and changes in passing scenarios.") | |
# Add the new link here | |
link_html = '<p style="font-size: 14px;"><a href="https://scenarioswitcher.negadan77.workers.dev/" target="_blank">Open Scenario Processor</a></p>' | |
st.markdown(link_html, unsafe_allow_html=True) | |