batch-run-csv-analyser / jira_integration.py
BananaSauce's picture
jira implemented
69a44c9
import logging
import os
import sys
import traceback
from datetime import datetime
import streamlit as st
from jira import JIRA
from dotenv import load_dotenv
from datetime import datetime, timedelta
import pandas as pd
import requests
import json
from groq import Groq
from difflib import SequenceMatcher
import time
# Configure logging based on environment
try:
# Try to create logs directory and file
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, f"jira_debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
# Configure root logger with file handler
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file)
]
)
except (OSError, IOError):
# If file logging fails (e.g., in Hugging Face Spaces), configure logging without file handler
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.NullHandler()
]
)
logger = logging.getLogger("jira_integration")
logger.info("Jira integration module loaded")
# Load environment variables
load_dotenv()
# Get API keys and configuration with default values for development
JIRA_SERVER = os.getenv("JIRA_SERVER")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
# Validate required environment variables
if not JIRA_SERVER:
st.error("JIRA_SERVER not found in environment variables. Please check your .env file.")
if not GROQ_API_KEY:
st.error("GROQ_API_KEY not found in environment variables. Please check your .env file.")
def init_jira_session():
"""Initialize Jira session state variables"""
if 'jira_client' not in st.session_state:
st.session_state.jira_client = None
if 'projects' not in st.session_state:
st.session_state.projects = None
def get_projects():
"""Fetch all accessible projects"""
if not st.session_state.jira_client:
return None
try:
projects = st.session_state.jira_client.projects()
# Sort projects by key
return sorted(projects, key=lambda x: x.key)
except Exception as e:
st.error(f"Error fetching projects: {str(e)}")
return None
def get_board_configuration(board_id):
"""Fetch board configuration including estimation field"""
if not st.session_state.jira_client:
return None
try:
url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/configuration"
response = st.session_state.jira_client._session.get(url)
if response.status_code == 200:
config = response.json()
return config
return None
except Exception as e:
st.error(f"Error fetching board configuration: {str(e)}")
return None
def get_boards(project_key):
"""Fetch all boards for a project"""
if not st.session_state.jira_client:
return None
try:
boards = st.session_state.jira_client.boards(projectKeyOrID=project_key)
board_details = []
for board in boards:
config = get_board_configuration(board.id)
board_type = config.get('type', 'Unknown') if config else 'Unknown'
estimation_field = None
if config and 'estimation' in config:
estimation_field = config['estimation'].get('field', {}).get('fieldId')
board_details.append({
'id': board.id,
'name': board.name,
'type': board_type,
'estimation_field': estimation_field
})
return board_details
except Exception as e:
st.error(f"Error fetching boards: {str(e)}")
return None
def get_current_sprint(board_id):
"""Fetch the current sprint for a board"""
if not st.session_state.jira_client:
return None
try:
# Get all active and future sprints
sprints = st.session_state.jira_client.sprints(board_id, state='active,future')
if sprints:
# Look for sprints starting with 'RS'
rs_sprints = [sprint for sprint in sprints if sprint.name.startswith('RS')]
if rs_sprints:
# Sort sprints by name to get the latest one
latest_sprint = sorted(rs_sprints, key=lambda x: x.name, reverse=True)[0]
return latest_sprint
else:
st.warning("No RS sprints found. Available sprints: " + ", ".join([s.name for s in sprints]))
return None
except Exception as e:
if "board does not support sprints" in str(e).lower():
return None
st.error(f"Error fetching current sprint: {str(e)}")
return None
def get_board_issues(board_id, estimation_field=None):
"""Fetch all issues on the board using Agile REST API"""
if not st.session_state.jira_client:
return None
try:
url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/issue"
fields = ['summary', 'status', 'created', 'description', 'issuetype', 'assignee']
if estimation_field:
fields.append(estimation_field)
params = {
'maxResults': 200,
'fields': fields,
'jql': f'assignee = currentUser()'
}
response = st.session_state.jira_client._session.get(url, params=params)
if response.status_code != 200:
st.error(f"Error fetching board issues: {response.text}")
return None
data = response.json()
issues = []
for issue_data in data['issues']:
issue = st.session_state.jira_client.issue(issue_data['key'])
issues.append(issue)
return issues
except Exception as e:
st.error(f"Error fetching board issues: {str(e)}")
return None
def get_sprint_issues(board_id, sprint_id, estimation_field=None):
"""Fetch all issues in the current sprint using Agile REST API"""
if not st.session_state.jira_client:
return None
try:
# Cache key for sprint issues
cache_key = f"sprint_issues_{sprint_id}"
if cache_key in st.session_state and (datetime.now() - st.session_state.get('last_refresh', datetime.min)).total_seconds() < 60:
return st.session_state[cache_key]
# Use the Agile REST API endpoint to get issues from the sprint
url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/sprint/{sprint_id}/issue"
params = {
'maxResults': 200,
'fields': [
'summary',
'status',
'created',
'description',
'issuetype',
'assignee'
],
'jql': 'assignee = currentUser()' # Filter for current user's issues
}
# Add estimation field if provided
if estimation_field:
params['fields'].append(estimation_field)
# Make a single API call with all required fields
response = st.session_state.jira_client._session.get(url, params=params)
if response.status_code != 200:
st.error(f"Error fetching sprint issues: {response.text}")
return None
# Process all issues in one go
data = response.json()
issues = [st.session_state.jira_client.issue(issue['key']) for issue in data['issues']]
# Cache the results
st.session_state[cache_key] = issues
st.session_state['last_refresh'] = datetime.now()
return issues
except Exception as e:
st.error(f"Error fetching sprint issues: {str(e)}")
return None
def calculate_points(issues, estimation_field):
"""Calculate story points from issues"""
if not estimation_field:
return [], 0, 0, 0
try:
# Process all issues at once
field_id = estimation_field.replace('customfield_', '')
issues_data = []
total_points = completed_points = in_progress_points = 0
# Create status mappings for faster lookup
done_statuses = {'done', 'completed', 'closed'}
progress_statuses = {'in progress', 'development', 'in development'}
for issue in issues:
try:
# Get points value efficiently
points = getattr(issue.fields, field_id, None) or getattr(issue.fields, estimation_field, 0)
points = float(points) if points is not None else 0
# Get status efficiently
status_name = issue.fields.status.name.lower()
# Update points
total_points += points
if status_name in done_statuses:
completed_points += points
elif status_name in progress_statuses:
in_progress_points += points
# Build issue data
issues_data.append({
"Key": issue.key,
"Type": issue.fields.issuetype.name,
"Summary": issue.fields.summary,
"Status": issue.fields.status.name,
"Story Points": points,
"Assignee": issue.fields.assignee.displayName if issue.fields.assignee else "Unassigned"
})
except Exception as e:
st.error(f"Error processing points for issue {issue.key}: {str(e)}")
continue
return issues_data, total_points, completed_points, in_progress_points
except Exception as e:
st.error(f"Error calculating points: {str(e)}")
return [], 0, 0, 0
def create_maintenance_task(project_key, summary, description, issue_type='Task'):
"""Create a task in Jira"""
if not st.session_state.jira_client:
st.error("Not authenticated with Jira. Please log in first.")
return None
try:
issue_dict = {
'project': {'key': project_key},
'summary': summary,
'description': description,
'issuetype': {'name': issue_type}
}
new_issue = st.session_state.jira_client.create_issue(fields=issue_dict)
return new_issue
except Exception as e:
st.error(f"Error creating task: {str(e)}")
return None
def render_jira_login():
"""Render the Jira login form and handle authentication"""
# If already authenticated, just return True
if 'jira_client' in st.session_state and st.session_state.jira_client:
st.success("Connected to Jira")
return True
# Initialize session state for login form and attempts tracking
if 'jira_username' not in st.session_state:
st.session_state.jira_username = ""
if 'jira_password' not in st.session_state:
st.session_state.jira_password = ""
if 'login_blocked_until' not in st.session_state:
st.session_state.login_blocked_until = None
# Check if login is temporarily blocked
if st.session_state.login_blocked_until:
if datetime.now() < st.session_state.login_blocked_until:
wait_time = (st.session_state.login_blocked_until - datetime.now()).seconds
st.error(f"Login temporarily blocked due to too many failed attempts. Please wait {wait_time} seconds before trying again.")
st.info("If you need immediate access, please try logging in directly to Jira in your browser first, complete the CAPTCHA there, then return here.")
return False
else:
st.session_state.login_blocked_until = None
# Create login form
with st.form(key="jira_login_form"):
username = st.text_input("Jira Username", value=st.session_state.jira_username, key="username_input")
password = st.text_input("Password", value=st.session_state.jira_password, type="password", key="password_input")
submit_button = st.form_submit_button(label="Login")
if submit_button:
# Store credentials in session state
st.session_state.jira_username = username
st.session_state.jira_password = password
# Try to authenticate
try:
jira_client = JIRA(server=JIRA_SERVER, basic_auth=(username, password))
st.session_state.jira_client = jira_client
# Get projects
projects = get_projects()
if projects:
st.session_state.projects = projects
st.success("Connected to Jira")
return True
else:
st.error("Failed to fetch projects")
return False
except Exception as e:
error_message = str(e).lower()
if "captcha_challenge" in error_message:
# Set a 5-minute block on login attempts
st.session_state.login_blocked_until = datetime.now() + timedelta(minutes=5)
st.error("Too many failed login attempts. Please try one of the following:")
st.info("""
1. Wait 5 minutes before trying again
2. Log in to Jira in your browser first, complete the CAPTCHA there, then return here
3. Clear your browser cookies and try again
""")
else:
st.error(f"Authentication failed: {str(e)}")
return False
return False
def get_cached_metadata(project_key):
"""Get cached metadata or fetch new if cache is expired or doesn't exist"""
# Check if metadata cache exists in session state
if 'metadata_cache' not in st.session_state:
st.session_state.metadata_cache = {}
if 'metadata_cache_timestamp' not in st.session_state:
st.session_state.metadata_cache_timestamp = {}
current_time = datetime.now()
cache_expiry = timedelta(minutes=30) # Cache expires after 30 minutes
# Check if we have valid cached metadata
if (project_key in st.session_state.metadata_cache and
project_key in st.session_state.metadata_cache_timestamp and
current_time - st.session_state.metadata_cache_timestamp[project_key] < cache_expiry):
logger.info(f"Using cached metadata for project {project_key}")
return st.session_state.metadata_cache[project_key]
# If no valid cache, fetch new metadata
logger.info(f"Fetching fresh metadata for project {project_key}")
metadata = get_project_metadata_fresh(project_key)
if metadata:
# Update cache
st.session_state.metadata_cache[project_key] = metadata
st.session_state.metadata_cache_timestamp[project_key] = current_time
logger.info(f"Updated metadata cache for project {project_key}")
return metadata
def get_project_metadata_fresh(project_key):
"""Fetch fresh metadata from Jira without using cache"""
logger.info(f"=== Getting fresh metadata for project {project_key} ===")
if not st.session_state.jira_client:
logger.error("Not authenticated with Jira. Please log in first.")
st.error("Not authenticated with Jira. Please log in first.")
return None
try:
# Get project
logger.info("Getting project...")
project = st.session_state.jira_client.project(project_key)
logger.info(f"Got project: {project.name}")
logger.info("Getting createmeta with expanded fields...")
# Get create metadata for the project with expanded field info
metadata = st.session_state.jira_client.createmeta(
projectKeys=project_key,
expand='projects.issuetypes.fields',
issuetypeNames='Story' # Specifically get Story type fields
)
logger.info("Got createmeta response")
if not metadata.get('projects'):
logger.error(f"No metadata found for project {project_key}")
st.error(f"No metadata found for project {project_key}")
return None
project_meta = metadata['projects'][0]
issue_types = project_meta.get('issuetypes', [])
# Log available issue types
logger.info(f"Available issue types: {[t.get('name') for t in issue_types]}")
# Try to get Story issue type first
story_type = next((t for t in issue_types if t['name'] == 'Story'), None)
if not story_type:
logger.error("Story issue type not found in project")
st.error("Story issue type not found in project")
return None
logger.info("Processing fields...")
# Get required fields and all fields
required_fields = {}
all_fields = {}
# Log all available fields before processing
logger.info("Available fields in Story type:")
for field_id, field in story_type['fields'].items():
field_name = field.get('name', 'Unknown')
field_type = field.get('schema', {}).get('type', 'Unknown')
logger.info(f"Field: {field_name} (ID: {field_id}, Type: {field_type})")
# Store complete field information including schema and allowed values
all_fields[field_id] = {
'name': field['name'],
'required': field.get('required', False),
'schema': field.get('schema', {}),
'allowedValues': field.get('allowedValues', []),
'hasDefaultValue': field.get('hasDefaultValue', False),
'defaultValue': field.get('defaultValue'),
'operations': field.get('operations', []),
'configuration': field.get('configuration', {})
}
# If this is a cascading select field, log its structure
if field.get('schema', {}).get('type') == 'option-with-child':
logger.info(f"Found cascading select field: {field_name}")
if 'allowedValues' in field:
for parent in field['allowedValues']:
parent_value = parent.get('value', 'Unknown')
logger.info(f" Parent value: {parent_value}")
if 'cascadingOptions' in parent:
child_values = [child.get('value') for child in parent['cascadingOptions']]
logger.info(f" Child values: {child_values}")
# Store required fields separately
if field.get('required', False):
required_fields[field_id] = all_fields[field_id]
logger.info(f"Required field: {field_name}")
logger.info(f"Found {len(all_fields)} total fields, {len(required_fields)} required fields")
metadata_result = {
'project_name': project.name,
'issue_type': 'Story',
'required_fields': required_fields,
'all_fields': all_fields
}
logger.info("Successfully processed project metadata")
return metadata_result
except Exception as e:
logger.exception(f"Error getting project metadata: {str(e)}")
st.error(f"Error getting project metadata: {str(e)}")
st.error("Full error details:")
st.error(str(e))
st.code(traceback.format_exc(), language="python")
return None
def get_project_metadata(project_key):
"""Get project metadata, using cache if available"""
return get_cached_metadata(project_key)
def generate_task_content(filtered_scenarios_df):
"""Generate task summary and description using template-based approach"""
try:
# Extract key information
environment = filtered_scenarios_df['Environment'].iloc[0]
functional_area = filtered_scenarios_df['Functional area'].iloc[0]
scenario_count = len(filtered_scenarios_df)
# Generate summary
summary = f"Maintenance: {environment} - {functional_area}"
# Generate description
description = "Performing maintenance on the following scenarios failing :\n\n"
# Add each scenario and its error, using enumerate to start from 1
for i, (_, row) in enumerate(filtered_scenarios_df.iterrows(), 1):
description += f"{i}. {row['Scenario Name']}\n"
description += f" Error: {row['Error Message']}\n\n"
return summary, description
except Exception as e:
st.error(f"Error generating task content: {str(e)}")
return None, None
def get_regression_board(project_key):
"""Find the regression sprint board for the project"""
boards = get_boards(project_key)
if not boards:
return None
# Look specifically for the "Regression Sprints" board
regression_board = next((b for b in boards if b['name'].lower() == 'regression sprints' and b['type'].lower() == 'scrum'), None)
if not regression_board:
st.error("Could not find the 'Regression Sprints' board. Available boards: " +
", ".join([f"{b['name']} ({b['type']})" for b in boards]))
return regression_board
def get_field_dependencies():
"""Cache and return field dependencies and their allowed values"""
if 'field_dependencies' not in st.session_state:
try:
# Get project metadata for RS project
metadata = get_project_metadata("RS")
if not metadata:
return None
# Initialize dependencies dictionary with correct field IDs
dependencies = {
'Customer': {
'field_id': 'customfield_10427',
'values': [],
'dependencies': {}
},
'Environment': {
'field_id': 'customfield_14157', # Updated field ID
'values': [],
'dependencies': {}
},
'Functional Areas': {
'field_id': 'customfield_15303', # Updated field ID
'values': [],
'dependencies': {}
}
}
# Get field values and their dependencies
for field_name, field_info in dependencies.items():
field_id = field_info['field_id']
if field_id in metadata['all_fields']:
field_data = metadata['all_fields'][field_id]
if 'allowedValues' in field_data:
# Store allowed values
dependencies[field_name]['values'] = [
value.get('value', value.get('name', ''))
for value in field_data['allowedValues']
if isinstance(value, dict)
]
# Store dependencies (if any)
if 'dependency' in field_data:
dep_field = field_data['dependency']
dependencies[field_name]['dependencies'] = {
'field': dep_field['field']['name'],
'field_id': dep_field['field']['id'],
'values': dep_field.get('values', [])
}
# Cache the dependencies
st.session_state.field_dependencies = dependencies
return dependencies
except Exception as e:
st.error(f"Error fetching field dependencies: {str(e)}")
return None
return st.session_state.field_dependencies
def get_dependent_field_value(field_name, parent_value=None):
"""Get the appropriate field value based on dependencies"""
dependencies = get_field_dependencies()
if not dependencies or field_name not in dependencies:
return None
field_info = dependencies[field_name]
# If this field depends on another field
if parent_value and field_info['dependencies']:
dep_info = field_info['dependencies']
# Find values that match the parent value
for value_mapping in dep_info.get('values', []):
if value_mapping.get('parent') == parent_value:
return value_mapping.get('value')
# If no dependency or no match, return first available value
return field_info['values'][0] if field_info['values'] else None
def display_project_fields():
"""Display available fields for issue creation"""
project_key = "RS" # Using the fixed project key
metadata = get_project_metadata(project_key)
if metadata:
st.subheader("Project Fields")
# Display required fields
st.write("### Required Fields")
for field_id, field in metadata['required_fields'].items():
st.write(f"- {field['name']} ({field_id})")
if field.get('allowedValues'):
st.write(" Allowed values:")
for value in field['allowedValues']:
if isinstance(value, dict):
# Handle cascading select fields
if 'cascadingOptions' in value:
parent_value = value.get('value', 'Unknown')
st.write(f" - {parent_value}")
st.write(" Child options:")
for child in value['cascadingOptions']:
st.write(f" - {child.get('value', 'Unknown')}")
else:
st.write(f" - {value.get('value', value.get('name', 'Unknown'))}")
else:
st.write(f" - {value}")
# Display custom fields with dependencies
st.write("### Custom Fields and Dependencies")
# Customer field (customfield_10427) - Cascading Select
st.write("\n#### Customer (customfield_10427)")
cust_field = metadata['all_fields'].get('customfield_10427', {})
if cust_field.get('allowedValues'):
st.write("Cascading options:")
for value in cust_field['allowedValues']:
if isinstance(value, dict):
parent_value = value.get('value', 'Unknown')
st.write(f"- {parent_value}")
if 'cascadingOptions' in value:
st.write(" Child options:")
for child in value['cascadingOptions']:
st.write(f" - {child.get('value', 'Unknown')}")
# Functional Areas field (customfield_13100) - Cascading Select
st.write("\n#### Functional Areas (customfield_13100)")
func_field = metadata['all_fields'].get('customfield_13100', {})
if func_field.get('allowedValues'):
st.write("Cascading options:")
for value in func_field['allowedValues']:
if isinstance(value, dict):
parent_value = value.get('value', 'Unknown')
st.write(f"- {parent_value}")
if 'cascadingOptions' in value:
st.write(" Child options:")
for child in value['cascadingOptions']:
st.write(f" - {child.get('value', 'Unknown')}")
# Environment field (customfield_14157)
st.write("\n#### Environment (customfield_14157)")
env_field = metadata['all_fields'].get('customfield_14157', {})
if env_field.get('allowedValues'):
st.write("Allowed values:")
for value in env_field['allowedValues']:
if isinstance(value, dict):
st.write(f" - {value.get('value', value.get('name', 'Unknown'))}")
# Display dependencies
if any(field.get('dependency') for field in [env_field, func_field, cust_field]):
st.write("\n### Field Dependencies")
for field_name, field in [
('Environment', env_field),
('Functional Areas', func_field),
('Customer', cust_field)
]:
if field.get('dependency'):
st.write(f"\n{field_name} depends on:")
dep = field['dependency']
st.write(f" Field: {dep['field']['name']} ({dep['field']['id']})")
if dep.get('values'):
st.write(" Value mappings:")
for mapping in dep['values']:
parent_value = mapping.get('parent', 'Unknown')
child_value = mapping.get('value', 'Unknown')
st.write(f" - When parent is '{parent_value}' β†’ '{child_value}'")
# Display other custom fields
st.write("\n### Other Custom Fields")
excluded_fields = ['customfield_14157', 'customfield_13100', 'customfield_10427']
custom_fields = {k: v for k, v in metadata['all_fields'].items()
if k.startswith('customfield_') and k not in excluded_fields}
for field_id, field in custom_fields.items():
st.write(f"- {field['name']} ({field_id})")
if field.get('allowedValues'):
st.write(" Allowed values:")
for value in field.get('allowedValues', []):
if isinstance(value, dict):
if 'cascadingOptions' in value:
parent_value = value.get('value', 'Unknown')
st.write(f" - {parent_value}")
st.write(" Child options:")
for child in value['cascadingOptions']:
st.write(f" - {child.get('value', 'Unknown')}")
else:
st.write(f" - {value.get('value', value.get('name', 'Unknown'))}")
else:
st.write(f" - {value}")
def get_closest_match(target, choices, threshold=60):
"""
Find the closest matching string from choices using fuzzy matching.
Returns the best match if similarity is above threshold, otherwise None.
"""
if not choices:
return None
try:
def similarity(a, b):
# Normalize strings for comparison
a = a.lower().replace('-', ' ').replace(' ', ' ').strip()
b = b.lower().replace('-', ' ').replace(' ', ' ').strip()
return SequenceMatcher(None, a, b).ratio() * 100
# Calculate similarities
similarities = [(choice, similarity(target, choice)) for choice in choices]
# Sort by similarity score
best_match = max(similarities, key=lambda x: x[1])
if best_match[1] >= threshold:
return best_match[0]
return None
except Exception as e:
st.error(f"Error in fuzzy matching: {str(e)}")
return None
def get_functional_area_values(metadata):
"""Extract all available functional area values from metadata"""
logger.info("=== Starting get_functional_area_values ===")
if not metadata:
logger.error("No metadata provided")
return []
if 'all_fields' not in metadata:
logger.error("No 'all_fields' in metadata")
logger.debug(f"Available metadata keys: {list(metadata.keys())}")
return []
# Log all available field IDs for debugging
logger.info("Available fields:")
for field_id, field in metadata['all_fields'].items():
field_name = field.get('name', 'Unknown')
field_type = field.get('schema', {}).get('type', 'Unknown')
logger.info(f" {field_name} (ID: {field_id}, Type: {field_type})")
# List of possible field IDs for functional areas
functional_area_field_ids = [
'customfield_15303', # New field ID
'customfield_13100', # Old field ID
'customfield_13101' # Another possible variation
]
# Try to find the functional area field by name or ID
func_field = None
for field_id, field in metadata['all_fields'].items():
field_name = field.get('name', '').lower()
if field_id in functional_area_field_ids or 'functional area' in field_name:
func_field = field
logger.info(f"Found functional area field: {field.get('name')} (ID: {field_id})")
break
if not func_field:
logger.error("Could not find functional area field in metadata")
logger.info("Available field names:")
for field_id, field in metadata['all_fields'].items():
logger.info(f" {field.get('name', 'Unknown')} ({field_id})")
return []
# Check field type
field_type = func_field.get('schema', {}).get('type')
logger.info(f"Functional area field type: {field_type}")
allowed_values = []
if 'allowedValues' in func_field:
logger.info("Processing allowed values...")
for parent in func_field['allowedValues']:
if isinstance(parent, dict):
parent_value = parent.get('value', 'Unknown')
logger.info(f"Processing parent value: {parent_value}")
if 'cascadingOptions' in parent:
for child in parent['cascadingOptions']:
if isinstance(child, dict) and 'value' in child:
allowed_values.append(child['value'])
logger.debug(f"Added child value: {child['value']}")
elif 'value' in parent:
allowed_values.append(parent['value'])
logger.debug(f"Added value: {parent['value']}")
logger.info(f"Found {len(allowed_values)} allowed values")
if allowed_values:
logger.info(f"Sample of allowed values: {allowed_values[:5]}")
else:
logger.warning("No allowed values found in the field")
return allowed_values
def calculate_story_points(scenario_count):
"""Calculate story points based on number of scenarios"""
if scenario_count <= 3:
return 1
elif scenario_count <= 5:
return 2
elif scenario_count <= 9:
return 3
elif scenario_count <= 15:
return 5
else:
return 8
def map_functional_area(functional_area, metadata):
"""Map a functional area to its closest Jira allowed parent and child values using structured mapping."""
if not metadata or not functional_area:
logger.error("No metadata or functional area provided")
raise ValueError("Metadata and functional area are required")
# Get the functional area field from metadata
func_field = metadata['all_fields'].get('customfield_13100', {})
if not func_field or 'allowedValues' not in func_field:
logger.error("Could not find functional area field in metadata")
raise ValueError("Functional area field not found in metadata")
# Build a set of allowed child values for faster lookup
allowed_values = {}
for parent in func_field['allowedValues']:
if isinstance(parent, dict):
parent_value = parent.get('value')
if parent_value and 'children' in parent:
for child in parent['children']:
if isinstance(child, dict) and 'value' in child:
allowed_values[child['value']] = parent_value
logger.info(f"Input functional area: {functional_area}")
# Split the functional area into parts
parts = [p.strip() for p in functional_area.split(' - ')]
logger.info(f"Split into parts: {parts}")
# Try different combinations of parts joined with '-'
for i in range(len(parts)):
for j in range(i + 1, len(parts) + 1):
# Try joining parts with '-'
test_value = '-'.join(parts[i:j])
# Also try without spaces
test_value_no_spaces = test_value.replace(' ', '')
logger.info(f"Trying combination: {test_value}")
# Check both versions (with and without spaces)
if test_value in allowed_values:
logger.info(f"Found exact match: {test_value}")
return allowed_values[test_value], test_value
elif test_value_no_spaces in allowed_values:
logger.info(f"Found match without spaces: {test_value_no_spaces}")
return allowed_values[test_value_no_spaces], test_value_no_spaces
# Try category-specific matches
categories = ['Services', 'FIN', 'WARPSPEED']
for category in categories:
category_value = f"{category}-{test_value}"
category_value_no_spaces = category_value.replace(' ', '')
if category_value in allowed_values:
logger.info(f"Found category match: {category_value}")
return allowed_values[category_value], category_value
elif category_value_no_spaces in allowed_values:
logger.info(f"Found category match without spaces: {category_value_no_spaces}")
return allowed_values[category_value_no_spaces], category_value_no_spaces
# If no match found, try to find a suitable default based on the first part
first_part = parts[0].upper()
if 'SERVICE' in first_part or 'SERVICES' in first_part:
logger.info("No exact match found, defaulting to Services-Platform")
return "R&I", "Services-Platform"
elif 'FIN' in first_part:
logger.info("No exact match found, defaulting to FIN-Parameters")
return "R&I", "FIN-Parameters"
elif 'WARPSPEED' in first_part:
logger.info("No exact match found, defaulting to WARPSPEED-Parameters")
return "R&I", "WARPSPEED-Parameters"
# Final fallback to Data Exchange
logger.warning(f"No suitable match found for '{functional_area}', defaulting to Data Exchange")
return "R&I", "Data Exchange"
def get_customer_field_values(metadata):
"""Extract all available customer field values and their child options from metadata"""
if not metadata or 'all_fields' not in metadata:
return {}
customer_field = metadata['all_fields'].get('customfield_10427', {})
customer_values = {}
if 'allowedValues' in customer_field:
for parent in customer_field['allowedValues']:
if isinstance(parent, dict):
parent_value = parent.get('value')
if parent_value:
child_values = []
if 'cascadingOptions' in parent:
child_values = [child.get('value') for child in parent['cascadingOptions'] if child.get('value')]
customer_values[parent_value] = child_values
return customer_values
def map_customer_value(environment_value, customer_values):
"""Map environment value to appropriate customer field values"""
if not environment_value or not customer_values:
return "MIP Research and Innovation", "R&I General"
# Clean up environment value
env_value = environment_value.strip()
# Special case handling for specific environments
if any(env in env_value.lower() for env in ['legalwise', 'scorpion', 'lifewise', 'talksure']):
parent_value = "ILR"
child_value = env_value # Use the original environment value as child
logger.info(f"Mapped {env_value} to ILR parent with child {child_value}")
return parent_value, child_value
# Handle RI environments
if env_value.startswith('RI'):
parent_value = "MIP Research and Innovation"
# Remove 'RI' prefix and clean up
child_value = env_value[2:].strip()
if child_value:
child_value = f"R&I {child_value}"
else:
child_value = "R&I General"
logger.info(f"Mapped RI environment {env_value} to {parent_value} parent with child {child_value}")
return parent_value, child_value
# Default case - try to find matching values
for parent, children in customer_values.items():
if parent == "MIP Research and Innovation": # Default parent
# Look for exact match in child values
if env_value in children:
return parent, env_value
# Look for partial matches
for child in children:
if env_value in child or child in env_value:
return parent, child
# If no match found, return defaults
logger.warning(f"No specific mapping found for {env_value}, using defaults")
return "MIP Research and Innovation", "R&I General"
def create_regression_task(project_key, summary, description, environment, filtered_scenarios_df):
logger.debug(f"Entering create_regression_task with project_key={project_key}, summary={summary}, environment={environment}, DF_shape={filtered_scenarios_df.shape}")
logger.info("=== Starting create_regression_task function ===")
logger.info(f"Project: {project_key}, Summary: {summary}, Environment: {environment}")
logger.info(f"Filtered DF shape: {filtered_scenarios_df.shape if filtered_scenarios_df is not None else 'None'}")
try:
# Get metadata first to access field values
metadata = get_project_metadata(project_key)
if not metadata:
error_msg = "Could not get project metadata"
logger.error(error_msg)
st.error(error_msg)
return None
# Get customer field values and map environment
customer_values = get_customer_field_values(metadata)
parent_value, child_value = map_customer_value(environment, customer_values)
logger.info(f"Mapped customer values - Parent: {parent_value}, Child: {child_value}")
# Get Jira client
if "jira_client" not in st.session_state:
error_msg = "No Jira client available. Please connect to Jira first."
logger.error(error_msg)
return None
jira_client = st.session_state.jira_client
logger.info("Got Jira client from session state")
# Get active sprint
active_sprint = get_current_sprint(get_regression_board(project_key)['id'])
if not active_sprint:
error_msg = "No active sprint found"
logger.error(error_msg)
return None
logger.info(f"Found active sprint: {active_sprint.name} (ID: {active_sprint.id})")
# Extract functional area from filtered scenarios
functional_areas = []
try:
if "Functional area" in filtered_scenarios_df.columns:
functional_areas = filtered_scenarios_df["Functional area"].unique().tolist()
logger.info(f"Extracted functional areas: {functional_areas}")
except Exception as e:
logger.exception(f"Error extracting functional areas: {str(e)}")
st.error(f"Error extracting functional areas: {str(e)}")
return None
# Calculate story points based on number of scenarios
story_points = calculate_story_points(len(filtered_scenarios_df))
logger.info(f"Calculated story points: {story_points}")
# Map functional area using metadata
functional_area_parent, functional_area_child = map_functional_area(
functional_areas[0] if functional_areas else "Data Exchange",
metadata
)
logger.info(f"Mapped functional area to parent: {functional_area_parent}, child: {functional_area_child}")
# Prepare issue dictionary with all required fields
issue_dict = {
"project": {"key": project_key},
"summary": summary,
"description": description,
"issuetype": {"name": "Story"},
"components": [{"name": "Maintenance (Regression)"}],
"customfield_10427": {
"value": parent_value,
"child": {
"value": child_value
}
},
"customfield_12730": {"value": "Non-Business Critical"}, # Regression Type field
"customfield_13430": {"value": str(len(filtered_scenarios_df))}, # Number of Scenarios
"customfield_13100": {
"value": functional_area_parent,
"child": {
"value": functional_area_child
}
},
"assignee": {"name": st.session_state.jira_username},
"customfield_10002": story_points # Story Points field
}
# Log the complete issue dictionary
logger.info(f"Issue dictionary prepared: {issue_dict}")
# Create the issue
logger.info("Attempting to create issue in Jira...")
try:
# Create the issue with all fields
new_issue = jira_client.create_issue(fields=issue_dict)
logger.info(f"Issue created successfully: {new_issue.key}")
# Add issue to sprint
try:
logger.info(f"Attempting to add issue {new_issue.key} to sprint {active_sprint.id}...")
jira_client.add_issues_to_sprint(active_sprint.id, [new_issue.key])
logger.info(f"Added issue {new_issue.key} to sprint {active_sprint.name}")
except Exception as sprint_error:
logger.exception(f"Failed to add issue to sprint: {str(sprint_error)}")
st.warning(f"⚠️ Could not add task to sprint. Error: {str(sprint_error)}")
# Display success message
st.success(f"βœ… Task created successfully: {new_issue.key}")
return new_issue
except Exception as create_error:
error_message = str(create_error)
logger.exception(f"Failed to create issue: {error_message}")
# Try to extract the response content if it's a JIRA Error
try:
if hasattr(create_error, 'response'):
status_code = getattr(create_error.response, 'status_code', 'N/A')
logger.error(f"Response status code: {status_code}")
if hasattr(create_error.response, 'text'):
response_text = create_error.response.text
logger.error(f"Response text: {response_text}")
# Display the error to the user
st.error(f"❌ Error creating task in Jira (Status: {status_code}):")
st.error(response_text)
except Exception as extract_error:
logger.exception(f"Error extracting response details: {str(extract_error)}")
return None
except Exception as e:
error_message = f"❌ Unexpected error in create_regression_task: {str(e)}"
logger.exception(error_message)
st.error(error_message)
logger.error(f"Traceback: {''.join(traceback.format_exception(type(e), e, e.__traceback__))}")
return None
def create_test_data():
"""Create test data for development/testing that matches the filtered scenarios from multiple.py"""
test_data = {
'Environment': ['RI2008'] * 7, # Same environment for all scenarios
'Functional area': ['Data Exchange - Enquiries - Reports'] * 7,
'Scenario Name': [
'Add Missions Error Handling - Existing Code',
'Add Missions Error Handling - Incorrect Max Iterations',
'Add Missions Error Handling - No Badges',
'Add Missions Success - Individual',
'Add Missions Success - Team',
'Add Missions Success - Individual Iteration',
'Add Missions Success - Team Max Iterations'
],
'Error Message': [
'AssertionError [ERR_ASSERTION]: Error handling for existing code failed',
'AssertionError [ERR_ASSERTION]: Error handling for max iterations failed',
'AssertionError [ERR_ASSERTION]: Error handling for missing badges failed',
'AssertionError [ERR_ASSERTION]: Link validation failed',
'AssertionError [ERR_ASSERTION]: Link validation failed',
'AssertionError [ERR_ASSERTION]: Link validation failed',
'AssertionError [ERR_ASSERTION]: Link validation failed'
],
'Status': ['FAILED'] * 7,
'Time spent(m:s)': ['02:30'] * 7, # Example time spent
'Start datetime': [datetime.now()] * 7 # Current time as example
}
# Create DataFrame
df = pd.DataFrame(test_data)
# Add metadata that will be used for Jira task creation
df.attrs['metadata'] = {
'Customer': 'MIP Research and Innovation - R&I 2008',
'Sprint': 'RS Sprint 195',
'Story Points': 5,
'Regression Type': 'Non-Business Critical',
'Component': 'Maintenance (Regression)',
'Priority': 'Lowest',
'Type': 'Story',
'Labels': 'None',
'Assignee': 'Daniel Akinsola',
'Reporter': 'Daniel Akinsola'
}
return df
def process_failures_button(filtered_scenarios_df, environment=None):
"""Process failures and create Jira task"""
# Use RS project key since we can see it's a Regression Sprint board
project_key = "RS"
project_name = "RS - Regression"
# Get environment from DataFrame if not provided
if environment is None and 'Environment' in filtered_scenarios_df.columns:
environment = filtered_scenarios_df['Environment'].iloc[0]
# Get unique functional areas from the DataFrame
functional_areas = filtered_scenarios_df['Functional area'].unique()
functional_area = functional_areas[0] if len(functional_areas) > 0 else 'R&I'
# Extract the main service category
service_category = functional_area.split(' - ')[0] + '-' + functional_area.split(' - ')[1]
service_category = service_category.replace(' ', '')
# Format environment value
env_number = environment[2:] if environment.startswith('RI') else environment
env_value = env_number if env_number.startswith('R&I') else f"R&I {env_number}"
# Get the current sprint from the regression board
board = get_regression_board(project_key)
sprint = None
sprint_name = "No Active Sprint"
if board:
sprint = get_current_sprint(board['id'])
if sprint:
sprint_name = sprint.name
st.write(f"Found active sprint: {sprint_name}")
else:
st.warning("No active sprint found")
# Create metadata dictionary with all required fields
metadata = {
'Project Key': project_key,
'Project': project_name,
'Issue Type': 'Story',
'Customer': 'MIP Research and Innovation',
'Environment': env_value,
'Functional Areas': service_category,
'Sprint': sprint_name,
'Story Points': calculate_story_points(len(filtered_scenarios_df)),
'Regression Type': 'Non-Business Critical',
'Number of Scenarios': len(filtered_scenarios_df) if len(filtered_scenarios_df) <= 50 else 50
}
# Initialize session states if not exists
if 'task_content' not in st.session_state:
st.session_state.task_content = None
if 'task_created' not in st.session_state:
st.session_state.task_created = False
if 'created_task' not in st.session_state:
st.session_state.created_task = None
if 'show_success' not in st.session_state:
st.session_state.show_success = False
if 'last_task_key' not in st.session_state:
st.session_state.last_task_key = None
if 'last_task_url' not in st.session_state:
st.session_state.last_task_url = None
# Store sprint information in session state for task creation
if sprint:
st.session_state.current_sprint = sprint
# If we have a recently created task, show the success message first
if st.session_state.show_success and st.session_state.last_task_key:
st.success(f"βœ… Task created successfully!")
# Display task link in a more prominent way
st.markdown(
f"""
<div style='padding: 10px; border-radius: 5px; border: 1px solid #90EE90; margin: 10px 0;'>
<h3 style='margin: 0; color: #90EE90;'>Task Details</h3>
<p style='margin: 10px 0;'>Task Key: {st.session_state.last_task_key}</p>
<a href='{st.session_state.last_task_url}' target='_blank'
style='background-color: #90EE90; color: black; padding: 5px 10px;
border-radius: 3px; text-decoration: none; display: inline-block;'>
View Task in Jira
</a>
</div>
""",
unsafe_allow_html=True
)
# Add a button to create another task
if st.button("Create Another Task", key="create_another"):
# Clear all task-related state
st.session_state.task_content = None
st.session_state.task_created = False
st.session_state.created_task = None
st.session_state.show_success = False
st.session_state.last_task_key = None
st.session_state.last_task_url = None
st.rerun()
return
# Button to generate content
if st.button("Generate Task Content"):
with st.spinner("Generating task content..."):
summary, description = generate_task_content(filtered_scenarios_df)
if summary and description:
st.session_state.task_content = {
'summary': summary,
'description': description,
'environment': environment,
'metadata': metadata
}
else:
st.error("Failed to generate task content. Please try again.")
return
# Display content and create task button if content exists
if st.session_state.task_content:
with st.expander("Generated Task Content", expanded=True):
# Summary section with styling
st.markdown("### Summary")
st.markdown(f"""
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629;'>
{st.session_state.task_content['summary']}
</div>
""", unsafe_allow_html=True)
# Description section with styling
st.markdown("### Description")
st.markdown(f"""
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; white-space: pre-wrap;'>
{st.session_state.task_content['description']}
</div>
""", unsafe_allow_html=True)
# Get and display available functional area values
display_functional_areas(st.session_state.task_content['metadata'])
# Display metadata with actual field values
st.markdown("### Fields to be Set")
metadata = st.session_state.task_content['metadata']
metadata_html = f"""
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629;'>
<p><strong>Project:</strong> {metadata['Project']}</p>
<p><strong>Issue Type:</strong> {metadata['Issue Type']}</p>
<p><strong>Customer:</strong> {metadata['Customer']}</p>
<p><strong>Environment:</strong> {metadata['Environment']}</p>
<p><strong>Functional Areas:</strong> {metadata['Functional Areas']}</p>
<p><strong>Sprint:</strong> {metadata['Sprint']}</p>
<p><strong>Story Points:</strong> {metadata['Story Points']}</p>
<p><strong>Regression Type:</strong> {metadata['Regression Type']}</p>
<p><strong>Number of Scenarios:</strong> {metadata['Number of Scenarios']}</p>
</div>
"""
st.markdown(metadata_html, unsafe_allow_html=True)
# Add buttons in columns for better layout
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ”„ Regenerate Content", key="regenerate"):
st.session_state.task_content = None
st.rerun()
with col2:
if st.button("πŸ“ Create Jira Task", key="create"):
task = create_regression_task(
metadata['Project Key'],
st.session_state.task_content['summary'],
st.session_state.task_content['description'],
st.session_state.task_content['environment'],
filtered_scenarios_df
)
if task:
# Store task information in session state
st.session_state.last_task_key = task.key
st.session_state.last_task_url = f"{JIRA_SERVER}/browse/{task.key}"
st.session_state.show_success = True
# Clear the content
st.session_state.task_content = None
# Force refresh of sprint stats on next load
st.session_state.force_sprint_refresh = True
st.rerun()
else:
st.error("Failed to create task. Please try again.")
def display_functional_areas(metadata):
"""Display functional areas and customer fields in a tree-like structure with styling"""
if not metadata:
st.error("No metadata available")
return
# If this is task metadata (not project metadata), get project metadata first
if 'all_fields' not in metadata:
project_metadata = get_project_metadata("RS") # RS is the fixed project key
if not project_metadata:
st.error("Could not fetch project metadata")
return
metadata = project_metadata
# Display Functional Areas
func_field = metadata['all_fields'].get('customfield_13100', {})
if func_field and 'allowedValues' in func_field:
st.markdown("### Available Functional Areas")
# Log the raw allowedValues for debugging
logger.info("=== Raw Functional Area Values ===")
for value in func_field['allowedValues']:
logger.info(f"Raw value: {value}")
# Create a dictionary to store parent-child relationships
parent_child_map = {}
# First pass: collect all parent-child relationships
for value in func_field['allowedValues']:
if isinstance(value, dict):
parent_value = value.get('value', 'Unknown')
if parent_value:
child_values = []
logger.info(f"\nProcessing parent: {parent_value}")
if 'cascadingOptions' in value:
logger.info(f"Found cascading options for {parent_value}:")
for child in value['cascadingOptions']:
logger.info(f"Raw child value: {child}")
if isinstance(child, dict) and child.get('value'):
child_value = child.get('value')
child_values.append(child_value)
logger.info(f" - Added child: {child_value}")
parent_child_map[parent_value] = sorted(child_values) if child_values else []
logger.info(f"Final children for {parent_value}: {parent_child_map[parent_value]}")
# Second pass: display the relationships
for parent_value in sorted(parent_child_map.keys()):
child_values = parent_child_map[parent_value]
# Create a styled box for each parent and its children
st.markdown(f"""
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; margin-bottom: 10px;'>
<strong>{parent_value}</strong>
{"<ul style='margin-bottom: 0; margin-top: 5px;'>" if child_values else ""}
""", unsafe_allow_html=True)
# Display child values if they exist
for child in child_values:
st.markdown(f"<li>{child}</li>", unsafe_allow_html=True)
if child_values:
st.markdown("</ul>", unsafe_allow_html=True)
st.markdown("</div>", unsafe_allow_html=True)
# Log the parent-child relationship for debugging
logger.info(f"Displaying Parent: {parent_value}")
if child_values:
logger.info(f" With Children: {', '.join(child_values)}")
else:
logger.info(" No children found")
else:
st.warning("No functional area values found in metadata")
logger.warning("No functional area values found in metadata")
if func_field:
logger.info("Available func_field keys: " + str(list(func_field.keys())))
# Display Customer Field
cust_field = metadata['all_fields'].get('customfield_10427', {})
if cust_field and 'allowedValues' in cust_field:
st.markdown("### Available Customer Values")
# Log the raw allowedValues for debugging
logger.info("=== Raw Customer Field Values ===")
for value in cust_field['allowedValues']:
logger.info(f"Raw value: {value}")
# Create a dictionary to store parent-child relationships for customer field
customer_parent_child_map = {}
# First pass: collect all parent-child relationships
for value in cust_field['allowedValues']:
if isinstance(value, dict):
parent_value = value.get('value', 'Unknown')
if parent_value:
child_values = []
logger.info(f"\nProcessing customer parent: {parent_value}")
if 'cascadingOptions' in value:
logger.info(f"Found customer cascading options for {parent_value}:")
for child in value['cascadingOptions']:
logger.info(f"Raw child value: {child}")
if isinstance(child, dict) and child.get('value'):
child_value = child.get('value')
child_values.append(child_value)
logger.info(f" - Added child: {child_value}")
customer_parent_child_map[parent_value] = sorted(child_values) if child_values else []
logger.info(f"Final customer children for {parent_value}: {customer_parent_child_map[parent_value]}")
# Second pass: display the relationships
for parent_value in sorted(customer_parent_child_map.keys()):
child_values = customer_parent_child_map[parent_value]
# Create a styled box for each parent and its children
st.markdown(f"""
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; margin-bottom: 10px;'>
<strong>{parent_value}</strong>
{"<ul style='margin-bottom: 0; margin-top: 5px;'>" if child_values else ""}
""", unsafe_allow_html=True)
# Display child values if they exist
for child in child_values:
st.markdown(f"<li>{child}</li>", unsafe_allow_html=True)
if child_values:
st.markdown("</ul>", unsafe_allow_html=True)
st.markdown("</div>", unsafe_allow_html=True)
# Log the parent-child relationship for debugging
logger.info(f"Displaying Customer Parent: {parent_value}")
if child_values:
logger.info(f" With Children: {', '.join(child_values)}")
else:
logger.info(" No children found")
else:
st.warning("No customer field values found in metadata")
logger.warning("No customer field values found in metadata")
if cust_field:
logger.info("Available cust_field keys: " + str(list(cust_field.keys())))
def display_story_points_stats(force_refresh=False):
"""Display story points statistics from current sprint"""
if not st.session_state.jira_client:
return
# Initialize session state for sprint data if not exists
if 'sprint_data' not in st.session_state:
st.session_state.sprint_data = None
# Initialize refresh timestamp if not exists
if 'last_sprint_refresh' not in st.session_state:
st.session_state.last_sprint_refresh = None
try:
# Only fetch data if forced refresh, no data exists, or refresh timestamp is old
current_time = datetime.now()
refresh_needed = (
force_refresh or
st.session_state.sprint_data is None or
(st.session_state.last_sprint_refresh and
(current_time - st.session_state.last_sprint_refresh).total_seconds() > 300) # 5 minutes cache
)
if refresh_needed:
with st.spinner("Fetching sprint data..."):
# Get regression board
board = get_regression_board("RS")
if not board:
return
# Get current sprint
sprint = get_current_sprint(board['id'])
if not sprint:
return
# Get sprint issues
issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field'])
if not issues:
return
# Calculate points
issues_data, total_points, completed_points, in_progress_points = calculate_points(issues, board['estimation_field'])
# Store in session state
st.session_state.sprint_data = {
'sprint_name': sprint.name,
'total_points': total_points,
'completed_points': completed_points,
'in_progress_points': in_progress_points,
'timestamp': current_time
}
st.session_state.last_sprint_refresh = current_time
# Display data from session state
if st.session_state.sprint_data:
sprint_data = st.session_state.sprint_data
# Create compact metrics display using custom HTML/CSS
st.markdown(f"""
<div style='background-color: #1E1E1E; padding: 10px; border-radius: 5px; margin-bottom: 10px;'>
<div style='font-size: 0.8em; color: #E0E0E0; margin-bottom: 8px;'>Current Sprint: {sprint_data['sprint_name']}</div>
<div style='display: grid; grid-template-columns: repeat(4, 1fr); gap: 5px; font-size: 0.9em;'>
<div style='text-align: center;'>
<div style='color: #E0E0E0;'>Total</div>
<div style='font-size: 1.2em; font-weight: bold;'>{sprint_data['total_points']:.1f}</div>
</div>
<div style='text-align: center;'>
<div style='color: #E0E0E0;'>Done</div>
<div style='font-size: 1.2em; font-weight: bold;'>{sprint_data['completed_points']:.1f}</div>
</div>
<div style='text-align: center;'>
<div style='color: #E0E0E0;'>In Progress</div>
<div style='font-size: 1.2em; font-weight: bold;'>{sprint_data['in_progress_points']:.1f}</div>
</div>
<div style='text-align: center;'>
<div style='color: #E0E0E0;'>Complete</div>
<div style='font-size: 1.2em; font-weight: bold;'>{(sprint_data['completed_points'] / sprint_data['total_points'] * 100) if sprint_data['total_points'] > 0 else 0:.1f}%</div>
</div>
</div>
</div>
""", unsafe_allow_html=True)
# Show progress bar
progress = sprint_data['completed_points'] / sprint_data['total_points'] if sprint_data['total_points'] > 0 else 0
st.progress(progress)
# Add refresh button with key based on timestamp to prevent rerendering
refresh_key = f"refresh_stats_{datetime.now().strftime('%Y%m%d%H%M%S')}"
if st.button("πŸ”„ Refresh", key=refresh_key, use_container_width=True):
# Use a session state flag to trigger refresh on next rerun
st.session_state.force_sprint_refresh = True
st.rerun()
except Exception as e:
st.error(f"Error updating story points: {str(e)}")
# Check if we need to force refresh (from button click)
if 'force_sprint_refresh' in st.session_state and st.session_state.force_sprint_refresh:
st.session_state.force_sprint_refresh = False
return display_story_points_stats(force_refresh=True)
def main():
st.title("Jira Integration Test")
# Add test data button
if st.button("Load Test Data"):
st.session_state.filtered_scenarios_df = create_test_data()
st.success("Test data loaded!")
is_authenticated = render_jira_login()
if is_authenticated and st.session_state.projects:
# Fixed project and board selection
project_key = "RS"
board_type = "scrum"
board_name = "Regression Sprints"
# Display fixed selections in a more compact way
st.markdown("""
<div style='display: flex; gap: 10px; margin-bottom: 15px; font-size: 0.9em;'>
<div style='flex: 1;'>
<div style='color: #E0E0E0; margin-bottom: 4px;'>Project</div>
<div style='background-color: #262730; padding: 5px 8px; border-radius: 4px; font-size: 0.9em;'>RS - Regression</div>
</div>
<div style='flex: 1;'>
<div style='color: #E0E0E0; margin-bottom: 4px;'>Board</div>
<div style='background-color: #262730; padding: 5px 8px; border-radius: 4px; font-size: 0.9em;'>Regression Sprints (scrum)</div>
</div>
</div>
""", unsafe_allow_html=True)
# Display sprint stats (only fetch if no data exists)
display_story_points_stats(force_refresh=False)
# Show test data if loaded
if 'filtered_scenarios_df' in st.session_state:
st.subheader("Failed Scenarios")
st.dataframe(st.session_state.filtered_scenarios_df)
# Get environment directly from the DataFrame
if 'Environment' in st.session_state.filtered_scenarios_df.columns:
environment = st.session_state.filtered_scenarios_df['Environment'].iloc[0]
st.info(f"Using environment from data: {environment}")
process_failures_button(st.session_state.filtered_scenarios_df, environment)
else:
st.error("No environment information found in the data")
# Add project fields button at the bottom
if st.button("Show Project Fields"):
display_project_fields()
if __name__ == "__main__":
main()