batch-run-csv-analyser / multiple.py
BananaSauce's picture
Update multiple.py
7a2556d
raw
history blame
8.15 kB
import pandas as pd
import streamlit as st
import matplotlib.pyplot as plt
import numpy as np
from pre import preprocess_uploaded_file
# Define the function to perform analysis
def perform_analysis(uploaded_dataframes):
# Concatenate all dataframes into a single dataframe
combined_data = pd.concat(uploaded_dataframes, ignore_index=True)
# Display scenarios with status "failed" grouped by functional area
failed_scenarios = combined_data[combined_data['Status'] == 'FAILED']
passed_scenarios = combined_data[combined_data['Status'] == 'PASSED']
# Display total count of failures
fail_count = len(failed_scenarios)
st.markdown(f"Failing scenarios Count: {fail_count}")
# Display total count of Passing
pass_count = len(passed_scenarios)
st.markdown(f"Passing scenarios Count: {pass_count}")
# Use radio buttons for selecting status
selected_status = st.radio("Select a status", ['Failed', 'Passed'])
# Determine which scenarios to display based on selected status
if selected_status == 'Failed':
unique_areas = np.append(failed_scenarios['Functional area'].unique(), "All")
selected_scenarios = failed_scenarios
elif selected_status == 'Passed':
unique_areas = np.append(passed_scenarios['Functional area'].unique(), "All")
selected_scenarios = passed_scenarios
else:
selected_scenarios = None
if selected_scenarios is not None:
# st.write(f"Scenarios with status '{selected_status}' grouped by functional area:")
st.markdown(f"### Scenarios with status '{selected_status}' grouped by functional area:")
# Select a range of functional areas to filter scenarios
selected_functional_areas = st.multiselect("Select functional areas", unique_areas, ["All"])
if "All" in selected_functional_areas:
filtered_scenarios = selected_scenarios
else:
filtered_scenarios = selected_scenarios[selected_scenarios['Functional area'].isin(selected_functional_areas)]
if not selected_functional_areas: # Check if the list is empty
st.error("Please select at least one functional area.")
else:
# Calculate the average time spent for each functional area
average_time_spent_seconds = filtered_scenarios.groupby('Functional area')['Time spent'].mean().reset_index()
# Convert average time spent from seconds to minutes and seconds format
average_time_spent_seconds['Time spent'] = pd.to_datetime(average_time_spent_seconds['Time spent'], unit='s').dt.strftime('%M:%S')
# Group by functional area and get the start datetime for sorting
start_datetime_group = filtered_scenarios.groupby('Functional area')['Start datetime'].min().reset_index()
end_datetime_group = filtered_scenarios.groupby('Functional area')['End datetime'].max().reset_index()
# Calculate the total time spent for each functional area (difference between end and start datetime)
total_time_spent_seconds = (end_datetime_group['End datetime'] - start_datetime_group['Start datetime']).dt.total_seconds()
# Convert total time spent from seconds to minutes and seconds format
total_time_spent_seconds = pd.to_datetime(total_time_spent_seconds, unit='s').dt.strftime('%M:%S')
# Merge the average_time_spent_seconds with start_datetime_group and end_datetime_group
average_time_spent_seconds = average_time_spent_seconds.merge(start_datetime_group, on='Functional area')
average_time_spent_seconds = average_time_spent_seconds.merge(end_datetime_group, on='Functional area')
average_time_spent_seconds['Total Time Spent'] = total_time_spent_seconds
# Filter scenarios based on selected functional area
if selected_status == 'Failed':
grouped_filtered_scenarios = filtered_scenarios.groupby('Environment')[['Functional area', 'Scenario name', 'Error message','Time spent(m:s)','Start datetime']].apply(lambda x: x.reset_index(drop=True))
elif selected_status == 'Passed':
grouped_filtered_scenarios = filtered_scenarios.groupby('Functional area')[['Scenario name', 'Time spent(m:s)']].apply(lambda x: x.reset_index(drop=True))
else:
grouped_filtered_scenarios = None
grouped_filtered_scenarios.reset_index(inplace=True)
grouped_filtered_scenarios.drop(columns=['level_1'], inplace=True)
grouped_filtered_scenarios.index = grouped_filtered_scenarios.index + 1
st.dataframe(grouped_filtered_scenarios)
# Sort the average time spent table by start datetime
average_time_spent_seconds = average_time_spent_seconds.sort_values(by='Start datetime')
# Display average time spent on each functional area in a table
st.markdown("### Total and Average Time Spent on Each Functional Area")
average_time_spent_seconds.index = average_time_spent_seconds.index + 1
# Rename the columns for clarity
average_time_spent_seconds.rename(columns={'Start datetime': 'Start Datetime', 'End datetime': 'End Datetime', 'Time spent':'Average Time Spent'}, inplace=True)
# Rearrange the columns
average_time_spent_seconds = average_time_spent_seconds[['Functional area', 'Total Time Spent', 'Start Datetime', 'End Datetime', 'Average Time Spent']]
st.dataframe(average_time_spent_seconds)
# Check if selected_status is 'Failed' and grouped_filtered_scenarifos length is less than or equal to 400
if selected_status != 'Passed':
# Create and display bar graph of errors by functional area
st.write(f"### Bar graph showing number of '{selected_status}' scenarios in each functional area:")
error_counts = grouped_filtered_scenarios['Functional area'].value_counts()
plt.figure(figsize=(12, 10))
bars = plt.bar(error_counts.index, error_counts.values)
plt.xlabel('Functional Area')
plt.ylabel('Number of Failures')
plt.title(f"Number of '{selected_status}' scenarios by Functional Area")
plt.xticks(rotation=45, ha='right', fontsize=10)
# Set y-axis limits and ticks for consistent interval of 1
y_max = max(error_counts.values) + 1
plt.ylim(0, y_max)
plt.yticks(range(0, y_max, 1), fontsize=10)
# Display individual numbers on y-axis
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width() / 2, height, str(int(height)),
ha='center', va='bottom') # Reduce font size of individual numbers
plt.tight_layout() # Add this line to adjust layout
st.pyplot(plt)
pass
def multiple_main():
# Get the number of environments from the user
num_environments = st.number_input("Enter the number of environments", min_value=1, value=1, step=1)
# Initialize list to store uploaded dataframes
uploaded_dataframes = []
# Loop through the number of environments and create file uploaders
for i in range(num_environments):
uploaded_files = st.file_uploader(f"Upload CSV files for Environment {i + 1}", type="csv", accept_multiple_files=True)
for uploaded_file in uploaded_files:
# Preprocess the uploaded CSV file
data = preprocess_uploaded_file(uploaded_file)
# Append the dataframe to the list
uploaded_dataframes.append(data)
# Check if any files were uploaded
if uploaded_dataframes:
# Perform analysis for uploaded data
perform_analysis(uploaded_dataframes)
else:
st.write("Please upload at least one CSV file.")
pass