# # Import necessary libraries # from fastapi import FastAPI, HTTPException # from pydantic import BaseModel # import gspread # from google.oauth2.service_account import Credentials # import pandas as pd # from collections import defaultdict # import os # # Initialize the FastAPI app # app = FastAPI() # # Step 1: Define a function to get Google Sheets API credentials # def get_credentials(): # """Get Google Sheets API credentials from environment variables.""" # try: # # Construct the service account info dictionary # service_account_info = { # "type": os.getenv("SERVICE_ACCOUNT_TYPE"), # "project_id": os.getenv("PROJECT_ID"), # "private_key_id": os.getenv("PRIVATE_KEY_ID"), # "private_key": os.getenv("PRIVATE_KEY").replace('\\n', '\n'), # "client_email": os.getenv("CLIENT_EMAIL"), # "client_id": os.getenv("CLIENT_ID"), # "auth_uri": os.getenv("AUTH_URI"), # "token_uri": os.getenv("TOKEN_URI"), # "auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"), # "client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"), # "universe_domain": os.getenv("UNIVERSE_DOMAIN") # } # scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] # creds = Credentials.from_service_account_info(service_account_info, scopes=scope) # return creds # except Exception as e: # print(f"Error getting credentials: {e}") # return None # # Step 2: Authorize gspread using the credentials # creds = get_credentials() # client = gspread.authorize(creds) # # Input the paths and coaching code # journal_file_path = '' # panic_button_file_path = '' # test_file_path = '' # coachingCode = '1919' # if coachingCode == '1919': # journal_file_path = 'https://docs.google.com/spreadsheets/d/1EFf2lr4A10nt4RhIqxCD_fxe-l3sXH09II0TEkMmvhA/edit?usp=drive_link' # panic_button_file_path = 'https://docs.google.com/spreadsheets/d/1nFZGkCvRV6qS-mhsORhX3dxI0JSge32_UwWgWKl3eyw/edit?usp=drive_link' # test_file_path = 'https://docs.google.com/spreadsheets/d/13PUHySUXWtKBusjugoe7Dbsm39PwBUfG4tGLipspIx4/edit?usp=drive_link' # # Step 3: Open Google Sheets using the URLs # journal_file = client.open_by_url(journal_file_path).worksheet('Sheet1') # panic_button_file = client.open_by_url(panic_button_file_path).worksheet('Sheet1') # Fixed missing part # test_file = client.open_by_url(test_file_path).worksheet('Sheet1') # # Step 4: Convert the sheets into Pandas DataFrames # journal_df = pd.DataFrame(journal_file.get_all_values()) # panic_button_df = pd.DataFrame(panic_button_file.get_all_values()) # test_df = pd.DataFrame(test_file.get_all_values()) # # Label the columns manually since there are no headers # journal_df.columns = ['user_id', 'productivity_yes_no', 'productivity_rate'] # panic_button_df.columns = ['user_id', 'panic_button'] # # Initialize a list for the merged data # merged_data = [] # # Step 5: Group panic buttons by user_id and combine into a single comma-separated string # panic_button_grouped = panic_button_df.groupby('user_id')['panic_button'].apply(lambda x: ','.join(x)).reset_index() # # Merge journal and panic button data # merged_journal_panic = pd.merge(journal_df, panic_button_grouped, on='user_id', how='outer') # # Step 6: Process the test data # test_data = [] # for index, row in test_df.iterrows(): # user_id = row[0] # i = 1 # while i < len(row) and pd.notna(row[i]): # Process chapter and score pairs # chapter = row[i].lower().strip() # score = row[i + 1] # if pd.notna(score): # test_data.append({'user_id': user_id, 'test_chapter': chapter, 'test_score': score}) # i += 2 # # Convert the processed test data into a DataFrame # test_df_processed = pd.DataFrame(test_data) # # Step 7: Merge the journal+panic button data with the test data # merged_data = pd.merge(merged_journal_panic, test_df_processed, on='user_id', how='outer') # # Step 8: Drop rows where all data (except user_id and test_chapter) is missing # merged_data_cleaned = merged_data.dropna(subset=['productivity_yes_no', 'productivity_rate', 'panic_button', 'test_chapter'], how='all') # # Group the merged DataFrame by user_id # df = pd.DataFrame(merged_data_cleaned) # # Function to process panic button counts and test scores # def process_group(group): # # Panic button counts # panic_button_series = group['panic_button'].dropna() # panic_button_dict = panic_button_series.value_counts().to_dict() # # Test scores aggregation # test_scores = group[['test_chapter', 'test_score']].dropna() # test_scores['test_score'] = pd.to_numeric(test_scores['test_score'], errors='coerce') # # Create the test_scores_dict excluding NaN values # test_scores_dict = test_scores.groupby('test_chapter')['test_score'].mean().dropna().to_dict() # return pd.Series({ # 'productivity_yes_no': group['productivity_yes_no'].iloc[0], # 'productivity_rate': group['productivity_rate'].iloc[0], # 'panic_button': panic_button_dict, # 'test_scores': test_scores_dict # }) # # Apply the group processing function # merged_df = df.groupby('user_id').apply(process_group).reset_index() # # Step 9: Calculate potential score # # Panic button weightages # academic_weights = {'BACKLOGS': -5, 'MISSED CLASSES': -4, 'NOT UNDERSTANDING': -3, 'BAD MARKS': -3, 'LACK OF MOTIVATION': -3} # non_academic_weights = {'EMOTIONAL FACTORS': -3, 'PROCRASTINATE': -2, 'LOST INTEREST': -4, 'LACK OF FOCUS': -2, 'GOALS NOT ACHIEVED': -2, 'LACK OF DISCIPLINE': -2} # # Max weighted panic score # max_weighted_panic_score = sum([max(academic_weights.values()) * 3, max(non_academic_weights.values()) * 3]) # # Function to calculate potential score # def calculate_potential_score(row): # # Test score normalization (70% weightage) # if row['test_scores']: # Check if test_scores is not empty # avg_test_score = sum(row['test_scores'].values()) / len(row['test_scores']) # test_score_normalized = (avg_test_score / 40) * 70 # Scale test score to 70 # else: # test_score_normalized = 0 # Default value for users with no test scores # # Panic score calculation (20% weightage) # student_panic_score = 0 # if row['panic_button']: # Ensure panic_button is not NaN or empty # for factor, count in row['panic_button'].items(): # if factor in academic_weights: # student_panic_score += academic_weights[factor] * count # elif factor in non_academic_weights: # student_panic_score += non_academic_weights[factor] * count # else: # student_panic_score = 0 # Default if no panic button issues # # Panic score normalized to 20 # panic_score = 20 * (1 - (student_panic_score / max_weighted_panic_score) if max_weighted_panic_score != 0 else 1) # # Journal score calculation (10% weightage) # if pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'Yes': # if pd.notna(row['productivity_rate']): # journal_score = (float(row['productivity_rate']) / 10) * 10 # Scale journal score to 10 # else: # journal_score = 0 # Default if productivity_rate is missing # elif pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'No': # if pd.notna(row['productivity_rate']): # journal_score = (float(row['productivity_rate']) / 10) * 5 # Scale journal score to 5 if "No" # else: # journal_score = 0 # Default if productivity_rate is missing # else: # journal_score = 0 # Default if productivity_yes_no is missing # # Total score based on new weightages # total_potential_score = test_score_normalized + panic_score + journal_score # return total_potential_score # # Apply potential score calculation to the dataframe # merged_df['potential_score'] = merged_df.apply(calculate_potential_score, axis=1) # merged_df['potential_score'] = merged_df['potential_score'].round(2) # # Step 10: Sort by potential score # sorted_df = merged_df[['user_id', 'potential_score']].sort_values(by='potential_score', ascending=False) # # Step 11: Define API endpoint to get the sorted potential scores # @app.get("/sorted-potential-scores") # async def get_sorted_potential_scores(): # try: # result = sorted_df.to_dict(orient="records") # return {"sorted_scores": result} # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) # Import necessary libraries # from fastapi import FastAPI, HTTPException, Query # from pydantic import BaseModel # import gspread # from google.oauth2.service_account import Credentials # import pandas as pd # from collections import defaultdict # import os # from fastapi.middleware.cors import CORSMiddleware # # Initialize the FastAPI app # app = FastAPI() # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # You can specify domains instead of "*" to restrict access # allow_credentials=True, # allow_methods=["*"], # Allows all HTTP methods (POST, GET, OPTIONS, etc.) # allow_headers=["*"], # Allows all headers # ) # # Step 1: Define a function to get Google Sheets API credentials # def get_credentials(): # """Get Google Sheets API credentials from environment variables.""" # try: # # Construct the service account info dictionary # service_account_info = { # "type": os.getenv("SERVICE_ACCOUNT_TYPE"), # "project_id": os.getenv("PROJECT_ID"), # "private_key_id": os.getenv("PRIVATE_KEY_ID"), # "private_key": os.getenv("PRIVATE_KEY").replace('\\n', '\n'), # "client_email": os.getenv("CLIENT_EMAIL"), # "client_id": os.getenv("CLIENT_ID"), # "auth_uri": os.getenv("AUTH_URI"), # "token_uri": os.getenv("TOKEN_URI"), # "auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"), # "client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"), # "universe_domain": os.getenv("UNIVERSE_DOMAIN") # } # scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] # creds = Credentials.from_service_account_info(service_account_info, scopes=scope) # return creds # except Exception as e: # print(f"Error getting credentials: {e}") # return None # # Step 2: Authorize gspread using the credentials # creds = get_credentials() # client = gspread.authorize(creds) # # Function to get file paths based on coaching code # def get_file_paths(coaching_code): # if coaching_code == '1919': # return { # 'journal': 'https://docs.google.com/spreadsheets/d/1EFf2lr4A10nt4RhIqxCD_fxe-l3sXH09II0TEkMmvhA/edit?usp=drive_link', # 'panic_button': 'https://docs.google.com/spreadsheets/d/1nFZGkCvRV6qS-mhsORhX3dxI0JSge32_UwWgWKl3eyw/edit?usp=drive_link', # 'test': 'https://docs.google.com/spreadsheets/d/13PUHySUXWtKBusjugoe7Dbsm39PwBUfG4tGLipspIx4/edit?usp=drive_link' # } # if coaching_code == '0946': # return { # 'journal': 'https://docs.google.com/spreadsheets/d/1c1TkL7sOUvFn6UPz3gwp135UVjOou9u1weohWzpmx6I/edit?usp=drive_link', # 'panic_button': 'https://docs.google.com/spreadsheets/d/1RhbPQnNNBUthKKJyoW4q6x3uaWl1YSqmsFlfJ2THphE/edit?usp=drive_link', # 'test': 'https://docs.google.com/spreadsheets/d/1JO5wDkfl2fr2ZQenI8OEu48jkWm48veYN1Fsw5Ctkzw/edit?usp=drive_link' # } # # Panic button weightages # academic_weights = {'BACKLOGS': -5, 'MISSED CLASSES': -4, 'NOT UNDERSTANDING': -3, 'BAD MARKS': -3, 'LACK OF MOTIVATION': -3} # non_academic_weights = {'EMOTIONAL FACTORS': -3, 'PROCRASTINATE': -2, 'LOST INTEREST': -4, 'LACK OF FOCUS': -2, 'GOALS NOT ACHIEVED': -2, 'LACK OF DISCIPLINE': -2} # # Max weighted panic score # max_weighted_panic_score = sum([max(academic_weights.values()) * 3, max(non_academic_weights.values()) * 3]) # # Function to calculate potential score # def calculate_potential_score(row): # # Test score normalization (70% weightage) # if row['test_scores']: # Check if test_scores is not empty # avg_test_score = sum(row['test_scores'].values()) / len(row['test_scores']) # test_score_normalized = (avg_test_score / 40) * 70 # Scale test score to 70 # else: # test_score_normalized = 0 # Default value for users with no test scores # # Panic score calculation (20% weightage) # student_panic_score = 0 # if row['panic_button']: # Ensure panic_button is not NaN or empty # for factor, count in row['panic_button'].items(): # if factor in academic_weights: # student_panic_score += academic_weights[factor] * count # elif factor in non_academic_weights: # student_panic_score += non_academic_weights[factor] * count # else: # student_panic_score = 0 # Default if no panic button issues # # Panic score normalized to 20 # panic_score = 20 * (1 - (student_panic_score / max_weighted_panic_score) if max_weighted_panic_score != 0 else 1) # # Journal score calculation (10% weightage) # if pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'Yes': # if pd.notna(row['productivity_rate']): # journal_score = (float(row['productivity_rate']) / 10) * 10 # Scale journal score to 10 # else: # journal_score = 0 # Default if productivity_rate is missing # elif pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'No': # if pd.notna(row['productivity_rate']): # journal_score = (float(row['productivity_rate']) / 10) * 5 # Scale journal score to 5 if "No" # else: # journal_score = 0 # Default if productivity_rate is missing # else: # journal_score = 0 # Default if productivity_yes_no is missing # # Total score based on new weightages # total_potential_score = test_score_normalized + panic_score + journal_score # return total_potential_score # # Step 11: Define API endpoint to get the sorted potential scores # @app.get("/sorted-potential-scores") # async def get_sorted_potential_scores(coaching_code: str = Query(..., description="Coaching code to determine file paths")): # try: # file_paths = get_file_paths(coaching_code) # if not file_paths: # raise HTTPException(status_code=400, detail="Invalid coaching code") # print("A"); # # Open Google Sheets using the URLs # journal_file = client.open_by_url(file_paths['journal']).worksheet('Sheet1') # panic_button_file = client.open_by_url(file_paths['panic_button']).worksheet('Sheet1') # test_file = client.open_by_url(file_paths['test']).worksheet('Sheet1') # print("B"); # # Convert the sheets into Pandas DataFrames # journal_df = pd.DataFrame(journal_file.get_all_values()) # panic_button_df = pd.DataFrame(panic_button_file.get_all_values()) # test_df = pd.DataFrame(test_file.get_all_values()) # print("C"); # # Label the columns manually since there are no headers # journal_df.columns = ['user_id', 'productivity_yes_no', 'productivity_rate'] # panic_button_df.columns = ['user_id', 'panic_button'] # print("D") # # Initialize a list for the merged data # merged_data = [] # # Group panic buttons by user_id and combine into a single comma-separated string # panic_button_grouped = panic_button_df.groupby('user_id')['panic_button'].apply(lambda x: ','.join(x)).reset_index() # print("E") # # Merge journal and panic button data # merged_journal_panic = pd.merge(journal_df, panic_button_grouped, on='user_id', how='outer') # print("F") # # Process the test data # test_data = [] # for index, row in test_df.iterrows(): # user_id = row[0] # i = 1 # while i < len(row) and pd.notna(row[i]): # Process chapter and score pairs # chapter = row[i].lower().strip() # score = row[i + 1] # if pd.notna(score): # test_data.append({'user_id': user_id, 'test_chapter': chapter, 'test_score': score}) # i += 2 # print("G") # # Convert the processed test data into a DataFrame # test_df_processed = pd.DataFrame(test_data) # print("H") # # Merge the journal+panic button data with the test data # merged_data = pd.merge(merged_journal_panic, test_df_processed, on='user_id', how='outer') # print("I") # # Drop rows where all data (except user_id and test_chapter) is missing # merged_data_cleaned = merged_data.dropna(subset=['productivity_yes_no', 'productivity_rate', 'panic_button', 'test_chapter'], how='all') # print("J") # # Group the merged DataFrame by user_id # df = pd.DataFrame(merged_data_cleaned) # print("K") # # Function to process panic button counts and test scores # def process_group(group): # # Panic button counts # panic_button_series = group['panic_button'].dropna() # panic_button_dict = panic_button_series.value_counts().to_dict() # # Test scores aggregation # test_scores = group[['test_chapter', 'test_score']].dropna() # test_scores['test_score'] = pd.to_numeric(test_scores['test_score'], errors='coerce') # # Create the test_scores_dict excluding NaN values # test_scores_dict = test_scores.groupby('test_chapter')['test_score'].mean().dropna().to_dict() # return pd.Series({ # 'productivity_yes_no': group['productivity_yes_no'].iloc[0], # 'productivity_rate': group['productivity_rate'].iloc[0], # 'panic_button': panic_button_dict, # 'test_scores': test_scores_dict # }) # # Apply the group processing function # merged_df = df.groupby('user_id').apply(process_group).reset_index() # print("L") # # Calculate potential scores and sort # merged_df['potential_score'] = merged_df.apply(calculate_potential_score, axis=1) # merged_df['potential_score'] = merged_df['potential_score'].round(2) # sorted_df = merged_df[['user_id', 'potential_score']].sort_values(by='potential_score', ascending=False) # print("M") # result = sorted_df.to_dict(orient="records") # return {"sorted_scores": result} # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel import gspread from google.oauth2.service_account import Credentials import pandas as pd from collections import defaultdict import os from fastapi.middleware.cors import CORSMiddleware app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # You can specify domains instead of "*" to restrict access allow_credentials=True, allow_methods=["*"], # Allows all HTTP methods (POST, GET, OPTIONS, etc.) allow_headers=["*"], # Allows all headers ) # Model for request class CoachingCodeRequest(BaseModel): coachingCode: str # Function to get credentials def get_credentials(): """Get Google Sheets API credentials from environment variables.""" try: # Construct the service account info dictionary service_account_info = { "type": os.getenv("SERVICE_ACCOUNT_TYPE"), "project_id": os.getenv("PROJECT_ID"), "private_key_id": os.getenv("PRIVATE_KEY_ID"), "private_key": os.getenv("PRIVATE_KEY").replace('\\n', '\n'), "client_email": os.getenv("CLIENT_EMAIL"), "client_id": os.getenv("CLIENT_ID"), "auth_uri": os.getenv("AUTH_URI"), "token_uri": os.getenv("TOKEN_URI"), "auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"), "client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"), "universe_domain": os.getenv("UNIVERSE_DOMAIN") } scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] creds = Credentials.from_service_account_info(service_account_info, scopes=scope) return creds except Exception as e: print(f"Error getting credentials: {e}") return None # Select files based on coaching code def select_files(coaching_code): creds = get_credentials() client = gspread.authorize(creds) if coaching_code == "1919": journal_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1EFf2lr4A10nt4RhIqxCD_fxe-l3sXH09II0TEkMmvhA/edit?gid=0#gid=0').worksheet('Sheet1') panic_button_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1nFZGkCvRV6qS-mhsORhX3dxI0JSge32_UwWgWKl3eyw/edit?gid=0#gid=0').worksheet('Sheet1') test_file = client.open_by_url('https://docs.google.com/spreadsheets/d/13PUHySUXWtKBusjugoe7Dbsm39PwBUfG4tGLipspIx4/edit?gid=0#gid=0').worksheet('Sheet1') elif coaching_code == "1099": journal_file = client.open_by_url('https://docs.google.com/spreadsheets/d/12UQzr7xy70-MvbKUuqM6YMUF-y2kY1rumX0vOj0hKXI/edit?gid=0#gid=0').worksheet('Sheet1') panic_button_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1zaKSRKgf2Nd7lWIf315YzvQeTQ3gU_PIRIS_bEAhl90/edit?gid=0#gid=0').worksheet('Sheet1') test_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1ms_SdloQqlXO85NK_xExhHT0LEeLsth0VBmdHQt55jc/edit?gid=0#gid=0').worksheet('Sheet1') else: raise HTTPException(status_code=404, detail="Invalid coaching code") return journal_file, panic_button_file, test_file # Main route to get sorted scores @app.post("/get_revision_chapters") async def get_revison_chapters(data: CoachingCodeRequest): journal_file, panic_button_file, test_file = select_files(data.coachingCode) # Load data into DataFrames journal_df = pd.DataFrame(journal_file.get_all_values()) panic_button_df = pd.DataFrame(panic_button_file.get_all_values()) test_df = pd.DataFrame(test_file.get_all_values()) # Processing logic panic_data = [] for index, row in panic_button_df.iterrows(): user_id = row[0] row_pairs = row[1:].dropna().to_list()[-5:] for i in range(0, len(row_pairs), 2): panic = row_pairs[i].upper().strip() if pd.notna(panic): panic_data.append({'user_id': user_id, 'panic_button': panic}) panic_df_processed = pd.DataFrame(panic_data) test_data = [] for index, row in test_df.iterrows(): user_id = row[0] row_pairs = row[1:].dropna().to_list() chapter_scores = {} for i in range(0, len(row_pairs), 2): chapter = row_pairs[i].lower().strip() score = row_pairs[i + 1] if pd.notna(score): if chapter not in chapter_scores: chapter_scores[chapter] = [] chapter_scores[chapter].append(score) for chapter, scores in chapter_scores.items(): last_5_scores = scores[-5:] for score in last_5_scores: test_data.append({'user_id': user_id, 'test_chapter': chapter, 'test_score': score}) test_df_processed = pd.DataFrame(test_data) journal_data = [] for index, row in journal_df.iterrows(): user_id = row[0] row_pairs = row[1:].dropna().to_list()[-10:] for i in range(0, len(row_pairs), 2): productivity_yes_no = row_pairs[i].lower().strip() productivity_rate = row_pairs[i + 1] if pd.notna(productivity_rate): journal_data.append({'user_id': user_id, 'productivity_yes_no': productivity_yes_no, 'productivity_rate': productivity_rate}) journal_df_processed = pd.DataFrame(journal_data) merged_journal_panic = pd.merge(panic_df_processed, journal_df_processed, on='user_id', how='outer') merged_data = pd.merge(merged_journal_panic, test_df_processed, on='user_id', how='outer') merged_data_cleaned = merged_data.dropna(subset=['productivity_yes_no', 'productivity_rate', 'panic_button', 'test_chapter'], how='all') def process_group(group): # Panic button counts panic_button_series = group['panic_button'].dropna() panic_button_dict = panic_button_series.value_counts().to_dict() # Test scores aggregation test_scores = group[['test_chapter', 'test_score']].dropna() test_scores['test_score'] = pd.to_numeric(test_scores['test_score'], errors='coerce') # Create the test_scores_dict excluding NaN values test_scores_dict = test_scores.groupby('test_chapter')['test_score'].mean().dropna().to_dict() return pd.Series({ 'productivity_yes_no': group['productivity_yes_no'].iloc[0], 'productivity_rate': group['productivity_rate'].iloc[0], 'panic_button': panic_button_dict, 'test_scores': test_scores_dict }) merged_df = merged_data_cleaned.groupby('user_id').apply(process_group).reset_index() revision_dict = {} # Define the score threshold for requiring revision threshold = 16 # Iterate through each student's test_scores for index, row in merged_df.iterrows(): user_id = row['user_id'] test_scores = row['test_scores'] # Check each chapter's score for this student for chapter, score in test_scores.items(): if score < threshold: if chapter not in revision_dict: revision_dict[chapter] = [] revision_dict[chapter].append(user_id) # Convert the dictionary to the desired DataFrame format revision_df = pd.DataFrame({ 'chapter_name': list(revision_dict.keys()), 'user_ids_needing_revision': [', '.join(users) for users in revision_dict.values()] }) revision_json = revision_df.to_dict(orient="records") return {"data": revision_json}