Spaces:
Sleeping
Sleeping
import pandas as pd | |
import streamlit as st | |
import plotly.graph_objects as go | |
from pre import preprocess_uploaded_file | |
from datetime import datetime | |
import re | |
def extract_date_from_filename(filename): | |
"""Extract date from various filename formats""" | |
# Try pattern for "name_YYYYMMDD_HHMMSS" format | |
pattern1 = r'_(\d{8})_(\d{6})' | |
match1 = re.search(pattern1, filename) | |
if match1: | |
try: | |
return datetime.strptime(f"{match1.group(1)}_{match1.group(2)}", '%Y%m%d_%H%M%S') | |
except ValueError: | |
pass | |
# Try pattern for "name_YYYYMMDD" format | |
pattern2 = r'_(\d{8})' | |
match2 = re.search(pattern2, filename) | |
if match2: | |
try: | |
return datetime.strptime(match2.group(1), '%Y%m%d') | |
except ValueError: | |
pass | |
# Try pattern for "nameYYYYMMDD" format (e.g. batch_20250224) | |
pattern3 = r'(\d{8})' | |
match3 = re.search(pattern3, filename) | |
if match3: | |
try: | |
return datetime.strptime(match3.group(1), '%Y%m%d') | |
except ValueError: | |
pass | |
# If no patterns match, return current date with a warning | |
st.warning(f"Could not extract date from filename: {filename}. Using current date instead.") | |
return datetime.now() | |
def generate_weekly_report(uploaded_files): | |
if not uploaded_files: | |
st.error("No files uploaded. Please upload files for analysis.") | |
return | |
# Set pandas option to use Copy-on-Write | |
pd.options.mode.copy_on_write = True | |
combined_data = pd.DataFrame() | |
for uploaded_file in uploaded_files: | |
data = preprocess_uploaded_file(uploaded_file) | |
# Extract date from filename | |
file_datetime = extract_date_from_filename(uploaded_file.name) | |
file_date = file_datetime.date() | |
data['File Date'] = file_date | |
combined_data = pd.concat([combined_data, data], ignore_index=True) | |
if combined_data.empty: | |
st.error("No data found in the uploaded files. Please check the file contents.") | |
return | |
# Create a boolean mask for failed data | |
failed_mask = combined_data['Status'] == 'FAILED' | |
# Use .loc to set the 'Date' column for failed data | |
combined_data.loc[failed_mask, 'Date'] = combined_data.loc[failed_mask, 'File Date'] | |
# Filter failed data | |
failed_data = combined_data[failed_mask] | |
if failed_data.empty: | |
st.warning("No failed scenarios found in the uploaded data.") | |
return | |
# UI for selecting environments and functional areas | |
environments = combined_data['Environment'].unique() | |
selected_environments = st.multiselect("Select Environments", options=environments, default=environments) | |
all_functional_areas = failed_data['Functional area'].unique() | |
area_choice = st.radio("Choose Functional Areas to Display", ['All', 'Select Functional Areas']) | |
if area_choice == 'Select Functional Areas': | |
selected_functional_areas = st.multiselect("Select Functional Areas", options=all_functional_areas) | |
if not selected_functional_areas: | |
st.error("Please select at least one functional area.") | |
return | |
else: | |
selected_functional_areas = all_functional_areas | |
# Date range selection | |
min_date = failed_data['Date'].min() | |
max_date = failed_data['Date'].max() | |
col1, col2 = st.columns(2) | |
with col1: | |
start_date = st.date_input("Start Date", min_value=min_date, max_value=max_date, value=min_date) | |
with col2: | |
end_date = st.date_input("End Date", min_value=min_date, max_value=max_date, value=max_date) | |
# Filter data based on selections and date range | |
filtered_data = failed_data[ | |
(failed_data['Environment'].isin(selected_environments)) & | |
(failed_data['Date'] >= start_date) & | |
(failed_data['Date'] <= end_date) | |
] | |
if area_choice == 'Select Functional Areas': | |
filtered_data = filtered_data[filtered_data['Functional area'].isin(selected_functional_areas)] | |
# Group by Date, Environment, and Functional area | |
daily_failures = filtered_data.groupby(['Date', 'Environment', 'Functional area']).size().unstack(level=[1, 2], fill_value=0) | |
# Ensure we have a continuous date range | |
date_range = pd.date_range(start=start_date, end=end_date) | |
daily_failures = daily_failures.reindex(date_range, fill_value=0) | |
# Convert all columns to int64 to avoid Arrow serialization issues | |
daily_failures = daily_failures.astype('int64') | |
# Y-axis scaling option | |
y_axis_scale = st.radio("Y-axis Scaling", ["Fixed", "Dynamic"]) | |
# Create an interactive plot using Plotly | |
fig = go.Figure() | |
for env in selected_environments: | |
if env in daily_failures.columns.levels[0]: | |
env_data = daily_failures[env] | |
if area_choice == 'All': | |
total_failures = env_data.sum(axis=1) | |
fig.add_trace(go.Scatter(x=daily_failures.index, y=total_failures, | |
mode='lines+markers', name=f'{env} - All Areas')) | |
else: | |
for area in selected_functional_areas: | |
if area in env_data.columns: | |
fig.add_trace(go.Scatter(x=daily_failures.index, y=env_data[area], | |
mode='lines+markers', name=f'{env} - {area}')) | |
fig.update_layout( | |
title='Failure Rates Comparison Across Environments Over Time', | |
xaxis_title='Date', | |
yaxis_title='Number of Failures', | |
legend_title='Environment - Functional Area', | |
hovermode='closest' | |
) | |
if y_axis_scale == "Fixed": | |
fig.update_yaxes(rangemode="tozero") | |
else: | |
pass | |
# Use st.plotly_chart to display the interactive chart | |
st.plotly_chart(fig, use_container_width=True) | |
# Add interactivity for scenario details | |
st.write("Select a date and environment to see detailed scenario information:") | |
selected_date = st.date_input("Select a date", min_value=start_date, max_value=end_date, value=start_date) | |
selected_env = st.selectbox("Select an environment", options=selected_environments) | |
if selected_date and selected_env: | |
st.write(f"### Detailed Scenarios for {selected_date} - {selected_env}") | |
day_scenarios = filtered_data[(filtered_data['Date'] == selected_date) & | |
(filtered_data['Environment'] == selected_env)] | |
if not day_scenarios.empty: | |
st.dataframe(day_scenarios[['Functional area', 'Scenario Name', 'Error Message', 'Time spent(m:s)']]) | |
else: | |
st.write("No failing scenarios found for the selected date and environment.") | |
# Summary Statistics | |
st.write("### Summary Statistics") | |
for env in selected_environments: | |
env_data = filtered_data[filtered_data['Environment'] == env] | |
total_failures = len(env_data) | |
if len(daily_failures) > 0: | |
avg_daily_failures = total_failures / len(daily_failures) | |
if env in daily_failures.columns.levels[0]: | |
max_daily_failures = daily_failures[env].sum(axis=1).max() | |
min_daily_failures = daily_failures[env].sum(axis=1).min() | |
else: | |
max_daily_failures = min_daily_failures = 0 | |
else: | |
avg_daily_failures = max_daily_failures = min_daily_failures = 0 | |
st.write(f"**{env}**:") | |
st.write(f" - Total Failures: {total_failures}") | |
st.write(f" - Average Daily Failures: {avg_daily_failures:.2f}") | |
st.write(f" - Max Daily Failures: {max_daily_failures}") | |
st.write(f" - Min Daily Failures: {min_daily_failures}") | |
if area_choice == 'Select Functional Areas': | |
st.write("\n **Failures by Functional Area:**") | |
for area in selected_functional_areas: | |
area_total = len(env_data[env_data['Functional area'] == area]) | |
st.write(f" - {area}: {area_total}") | |
st.write("---") | |
# Display raw data for verification | |
if st.checkbox("Show Raw Data"): | |
st.write(daily_failures) |