import logging import os import sys import traceback from datetime import datetime import streamlit as st from jira import JIRA from dotenv import load_dotenv from datetime import datetime, timedelta import pandas as pd import requests import json from groq import Groq from difflib import SequenceMatcher import time # Configure logging based on environment try: # Try to create logs directory and file log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, f"jira_debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") # Configure root logger with file handler logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file) ] ) except (OSError, IOError): # If file logging fails (e.g., in Hugging Face Spaces), configure logging without file handler logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.NullHandler() ] ) logger = logging.getLogger("jira_integration") logger.info("Jira integration module loaded") # Load environment variables load_dotenv() # Get API keys and configuration with default values for development JIRA_SERVER = os.getenv("JIRA_SERVER") GROQ_API_KEY = os.getenv("GROQ_API_KEY") # Validate required environment variables if not JIRA_SERVER: st.error("JIRA_SERVER not found in environment variables. Please check your .env file.") if not GROQ_API_KEY: st.error("GROQ_API_KEY not found in environment variables. Please check your .env file.") def init_jira_session(): """Initialize Jira session state variables""" if 'jira_client' not in st.session_state: st.session_state.jira_client = None if 'projects' not in st.session_state: st.session_state.projects = None def get_projects(): """Fetch all accessible projects""" if not st.session_state.jira_client: return None try: projects = st.session_state.jira_client.projects() # Sort projects by key return sorted(projects, key=lambda x: x.key) except Exception as e: st.error(f"Error fetching projects: {str(e)}") return None def get_board_configuration(board_id): """Fetch board configuration including estimation field""" if not st.session_state.jira_client: return None try: url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/configuration" response = st.session_state.jira_client._session.get(url) if response.status_code == 200: config = response.json() return config return None except Exception as e: st.error(f"Error fetching board configuration: {str(e)}") return None def get_boards(project_key): """Fetch all boards for a project""" if not st.session_state.jira_client: return None try: boards = st.session_state.jira_client.boards(projectKeyOrID=project_key) board_details = [] for board in boards: config = get_board_configuration(board.id) board_type = config.get('type', 'Unknown') if config else 'Unknown' estimation_field = None if config and 'estimation' in config: estimation_field = config['estimation'].get('field', {}).get('fieldId') board_details.append({ 'id': board.id, 'name': board.name, 'type': board_type, 'estimation_field': estimation_field }) return board_details except Exception as e: st.error(f"Error fetching boards: {str(e)}") return None def get_current_sprint(board_id): """Fetch the current sprint for a board""" if not st.session_state.jira_client: return None try: # Get all active and future sprints sprints = st.session_state.jira_client.sprints(board_id, state='active,future') if sprints: # Look for sprints starting with 'RS' rs_sprints = [sprint for sprint in sprints if sprint.name.startswith('RS')] if rs_sprints: # Sort sprints by name to get the latest one latest_sprint = sorted(rs_sprints, key=lambda x: x.name, reverse=True)[0] return latest_sprint else: st.warning("No RS sprints found. Available sprints: " + ", ".join([s.name for s in sprints])) return None except Exception as e: if "board does not support sprints" in str(e).lower(): return None st.error(f"Error fetching current sprint: {str(e)}") return None def get_board_issues(board_id, estimation_field=None): """Fetch all issues on the board using Agile REST API""" if not st.session_state.jira_client: return None try: url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/issue" fields = ['summary', 'status', 'created', 'description', 'issuetype', 'assignee'] if estimation_field: fields.append(estimation_field) params = { 'maxResults': 200, 'fields': fields, 'jql': f'assignee = currentUser()' } response = st.session_state.jira_client._session.get(url, params=params) if response.status_code != 200: st.error(f"Error fetching board issues: {response.text}") return None data = response.json() issues = [] for issue_data in data['issues']: issue = st.session_state.jira_client.issue(issue_data['key']) issues.append(issue) return issues except Exception as e: st.error(f"Error fetching board issues: {str(e)}") return None def get_sprint_issues(board_id, sprint_id, estimation_field=None): """Fetch all issues in the current sprint using Agile REST API""" if not st.session_state.jira_client: return None try: # Cache key for sprint issues cache_key = f"sprint_issues_{sprint_id}" if cache_key in st.session_state and (datetime.now() - st.session_state.get('last_refresh', datetime.min)).total_seconds() < 60: return st.session_state[cache_key] # Use the Agile REST API endpoint to get issues from the sprint url = f"{JIRA_SERVER}/rest/agile/1.0/board/{board_id}/sprint/{sprint_id}/issue" params = { 'maxResults': 200, 'fields': [ 'summary', 'status', 'created', 'description', 'issuetype', 'assignee' ], 'jql': 'assignee = currentUser()' # Filter for current user's issues } # Add estimation field if provided if estimation_field: params['fields'].append(estimation_field) # Make a single API call with all required fields response = st.session_state.jira_client._session.get(url, params=params) if response.status_code != 200: st.error(f"Error fetching sprint issues: {response.text}") return None # Process all issues in one go data = response.json() issues = [st.session_state.jira_client.issue(issue['key']) for issue in data['issues']] # Cache the results st.session_state[cache_key] = issues st.session_state['last_refresh'] = datetime.now() return issues except Exception as e: st.error(f"Error fetching sprint issues: {str(e)}") return None def calculate_points(issues, estimation_field): """Calculate story points from issues""" if not estimation_field: return [], 0, 0, 0 try: # Process all issues at once field_id = estimation_field.replace('customfield_', '') issues_data = [] total_points = completed_points = in_progress_points = 0 # Create status mappings for faster lookup done_statuses = {'done', 'completed', 'closed'} progress_statuses = {'in progress', 'development', 'in development'} for issue in issues: try: # Get points value efficiently points = getattr(issue.fields, field_id, None) or getattr(issue.fields, estimation_field, 0) points = float(points) if points is not None else 0 # Get status efficiently status_name = issue.fields.status.name.lower() # Update points total_points += points if status_name in done_statuses: completed_points += points elif status_name in progress_statuses: in_progress_points += points # Build issue data issues_data.append({ "Key": issue.key, "Type": issue.fields.issuetype.name, "Summary": issue.fields.summary, "Status": issue.fields.status.name, "Story Points": points, "Assignee": issue.fields.assignee.displayName if issue.fields.assignee else "Unassigned" }) except Exception as e: st.error(f"Error processing points for issue {issue.key}: {str(e)}") continue return issues_data, total_points, completed_points, in_progress_points except Exception as e: st.error(f"Error calculating points: {str(e)}") return [], 0, 0, 0 def create_maintenance_task(project_key, summary, description, issue_type='Task'): """Create a task in Jira""" if not st.session_state.jira_client: st.error("Not authenticated with Jira. Please log in first.") return None try: issue_dict = { 'project': {'key': project_key}, 'summary': summary, 'description': description, 'issuetype': {'name': issue_type} } new_issue = st.session_state.jira_client.create_issue(fields=issue_dict) return new_issue except Exception as e: st.error(f"Error creating task: {str(e)}") return None def render_jira_login(): """Render the Jira login form and handle authentication""" # If already authenticated, just return True if 'jira_client' in st.session_state and st.session_state.jira_client: st.success("Connected to Jira") return True # Initialize session state for login form and attempts tracking if 'jira_username' not in st.session_state: st.session_state.jira_username = "" if 'jira_password' not in st.session_state: st.session_state.jira_password = "" if 'login_blocked_until' not in st.session_state: st.session_state.login_blocked_until = None # Check if login is temporarily blocked if st.session_state.login_blocked_until: if datetime.now() < st.session_state.login_blocked_until: wait_time = (st.session_state.login_blocked_until - datetime.now()).seconds st.error(f"Login temporarily blocked due to too many failed attempts. Please wait {wait_time} seconds before trying again.") st.info("If you need immediate access, please try logging in directly to Jira in your browser first, complete the CAPTCHA there, then return here.") return False else: st.session_state.login_blocked_until = None # Create login form with st.form(key="jira_login_form"): username = st.text_input("Jira Username", value=st.session_state.jira_username, key="username_input") password = st.text_input("Password", value=st.session_state.jira_password, type="password", key="password_input") submit_button = st.form_submit_button(label="Login") if submit_button: # Store credentials in session state st.session_state.jira_username = username st.session_state.jira_password = password # Try to authenticate try: jira_client = JIRA(server=JIRA_SERVER, basic_auth=(username, password)) st.session_state.jira_client = jira_client # Get projects projects = get_projects() if projects: st.session_state.projects = projects st.success("Connected to Jira") return True else: st.error("Failed to fetch projects") return False except Exception as e: error_message = str(e).lower() if "captcha_challenge" in error_message: # Set a 5-minute block on login attempts st.session_state.login_blocked_until = datetime.now() + timedelta(minutes=5) st.error("Too many failed login attempts. Please try one of the following:") st.info(""" 1. Wait 5 minutes before trying again 2. Log in to Jira in your browser first, complete the CAPTCHA there, then return here 3. Clear your browser cookies and try again """) else: st.error(f"Authentication failed: {str(e)}") return False return False def get_cached_metadata(project_key): """Get cached metadata or fetch new if cache is expired or doesn't exist""" # Check if metadata cache exists in session state if 'metadata_cache' not in st.session_state: st.session_state.metadata_cache = {} if 'metadata_cache_timestamp' not in st.session_state: st.session_state.metadata_cache_timestamp = {} current_time = datetime.now() cache_expiry = timedelta(minutes=30) # Cache expires after 30 minutes # Check if we have valid cached metadata if (project_key in st.session_state.metadata_cache and project_key in st.session_state.metadata_cache_timestamp and current_time - st.session_state.metadata_cache_timestamp[project_key] < cache_expiry): logger.info(f"Using cached metadata for project {project_key}") return st.session_state.metadata_cache[project_key] # If no valid cache, fetch new metadata logger.info(f"Fetching fresh metadata for project {project_key}") metadata = get_project_metadata_fresh(project_key) if metadata: # Update cache st.session_state.metadata_cache[project_key] = metadata st.session_state.metadata_cache_timestamp[project_key] = current_time logger.info(f"Updated metadata cache for project {project_key}") return metadata def get_project_metadata_fresh(project_key): """Fetch fresh metadata from Jira without using cache""" logger.info(f"=== Getting fresh metadata for project {project_key} ===") if not st.session_state.jira_client: logger.error("Not authenticated with Jira. Please log in first.") st.error("Not authenticated with Jira. Please log in first.") return None try: # Get project logger.info("Getting project...") project = st.session_state.jira_client.project(project_key) logger.info(f"Got project: {project.name}") logger.info("Getting createmeta with expanded fields...") # Get create metadata for the project with expanded field info metadata = st.session_state.jira_client.createmeta( projectKeys=project_key, expand='projects.issuetypes.fields', issuetypeNames='Story' # Specifically get Story type fields ) logger.info("Got createmeta response") if not metadata.get('projects'): logger.error(f"No metadata found for project {project_key}") st.error(f"No metadata found for project {project_key}") return None project_meta = metadata['projects'][0] issue_types = project_meta.get('issuetypes', []) # Log available issue types logger.info(f"Available issue types: {[t.get('name') for t in issue_types]}") # Try to get Story issue type first story_type = next((t for t in issue_types if t['name'] == 'Story'), None) if not story_type: logger.error("Story issue type not found in project") st.error("Story issue type not found in project") return None logger.info("Processing fields...") # Get required fields and all fields required_fields = {} all_fields = {} # Log all available fields before processing logger.info("Available fields in Story type:") for field_id, field in story_type['fields'].items(): field_name = field.get('name', 'Unknown') field_type = field.get('schema', {}).get('type', 'Unknown') logger.info(f"Field: {field_name} (ID: {field_id}, Type: {field_type})") # Store complete field information including schema and allowed values all_fields[field_id] = { 'name': field['name'], 'required': field.get('required', False), 'schema': field.get('schema', {}), 'allowedValues': field.get('allowedValues', []), 'hasDefaultValue': field.get('hasDefaultValue', False), 'defaultValue': field.get('defaultValue'), 'operations': field.get('operations', []), 'configuration': field.get('configuration', {}) } # If this is a cascading select field, log its structure if field.get('schema', {}).get('type') == 'option-with-child': logger.info(f"Found cascading select field: {field_name}") if 'allowedValues' in field: for parent in field['allowedValues']: parent_value = parent.get('value', 'Unknown') logger.info(f" Parent value: {parent_value}") if 'cascadingOptions' in parent: child_values = [child.get('value') for child in parent['cascadingOptions']] logger.info(f" Child values: {child_values}") # Store required fields separately if field.get('required', False): required_fields[field_id] = all_fields[field_id] logger.info(f"Required field: {field_name}") logger.info(f"Found {len(all_fields)} total fields, {len(required_fields)} required fields") metadata_result = { 'project_name': project.name, 'issue_type': 'Story', 'required_fields': required_fields, 'all_fields': all_fields } logger.info("Successfully processed project metadata") return metadata_result except Exception as e: logger.exception(f"Error getting project metadata: {str(e)}") st.error(f"Error getting project metadata: {str(e)}") st.error("Full error details:") st.error(str(e)) st.code(traceback.format_exc(), language="python") return None def get_project_metadata(project_key): """Get project metadata, using cache if available""" return get_cached_metadata(project_key) def generate_task_content(filtered_scenarios_df): """Generate task summary and description using template-based approach""" try: # Extract key information environment = filtered_scenarios_df['Environment'].iloc[0] functional_area = filtered_scenarios_df['Functional area'].iloc[0] scenario_count = len(filtered_scenarios_df) # Generate summary summary = f"Maintenance: {environment} - {functional_area}" # Generate description description = "Performing maintenance on the following scenarios failing :\n\n" # Add each scenario and its error, using enumerate to start from 1 for i, (_, row) in enumerate(filtered_scenarios_df.iterrows(), 1): description += f"{i}. {row['Scenario Name']}\n" description += f" Error: {row['Error Message']}\n\n" return summary, description except Exception as e: st.error(f"Error generating task content: {str(e)}") return None, None def get_regression_board(project_key): """Find the regression sprint board for the project""" boards = get_boards(project_key) if not boards: return None # Look specifically for the "Regression Sprints" board regression_board = next((b for b in boards if b['name'].lower() == 'regression sprints' and b['type'].lower() == 'scrum'), None) if not regression_board: st.error("Could not find the 'Regression Sprints' board. Available boards: " + ", ".join([f"{b['name']} ({b['type']})" for b in boards])) return regression_board def get_field_dependencies(): """Cache and return field dependencies and their allowed values""" if 'field_dependencies' not in st.session_state: try: # Get project metadata for RS project metadata = get_project_metadata("RS") if not metadata: return None # Initialize dependencies dictionary with correct field IDs dependencies = { 'Customer': { 'field_id': 'customfield_10427', 'values': [], 'dependencies': {} }, 'Environment': { 'field_id': 'customfield_14157', # Updated field ID 'values': [], 'dependencies': {} }, 'Functional Areas': { 'field_id': 'customfield_15303', # Updated field ID 'values': [], 'dependencies': {} } } # Get field values and their dependencies for field_name, field_info in dependencies.items(): field_id = field_info['field_id'] if field_id in metadata['all_fields']: field_data = metadata['all_fields'][field_id] if 'allowedValues' in field_data: # Store allowed values dependencies[field_name]['values'] = [ value.get('value', value.get('name', '')) for value in field_data['allowedValues'] if isinstance(value, dict) ] # Store dependencies (if any) if 'dependency' in field_data: dep_field = field_data['dependency'] dependencies[field_name]['dependencies'] = { 'field': dep_field['field']['name'], 'field_id': dep_field['field']['id'], 'values': dep_field.get('values', []) } # Cache the dependencies st.session_state.field_dependencies = dependencies return dependencies except Exception as e: st.error(f"Error fetching field dependencies: {str(e)}") return None return st.session_state.field_dependencies def get_dependent_field_value(field_name, parent_value=None): """Get the appropriate field value based on dependencies""" dependencies = get_field_dependencies() if not dependencies or field_name not in dependencies: return None field_info = dependencies[field_name] # If this field depends on another field if parent_value and field_info['dependencies']: dep_info = field_info['dependencies'] # Find values that match the parent value for value_mapping in dep_info.get('values', []): if value_mapping.get('parent') == parent_value: return value_mapping.get('value') # If no dependency or no match, return first available value return field_info['values'][0] if field_info['values'] else None def display_project_fields(): """Display available fields for issue creation""" project_key = "RS" # Using the fixed project key metadata = get_project_metadata(project_key) if metadata: st.subheader("Project Fields") # Display required fields st.write("### Required Fields") for field_id, field in metadata['required_fields'].items(): st.write(f"- {field['name']} ({field_id})") if field.get('allowedValues'): st.write(" Allowed values:") for value in field['allowedValues']: if isinstance(value, dict): # Handle cascading select fields if 'cascadingOptions' in value: parent_value = value.get('value', 'Unknown') st.write(f" - {parent_value}") st.write(" Child options:") for child in value['cascadingOptions']: st.write(f" - {child.get('value', 'Unknown')}") else: st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") else: st.write(f" - {value}") # Display custom fields with dependencies st.write("### Custom Fields and Dependencies") # Customer field (customfield_10427) - Cascading Select st.write("\n#### Customer (customfield_10427)") cust_field = metadata['all_fields'].get('customfield_10427', {}) if cust_field.get('allowedValues'): st.write("Cascading options:") for value in cust_field['allowedValues']: if isinstance(value, dict): parent_value = value.get('value', 'Unknown') st.write(f"- {parent_value}") if 'cascadingOptions' in value: st.write(" Child options:") for child in value['cascadingOptions']: st.write(f" - {child.get('value', 'Unknown')}") # Functional Areas field (customfield_13100) - Cascading Select st.write("\n#### Functional Areas (customfield_13100)") func_field = metadata['all_fields'].get('customfield_13100', {}) if func_field.get('allowedValues'): st.write("Cascading options:") for value in func_field['allowedValues']: if isinstance(value, dict): parent_value = value.get('value', 'Unknown') st.write(f"- {parent_value}") if 'cascadingOptions' in value: st.write(" Child options:") for child in value['cascadingOptions']: st.write(f" - {child.get('value', 'Unknown')}") # Environment field (customfield_14157) st.write("\n#### Environment (customfield_14157)") env_field = metadata['all_fields'].get('customfield_14157', {}) if env_field.get('allowedValues'): st.write("Allowed values:") for value in env_field['allowedValues']: if isinstance(value, dict): st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") # Display dependencies if any(field.get('dependency') for field in [env_field, func_field, cust_field]): st.write("\n### Field Dependencies") for field_name, field in [ ('Environment', env_field), ('Functional Areas', func_field), ('Customer', cust_field) ]: if field.get('dependency'): st.write(f"\n{field_name} depends on:") dep = field['dependency'] st.write(f" Field: {dep['field']['name']} ({dep['field']['id']})") if dep.get('values'): st.write(" Value mappings:") for mapping in dep['values']: parent_value = mapping.get('parent', 'Unknown') child_value = mapping.get('value', 'Unknown') st.write(f" - When parent is '{parent_value}' → '{child_value}'") # Display other custom fields st.write("\n### Other Custom Fields") excluded_fields = ['customfield_14157', 'customfield_13100', 'customfield_10427'] custom_fields = {k: v for k, v in metadata['all_fields'].items() if k.startswith('customfield_') and k not in excluded_fields} for field_id, field in custom_fields.items(): st.write(f"- {field['name']} ({field_id})") if field.get('allowedValues'): st.write(" Allowed values:") for value in field.get('allowedValues', []): if isinstance(value, dict): if 'cascadingOptions' in value: parent_value = value.get('value', 'Unknown') st.write(f" - {parent_value}") st.write(" Child options:") for child in value['cascadingOptions']: st.write(f" - {child.get('value', 'Unknown')}") else: st.write(f" - {value.get('value', value.get('name', 'Unknown'))}") else: st.write(f" - {value}") def get_closest_match(target, choices, threshold=60): """ Find the closest matching string from choices using fuzzy matching. Returns the best match if similarity is above threshold, otherwise None. """ if not choices: return None try: def similarity(a, b): # Normalize strings for comparison a = a.lower().replace('-', ' ').replace(' ', ' ').strip() b = b.lower().replace('-', ' ').replace(' ', ' ').strip() return SequenceMatcher(None, a, b).ratio() * 100 # Calculate similarities similarities = [(choice, similarity(target, choice)) for choice in choices] # Sort by similarity score best_match = max(similarities, key=lambda x: x[1]) if best_match[1] >= threshold: return best_match[0] return None except Exception as e: st.error(f"Error in fuzzy matching: {str(e)}") return None def get_functional_area_values(metadata): """Extract all available functional area values from metadata""" logger.info("=== Starting get_functional_area_values ===") if not metadata: logger.error("No metadata provided") return [] if 'all_fields' not in metadata: logger.error("No 'all_fields' in metadata") logger.debug(f"Available metadata keys: {list(metadata.keys())}") return [] # Log all available field IDs for debugging logger.info("Available fields:") for field_id, field in metadata['all_fields'].items(): field_name = field.get('name', 'Unknown') field_type = field.get('schema', {}).get('type', 'Unknown') logger.info(f" {field_name} (ID: {field_id}, Type: {field_type})") # List of possible field IDs for functional areas functional_area_field_ids = [ 'customfield_15303', # New field ID 'customfield_13100', # Old field ID 'customfield_13101' # Another possible variation ] # Try to find the functional area field by name or ID func_field = None for field_id, field in metadata['all_fields'].items(): field_name = field.get('name', '').lower() if field_id in functional_area_field_ids or 'functional area' in field_name: func_field = field logger.info(f"Found functional area field: {field.get('name')} (ID: {field_id})") break if not func_field: logger.error("Could not find functional area field in metadata") logger.info("Available field names:") for field_id, field in metadata['all_fields'].items(): logger.info(f" {field.get('name', 'Unknown')} ({field_id})") return [] # Check field type field_type = func_field.get('schema', {}).get('type') logger.info(f"Functional area field type: {field_type}") allowed_values = [] if 'allowedValues' in func_field: logger.info("Processing allowed values...") for parent in func_field['allowedValues']: if isinstance(parent, dict): parent_value = parent.get('value', 'Unknown') logger.info(f"Processing parent value: {parent_value}") if 'cascadingOptions' in parent: for child in parent['cascadingOptions']: if isinstance(child, dict) and 'value' in child: allowed_values.append(child['value']) logger.debug(f"Added child value: {child['value']}") elif 'value' in parent: allowed_values.append(parent['value']) logger.debug(f"Added value: {parent['value']}") logger.info(f"Found {len(allowed_values)} allowed values") if allowed_values: logger.info(f"Sample of allowed values: {allowed_values[:5]}") else: logger.warning("No allowed values found in the field") return allowed_values def calculate_story_points(scenario_count): """Calculate story points based on number of scenarios""" if scenario_count <= 3: return 1 elif scenario_count <= 5: return 2 elif scenario_count <= 9: return 3 elif scenario_count <= 15: return 5 else: return 8 def map_functional_area(functional_area, metadata): """Map a functional area to its closest Jira allowed parent and child values using structured mapping.""" if not metadata or not functional_area: logger.error("No metadata or functional area provided") raise ValueError("Metadata and functional area are required") # Get the functional area field from metadata func_field = metadata['all_fields'].get('customfield_13100', {}) if not func_field or 'allowedValues' not in func_field: logger.error("Could not find functional area field in metadata") raise ValueError("Functional area field not found in metadata") # Build a set of allowed child values for faster lookup allowed_values = {} for parent in func_field['allowedValues']: if isinstance(parent, dict): parent_value = parent.get('value') if parent_value and 'children' in parent: for child in parent['children']: if isinstance(child, dict) and 'value' in child: allowed_values[child['value']] = parent_value logger.info(f"Input functional area: {functional_area}") # Split the functional area into parts parts = [p.strip() for p in functional_area.split(' - ')] logger.info(f"Split into parts: {parts}") # Try different combinations of parts joined with '-' for i in range(len(parts)): for j in range(i + 1, len(parts) + 1): # Try joining parts with '-' test_value = '-'.join(parts[i:j]) # Also try without spaces test_value_no_spaces = test_value.replace(' ', '') logger.info(f"Trying combination: {test_value}") # Check both versions (with and without spaces) if test_value in allowed_values: logger.info(f"Found exact match: {test_value}") return allowed_values[test_value], test_value elif test_value_no_spaces in allowed_values: logger.info(f"Found match without spaces: {test_value_no_spaces}") return allowed_values[test_value_no_spaces], test_value_no_spaces # Try category-specific matches categories = ['Services', 'FIN', 'WARPSPEED'] for category in categories: category_value = f"{category}-{test_value}" category_value_no_spaces = category_value.replace(' ', '') if category_value in allowed_values: logger.info(f"Found category match: {category_value}") return allowed_values[category_value], category_value elif category_value_no_spaces in allowed_values: logger.info(f"Found category match without spaces: {category_value_no_spaces}") return allowed_values[category_value_no_spaces], category_value_no_spaces # If no match found, try to find a suitable default based on the first part first_part = parts[0].upper() if 'SERVICE' in first_part or 'SERVICES' in first_part: logger.info("No exact match found, defaulting to Services-Platform") return "R&I", "Services-Platform" elif 'FIN' in first_part: logger.info("No exact match found, defaulting to FIN-Parameters") return "R&I", "FIN-Parameters" elif 'WARPSPEED' in first_part: logger.info("No exact match found, defaulting to WARPSPEED-Parameters") return "R&I", "WARPSPEED-Parameters" # Final fallback to Data Exchange logger.warning(f"No suitable match found for '{functional_area}', defaulting to Data Exchange") return "R&I", "Data Exchange" def get_customer_field_values(metadata): """Extract all available customer field values and their child options from metadata""" if not metadata or 'all_fields' not in metadata: return {} customer_field = metadata['all_fields'].get('customfield_10427', {}) customer_values = {} if 'allowedValues' in customer_field: for parent in customer_field['allowedValues']: if isinstance(parent, dict): parent_value = parent.get('value') if parent_value: child_values = [] if 'cascadingOptions' in parent: child_values = [child.get('value') for child in parent['cascadingOptions'] if child.get('value')] customer_values[parent_value] = child_values return customer_values def map_customer_value(environment_value, customer_values): """Map environment value to appropriate customer field values""" if not environment_value or not customer_values: return "MIP Research and Innovation", "R&I General" # Clean up environment value env_value = environment_value.strip() # Special case handling for specific environments if any(env in env_value.lower() for env in ['legalwise', 'scorpion', 'lifewise', 'talksure']): parent_value = "ILR" child_value = env_value # Use the original environment value as child logger.info(f"Mapped {env_value} to ILR parent with child {child_value}") return parent_value, child_value # Handle RI environments if env_value.startswith('RI'): parent_value = "MIP Research and Innovation" # Remove 'RI' prefix and clean up child_value = env_value[2:].strip() if child_value: child_value = f"R&I {child_value}" else: child_value = "R&I General" logger.info(f"Mapped RI environment {env_value} to {parent_value} parent with child {child_value}") return parent_value, child_value # Default case - try to find matching values for parent, children in customer_values.items(): if parent == "MIP Research and Innovation": # Default parent # Look for exact match in child values if env_value in children: return parent, env_value # Look for partial matches for child in children: if env_value in child or child in env_value: return parent, child # If no match found, return defaults logger.warning(f"No specific mapping found for {env_value}, using defaults") return "MIP Research and Innovation", "R&I General" def create_regression_task(project_key, summary, description, environment, filtered_scenarios_df): logger.debug(f"Entering create_regression_task with project_key={project_key}, summary={summary}, environment={environment}, DF_shape={filtered_scenarios_df.shape}") logger.info("=== Starting create_regression_task function ===") logger.info(f"Project: {project_key}, Summary: {summary}, Environment: {environment}") logger.info(f"Filtered DF shape: {filtered_scenarios_df.shape if filtered_scenarios_df is not None else 'None'}") try: # Get metadata first to access field values metadata = get_project_metadata(project_key) if not metadata: error_msg = "Could not get project metadata" logger.error(error_msg) st.error(error_msg) return None # Get customer field values and map environment customer_values = get_customer_field_values(metadata) parent_value, child_value = map_customer_value(environment, customer_values) logger.info(f"Mapped customer values - Parent: {parent_value}, Child: {child_value}") # Get Jira client if "jira_client" not in st.session_state: error_msg = "No Jira client available. Please connect to Jira first." logger.error(error_msg) return None jira_client = st.session_state.jira_client logger.info("Got Jira client from session state") # Get active sprint active_sprint = get_current_sprint(get_regression_board(project_key)['id']) if not active_sprint: error_msg = "No active sprint found" logger.error(error_msg) return None logger.info(f"Found active sprint: {active_sprint.name} (ID: {active_sprint.id})") # Extract functional area from filtered scenarios functional_areas = [] try: if "Functional area" in filtered_scenarios_df.columns: functional_areas = filtered_scenarios_df["Functional area"].unique().tolist() logger.info(f"Extracted functional areas: {functional_areas}") except Exception as e: logger.exception(f"Error extracting functional areas: {str(e)}") st.error(f"Error extracting functional areas: {str(e)}") return None # Calculate story points based on number of scenarios story_points = calculate_story_points(len(filtered_scenarios_df)) logger.info(f"Calculated story points: {story_points}") # Map functional area using metadata functional_area_parent, functional_area_child = map_functional_area( functional_areas[0] if functional_areas else "Data Exchange", metadata ) logger.info(f"Mapped functional area to parent: {functional_area_parent}, child: {functional_area_child}") # Prepare issue dictionary with all required fields issue_dict = { "project": {"key": project_key}, "summary": summary, "description": description, "issuetype": {"name": "Story"}, "components": [{"name": "Maintenance (Regression)"}], "customfield_10427": { "value": parent_value, "child": { "value": child_value } }, "customfield_12730": {"value": "Non-Business Critical"}, # Regression Type field "customfield_13430": {"value": str(len(filtered_scenarios_df))}, # Number of Scenarios "customfield_13100": { "value": functional_area_parent, "child": { "value": functional_area_child } }, "assignee": {"name": st.session_state.jira_username}, "customfield_10002": story_points # Story Points field } # Log the complete issue dictionary logger.info(f"Issue dictionary prepared: {issue_dict}") # Create the issue logger.info("Attempting to create issue in Jira...") try: # Create the issue with all fields new_issue = jira_client.create_issue(fields=issue_dict) logger.info(f"Issue created successfully: {new_issue.key}") # Add issue to sprint try: logger.info(f"Attempting to add issue {new_issue.key} to sprint {active_sprint.id}...") jira_client.add_issues_to_sprint(active_sprint.id, [new_issue.key]) logger.info(f"Added issue {new_issue.key} to sprint {active_sprint.name}") except Exception as sprint_error: logger.exception(f"Failed to add issue to sprint: {str(sprint_error)}") st.warning(f"⚠️ Could not add task to sprint. Error: {str(sprint_error)}") # Display success message st.success(f"✅ Task created successfully: {new_issue.key}") return new_issue except Exception as create_error: error_message = str(create_error) logger.exception(f"Failed to create issue: {error_message}") # Try to extract the response content if it's a JIRA Error try: if hasattr(create_error, 'response'): status_code = getattr(create_error.response, 'status_code', 'N/A') logger.error(f"Response status code: {status_code}") if hasattr(create_error.response, 'text'): response_text = create_error.response.text logger.error(f"Response text: {response_text}") # Display the error to the user st.error(f"❌ Error creating task in Jira (Status: {status_code}):") st.error(response_text) except Exception as extract_error: logger.exception(f"Error extracting response details: {str(extract_error)}") return None except Exception as e: error_message = f"❌ Unexpected error in create_regression_task: {str(e)}" logger.exception(error_message) st.error(error_message) logger.error(f"Traceback: {''.join(traceback.format_exception(type(e), e, e.__traceback__))}") return None def create_test_data(): """Create test data for development/testing that matches the filtered scenarios from multiple.py""" test_data = { 'Environment': ['RI2008'] * 7, # Same environment for all scenarios 'Functional area': ['Data Exchange - Enquiries - Reports'] * 7, 'Scenario Name': [ 'Add Missions Error Handling - Existing Code', 'Add Missions Error Handling - Incorrect Max Iterations', 'Add Missions Error Handling - No Badges', 'Add Missions Success - Individual', 'Add Missions Success - Team', 'Add Missions Success - Individual Iteration', 'Add Missions Success - Team Max Iterations' ], 'Error Message': [ 'AssertionError [ERR_ASSERTION]: Error handling for existing code failed', 'AssertionError [ERR_ASSERTION]: Error handling for max iterations failed', 'AssertionError [ERR_ASSERTION]: Error handling for missing badges failed', 'AssertionError [ERR_ASSERTION]: Link validation failed', 'AssertionError [ERR_ASSERTION]: Link validation failed', 'AssertionError [ERR_ASSERTION]: Link validation failed', 'AssertionError [ERR_ASSERTION]: Link validation failed' ], 'Status': ['FAILED'] * 7, 'Time spent(m:s)': ['02:30'] * 7, # Example time spent 'Start datetime': [datetime.now()] * 7 # Current time as example } # Create DataFrame df = pd.DataFrame(test_data) # Add metadata that will be used for Jira task creation df.attrs['metadata'] = { 'Customer': 'MIP Research and Innovation - R&I 2008', 'Sprint': 'RS Sprint 195', 'Story Points': 5, 'Regression Type': 'Non-Business Critical', 'Component': 'Maintenance (Regression)', 'Priority': 'Lowest', 'Type': 'Story', 'Labels': 'None', 'Assignee': 'Daniel Akinsola', 'Reporter': 'Daniel Akinsola' } return df def process_failures_button(filtered_scenarios_df, environment=None): """Process failures and create Jira task""" # Use RS project key since we can see it's a Regression Sprint board project_key = "RS" project_name = "RS - Regression" # Get environment from DataFrame if not provided if environment is None and 'Environment' in filtered_scenarios_df.columns: environment = filtered_scenarios_df['Environment'].iloc[0] # Get unique functional areas from the DataFrame functional_areas = filtered_scenarios_df['Functional area'].unique() functional_area = functional_areas[0] if len(functional_areas) > 0 else 'R&I' # Extract the main service category service_category = functional_area.split(' - ')[0] + '-' + functional_area.split(' - ')[1] service_category = service_category.replace(' ', '') # Format environment value env_number = environment[2:] if environment.startswith('RI') else environment env_value = env_number if env_number.startswith('R&I') else f"R&I {env_number}" # Get the current sprint from the regression board board = get_regression_board(project_key) sprint = None sprint_name = "No Active Sprint" if board: sprint = get_current_sprint(board['id']) if sprint: sprint_name = sprint.name st.write(f"Found active sprint: {sprint_name}") else: st.warning("No active sprint found") # Create metadata dictionary with all required fields metadata = { 'Project Key': project_key, 'Project': project_name, 'Issue Type': 'Story', 'Customer': 'MIP Research and Innovation', 'Environment': env_value, 'Functional Areas': service_category, 'Sprint': sprint_name, 'Story Points': calculate_story_points(len(filtered_scenarios_df)), 'Regression Type': 'Non-Business Critical', 'Number of Scenarios': len(filtered_scenarios_df) if len(filtered_scenarios_df) <= 50 else 50 } # Initialize session states if not exists if 'task_content' not in st.session_state: st.session_state.task_content = None if 'task_created' not in st.session_state: st.session_state.task_created = False if 'created_task' not in st.session_state: st.session_state.created_task = None if 'show_success' not in st.session_state: st.session_state.show_success = False if 'last_task_key' not in st.session_state: st.session_state.last_task_key = None if 'last_task_url' not in st.session_state: st.session_state.last_task_url = None # Store sprint information in session state for task creation if sprint: st.session_state.current_sprint = sprint # If we have a recently created task, show the success message first if st.session_state.show_success and st.session_state.last_task_key: st.success(f"✅ Task created successfully!") # Display task link in a more prominent way st.markdown( f"""

Task Details

Task Key: {st.session_state.last_task_key}

View Task in Jira
""", unsafe_allow_html=True ) # Add a button to create another task if st.button("Create Another Task", key="create_another"): # Clear all task-related state st.session_state.task_content = None st.session_state.task_created = False st.session_state.created_task = None st.session_state.show_success = False st.session_state.last_task_key = None st.session_state.last_task_url = None st.rerun() return # Button to generate content if st.button("Generate Task Content"): with st.spinner("Generating task content..."): summary, description = generate_task_content(filtered_scenarios_df) if summary and description: st.session_state.task_content = { 'summary': summary, 'description': description, 'environment': environment, 'metadata': metadata } else: st.error("Failed to generate task content. Please try again.") return # Display content and create task button if content exists if st.session_state.task_content: with st.expander("Generated Task Content", expanded=True): # Summary section with styling st.markdown("### Summary") st.markdown(f"""
{st.session_state.task_content['summary']}
""", unsafe_allow_html=True) # Description section with styling st.markdown("### Description") st.markdown(f"""
{st.session_state.task_content['description']}
""", unsafe_allow_html=True) # Get and display available functional area values display_functional_areas(st.session_state.task_content['metadata']) # Display metadata with actual field values st.markdown("### Fields to be Set") metadata = st.session_state.task_content['metadata'] metadata_html = f"""

Project: {metadata['Project']}

Issue Type: {metadata['Issue Type']}

Customer: {metadata['Customer']}

Environment: {metadata['Environment']}

Functional Areas: {metadata['Functional Areas']}

Sprint: {metadata['Sprint']}

Story Points: {metadata['Story Points']}

Regression Type: {metadata['Regression Type']}

Number of Scenarios: {metadata['Number of Scenarios']}

""" st.markdown(metadata_html, unsafe_allow_html=True) # Add buttons in columns for better layout col1, col2 = st.columns(2) with col1: if st.button("🔄 Regenerate Content", key="regenerate"): st.session_state.task_content = None st.rerun() with col2: if st.button("📝 Create Jira Task", key="create"): task = create_regression_task( metadata['Project Key'], st.session_state.task_content['summary'], st.session_state.task_content['description'], st.session_state.task_content['environment'], filtered_scenarios_df ) if task: # Store task information in session state st.session_state.last_task_key = task.key st.session_state.last_task_url = f"{JIRA_SERVER}/browse/{task.key}" st.session_state.show_success = True # Clear the content st.session_state.task_content = None # Force refresh of sprint stats on next load st.session_state.force_sprint_refresh = True st.rerun() else: st.error("Failed to create task. Please try again.") def display_functional_areas(metadata): """Display functional areas and customer fields in a tree-like structure with styling""" if not metadata: st.error("No metadata available") return # If this is task metadata (not project metadata), get project metadata first if 'all_fields' not in metadata: project_metadata = get_project_metadata("RS") # RS is the fixed project key if not project_metadata: st.error("Could not fetch project metadata") return metadata = project_metadata # Display Functional Areas func_field = metadata['all_fields'].get('customfield_13100', {}) if func_field and 'allowedValues' in func_field: st.markdown("### Available Functional Areas") # Log the raw allowedValues for debugging logger.info("=== Raw Functional Area Values ===") for value in func_field['allowedValues']: logger.info(f"Raw value: {value}") # Create a dictionary to store parent-child relationships parent_child_map = {} # First pass: collect all parent-child relationships for value in func_field['allowedValues']: if isinstance(value, dict): parent_value = value.get('value', 'Unknown') if parent_value: child_values = [] logger.info(f"\nProcessing parent: {parent_value}") if 'cascadingOptions' in value: logger.info(f"Found cascading options for {parent_value}:") for child in value['cascadingOptions']: logger.info(f"Raw child value: {child}") if isinstance(child, dict) and child.get('value'): child_value = child.get('value') child_values.append(child_value) logger.info(f" - Added child: {child_value}") parent_child_map[parent_value] = sorted(child_values) if child_values else [] logger.info(f"Final children for {parent_value}: {parent_child_map[parent_value]}") # Second pass: display the relationships for parent_value in sorted(parent_child_map.keys()): child_values = parent_child_map[parent_value] # Create a styled box for each parent and its children st.markdown(f"""
{parent_value} {"", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Log the parent-child relationship for debugging logger.info(f"Displaying Parent: {parent_value}") if child_values: logger.info(f" With Children: {', '.join(child_values)}") else: logger.info(" No children found") else: st.warning("No functional area values found in metadata") logger.warning("No functional area values found in metadata") if func_field: logger.info("Available func_field keys: " + str(list(func_field.keys()))) # Display Customer Field cust_field = metadata['all_fields'].get('customfield_10427', {}) if cust_field and 'allowedValues' in cust_field: st.markdown("### Available Customer Values") # Log the raw allowedValues for debugging logger.info("=== Raw Customer Field Values ===") for value in cust_field['allowedValues']: logger.info(f"Raw value: {value}") # Create a dictionary to store parent-child relationships for customer field customer_parent_child_map = {} # First pass: collect all parent-child relationships for value in cust_field['allowedValues']: if isinstance(value, dict): parent_value = value.get('value', 'Unknown') if parent_value: child_values = [] logger.info(f"\nProcessing customer parent: {parent_value}") if 'cascadingOptions' in value: logger.info(f"Found customer cascading options for {parent_value}:") for child in value['cascadingOptions']: logger.info(f"Raw child value: {child}") if isinstance(child, dict) and child.get('value'): child_value = child.get('value') child_values.append(child_value) logger.info(f" - Added child: {child_value}") customer_parent_child_map[parent_value] = sorted(child_values) if child_values else [] logger.info(f"Final customer children for {parent_value}: {customer_parent_child_map[parent_value]}") # Second pass: display the relationships for parent_value in sorted(customer_parent_child_map.keys()): child_values = customer_parent_child_map[parent_value] # Create a styled box for each parent and its children st.markdown(f"""
{parent_value} {"", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Log the parent-child relationship for debugging logger.info(f"Displaying Customer Parent: {parent_value}") if child_values: logger.info(f" With Children: {', '.join(child_values)}") else: logger.info(" No children found") else: st.warning("No customer field values found in metadata") logger.warning("No customer field values found in metadata") if cust_field: logger.info("Available cust_field keys: " + str(list(cust_field.keys()))) def display_story_points_stats(force_refresh=False): """Display story points statistics from current sprint""" if not st.session_state.jira_client: return # Initialize session state for sprint data if not exists if 'sprint_data' not in st.session_state: st.session_state.sprint_data = None # Initialize refresh timestamp if not exists if 'last_sprint_refresh' not in st.session_state: st.session_state.last_sprint_refresh = None try: # Only fetch data if forced refresh, no data exists, or refresh timestamp is old current_time = datetime.now() refresh_needed = ( force_refresh or st.session_state.sprint_data is None or (st.session_state.last_sprint_refresh and (current_time - st.session_state.last_sprint_refresh).total_seconds() > 300) # 5 minutes cache ) if refresh_needed: with st.spinner("Fetching sprint data..."): # Get regression board board = get_regression_board("RS") if not board: return # Get current sprint sprint = get_current_sprint(board['id']) if not sprint: return # Get sprint issues issues = get_sprint_issues(board['id'], sprint.id, board['estimation_field']) if not issues: return # Calculate points issues_data, total_points, completed_points, in_progress_points = calculate_points(issues, board['estimation_field']) # Store in session state st.session_state.sprint_data = { 'sprint_name': sprint.name, 'total_points': total_points, 'completed_points': completed_points, 'in_progress_points': in_progress_points, 'timestamp': current_time } st.session_state.last_sprint_refresh = current_time # Display data from session state if st.session_state.sprint_data: sprint_data = st.session_state.sprint_data # Create compact metrics display using custom HTML/CSS st.markdown(f"""
Current Sprint: {sprint_data['sprint_name']}
Total
{sprint_data['total_points']:.1f}
Done
{sprint_data['completed_points']:.1f}
In Progress
{sprint_data['in_progress_points']:.1f}
Complete
{(sprint_data['completed_points'] / sprint_data['total_points'] * 100) if sprint_data['total_points'] > 0 else 0:.1f}%
""", unsafe_allow_html=True) # Show progress bar progress = sprint_data['completed_points'] / sprint_data['total_points'] if sprint_data['total_points'] > 0 else 0 st.progress(progress) # Add refresh button with key based on timestamp to prevent rerendering refresh_key = f"refresh_stats_{datetime.now().strftime('%Y%m%d%H%M%S')}" if st.button("🔄 Refresh", key=refresh_key, use_container_width=True): # Use a session state flag to trigger refresh on next rerun st.session_state.force_sprint_refresh = True st.rerun() except Exception as e: st.error(f"Error updating story points: {str(e)}") # Check if we need to force refresh (from button click) if 'force_sprint_refresh' in st.session_state and st.session_state.force_sprint_refresh: st.session_state.force_sprint_refresh = False return display_story_points_stats(force_refresh=True) def main(): st.title("Jira Integration Test") # Add test data button if st.button("Load Test Data"): st.session_state.filtered_scenarios_df = create_test_data() st.success("Test data loaded!") is_authenticated = render_jira_login() if is_authenticated and st.session_state.projects: # Fixed project and board selection project_key = "RS" board_type = "scrum" board_name = "Regression Sprints" # Display fixed selections in a more compact way st.markdown("""
Project
RS - Regression
Board
Regression Sprints (scrum)
""", unsafe_allow_html=True) # Display sprint stats (only fetch if no data exists) display_story_points_stats(force_refresh=False) # Show test data if loaded if 'filtered_scenarios_df' in st.session_state: st.subheader("Failed Scenarios") st.dataframe(st.session_state.filtered_scenarios_df) # Get environment directly from the DataFrame if 'Environment' in st.session_state.filtered_scenarios_df.columns: environment = st.session_state.filtered_scenarios_df['Environment'].iloc[0] st.info(f"Using environment from data: {environment}") process_failures_button(st.session_state.filtered_scenarios_df, environment) else: st.error("No environment information found in the data") # Add project fields button at the bottom if st.button("Show Project Fields"): display_project_fields() if __name__ == "__main__": main()