Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import sys | |
import traceback | |
from datetime import datetime | |
import streamlit as st | |
from jira import JIRA | |
from dotenv import load_dotenv | |
from datetime import datetime, timedelta | |
import pandas as pd | |
import requests | |
import json | |
from groq import Groq | |
from difflib import SequenceMatcher | |
import time | |
# Configure logging based on environment | |
try: | |
# Try to create logs directory and file | |
log_dir = "logs" | |
if not os.path.exists(log_dir): | |
os.makedirs(log_dir) | |
log_file = os.path.join(log_dir, f"jira_debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") | |
# Configure root logger with file handler | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file) | |
] | |
) | |
except (OSError, IOError): | |
# If file logging fails (e.g., in Hugging Face Spaces), configure logging without file handler | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.NullHandler() | |
] | |
) | |
logger = logging.getLogger("jira_integration") | |
logger.info("Jira integration module loaded") | |
# Load environment variables | |
load_dotenv() | |
# Get API keys and configuration with default values for development | |
JIRA_SERVER = os.getenv("JIRA_SERVER") | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
# Validate required environment variables | |
if not JIRA_SERVER: | |
st.error("JIRA_SERVER not found in environment variables. Please check your .env file.") | |
if not GROQ_API_KEY: | |
st.error("GROQ_API_KEY not found in environment variables. Please check your .env file.") | |
def init_jira_session(): | |
"""Initialize Jira session state variables""" | |
if 'jira_client' not in st.session_state: | |
st.session_state.jira_client = None | |
if 'projects' not in st.session_state: | |
st.session_state.projects = None | |
def get_projects(): | |
"""Fetch all accessible projects""" | |
if not st.session_state.jira_client: | |
return None | |
try: | |
projects = st.session_state.jira_client.projects() | |
# Sort projects by key | |
return sorted(projects, key=lambda x: x.key) | |
except Exception as e: | |
st.error(f"Error fetching projects: {str(e)}") | |
return None | |
def get_board_configuration(board_id): | |
"""Fetch board configuration including estimation field""" | |
if not st.session_state.jira_client: | |
return None | |
try: | |
url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/configuration" | |
response = st.session_state.jira_client._session.get(url) | |
if response.status_code == 200: | |
config = response.json() | |
return config | |
return None | |
except Exception as e: | |
st.error(f"Error fetching board configuration: {str(e)}") | |
return None | |
def get_boards(project_key): | |
"""Fetch all boards for a project""" | |
if not st.session_state.jira_client: | |
return None | |
try: | |
boards = st.session_state.jira_client.boards(projectKeyOrID=project_key) | |
board_details = [] | |
for board in boards: | |
config = get_board_configuration(board.id) | |
board_type = config.get('type', 'Unknown') if config else 'Unknown' | |
estimation_field = None | |
if config and 'estimation' in config: | |
estimation_field = config['estimation'].get('field', {}).get('fieldId') | |
board_details.append({ | |
'id': board.id, | |
'name': board.name, | |
'type': board_type, | |
'estimation_field': estimation_field | |
}) | |
return board_details | |
except Exception as e: | |
st.error(f"Error fetching boards: {str(e)}") | |
return None | |
def get_current_sprint(board_id): | |
"""Fetch the current sprint for a board""" | |
if not st.session_state.jira_client: | |
return None | |
try: | |
# Get all active and future sprints | |
sprints = st.session_state.jira_client.sprints(board_id, state='active,future') | |
if sprints: | |
# Look for sprints starting with 'RS' | |
rs_sprints = [sprint for sprint in sprints if sprint.name.startswith('RS')] | |
if rs_sprints: | |
# Sort sprints by name to get the latest one | |
latest_sprint = sorted(rs_sprints, key=lambda x: x.name, reverse=True)[0] | |
return latest_sprint | |
else: | |
st.warning("No RS sprints found. Available sprints: " + ", ".join([s.name for s in sprints])) | |
return None | |
except Exception as e: | |
if "board does not support sprints" in str(e).lower(): | |
return None | |
st.error(f"Error fetching current sprint: {str(e)}") | |
return None | |
def get_board_issues(board_id, estimation_field=None): | |
"""Fetch all issues on the board using Agile REST API""" | |
if not st.session_state.jira_client: | |
return None | |
try: | |
url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/issue" | |
fields = ['summary', 'status', 'created', 'description', 'issuetype', 'assignee'] | |
if estimation_field: | |
fields.append(estimation_field) | |
params = { | |
'maxResults': 200, | |
'fields': fields, | |
'jql': f'assignee = currentUser()' | |
} | |
response = st.session_state.jira_client._session.get(url, params=params) | |
if response.status_code != 200: | |
st.error(f"Error fetching board issues: {response.text}") | |
return None | |
data = response.json() | |
issues = [] | |
for issue_data in data['issues']: | |
issue = st.session_state.jira_client.issue(issue_data['key']) | |
issues.append(issue) | |
return issues | |
except Exception as e: | |
st.error(f"Error fetching board issues: {str(e)}") | |
return None | |
def get_sprint_issues(board_id, sprint_id, estimation_field=None): | |
"""Fetch all issues in the current sprint using Agile REST API""" | |
if not st.session_state.jira_client: | |
return None | |
try: | |
# Cache key for sprint issues | |
cache_key = f"sprint_issues_{sprint_id}" | |
if cache_key in st.session_state and (datetime.now() - st.session_state.get('last_refresh', datetime.min)).total_seconds() < 60: | |
return st.session_state[cache_key] | |
# Use the Agile REST API endpoint to get issues from the sprint | |
url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/sprint/{sprint_id}/issue" | |
params = { | |
'maxResults': 200, | |
'fields': [ | |
'summary', | |
'status', | |
'created', | |
'description', | |
'issuetype', | |
'assignee' | |
], | |
'jql': 'assignee = currentUser()' # Filter for current user's issues | |
} | |
# Add estimation field if provided | |
if estimation_field: | |
params['fields'].append(estimation_field) | |
# Make a single API call with all required fields | |
response = st.session_state.jira_client._session.get(url, params=params) | |
if response.status_code != 200: | |
st.error(f"Error fetching sprint issues: {response.text}") | |
return None | |
# Process all issues in one go | |
data = response.json() | |
issues = [st.session_state.jira_client.issue(issue['key']) for issue in data['issues']] | |
# Cache the results | |
st.session_state[cache_key] = issues | |
st.session_state['last_refresh'] = datetime.now() | |
return issues | |
except Exception as e: | |
st.error(f"Error fetching sprint issues: {str(e)}") | |
return None | |
def calculate_points(issues, estimation_field): | |
"""Calculate story points from issues""" | |
if not estimation_field: | |
return [], 0, 0, 0 | |
try: | |
# Process all issues at once | |
field_id = estimation_field.replace('customfield_', '') | |
issues_data = [] | |
total_points = completed_points = in_progress_points = 0 | |
# Create status mappings for faster lookup | |
done_statuses = {'done', 'completed', 'closed'} | |
progress_statuses = {'in progress', 'development', 'in development'} | |
for issue in issues: | |
try: | |
# Get points value efficiently | |
points = getattr(issue.fields, field_id, None) or getattr(issue.fields, estimation_field, 0) | |
points = float(points) if points is not None else 0 | |
# Get status efficiently | |
status_name = issue.fields.status.name.lower() | |
# Update points | |
total_points += points | |
if status_name in done_statuses: | |
completed_points += points | |
elif status_name in progress_statuses: | |
in_progress_points += points | |
# Build issue data | |
issues_data.append({ | |
"Key": issue.key, | |
"Type": issue.fields.issuetype.name, | |
"Summary": issue.fields.summary, | |
"Status": issue.fields.status.name, | |
"Story Points": points, | |
"Assignee": issue.fields.assignee.displayName if issue.fields.assignee else "Unassigned" | |
}) | |
except Exception as e: | |
st.error(f"Error processing points for issue {issue.key}: {str(e)}") | |
continue | |
return issues_data, total_points, completed_points, in_progress_points | |
except Exception as e: | |
st.error(f"Error calculating points: {str(e)}") | |
return [], 0, 0, 0 | |
def create_maintenance_task(project_key, summary, description, issue_type='Task'): | |
"""Create a task in Jira""" | |
if not st.session_state.jira_client: | |
st.error("Not authenticated with Jira. Please log in first.") | |
return None | |
try: | |
issue_dict = { | |
'project': {'key': project_key}, | |
'summary': summary, | |
'description': description, | |
'issuetype': {'name': issue_type} | |
} | |
new_issue = st.session_state.jira_client.create_issue(fields=issue_dict) | |
return new_issue | |
except Exception as e: | |
st.error(f"Error creating task: {str(e)}") | |
return None | |
def render_jira_login(): | |
"""Render the Jira login form and handle authentication""" | |
# If already authenticated, just return True | |
if 'jira_client' in st.session_state and st.session_state.jira_client: | |
st.success("Connected to Jira") | |
return True | |
# Initialize session state for login form and attempts tracking | |
if 'jira_username' not in st.session_state: | |
st.session_state.jira_username = "" | |
if 'jira_password' not in st.session_state: | |
st.session_state.jira_password = "" | |
if 'login_blocked_until' not in st.session_state: | |
st.session_state.login_blocked_until = None | |
# Check if login is temporarily blocked | |
if st.session_state.login_blocked_until: | |
if datetime.now() < st.session_state.login_blocked_until: | |
wait_time = (st.session_state.login_blocked_until - datetime.now()).seconds | |
st.error(f"Login temporarily blocked due to too many failed attempts. Please wait {wait_time} seconds before trying again.") | |
st.info("If you need immediate access, please try logging in directly to Jira in your browser first, complete the CAPTCHA there, then return here.") | |
return False | |
else: | |
st.session_state.login_blocked_until = None | |
# Create login form | |
with st.form(key="jira_login_form"): | |
username = st.text_input("Jira Username", value=st.session_state.jira_username, key="username_input") | |
password = st.text_input("Password", value=st.session_state.jira_password, type="password", key="password_input") | |
submit_button = st.form_submit_button(label="Login") | |
if submit_button: | |
# Store credentials in session state | |
st.session_state.jira_username = username | |
st.session_state.jira_password = password | |
# Try to authenticate | |
try: | |
jira_client = JIRA(server=JIRA_SERVER, basic_auth=(username, password)) | |
st.session_state.jira_client = jira_client | |
# Get projects | |
projects = get_projects() | |
if projects: | |
st.session_state.projects = projects | |
st.success("Connected to Jira") | |
return True | |
else: | |
st.error("Failed to fetch projects") | |
return False | |
except Exception as e: | |
error_message = str(e).lower() | |
if "captcha_challenge" in error_message: | |
# Set a 5-minute block on login attempts | |
st.session_state.login_blocked_until = datetime.now() + timedelta(minutes=5) | |
st.error("Too many failed login attempts. Please try one of the following:") | |
st.info(""" | |
1. Wait 5 minutes before trying again | |
2. Log in to Jira in your browser first, complete the CAPTCHA there, then return here | |
3. Clear your browser cookies and try again | |
""") | |
else: | |
st.error(f"Authentication failed: {str(e)}") | |
return False | |
return False | |
def get_cached_metadata(project_key): | |
"""Get cached metadata or fetch new if cache is expired or doesn't exist""" | |
# Check if metadata cache exists in session state | |
if 'metadata_cache' not in st.session_state: | |
st.session_state.metadata_cache = {} | |
if 'metadata_cache_timestamp' not in st.session_state: | |
st.session_state.metadata_cache_timestamp = {} | |
current_time = datetime.now() | |
cache_expiry = timedelta(minutes=30) # Cache expires after 30 minutes | |
# Check if we have valid cached metadata | |
if (project_key in st.session_state.metadata_cache and | |
project_key in st.session_state.metadata_cache_timestamp and | |
current_time - st.session_state.metadata_cache_timestamp[project_key] < cache_expiry): | |
logger.info(f"Using cached metadata for project {project_key}") | |
return st.session_state.metadata_cache[project_key] | |
# If no valid cache, fetch new metadata | |
logger.info(f"Fetching fresh metadata for project {project_key}") | |
metadata = get_project_metadata_fresh(project_key) | |
if metadata: | |
# Update cache | |
st.session_state.metadata_cache[project_key] = metadata | |
st.session_state.metadata_cache_timestamp[project_key] = current_time | |
logger.info(f"Updated metadata cache for project {project_key}") | |
return metadata | |
def get_project_metadata_fresh(project_key): | |
"""Fetch fresh metadata from Jira without using cache""" | |
logger.info(f"=== Getting fresh metadata for project {project_key} ===") | |
if not st.session_state.jira_client: | |
logger.error("Not authenticated with Jira. Please log in first.") | |
st.error("Not authenticated with Jira. Please log in first.") | |
return None | |
try: | |
# Get project | |
logger.info("Getting project...") | |
project = st.session_state.jira_client.project(project_key) | |
logger.info(f"Got project: {project.name}") | |
logger.info("Getting createmeta with expanded fields...") | |
# Get create metadata for the project with expanded field info | |
metadata = st.session_state.jira_client.createmeta( | |
projectKeys=project_key, | |
expand='projects.issuetypes.fields', | |
issuetypeNames='Story' # Specifically get Story type fields | |
) | |
logger.info("Got createmeta response") | |
if not metadata.get('projects'): | |
logger.error(f"No metadata found for project {project_key}") | |
st.error(f"No metadata found for project {project_key}") | |
return None | |
project_meta = metadata['projects'][0] | |
issue_types = project_meta.get('issuetypes', []) | |
# Log available issue types | |
logger.info(f"Available issue types: {[t.get('name') for t in issue_types]}") | |
# Try to get Story issue type first | |
story_type = next((t for t in issue_types if t['name'] == 'Story'), None) | |
if not story_type: | |
logger.error("Story issue type not found in project") | |
st.error("Story issue type not found in project") | |
return None | |
logger.info("Processing fields...") | |
# Get required fields and all fields | |
required_fields = {} | |
all_fields = {} | |
# Log all available fields before processing | |
logger.info("Available fields in Story type:") | |
for field_id, field in story_type['fields'].items(): | |
field_name = field.get('name', 'Unknown') | |
field_type = field.get('schema', {}).get('type', 'Unknown') | |
logger.info(f"Field: {field_name} (ID: {field_id}, Type: {field_type})") | |
# Store complete field information including schema and allowed values | |
all_fields[field_id] = { | |
'name': field['name'], | |
'required': field.get('required', False), | |
'schema': field.get('schema', {}), | |
'allowedValues': field.get('allowedValues', []), | |
'hasDefaultValue': field.get('hasDefaultValue', False), | |
'defaultValue': field.get('defaultValue'), | |
'operations': field.get('operations', []), | |
'configuration': field.get('configuration', {}) | |
} | |
# If this is a cascading select field, log its structure | |
if field.get('schema', {}).get('type') == 'option-with-child': | |
logger.info(f"Found cascading select field: {field_name}") | |
if 'allowedValues' in field: | |
for parent in field['allowedValues']: | |
parent_value = parent.get('value', 'Unknown') | |
logger.info(f" Parent value: {parent_value}") | |
if 'cascadingOptions' in parent: | |
child_values = [child.get('value') for child in parent['cascadingOptions']] | |
logger.info(f" Child values: {child_values}") | |
# Store required fields separately | |
if field.get('required', False): | |
required_fields[field_id] = all_fields[field_id] | |
logger.info(f"Required field: {field_name}") | |
logger.info(f"Found {len(all_fields)} total fields, {len(required_fields)} required fields") | |
metadata_result = { | |
'project_name': project.name, | |
'issue_type': 'Story', | |
'required_fields': required_fields, | |
'all_fields': all_fields | |
} | |
logger.info("Successfully processed project metadata") | |
return metadata_result | |
except Exception as e: | |
logger.exception(f"Error getting project metadata: {str(e)}") | |
st.error(f"Error getting project metadata: {str(e)}") | |
st.error("Full error details:") | |
st.error(str(e)) | |
st.code(traceback.format_exc(), language="python") | |
return None | |
def get_project_metadata(project_key): | |
"""Get project metadata, using cache if available""" | |
return get_cached_metadata(project_key) | |
def generate_task_content(filtered_scenarios_df): | |
"""Generate task summary and description using template-based approach""" | |
try: | |
# Extract key information | |
environment = filtered_scenarios_df['Environment'].iloc[0] | |
functional_area = filtered_scenarios_df['Functional area'].iloc[0] | |
scenario_count = len(filtered_scenarios_df) | |
# Generate summary | |
summary = f"Maintenance: {environment} - {functional_area}" | |
# Generate description | |
description = "Performing maintenance on the following scenarios failing :\n\n" | |
# Add each scenario and its error, using enumerate to start from 1 | |
for i, (_, row) in enumerate(filtered_scenarios_df.iterrows(), 1): | |
description += f"{i}. {row['Scenario Name']}\n" | |
description += f" Error: {row['Error Message']}\n\n" | |
return summary, description | |
except Exception as e: | |
st.error(f"Error generating task content: {str(e)}") | |
return None, None | |
def get_regression_board(project_key): | |
"""Find the regression sprint board for the project""" | |
boards = get_boards(project_key) | |
if not boards: | |
return None | |
# Look specifically for the "Regression Sprints" board | |
regression_board = next((b for b in boards if b['name'].lower() == 'regression sprints' and b['type'].lower() == 'scrum'), None) | |
if not regression_board: | |
st.error("Could not find the 'Regression Sprints' board. Available boards: " + | |
", ".join([f"{b['name']} ({b['type']})" for b in boards])) | |
return regression_board | |
def get_field_dependencies(): | |
"""Cache and return field dependencies and their allowed values""" | |
if 'field_dependencies' not in st.session_state: | |
try: | |
# Get project metadata for RS project | |
metadata = get_project_metadata("RS") | |
if not metadata: | |
return None | |
# Initialize dependencies dictionary with correct field IDs | |
dependencies = { | |
'Customer': { | |
'field_id': 'customfield_10427', | |
'values': [], | |
'dependencies': {} | |
}, | |
'Environment': { | |
'field_id': 'customfield_14157', # Updated field ID | |
'values': [], | |
'dependencies': {} | |
}, | |
'Functional Areas': { | |
'field_id': 'customfield_15303', # Updated field ID | |
'values': [], | |
'dependencies': {} | |
} | |
} | |
# Get field values and their dependencies | |
for field_name, field_info in dependencies.items(): | |
field_id = field_info['field_id'] | |
if field_id in metadata['all_fields']: | |
field_data = metadata['all_fields'][field_id] | |
if 'allowedValues' in field_data: | |
# Store allowed values | |
dependencies[field_name]['values'] = [ | |
value.get('value', value.get('name', '')) | |
for value in field_data['allowedValues'] | |
if isinstance(value, dict) | |
] | |
# Store dependencies (if any) | |
if 'dependency' in field_data: | |
dep_field = field_data['dependency'] | |
dependencies[field_name]['dependencies'] = { | |
'field': dep_field['field']['name'], | |
'field_id': dep_field['field']['id'], | |
'values': dep_field.get('values', []) | |
} | |
# Cache the dependencies | |
st.session_state.field_dependencies = dependencies | |
return dependencies | |
except Exception as e: | |
st.error(f"Error fetching field dependencies: {str(e)}") | |
return None | |
return st.session_state.field_dependencies | |
def get_dependent_field_value(field_name, parent_value=None): | |
"""Get the appropriate field value based on dependencies""" | |
dependencies = get_field_dependencies() | |
if not dependencies or field_name not in dependencies: | |
return None | |
field_info = dependencies[field_name] | |
# If this field depends on another field | |
if parent_value and field_info['dependencies']: | |
dep_info = field_info['dependencies'] | |
# Find values that match the parent value | |
for value_mapping in dep_info.get('values', []): | |
if value_mapping.get('parent') == parent_value: | |
return value_mapping.get('value') | |
# If no dependency or no match, return first available value | |
return field_info['values'][0] if field_info['values'] else None | |
def display_project_fields(): | |
"""Display available fields for issue creation""" | |
project_key = "RS" # Using the fixed project key | |
metadata = get_project_metadata(project_key) | |
if metadata: | |
st.subheader("Project Fields") | |
# Display required fields | |
st.write("### Required Fields") | |
for field_id, field in metadata['required_fields'].items(): | |
st.write(f"- {field['name']} ({field_id})") | |
if field.get('allowedValues'): | |
st.write(" Allowed values:") | |
for value in field['allowedValues']: | |
if isinstance(value, dict): | |
# Handle cascading select fields | |
if 'cascadingOptions' in value: | |
parent_value = value.get('value', 'Unknown') | |
st.write(f" - {parent_value}") | |
st.write(" Child options:") | |
for child in value['cascadingOptions']: | |
st.write(f" - {child.get('value', 'Unknown')}") | |
else: | |
st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") | |
else: | |
st.write(f" - {value}") | |
# Display custom fields with dependencies | |
st.write("### Custom Fields and Dependencies") | |
# Customer field (customfield_10427) - Cascading Select | |
st.write("\n#### Customer (customfield_10427)") | |
cust_field = metadata['all_fields'].get('customfield_10427', {}) | |
if cust_field.get('allowedValues'): | |
st.write("Cascading options:") | |
for value in cust_field['allowedValues']: | |
if isinstance(value, dict): | |
parent_value = value.get('value', 'Unknown') | |
st.write(f"- {parent_value}") | |
if 'cascadingOptions' in value: | |
st.write(" Child options:") | |
for child in value['cascadingOptions']: | |
st.write(f" - {child.get('value', 'Unknown')}") | |
# Functional Areas field (customfield_13100) - Cascading Select | |
st.write("\n#### Functional Areas (customfield_13100)") | |
func_field = metadata['all_fields'].get('customfield_13100', {}) | |
if func_field.get('allowedValues'): | |
st.write("Cascading options:") | |
for value in func_field['allowedValues']: | |
if isinstance(value, dict): | |
parent_value = value.get('value', 'Unknown') | |
st.write(f"- {parent_value}") | |
if 'cascadingOptions' in value: | |
st.write(" Child options:") | |
for child in value['cascadingOptions']: | |
st.write(f" - {child.get('value', 'Unknown')}") | |
# Environment field (customfield_14157) | |
st.write("\n#### Environment (customfield_14157)") | |
env_field = metadata['all_fields'].get('customfield_14157', {}) | |
if env_field.get('allowedValues'): | |
st.write("Allowed values:") | |
for value in env_field['allowedValues']: | |
if isinstance(value, dict): | |
st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") | |
# Display dependencies | |
if any(field.get('dependency') for field in [env_field, func_field, cust_field]): | |
st.write("\n### Field Dependencies") | |
for field_name, field in [ | |
('Environment', env_field), | |
('Functional Areas', func_field), | |
('Customer', cust_field) | |
]: | |
if field.get('dependency'): | |
st.write(f"\n{field_name} depends on:") | |
dep = field['dependency'] | |
st.write(f" Field: {dep['field']['name']} ({dep['field']['id']})") | |
if dep.get('values'): | |
st.write(" Value mappings:") | |
for mapping in dep['values']: | |
parent_value = mapping.get('parent', 'Unknown') | |
child_value = mapping.get('value', 'Unknown') | |
st.write(f" - When parent is '{parent_value}' β '{child_value}'") | |
# Display other custom fields | |
st.write("\n### Other Custom Fields") | |
excluded_fields = ['customfield_14157', 'customfield_13100', 'customfield_10427'] | |
custom_fields = {k: v for k, v in metadata['all_fields'].items() | |
if k.startswith('customfield_') and k not in excluded_fields} | |
for field_id, field in custom_fields.items(): | |
st.write(f"- {field['name']} ({field_id})") | |
if field.get('allowedValues'): | |
st.write(" Allowed values:") | |
for value in field.get('allowedValues', []): | |
if isinstance(value, dict): | |
if 'cascadingOptions' in value: | |
parent_value = value.get('value', 'Unknown') | |
st.write(f" - {parent_value}") | |
st.write(" Child options:") | |
for child in value['cascadingOptions']: | |
st.write(f" - {child.get('value', 'Unknown')}") | |
else: | |
st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") | |
else: | |
st.write(f" - {value}") | |
def get_closest_match(target, choices, threshold=60): | |
""" | |
Find the closest matching string from choices using fuzzy matching. | |
Returns the best match if similarity is above threshold, otherwise None. | |
""" | |
if not choices: | |
return None | |
try: | |
def similarity(a, b): | |
# Normalize strings for comparison | |
a = a.lower().replace('-', ' ').replace(' ', ' ').strip() | |
b = b.lower().replace('-', ' ').replace(' ', ' ').strip() | |
return SequenceMatcher(None, a, b).ratio() * 100 | |
# Calculate similarities | |
similarities = [(choice, similarity(target, choice)) for choice in choices] | |
# Sort by similarity score | |
best_match = max(similarities, key=lambda x: x[1]) | |
if best_match[1] >= threshold: | |
return best_match[0] | |
return None | |
except Exception as e: | |
st.error(f"Error in fuzzy matching: {str(e)}") | |
return None | |
def get_functional_area_values(metadata): | |
"""Extract all available functional area values from metadata""" | |
logger.info("=== Starting get_functional_area_values ===") | |
if not metadata: | |
logger.error("No metadata provided") | |
return [] | |
if 'all_fields' not in metadata: | |
logger.error("No 'all_fields' in metadata") | |
logger.debug(f"Available metadata keys: {list(metadata.keys())}") | |
return [] | |
# Log all available field IDs for debugging | |
logger.info("Available fields:") | |
for field_id, field in metadata['all_fields'].items(): | |
field_name = field.get('name', 'Unknown') | |
field_type = field.get('schema', {}).get('type', 'Unknown') | |
logger.info(f" {field_name} (ID: {field_id}, Type: {field_type})") | |
# List of possible field IDs for functional areas | |
functional_area_field_ids = [ | |
'customfield_15303', # New field ID | |
'customfield_13100', # Old field ID | |
'customfield_13101' # Another possible variation | |
] | |
# Try to find the functional area field by name or ID | |
func_field = None | |
for field_id, field in metadata['all_fields'].items(): | |
field_name = field.get('name', '').lower() | |
if field_id in functional_area_field_ids or 'functional area' in field_name: | |
func_field = field | |
logger.info(f"Found functional area field: {field.get('name')} (ID: {field_id})") | |
break | |
if not func_field: | |
logger.error("Could not find functional area field in metadata") | |
logger.info("Available field names:") | |
for field_id, field in metadata['all_fields'].items(): | |
logger.info(f" {field.get('name', 'Unknown')} ({field_id})") | |
return [] | |
# Check field type | |
field_type = func_field.get('schema', {}).get('type') | |
logger.info(f"Functional area field type: {field_type}") | |
allowed_values = [] | |
if 'allowedValues' in func_field: | |
logger.info("Processing allowed values...") | |
for parent in func_field['allowedValues']: | |
if isinstance(parent, dict): | |
parent_value = parent.get('value', 'Unknown') | |
logger.info(f"Processing parent value: {parent_value}") | |
if 'cascadingOptions' in parent: | |
for child in parent['cascadingOptions']: | |
if isinstance(child, dict) and 'value' in child: | |
allowed_values.append(child['value']) | |
logger.debug(f"Added child value: {child['value']}") | |
elif 'value' in parent: | |
allowed_values.append(parent['value']) | |
logger.debug(f"Added value: {parent['value']}") | |
logger.info(f"Found {len(allowed_values)} allowed values") | |
if allowed_values: | |
logger.info(f"Sample of allowed values: {allowed_values[:5]}") | |
else: | |
logger.warning("No allowed values found in the field") | |
return allowed_values | |
def calculate_story_points(scenario_count): | |
"""Calculate story points based on number of scenarios""" | |
if scenario_count <= 3: | |
return 1 | |
elif scenario_count <= 5: | |
return 2 | |
elif scenario_count <= 9: | |
return 3 | |
elif scenario_count <= 15: | |
return 5 | |
else: | |
return 8 | |
def map_functional_area(functional_area, metadata): | |
"""Map a functional area to its closest Jira allowed parent and child values using structured mapping.""" | |
if not metadata or not functional_area: | |
logger.error("No metadata or functional area provided") | |
raise ValueError("Metadata and functional area are required") | |
# Get the functional area field from metadata | |
func_field = metadata['all_fields'].get('customfield_13100', {}) | |
if not func_field or 'allowedValues' not in func_field: | |
logger.error("Could not find functional area field in metadata") | |
raise ValueError("Functional area field not found in metadata") | |
# Build a set of allowed child values for faster lookup | |
allowed_values = {} | |
for parent in func_field['allowedValues']: | |
if isinstance(parent, dict): | |
parent_value = parent.get('value') | |
if parent_value and 'children' in parent: | |
for child in parent['children']: | |
if isinstance(child, dict) and 'value' in child: | |
allowed_values[child['value']] = parent_value | |
logger.info(f"Input functional area: {functional_area}") | |
# Split the functional area into parts | |
parts = [p.strip() for p in functional_area.split(' - ')] | |
logger.info(f"Split into parts: {parts}") | |
# Try different combinations of parts joined with '-' | |
for i in range(len(parts)): | |
for j in range(i + 1, len(parts) + 1): | |
# Try joining parts with '-' | |
test_value = '-'.join(parts[i:j]) | |
# Also try without spaces | |
test_value_no_spaces = test_value.replace(' ', '') | |
logger.info(f"Trying combination: {test_value}") | |
# Check both versions (with and without spaces) | |
if test_value in allowed_values: | |
logger.info(f"Found exact match: {test_value}") | |
return allowed_values[test_value], test_value | |
elif test_value_no_spaces in allowed_values: | |
logger.info(f"Found match without spaces: {test_value_no_spaces}") | |
return allowed_values[test_value_no_spaces], test_value_no_spaces | |
# Try category-specific matches | |
categories = ['Services', 'FIN', 'WARPSPEED'] | |
for category in categories: | |
category_value = f"{category}-{test_value}" | |
category_value_no_spaces = category_value.replace(' ', '') | |
if category_value in allowed_values: | |
logger.info(f"Found category match: {category_value}") | |
return allowed_values[category_value], category_value | |
elif category_value_no_spaces in allowed_values: | |
logger.info(f"Found category match without spaces: {category_value_no_spaces}") | |
return allowed_values[category_value_no_spaces], category_value_no_spaces | |
# If no match found, try to find a suitable default based on the first part | |
first_part = parts[0].upper() | |
if 'SERVICE' in first_part or 'SERVICES' in first_part: | |
logger.info("No exact match found, defaulting to Services-Platform") | |
return "R&I", "Services-Platform" | |
elif 'FIN' in first_part: | |
logger.info("No exact match found, defaulting to FIN-Parameters") | |
return "R&I", "FIN-Parameters" | |
elif 'WARPSPEED' in first_part: | |
logger.info("No exact match found, defaulting to WARPSPEED-Parameters") | |
return "R&I", "WARPSPEED-Parameters" | |
# Final fallback to Data Exchange | |
logger.warning(f"No suitable match found for '{functional_area}', defaulting to Data Exchange") | |
return "R&I", "Data Exchange" | |
def get_customer_field_values(metadata): | |
"""Extract all available customer field values and their child options from metadata""" | |
if not metadata or 'all_fields' not in metadata: | |
return {} | |
customer_field = metadata['all_fields'].get('customfield_10427', {}) | |
customer_values = {} | |
if 'allowedValues' in customer_field: | |
for parent in customer_field['allowedValues']: | |
if isinstance(parent, dict): | |
parent_value = parent.get('value') | |
if parent_value: | |
child_values = [] | |
if 'cascadingOptions' in parent: | |
child_values = [child.get('value') for child in parent['cascadingOptions'] if child.get('value')] | |
customer_values[parent_value] = child_values | |
return customer_values | |
def map_customer_value(environment_value, customer_values): | |
"""Map environment value to appropriate customer field values""" | |
if not environment_value or not customer_values: | |
return "MIP Research and Innovation", "R&I General" | |
# Clean up environment value | |
env_value = environment_value.strip() | |
# Special case handling for specific environments | |
if any(env in env_value.lower() for env in ['legalwise', 'scorpion', 'lifewise', 'talksure']): | |
parent_value = "ILR" | |
child_value = env_value # Use the original environment value as child | |
logger.info(f"Mapped {env_value} to ILR parent with child {child_value}") | |
return parent_value, child_value | |
# Handle RI environments | |
if env_value.startswith('RI'): | |
parent_value = "MIP Research and Innovation" | |
# Remove 'RI' prefix and clean up | |
child_value = env_value[2:].strip() | |
if child_value: | |
child_value = f"R&I {child_value}" | |
else: | |
child_value = "R&I General" | |
logger.info(f"Mapped RI environment {env_value} to {parent_value} parent with child {child_value}") | |
return parent_value, child_value | |
# Default case - try to find matching values | |
for parent, children in customer_values.items(): | |
if parent == "MIP Research and Innovation": # Default parent | |
# Look for exact match in child values | |
if env_value in children: | |
return parent, env_value | |
# Look for partial matches | |
for child in children: | |
if env_value in child or child in env_value: | |
return parent, child | |
# If no match found, return defaults | |
logger.warning(f"No specific mapping found for {env_value}, using defaults") | |
return "MIP Research and Innovation", "R&I General" | |
def create_regression_task(project_key, summary, description, environment, filtered_scenarios_df): | |
logger.debug(f"Entering create_regression_task with project_key={project_key}, summary={summary}, environment={environment}, DF_shape={filtered_scenarios_df.shape}") | |
logger.info("=== Starting create_regression_task function ===") | |
logger.info(f"Project: {project_key}, Summary: {summary}, Environment: {environment}") | |
logger.info(f"Filtered DF shape: {filtered_scenarios_df.shape if filtered_scenarios_df is not None else 'None'}") | |
try: | |
# Get metadata first to access field values | |
metadata = get_project_metadata(project_key) | |
if not metadata: | |
error_msg = "Could not get project metadata" | |
logger.error(error_msg) | |
st.error(error_msg) | |
return None | |
# Get customer field values and map environment | |
customer_values = get_customer_field_values(metadata) | |
parent_value, child_value = map_customer_value(environment, customer_values) | |
logger.info(f"Mapped customer values - Parent: {parent_value}, Child: {child_value}") | |
# Get Jira client | |
if "jira_client" not in st.session_state: | |
error_msg = "No Jira client available. Please connect to Jira first." | |
logger.error(error_msg) | |
return None | |
jira_client = st.session_state.jira_client | |
logger.info("Got Jira client from session state") | |
# Get active sprint | |
active_sprint = get_current_sprint(get_regression_board(project_key)['id']) | |
if not active_sprint: | |
error_msg = "No active sprint found" | |
logger.error(error_msg) | |
return None | |
logger.info(f"Found active sprint: {active_sprint.name} (ID: {active_sprint.id})") | |
# Extract functional area from filtered scenarios | |
functional_areas = [] | |
try: | |
if "Functional area" in filtered_scenarios_df.columns: | |
functional_areas = filtered_scenarios_df["Functional area"].unique().tolist() | |
logger.info(f"Extracted functional areas: {functional_areas}") | |
except Exception as e: | |
logger.exception(f"Error extracting functional areas: {str(e)}") | |
st.error(f"Error extracting functional areas: {str(e)}") | |
return None | |
# Calculate story points based on number of scenarios | |
story_points = calculate_story_points(len(filtered_scenarios_df)) | |
logger.info(f"Calculated story points: {story_points}") | |
# Map functional area using metadata | |
functional_area_parent, functional_area_child = map_functional_area( | |
functional_areas[0] if functional_areas else "Data Exchange", | |
metadata | |
) | |
logger.info(f"Mapped functional area to parent: {functional_area_parent}, child: {functional_area_child}") | |
# Prepare issue dictionary with all required fields | |
issue_dict = { | |
"project": {"key": project_key}, | |
"summary": summary, | |
"description": description, | |
"issuetype": {"name": "Story"}, | |
"components": [{"name": "Maintenance (Regression)"}], | |
"customfield_10427": { | |
"value": parent_value, | |
"child": { | |
"value": child_value | |
} | |
}, | |
"customfield_12730": {"value": "Non-Business Critical"}, # Regression Type field | |
"customfield_13430": {"value": str(len(filtered_scenarios_df))}, # Number of Scenarios | |
"customfield_13100": { | |
"value": functional_area_parent, | |
"child": { | |
"value": functional_area_child | |
} | |
}, | |
"assignee": {"name": st.session_state.jira_username}, | |
"customfield_10002": story_points # Story Points field | |
} | |
# Log the complete issue dictionary | |
logger.info(f"Issue dictionary prepared: {issue_dict}") | |
# Create the issue | |
logger.info("Attempting to create issue in Jira...") | |
try: | |
# Create the issue with all fields | |
new_issue = jira_client.create_issue(fields=issue_dict) | |
logger.info(f"Issue created successfully: {new_issue.key}") | |
# Add issue to sprint | |
try: | |
logger.info(f"Attempting to add issue {new_issue.key} to sprint {active_sprint.id}...") | |
jira_client.add_issues_to_sprint(active_sprint.id, [new_issue.key]) | |
logger.info(f"Added issue {new_issue.key} to sprint {active_sprint.name}") | |
except Exception as sprint_error: | |
logger.exception(f"Failed to add issue to sprint: {str(sprint_error)}") | |
st.warning(f"β οΈ Could not add task to sprint. Error: {str(sprint_error)}") | |
# Display success message | |
st.success(f"β Task created successfully: {new_issue.key}") | |
return new_issue | |
except Exception as create_error: | |
error_message = str(create_error) | |
logger.exception(f"Failed to create issue: {error_message}") | |
# Try to extract the response content if it's a JIRA Error | |
try: | |
if hasattr(create_error, 'response'): | |
status_code = getattr(create_error.response, 'status_code', 'N/A') | |
logger.error(f"Response status code: {status_code}") | |
if hasattr(create_error.response, 'text'): | |
response_text = create_error.response.text | |
logger.error(f"Response text: {response_text}") | |
# Display the error to the user | |
st.error(f"β Error creating task in Jira (Status: {status_code}):") | |
st.error(response_text) | |
except Exception as extract_error: | |
logger.exception(f"Error extracting response details: {str(extract_error)}") | |
return None | |
except Exception as e: | |
error_message = f"β Unexpected error in create_regression_task: {str(e)}" | |
logger.exception(error_message) | |
st.error(error_message) | |
logger.error(f"Traceback: {''.join(traceback.format_exception(type(e), e, e.__traceback__))}") | |
return None | |
def create_test_data(): | |
"""Create test data for development/testing that matches the filtered scenarios from multiple.py""" | |
test_data = { | |
'Environment': ['RI2008'] * 7, # Same environment for all scenarios | |
'Functional area': ['Data Exchange - Enquiries - Reports'] * 7, | |
'Scenario Name': [ | |
'Add Missions Error Handling - Existing Code', | |
'Add Missions Error Handling - Incorrect Max Iterations', | |
'Add Missions Error Handling - No Badges', | |
'Add Missions Success - Individual', | |
'Add Missions Success - Team', | |
'Add Missions Success - Individual Iteration', | |
'Add Missions Success - Team Max Iterations' | |
], | |
'Error Message': [ | |
'AssertionError [ERR_ASSERTION]: Error handling for existing code failed', | |
'AssertionError [ERR_ASSERTION]: Error handling for max iterations failed', | |
'AssertionError [ERR_ASSERTION]: Error handling for missing badges failed', | |
'AssertionError [ERR_ASSERTION]: Link validation failed', | |
'AssertionError [ERR_ASSERTION]: Link validation failed', | |
'AssertionError [ERR_ASSERTION]: Link validation failed', | |
'AssertionError [ERR_ASSERTION]: Link validation failed' | |
], | |
'Status': ['FAILED'] * 7, | |
'Time spent(m:s)': ['02:30'] * 7, # Example time spent | |
'Start datetime': [datetime.now()] * 7 # Current time as example | |
} | |
# Create DataFrame | |
df = pd.DataFrame(test_data) | |
# Add metadata that will be used for Jira task creation | |
df.attrs['metadata'] = { | |
'Customer': 'MIP Research and Innovation - R&I 2008', | |
'Sprint': 'RS Sprint 195', | |
'Story Points': 5, | |
'Regression Type': 'Non-Business Critical', | |
'Component': 'Maintenance (Regression)', | |
'Priority': 'Lowest', | |
'Type': 'Story', | |
'Labels': 'None', | |
'Assignee': 'Daniel Akinsola', | |
'Reporter': 'Daniel Akinsola' | |
} | |
return df | |
def process_failures_button(filtered_scenarios_df, environment=None): | |
"""Process failures and create Jira task""" | |
# Use RS project key since we can see it's a Regression Sprint board | |
project_key = "RS" | |
project_name = "RS - Regression" | |
# Get environment from DataFrame if not provided | |
if environment is None and 'Environment' in filtered_scenarios_df.columns: | |
environment = filtered_scenarios_df['Environment'].iloc[0] | |
# Get unique functional areas from the DataFrame | |
functional_areas = filtered_scenarios_df['Functional area'].unique() | |
functional_area = functional_areas[0] if len(functional_areas) > 0 else 'R&I' | |
# Extract the main service category | |
service_category = functional_area.split(' - ')[0] + '-' + functional_area.split(' - ')[1] | |
service_category = service_category.replace(' ', '') | |
# Format environment value | |
env_number = environment[2:] if environment.startswith('RI') else environment | |
env_value = env_number if env_number.startswith('R&I') else f"R&I {env_number}" | |
# Get the current sprint from the regression board | |
board = get_regression_board(project_key) | |
sprint = None | |
sprint_name = "No Active Sprint" | |
if board: | |
sprint = get_current_sprint(board['id']) | |
if sprint: | |
sprint_name = sprint.name | |
st.write(f"Found active sprint: {sprint_name}") | |
else: | |
st.warning("No active sprint found") | |
# Create metadata dictionary with all required fields | |
metadata = { | |
'Project Key': project_key, | |
'Project': project_name, | |
'Issue Type': 'Story', | |
'Customer': 'MIP Research and Innovation', | |
'Environment': env_value, | |
'Functional Areas': service_category, | |
'Sprint': sprint_name, | |
'Story Points': calculate_story_points(len(filtered_scenarios_df)), | |
'Regression Type': 'Non-Business Critical', | |
'Number of Scenarios': len(filtered_scenarios_df) if len(filtered_scenarios_df) <= 50 else 50 | |
} | |
# Initialize session states if not exists | |
if 'task_content' not in st.session_state: | |
st.session_state.task_content = None | |
if 'task_created' not in st.session_state: | |
st.session_state.task_created = False | |
if 'created_task' not in st.session_state: | |
st.session_state.created_task = None | |
if 'show_success' not in st.session_state: | |
st.session_state.show_success = False | |
if 'last_task_key' not in st.session_state: | |
st.session_state.last_task_key = None | |
if 'last_task_url' not in st.session_state: | |
st.session_state.last_task_url = None | |
# Store sprint information in session state for task creation | |
if sprint: | |
st.session_state.current_sprint = sprint | |
# If we have a recently created task, show the success message first | |
if st.session_state.show_success and st.session_state.last_task_key: | |
st.success(f"β Task created successfully!") | |
# Display task link in a more prominent way | |
st.markdown( | |
f""" | |
<div style='padding: 10px; border-radius: 5px; border: 1px solid #90EE90; margin: 10px 0;'> | |
<h3 style='margin: 0; color: #90EE90;'>Task Details</h3> | |
<p style='margin: 10px 0;'>Task Key: {st.session_state.last_task_key}</p> | |
<a href='{st.session_state.last_task_url}' target='_blank' | |
style='background-color: #90EE90; color: black; padding: 5px 10px; | |
border-radius: 3px; text-decoration: none; display: inline-block;'> | |
View Task in Jira | |
</a> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
# Add a button to create another task | |
if st.button("Create Another Task", key="create_another"): | |
# Clear all task-related state | |
st.session_state.task_content = None | |
st.session_state.task_created = False | |
st.session_state.created_task = None | |
st.session_state.show_success = False | |
st.session_state.last_task_key = None | |
st.session_state.last_task_url = None | |
st.rerun() | |
return | |
# Button to generate content | |
if st.button("Generate Task Content"): | |
with st.spinner("Generating task content..."): | |
summary, description = generate_task_content(filtered_scenarios_df) | |
if summary and description: | |
st.session_state.task_content = { | |
'summary': summary, | |
'description': description, | |
'environment': environment, | |
'metadata': metadata | |
} | |
else: | |
st.error("Failed to generate task content. Please try again.") | |
return | |
# Display content and create task button if content exists | |
if st.session_state.task_content: | |
with st.expander("Generated Task Content", expanded=True): | |
# Summary section with styling | |
st.markdown("### Summary") | |
st.markdown(f""" | |
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629;'> | |
{st.session_state.task_content['summary']} | |
</div> | |
""", unsafe_allow_html=True) | |
# Description section with styling | |
st.markdown("### Description") | |
st.markdown(f""" | |
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; white-space: pre-wrap;'> | |
{st.session_state.task_content['description']} | |
</div> | |
""", unsafe_allow_html=True) | |
# Get and display available functional area values | |
display_functional_areas(st.session_state.task_content['metadata']) | |
# Display metadata with actual field values | |
st.markdown("### Fields to be Set") | |
metadata = st.session_state.task_content['metadata'] | |
metadata_html = f""" | |
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629;'> | |
<p><strong>Project:</strong> {metadata['Project']}</p> | |
<p><strong>Issue Type:</strong> {metadata['Issue Type']}</p> | |
<p><strong>Customer:</strong> {metadata['Customer']}</p> | |
<p><strong>Environment:</strong> {metadata['Environment']}</p> | |
<p><strong>Functional Areas:</strong> {metadata['Functional Areas']}</p> | |
<p><strong>Sprint:</strong> {metadata['Sprint']}</p> | |
<p><strong>Story Points:</strong> {metadata['Story Points']}</p> | |
<p><strong>Regression Type:</strong> {metadata['Regression Type']}</p> | |
<p><strong>Number of Scenarios:</strong> {metadata['Number of Scenarios']}</p> | |
</div> | |
""" | |
st.markdown(metadata_html, unsafe_allow_html=True) | |
# Add buttons in columns for better layout | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("π Regenerate Content", key="regenerate"): | |
st.session_state.task_content = None | |
st.rerun() | |
with col2: | |
if st.button("π Create Jira Task", key="create"): | |
task = create_regression_task( | |
metadata['Project Key'], | |
st.session_state.task_content['summary'], | |
st.session_state.task_content['description'], | |
st.session_state.task_content['environment'], | |
filtered_scenarios_df | |
) | |
if task: | |
# Store task information in session state | |
st.session_state.last_task_key = task.key | |
st.session_state.last_task_url = f"{JIRA_SERVER}/browse/{task.key}" | |
st.session_state.show_success = True | |
# Clear the content | |
st.session_state.task_content = None | |
# Force refresh of sprint stats on next load | |
st.session_state.force_sprint_refresh = True | |
st.rerun() | |
else: | |
st.error("Failed to create task. Please try again.") | |
def display_functional_areas(metadata): | |
"""Display functional areas and customer fields in a tree-like structure with styling""" | |
if not metadata: | |
st.error("No metadata available") | |
return | |
# If this is task metadata (not project metadata), get project metadata first | |
if 'all_fields' not in metadata: | |
project_metadata = get_project_metadata("RS") # RS is the fixed project key | |
if not project_metadata: | |
st.error("Could not fetch project metadata") | |
return | |
metadata = project_metadata | |
# Display Functional Areas | |
func_field = metadata['all_fields'].get('customfield_13100', {}) | |
if func_field and 'allowedValues' in func_field: | |
st.markdown("### Available Functional Areas") | |
# Log the raw allowedValues for debugging | |
logger.info("=== Raw Functional Area Values ===") | |
for value in func_field['allowedValues']: | |
logger.info(f"Raw value: {value}") | |
# Create a dictionary to store parent-child relationships | |
parent_child_map = {} | |
# First pass: collect all parent-child relationships | |
for value in func_field['allowedValues']: | |
if isinstance(value, dict): | |
parent_value = value.get('value', 'Unknown') | |
if parent_value: | |
child_values = [] | |
logger.info(f"\nProcessing parent: {parent_value}") | |
if 'cascadingOptions' in value: | |
logger.info(f"Found cascading options for {parent_value}:") | |
for child in value['cascadingOptions']: | |
logger.info(f"Raw child value: {child}") | |
if isinstance(child, dict) and child.get('value'): | |
child_value = child.get('value') | |
child_values.append(child_value) | |
logger.info(f" - Added child: {child_value}") | |
parent_child_map[parent_value] = sorted(child_values) if child_values else [] | |
logger.info(f"Final children for {parent_value}: {parent_child_map[parent_value]}") | |
# Second pass: display the relationships | |
for parent_value in sorted(parent_child_map.keys()): | |
child_values = parent_child_map[parent_value] | |
# Create a styled box for each parent and its children | |
st.markdown(f""" | |
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; margin-bottom: 10px;'> | |
<strong>{parent_value}</strong> | |
{"<ul style='margin-bottom: 0; margin-top: 5px;'>" if child_values else ""} | |
""", unsafe_allow_html=True) | |
# Display child values if they exist | |
for child in child_values: | |
st.markdown(f"<li>{child}</li>", unsafe_allow_html=True) | |
if child_values: | |
st.markdown("</ul>", unsafe_allow_html=True) | |
st.markdown("</div>", unsafe_allow_html=True) | |
# Log the parent-child relationship for debugging | |
logger.info(f"Displaying Parent: {parent_value}") | |
if child_values: | |
logger.info(f" With Children: {', '.join(child_values)}") | |
else: | |
logger.info(" No children found") | |
else: | |
st.warning("No functional area values found in metadata") | |
logger.warning("No functional area values found in metadata") | |
if func_field: | |
logger.info("Available func_field keys: " + str(list(func_field.keys()))) | |
# Display Customer Field | |
cust_field = metadata['all_fields'].get('customfield_10427', {}) | |
if cust_field and 'allowedValues' in cust_field: | |
st.markdown("### Available Customer Values") | |
# Log the raw allowedValues for debugging | |
logger.info("=== Raw Customer Field Values ===") | |
for value in cust_field['allowedValues']: | |
logger.info(f"Raw value: {value}") | |
# Create a dictionary to store parent-child relationships for customer field | |
customer_parent_child_map = {} | |
# First pass: collect all parent-child relationships | |
for value in cust_field['allowedValues']: | |
if isinstance(value, dict): | |
parent_value = value.get('value', 'Unknown') | |
if parent_value: | |
child_values = [] | |
logger.info(f"\nProcessing customer parent: {parent_value}") | |
if 'cascadingOptions' in value: | |
logger.info(f"Found customer cascading options for {parent_value}:") | |
for child in value['cascadingOptions']: | |
logger.info(f"Raw child value: {child}") | |
if isinstance(child, dict) and child.get('value'): | |
child_value = child.get('value') | |
child_values.append(child_value) | |
logger.info(f" - Added child: {child_value}") | |
customer_parent_child_map[parent_value] = sorted(child_values) if child_values else [] | |
logger.info(f"Final customer children for {parent_value}: {customer_parent_child_map[parent_value]}") | |
# Second pass: display the relationships | |
for parent_value in sorted(customer_parent_child_map.keys()): | |
child_values = customer_parent_child_map[parent_value] | |
# Create a styled box for each parent and its children | |
st.markdown(f""" | |
<div style='background-color: #f0f2f6; padding: 10px; border-radius: 5px; border: 1px solid #e0e0e0; color: #0f1629; margin-bottom: 10px;'> | |
<strong>{parent_value}</strong> | |
{"<ul style='margin-bottom: 0; margin-top: 5px;'>" if child_values else ""} | |
""", unsafe_allow_html=True) | |
# Display child values if they exist | |
for child in child_values: | |
st.markdown(f"<li>{child}</li>", unsafe_allow_html=True) | |
if child_values: | |
st.markdown("</ul>", unsafe_allow_html=True) | |
st.markdown("</div>", unsafe_allow_html=True) | |
# Log the parent-child relationship for debugging | |
logger.info(f"Displaying Customer Parent: {parent_value}") | |
if child_values: | |
logger.info(f" With Children: {', '.join(child_values)}") | |
else: | |
logger.info(" No children found") | |
else: | |
st.warning("No customer field values found in metadata") | |
logger.warning("No customer field values found in metadata") | |
if cust_field: | |
logger.info("Available cust_field keys: " + str(list(cust_field.keys()))) | |
def display_story_points_stats(force_refresh=False): | |
"""Display story points statistics from current sprint""" | |
if not st.session_state.jira_client: | |
return | |
# Initialize session state for sprint data if not exists | |
if 'sprint_data' not in st.session_state: | |
st.session_state.sprint_data = None | |
# Initialize refresh timestamp if not exists | |
if 'last_sprint_refresh' not in st.session_state: | |
st.session_state.last_sprint_refresh = None | |
try: | |
# Only fetch data if forced refresh, no data exists, or refresh timestamp is old | |
current_time = datetime.now() | |
refresh_needed = ( | |
force_refresh or | |
st.session_state.sprint_data is None or | |
(st.session_state.last_sprint_refresh and | |
(current_time - st.session_state.last_sprint_refresh).total_seconds() > 300) # 5 minutes cache | |
) | |
if refresh_needed: | |
with st.spinner("Fetching sprint data..."): | |
# Get regression board | |
board = get_regression_board("RS") | |
if not board: | |
return | |
# Get current sprint | |
sprint = get_current_sprint(board['id']) | |
if not sprint: | |
return | |
# Get sprint issues | |
issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field']) | |
if not issues: | |
return | |
# Calculate points | |
issues_data, total_points, completed_points, in_progress_points = calculate_points(issues, board['estimation_field']) | |
# Store in session state | |
st.session_state.sprint_data = { | |
'sprint_name': sprint.name, | |
'total_points': total_points, | |
'completed_points': completed_points, | |
'in_progress_points': in_progress_points, | |
'timestamp': current_time | |
} | |
st.session_state.last_sprint_refresh = current_time | |
# Display data from session state | |
if st.session_state.sprint_data: | |
sprint_data = st.session_state.sprint_data | |
# Create compact metrics display using custom HTML/CSS | |
st.markdown(f""" | |
<div style='background-color: #1E1E1E; padding: 10px; border-radius: 5px; margin-bottom: 10px;'> | |
<div style='font-size: 0.8em; color: #E0E0E0; margin-bottom: 8px;'>Current Sprint: {sprint_data['sprint_name']}</div> | |
<div style='display: grid; grid-template-columns: repeat(4, 1fr); gap: 5px; font-size: 0.9em;'> | |
<div style='text-align: center;'> | |
<div style='color: #E0E0E0;'>Total</div> | |
<div style='font-size: 1.2em; font-weight: bold;'>{sprint_data['total_points']:.1f}</div> | |
</div> | |
<div style='text-align: center;'> | |
<div style='color: #E0E0E0;'>Done</div> | |
<div style='font-size: 1.2em; font-weight: bold;'>{sprint_data['completed_points']:.1f}</div> | |
</div> | |
<div style='text-align: center;'> | |
<div style='color: #E0E0E0;'>In Progress</div> | |
<div style='font-size: 1.2em; font-weight: bold;'>{sprint_data['in_progress_points']:.1f}</div> | |
</div> | |
<div style='text-align: center;'> | |
<div style='color: #E0E0E0;'>Complete</div> | |
<div style='font-size: 1.2em; font-weight: bold;'>{(sprint_data['completed_points'] / sprint_data['total_points'] * 100) if sprint_data['total_points'] > 0 else 0:.1f}%</div> | |
</div> | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Show progress bar | |
progress = sprint_data['completed_points'] / sprint_data['total_points'] if sprint_data['total_points'] > 0 else 0 | |
st.progress(progress) | |
# Add refresh button with key based on timestamp to prevent rerendering | |
refresh_key = f"refresh_stats_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
if st.button("π Refresh", key=refresh_key, use_container_width=True): | |
# Use a session state flag to trigger refresh on next rerun | |
st.session_state.force_sprint_refresh = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error updating story points: {str(e)}") | |
# Check if we need to force refresh (from button click) | |
if 'force_sprint_refresh' in st.session_state and st.session_state.force_sprint_refresh: | |
st.session_state.force_sprint_refresh = False | |
return display_story_points_stats(force_refresh=True) | |
def main(): | |
st.title("Jira Integration Test") | |
# Add test data button | |
if st.button("Load Test Data"): | |
st.session_state.filtered_scenarios_df = create_test_data() | |
st.success("Test data loaded!") | |
is_authenticated = render_jira_login() | |
if is_authenticated and st.session_state.projects: | |
# Fixed project and board selection | |
project_key = "RS" | |
board_type = "scrum" | |
board_name = "Regression Sprints" | |
# Display fixed selections in a more compact way | |
st.markdown(""" | |
<div style='display: flex; gap: 10px; margin-bottom: 15px; font-size: 0.9em;'> | |
<div style='flex: 1;'> | |
<div style='color: #E0E0E0; margin-bottom: 4px;'>Project</div> | |
<div style='background-color: #262730; padding: 5px 8px; border-radius: 4px; font-size: 0.9em;'>RS - Regression</div> | |
</div> | |
<div style='flex: 1;'> | |
<div style='color: #E0E0E0; margin-bottom: 4px;'>Board</div> | |
<div style='background-color: #262730; padding: 5px 8px; border-radius: 4px; font-size: 0.9em;'>Regression Sprints (scrum)</div> | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Display sprint stats (only fetch if no data exists) | |
display_story_points_stats(force_refresh=False) | |
# Show test data if loaded | |
if 'filtered_scenarios_df' in st.session_state: | |
st.subheader("Failed Scenarios") | |
st.dataframe(st.session_state.filtered_scenarios_df) | |
# Get environment directly from the DataFrame | |
if 'Environment' in st.session_state.filtered_scenarios_df.columns: | |
environment = st.session_state.filtered_scenarios_df['Environment'].iloc[0] | |
st.info(f"Using environment from data: {environment}") | |
process_failures_button(st.session_state.filtered_scenarios_df, environment) | |
else: | |
st.error("No environment information found in the data") | |
# Add project fields button at the bottom | |
if st.button("Show Project Fields"): | |
display_project_fields() | |
if __name__ == "__main__": | |
main() |