batch-run-csv-analyser / multiple.py
BananaSauce's picture
jira implemented
69a44c9
raw
history blame
26.7 kB
import pandas as pd
import streamlit as st
import matplotlib.pyplot as plt
import numpy as np
from pre import preprocess_uploaded_file
from jira_integration import (
render_jira_login,
get_current_sprint,
get_regression_board,
get_sprint_issues,
calculate_points,
create_regression_task,
generate_task_content,
calculate_story_points,
get_project_metadata,
get_field_dependencies,
get_dependent_field_value,
get_boards,
get_functional_area_values
)
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
import os
from dotenv import load_dotenv
import json
import logging
load_dotenv()
JIRA_SERVER = os.getenv("JIRA_SERVER")
# Initialize session state variables
if 'filtered_scenarios_df' not in st.session_state:
st.session_state.filtered_scenarios_df = None
if 'task_content' not in st.session_state:
st.session_state.task_content = None
if 'total_story_points' not in st.session_state:
st.session_state.total_story_points = 0
if 'completed_points' not in st.session_state:
st.session_state.completed_points = 0
if 'current_page' not in st.session_state:
st.session_state.current_page = "analysis"
if 'task_df' not in st.session_state:
st.session_state.task_df = None
if 'task_environment' not in st.session_state:
st.session_state.task_environment = None
if 'last_task_key' not in st.session_state:
st.session_state.last_task_key = None
if 'last_task_url' not in st.session_state:
st.session_state.last_task_url = None
if 'show_success' not in st.session_state:
st.session_state.show_success = False
# Get logger from jira_integration
logger = logging.getLogger("multiple")
# Function to capture button clicks with manual callback
def handle_task_button_click(summary, description, formatted_env, filtered_df):
logger.info("=== Task button clicked - Starting callback function ===")
try:
logger.info(f"Summary: {summary}")
logger.info(f"Description length: {len(description)}")
logger.info(f"Environment: {formatted_env}")
logger.info(f"DataFrame shape: {filtered_df.shape}")
# Import here to avoid circular imports
from jira_integration import create_regression_task
logger.info("Imported create_regression_task function")
# Call the actual function
with st.spinner("Creating task in Jira..."):
logger.info("About to call create_regression_task function")
task = create_regression_task(
project_key="RS",
summary=summary,
description=description,
environment=formatted_env,
filtered_scenarios_df=filtered_df
)
logger.info(f"create_regression_task returned: {task}")
if task:
logger.info(f"Task created successfully: {task.key}")
# Store task information in session state
st.session_state.last_task_key = task.key
st.session_state.last_task_url = f"{JIRA_SERVER}/browse/{task.key}"
st.session_state.show_success = True
# Display success message and task details
st.success("✅ Task created successfully!")
st.markdown(
f"""
<div style='padding: 10px; border-radius: 5px; border: 1px solid #90EE90; margin: 10px 0;'>
<h3 style='margin: 0; color: #90EE90;'>Task Details</h3>
<p style='margin: 10px 0;'>Task Key: {task.key}</p>
<a href='{JIRA_SERVER}/browse/{task.key}' target='_blank'
style='background-color: #90EE90; color: black; padding: 5px 10px;
border-radius: 3px; text-decoration: none; display: inline-block;'>
View Task in Jira
</a>
</div>
""",
unsafe_allow_html=True
)
# Clear task content
st.session_state.task_content = None
# Add button to create another task
if st.button("Create Another Task", key="create_another"):
# Clear all task-related state
st.session_state.task_content = None
st.session_state.last_task_key = None
st.session_state.last_task_url = None
st.session_state.show_success = False
st.rerun()
logger.info("Task creation process completed successfully")
return True
else:
logger.error("Task creation failed (returned None)")
st.error("❌ Task creation failed. Please check the error messages and try again.")
return False
except Exception as e:
logger.exception(f"Error in handle_task_button_click: {str(e)}")
st.error(f"❌ Error creating task: {str(e)}")
import traceback
error_trace = traceback.format_exc()
logger.error(f"Full traceback: {error_trace}")
st.error(error_trace)
return False
finally:
logger.info("=== Ending handle_task_button_click function ===")
# Define the function to perform analysis
def perform_analysis(uploaded_dataframes):
# Concatenate all dataframes into a single dataframe
combined_data = pd.concat(uploaded_dataframes, ignore_index=True)
# Display debugging information
# st.write("Combined data shape:", combined_data.shape)
# st.write("Unique functional areas in combined data:", combined_data['Functional area'].nunique())
# st.write("Sample of combined data:", combined_data.head())
# Display scenarios with status "failed" grouped by functional area
failed_scenarios = combined_data[combined_data['Status'] == 'FAILED']
passed_scenarios = combined_data[combined_data['Status'] == 'PASSED']
# Display total count of failures
fail_count = len(failed_scenarios)
st.markdown(f"Failing scenarios Count: {fail_count}")
# Display total count of Passing
pass_count = len(passed_scenarios)
st.markdown(f"Passing scenarios Count: {pass_count}")
# Use radio buttons for selecting status
selected_status = st.radio("Select a status", ['Failed', 'Passed'])
# Determine which scenarios to display based on selected status
if selected_status == 'Failed':
unique_areas = np.append(failed_scenarios['Functional area'].unique(), "All")
selected_scenarios = failed_scenarios
elif selected_status == 'Passed':
unique_areas = np.append(passed_scenarios['Functional area'].unique(), "All")
selected_scenarios = passed_scenarios
else:
selected_scenarios = None
if selected_scenarios is not None:
st.markdown(f"### Scenarios with status '{selected_status}' grouped by functional area:")
# Select a range of functional areas to filter scenarios
selected_functional_areas = st.multiselect("Select functional areas", unique_areas, ["All"])
if "All" in selected_functional_areas:
filtered_scenarios = selected_scenarios
else:
filtered_scenarios = selected_scenarios[selected_scenarios['Functional area'].isin(selected_functional_areas)]
if not selected_functional_areas: # Check if the list is empty
st.error("Please select at least one functional area.")
else:
# Display count of filtered scenarios
st.write(f"Number of filtered scenarios: {len(filtered_scenarios)}")
# Calculate the average time spent for each functional area
average_time_spent_seconds = filtered_scenarios.groupby('Functional area')['Time spent'].mean().reset_index()
# Convert average time spent from seconds to minutes and seconds format
average_time_spent_seconds['Time spent'] = pd.to_datetime(average_time_spent_seconds['Time spent'], unit='s').dt.strftime('%M:%S')
# Group by functional area and get the start datetime for sorting
start_datetime_group = filtered_scenarios.groupby('Functional area')['Start datetime'].min().reset_index()
end_datetime_group = filtered_scenarios.groupby('Functional area')['End datetime'].max().reset_index()
# Calculate the total time spent for each functional area (difference between end and start datetime)
total_time_spent_seconds = (end_datetime_group['End datetime'] - start_datetime_group['Start datetime']).dt.total_seconds()
# Convert total time spent from seconds to minutes and seconds format
total_time_spent_seconds = pd.to_datetime(total_time_spent_seconds, unit='s').dt.strftime('%M:%S')
# Merge the average_time_spent_seconds with start_datetime_group and end_datetime_group
average_time_spent_seconds = average_time_spent_seconds.merge(start_datetime_group, on='Functional area')
average_time_spent_seconds = average_time_spent_seconds.merge(end_datetime_group, on='Functional area')
average_time_spent_seconds['Total Time Spent'] = total_time_spent_seconds
# Filter scenarios based on selected functional area
if selected_status == 'Failed':
# Define columns in the exact order they appear in the table
columns_to_keep = [
'Environment',
'Functional area',
'Scenario Name',
'Error Message',
'Failed Step',
'Time spent(m:s)',
'Start datetime'
]
# Check if Failed Step column exists
if 'Failed Step' in filtered_scenarios.columns:
grouped_filtered_scenarios = filtered_scenarios[columns_to_keep].copy()
else:
columns_to_keep.remove('Failed Step')
grouped_filtered_scenarios = filtered_scenarios[columns_to_keep].copy()
elif selected_status == 'Passed':
grouped_filtered_scenarios = filtered_scenarios[[
'Environment',
'Functional area',
'Scenario Name',
'Time spent(m:s)'
]].copy()
else:
grouped_filtered_scenarios = None
# Only proceed if we have data
if grouped_filtered_scenarios is not None:
# Reset the index to start from 1
grouped_filtered_scenarios.index = range(1, len(grouped_filtered_scenarios) + 1)
st.dataframe(grouped_filtered_scenarios)
# Show task creation button if:
# 1. User is authenticated
# 2. Status is Failed
# 3. Exactly one functional area is selected (not "All")
if ('jira_client' in st.session_state and
st.session_state.jira_client and
selected_status == 'Failed' and
len(selected_functional_areas) == 1 and
"All" not in selected_functional_areas):
# If we have a recently created task, show the success message first
if st.session_state.show_success and st.session_state.last_task_key:
st.success("✅ Task created successfully!")
# Display task link in a more prominent way
st.markdown(
f"""
<div style='padding: 10px; border-radius: 5px; border: 1px solid #90EE90; margin: 10px 0;'>
<h3 style='margin: 0; color: #90EE90;'>Task Details</h3>
<p style='margin: 10px 0;'>Task Key: {st.session_state.last_task_key}</p>
<a href='{st.session_state.last_task_url}' target='_blank'
style='background-color: #90EE90; color: black; padding: 5px 10px;
border-radius: 3px; text-decoration: none; display: inline-block;'>
View Task in Jira
</a>
</div>
""",
unsafe_allow_html=True
)
# Add a button to create another task
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
if st.button("Create Another Task", key="create_another", use_container_width=True):
# Clear all task-related state
st.session_state.task_content = None
st.session_state.last_task_key = None
st.session_state.last_task_url = None
st.session_state.show_success = False
st.rerun()
else:
environment = filtered_scenarios['Environment'].iloc[0]
# Create columns for compact layout
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
if st.button("📝 Log Jira Task", use_container_width=True):
st.write("Debug: Button clicked") # Debug line
# Use the properly structured DataFrame for task creation
task_df = grouped_filtered_scenarios.copy()
expected_columns = [
'Environment',
'Functional area',
'Scenario Name',
'Error Message',
'Failed Step',
'Time spent(m:s)',
'Start datetime'
]
missing_columns = [col for col in expected_columns if col not in task_df.columns]
if missing_columns:
st.error(f"Missing required columns: {', '.join(missing_columns)}")
st.error("Please ensure your data includes all required columns")
return
# Generate task content
summary, description = generate_task_content(task_df)
if summary and description:
# Call the task creation function
handle_task_button_click(summary, description, environment, task_df)
# Check if selected_status is 'Failed' and show bar graph
if selected_status != 'Passed':
# Create and display bar graph of errors by functional area
st.write(f"### Bar graph showing number of '{selected_status}' scenarios in each functional area:")
error_counts = grouped_filtered_scenarios['Functional area'].value_counts()
# Only create the graph if there are errors to display
if not error_counts.empty:
plt.figure(figsize=(12, 10))
bars = plt.bar(error_counts.index, error_counts.values)
plt.xlabel('Functional Area')
plt.ylabel('Number of Failures')
plt.title(f"Number of '{selected_status}' scenarios by Functional Area")
plt.xticks(rotation=45, ha='right', fontsize=10)
# Set y-axis limits and ticks for consistent interval of 1
y_max = max(error_counts.values) + 1
plt.ylim(0, y_max)
plt.yticks(range(0, y_max, 1), fontsize=10)
# Display individual numbers on y-axis
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width() / 2, height, str(int(height)),
ha='center', va='bottom') # Reduce font size of individual numbers
plt.tight_layout() # Add this line to adjust layout
st.pyplot(plt)
else:
st.info(f"No '{selected_status}' scenarios found to display in the graph.")
pass
def display_story_points_stats(force_refresh=False):
"""Display story points statistics from current sprint"""
if not st.session_state.jira_client:
return
try:
with st.spinner("Fetching sprint data..."):
# Get regression board
board = get_regression_board("RS")
if not board:
return
# Get current sprint
sprint = get_current_sprint(board['id'])
if not sprint:
return
# Get sprint issues
issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field'])
if not issues:
return
# Calculate points
issues_data, total_points, completed_points, in_progress_points = calculate_points(issues, board['estimation_field'])
# Update session state
st.session_state.total_story_points = total_points
st.session_state.completed_points = completed_points
# Create compact metrics display
metrics_container = st.container()
with metrics_container:
# Show sprint info
st.info(f"Current Sprint: {sprint.name}")
# Show metrics in a compact format
cols = st.columns(4)
with cols[0]:
st.metric("Total", f"{total_points:.1f}")
with cols[1]:
st.metric("Done", f"{completed_points:.1f}")
with cols[2]:
st.metric("In Progress", f"{in_progress_points:.1f}")
with cols[3]:
completion_rate = (completed_points / total_points * 100) if total_points > 0 else 0
st.metric("Complete", f"{completion_rate:.1f}%")
# Show progress bar
progress = completed_points / total_points if total_points > 0 else 0
st.progress(progress)
# Add refresh button
if st.button("🔄 Refresh", key="refresh_stats", use_container_width=True):
st.session_state.last_refresh = datetime.now()
return
except Exception as e:
st.error(f"Error updating story points: {str(e)}")
def show_task_creation_section(filtered_df, environment):
"""Display the task creation section with detailed functional area mapping information."""
if "Functional area" in filtered_df.columns and len(filtered_df) > 0:
functional_areas = filtered_df["Functional area"].unique().tolist()
functional_area = functional_areas[0] if functional_areas else None
logger.debug(f"Found functional areas: {functional_areas}")
# Get project metadata to access allowed values
metadata = get_project_metadata("RS")
if metadata:
# Create expandable section for field structure
with st.expander("Functional Area Field Structure", expanded=False):
func_field = metadata['all_fields'].get('customfield_13100', {})
if func_field and 'allowedValues' in func_field:
st.write("Available parent-child mappings:")
for parent in func_field['allowedValues']:
if isinstance(parent, dict):
parent_value = parent.get('value', 'Unknown')
st.markdown(f"**Parent: {parent_value}**")
if 'cascadingOptions' in parent:
child_values = [child.get('value') for child in parent['cascadingOptions'] if child.get('value')]
st.write("Child options:")
for child in sorted(child_values):
st.write(f" • {child}")
st.write("")
# Display current functional area and mapping attempt
st.subheader("Functional Area Mapping")
col1, col2 = st.columns(2)
with col1:
st.markdown("**Input Functional Area:**")
st.info(functional_area)
st.markdown("**Split Parts:**")
parts = functional_area.split(' - ')
for i, part in enumerate(parts, 1):
st.write(f"{i}. {part}")
with col2:
# Try to map the functional area
parent, child = map_functional_area(functional_area, metadata)
st.markdown("**Mapped Values:**")
st.success(f"Parent: {parent}")
st.success(f"Child: {child}")
# Show normalized form
st.markdown("**Normalized Form:**")
norm_area = functional_area.lower().replace(' ', '-')
st.info(norm_area)
# Add warning if using default mapping
if parent == "R&I" and child == "Data Exchange" and functional_area.lower() != "data exchange":
st.warning("""
⚠️ Using default mapping (R&I/Data Exchange). This might not be the best match.
Please check the 'Functional Area Field Structure' above for available values.
""")
else:
logger.warning("No functional area found in data")
st.warning("No functional area information found in the data")
# Create task button
if st.button("Create Task", key="create_task_button"):
handle_task_button_click(filtered_df, environment)
def multiple_main():
# Initialize session state variables
if 'current_page' not in st.session_state:
st.session_state.current_page = "upload"
if 'task_df' not in st.session_state:
st.session_state.task_df = None
if 'selected_files' not in st.session_state:
st.session_state.selected_files = []
if 'uploaded_files' not in st.session_state:
st.session_state.uploaded_files = []
if 'filtered_scenarios_df' not in st.session_state:
st.session_state.filtered_scenarios_df = None
if 'jira_server' not in st.session_state:
st.session_state.jira_server = JIRA_SERVER
# Initialize session state for sprint data if not exists
if 'sprint_data_initialized' not in st.session_state:
st.session_state.sprint_data_initialized = False
# Add Jira login to sidebar (only once)
with st.sidebar:
st.subheader("Jira Integration (Optional)")
# Only render login if not already authenticated
if 'is_authenticated' not in st.session_state:
st.session_state.is_authenticated = render_jira_login()
else:
# Just display the status without re-rendering the login
if st.session_state.is_authenticated:
st.success("Connected to Jira")
else:
# Allow re-login if not authenticated
st.session_state.is_authenticated = render_jira_login()
# Only show story points in sidebar if authenticated
if st.session_state.is_authenticated and st.session_state.jira_client:
st.markdown("---")
st.subheader("Sprint Progress")
# Only fetch sprint data once or when refresh is clicked
if not st.session_state.sprint_data_initialized:
display_story_points_stats(force_refresh=True)
st.session_state.sprint_data_initialized = True
else:
display_story_points_stats(force_refresh=False)
st.title("Multiple File Analysis")
# Initialize session state for uploaded data
if 'uploaded_data' not in st.session_state:
st.session_state.uploaded_data = None
if 'last_refresh' not in st.session_state:
st.session_state.last_refresh = None
# Check if we're in task creation mode
if st.session_state.current_page == "create_task" and st.session_state.task_df is not None:
# Add a back button
if st.button("⬅️ Back to Analysis"):
st.session_state.current_page = "analysis"
st.rerun()
return
# Show task creation section
show_task_creation_section(st.session_state.task_df, st.session_state.task_environment)
return
# Main analysis page
uploaded_files = st.file_uploader("Upload CSV or Excel files",
type=['csv', 'xlsx'],
accept_multiple_files=True)
# Process uploaded files and store in session state
if uploaded_files:
all_data = []
for file in uploaded_files:
try:
df = preprocess_uploaded_file(file)
all_data.append(df)
except Exception as e:
st.error(f"Error processing {file.name}: {str(e)}")
if all_data:
# Store the processed data in session state
st.session_state.uploaded_data = all_data
# Use data from session state for analysis
if st.session_state.uploaded_data:
# Perform analysis for uploaded data
perform_analysis(st.session_state.uploaded_data)
# Get combined data for Jira integration
combined_df = pd.concat(st.session_state.uploaded_data, ignore_index=True)
else:
st.write("Please upload at least one file.")
if __name__ == "__main__":
st.set_page_config(layout="wide")
multiple_main()