import os import streamlit as st from azure.cosmos import CosmosClient, exceptions from azure.cosmos.exceptions import CosmosHttpResponseError import bcrypt import base64 import logging from datetime import datetime, timezone logger = logging.getLogger(__name__) def clean_and_validate_key(key): """Limpia y valida la clave de CosmosDB""" key = key.strip() while len(key) % 4 != 0: key += '=' try: base64.b64decode(key) return key except: raise ValueError("La clave proporcionada no es válida") # Verificar las variables de entorno endpoint = os.getenv("COSMOS_ENDPOINT") key = os.getenv("COSMOS_KEY") if not endpoint or not key: raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas") key = clean_and_validate_key(key) def authenticate_user(username, password): """ Autentica un usuario y registra el inicio de sesión """ try: user_item = get_user(username) if user_item and verify_password(user_item['password'], password): logger.info(f"Usuario autenticado: {username}, Rol: {user_item['role']}") # Registrar inicio de sesión session_id = record_login(username) if session_id: st.session_state.session_id = session_id st.session_state.login_time = datetime.now(timezone.utc).isoformat() return True, user_item['role'] logger.warning(f"Autenticación fallida para el usuario: {username}") return False, None except Exception as e: logger.error(f"Error durante la autenticación del usuario: {str(e)}") return False, None def authenticate_admin(username, password): """Autentica un administrador""" return authenticate_user(username, password) def authenticate_student(username, password): """Autentica un estudiante""" return authenticate_user(username, password) def authenticate_teacher(username, password): """Autentica un profesor""" return authenticate_user(username, password) def register_student(username, password, additional_info=None): """ Registra un nuevo estudiante """ try: if get_student_user(username): logger.warning(f"Intento de registro de estudiante existente: {username}") return False # Hash de la contraseña antes de almacenar hashed_password = hash_password(password) success = create_student_user(username, hashed_password, additional_info) if success: logger.info(f"Nuevo estudiante registrado: {username}") return True else: logger.error(f"Error al registrar nuevo estudiante: {username}") return False except Exception as e: logger.error(f"Error al registrar estudiante: {str(e)}") return False def update_student_info(username, new_info): """ Actualiza la información de un estudiante """ try: # Si hay contraseña en new_info, hashearla if 'password' in new_info: new_info['password'] = hash_password(new_info['password']) success = update_student_user(username, new_info) if success: logger.info(f"Información del estudiante actualizada: {username}") return True else: logger.error(f"Error al actualizar información del estudiante: {username}") return False except Exception as e: logger.error(f"Error al actualizar información del estudiante: {str(e)}") return False def delete_student(username): """ Elimina un estudiante """ try: success = delete_student_user(username) if success: logger.info(f"Estudiante eliminado: {username}") return True else: logger.error(f"Error al eliminar estudiante: {username}") return False except Exception as e: logger.error(f"Error al eliminar estudiante: {str(e)}") return False def logout(): """ Cierra la sesión del usuario y registra el tiempo """ try: if 'session_id' in st.session_state and 'username' in st.session_state: # Registrar el cierre de sesión record_logout( st.session_state.username, st.session_state.session_id ) logger.info(f"Sesión cerrada para el usuario: {st.session_state.username}") # Limpiar el estado de la sesión st.session_state.clear() except Exception as e: logger.error(f"Error durante el logout: {str(e)}") # Asegurar que la sesión se limpie incluso si hay error st.session_state.clear() def hash_password(password): """ Hashea una contraseña para almacenamiento """ return bcrypt.hashpw( password.encode('utf-8'), bcrypt.gensalt() ).decode('utf-8') def verify_password(stored_password, provided_password): """ Verifica una contraseña almacenada contra una proporcionada """ return bcrypt.checkpw( provided_password.encode('utf-8'), stored_password.encode('utf-8') ) # Asegurarse de exportar todas las funciones necesarias __all__ = [ 'authenticate_user', 'authenticate_admin', 'authenticate_student', 'authenticate_teacher', 'register_student', 'update_student_info', 'delete_student', 'logout', 'hash_password', 'verify_password' ]