Spaces:
Sleeping
Sleeping
from datetime import timedelta, datetime | |
from request import RequestUser as req_login | |
from response import ResponseUser as res_login | |
import requests | |
import json, re | |
from auth.authentication import signJWT | |
from firebase_admin import credentials, auth, exceptions | |
import firebase_admin | |
from repository import UserLoginRepository, UserRepository, UserInfoRepository, OTPRepository | |
import service.OTPService | |
from function import support_function as sf | |
from dotenv import load_dotenv | |
import os | |
from response import ResponseUser as res | |
from response import ResponseDefault as res1 | |
load_dotenv() | |
CLIENT_ID_GOOGLE = os.getenv('CLIENT_ID') | |
API_SIGN_UP_FIREBASE_PATH = os.getenv('API_SIGN_UP_FIREBASE') | |
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
def get_user1(email): | |
try: | |
user = auth.get_user_by_email(email) | |
return user | |
except exceptions.FirebaseError as e: | |
return None | |
def check_email(email): | |
if isinstance(email, str) and re.fullmatch(regex, email): | |
return True | |
else: | |
return False | |
from pathlib import Path | |
try: | |
if not firebase_admin._apps: | |
json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json' | |
cred = credentials.Certificate(str(json_path)) | |
fred = firebase_admin.initialize_app(cred) | |
except: | |
if not firebase_admin._apps: | |
cred = credentials.Certificate("firebase_certificate.json") | |
fred = firebase_admin.initialize_app(cred) | |
def sign_up_with_email_and_password(email, password, username=None, return_secure_token=True): | |
try: | |
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signUp" | |
payload = { | |
"email": email, | |
"password": password, | |
"returnSecureToken": return_secure_token | |
} | |
if username: | |
payload["displayName"] = username | |
payload = json.dumps(payload) | |
r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload) | |
try: | |
return r.json()['email'] | |
except Exception as e: | |
pass | |
except Exception as e: | |
pass | |
def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True): | |
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword" | |
try: | |
payload = { | |
"returnSecureToken": return_secure_token | |
} | |
if email: | |
payload["email"] = email | |
if password: | |
payload["password"] = password | |
payload = json.dumps(payload) | |
r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload) | |
r.raise_for_status() | |
data = r.json() | |
if 'idToken' in data: | |
return data['email'] | |
else: | |
return False | |
except requests.exceptions.RequestException as e: | |
print(f"Error signing in: {e}") | |
return False | |
def update_info_user(uid, email=None, user_name=None, photo_url=None): | |
user_data = {} | |
if email is not None: | |
user_data['email'] = email | |
if user_name is not None: | |
user_data['display_name'] = user_name | |
if photo_url is not None and photo_url != 'N/A': | |
user_data['photo_url'] = photo_url | |
if user_data: | |
auth.update_user(uid, **user_data) | |
def update_user_info(request: req_login.RequestUpdateUserInfo): | |
try: | |
email = request.email | |
user_id = request.user_id | |
email_check = sf.check_email_service(user_id) | |
if isinstance(email_check, res1.ReponseError): | |
return email_check | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
user = get_user1(email) | |
if user: | |
user_info = UserInfoRepository.getUserInfo(user_id) | |
if user_info: | |
UserInfoRepository.updateUserInfo( | |
request.user_id, | |
request.uid, | |
request.email, | |
request.display_name, | |
request.photo_url) | |
else: | |
UserInfoRepository.addUserInfo( | |
request.uid, | |
request.email, | |
request.display_name, | |
request.photo_url | |
) | |
update_info_user(request.uid, | |
request.email, | |
request.display_name, | |
request.photo_url | |
) | |
return res_login.ResponseUpdateUserInfo(status=200, | |
data = res_login.Message(message=f"User info updated successfully")) | |
else: | |
return res_login.ReponseError( | |
status = 404, | |
data = res_login.Message(message="Not found user") | |
) | |
except: | |
return res_login.ReponseError( | |
status=500, | |
data=res_login.Message(message="Server Error") | |
) | |
def check_info_google(request: req_login.RequestCheckInfoGoogle): | |
try: | |
user_id = request.user_id | |
check = UserRepository.getEmailUserByIdFix(user_id) | |
if check is None: | |
return res_login.ReponseError( | |
status = 404, | |
data = res_login.Message(message="user_id not exist") | |
) | |
email = sf.check_email_service(str(user_id)) | |
if isinstance(email, res.ReponseError): | |
return email | |
user_info = UserInfoRepository.getUserInfo(user_id) | |
if user_info is not None: | |
email_check = True | |
else: | |
email_check = False | |
if email_check is not None: | |
return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check)) | |
except: | |
return res_login.ReponseError( | |
status=500, | |
data=res_login.Message(message="Server Error") | |
) | |
def check_info_google_email(request: req_login.RequestCheckInfoGoogleEmail): | |
try: | |
email = request.email | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
user_info = UserInfoRepository.getUserInfoByEmail(email) | |
if user_info is not None: | |
email_check = True | |
else: | |
email_check = False | |
if email_check is not None: | |
return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check)) | |
except: | |
return res_login.ReponseError( | |
status=500, | |
data=res_login.Message(message="Server Error") | |
) | |
def check_state_login(request: req_login.RequestCheckStateLogin): | |
try: | |
user_id = request.user_id | |
session_id_now = request.session_id_now | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res1.ReponseError): | |
return email | |
elif session_id_now is None or session_id_now=="": | |
return res_login.ReponseError( | |
status= 400, | |
data =res_login.Message(message="session_id is empty") | |
) | |
user = get_user1(email) | |
if user: | |
check1 = False | |
session_id = UserLoginRepository.getUserSessionIdByUserEmail(user_id) | |
print(f"session_id: {session_id}") | |
if session_id != session_id_now: | |
check1 = False | |
else: | |
check1 = True | |
return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check = check1)) | |
else: | |
return res_login.ReponseError( | |
status=404, | |
data =res_login.Message(message="Not found user") | |
) | |
except: | |
return res_login.ReponseError( | |
status=500, | |
data=res_login.Message(message="Server Error") | |
) | |
import string, random | |
def generate_otp(length=6): | |
characters = string.ascii_uppercase + string.digits | |
otp = ''.join(random.choice(characters) for _ in range(length)) | |
return otp | |
def createOTPReset(email): | |
otp = generate_otp() | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
OTPRepository.addOTP(email,otp) | |
return otp | |
def reset_password(request: req_login.RequestResetPassword): | |
try: | |
email = request.email | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
try: | |
user = get_user1(email) | |
if user is not None: | |
otp = createOTPReset(email) | |
return res_login.ResponseCreateOTP( | |
status= 200, | |
data= res_login.CheckModel(check = True), | |
otp = otp | |
) | |
else: | |
return res_login.ReponseError( | |
status= 404, | |
data =res_login.Message(message="Email not exist") | |
) | |
except auth.UserNotFoundError as e: | |
return res_login.ReponseError( | |
status=500, | |
data =res_login.Message(message=str(e)) | |
) | |
except: | |
return res_login.ReponseError( | |
status=500, | |
data=res_login.Message(message="Server Error") | |
) | |
def change_password(request: req_login.RequestChangePassword): | |
try: | |
user_id = request.user_id | |
email = sf.check_email_service(user_id) | |
new_password = request.new_password | |
current_password= request.current_password | |
confirm_new_password = request.confirm_new_password | |
if isinstance(email, res1.ReponseError): | |
return email | |
if new_password is None: | |
return res_login.ReponseError( | |
status=400, | |
data =res_login.Message(message="new_password is empty") | |
) | |
if current_password is None or confirm_new_password == "": | |
return res_login.ReponseError( | |
status=400, | |
data =res_login.Message(message="current_password is empty") | |
) | |
if confirm_new_password is None or confirm_new_password == "": | |
return res_login.ReponseError( | |
status=400, | |
data =res_login.Message(message="confirm_new_password is empty") | |
) | |
if current_password == new_password: | |
return res_login.ReponseError( | |
status=400, | |
data=res_login.Message(message="The new_password and the current_password must be different") | |
) | |
if confirm_new_password != new_password: | |
return res_login.ReponseError( | |
status=400, | |
data=res_login.Message(message="The new_password and the confirm_new_password must be similar") | |
) | |
user = sign_in_with_email_and_password(email, current_password) | |
try: | |
if user: | |
user_email = auth.get_user_by_email(email) | |
auth.update_user( | |
user_email.uid, | |
password=new_password | |
) | |
return res_login.ResponseChangePassword( | |
status= 200, | |
data = res_login.Message(message=f"Update password success")) | |
else: | |
return res_login.ReponseError( | |
status=400, | |
data =res_login.Message(message="Current password not valid") | |
) | |
except : | |
return res_login.ReponseError( | |
status=500, | |
data =res_login.Message(message="Server Error") | |
) | |
except: | |
return res_login.ReponseError( | |
status=500, | |
data=res_login.Message(message="Server Error!!") | |
) |