Spaces:
Sleeping
Sleeping
from datetime import timedelta, datetime | |
from request import RequestAuth as req | |
from response import ResponseAuth as res | |
import requests | |
import json, re | |
from auth.authentication import signJWT | |
from firebase_admin import credentials, auth, exceptions | |
import firebase_admin | |
from repository import UserLoginRepository, UserRepository, UserInfoRepository, OTPRepository | |
import service.OTPService | |
from function import support_function as sf | |
from dotenv import load_dotenv | |
import os | |
from response import ResponseUser as res | |
load_dotenv() | |
CLIENT_ID_GOOGLE = os.getenv('CLIENT_ID') | |
API_SIGN_UP_FIREBASE_PATH = os.getenv('API_SIGN_UP_FIREBASE') | |
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
def get_user1(email): | |
try: | |
user = auth.get_user_by_email(email) | |
return user | |
except exceptions.FirebaseError as e: | |
return None | |
def check_email(email): | |
if isinstance(email, str) and re.fullmatch(regex, email): | |
return True | |
else: | |
return False | |
from pathlib import Path | |
try: | |
if not firebase_admin._apps: | |
json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json' | |
cred = credentials.Certificate(str(json_path)) | |
fred = firebase_admin.initialize_app(cred) | |
except: | |
if not firebase_admin._apps: | |
cred = credentials.Certificate("firebase_certificate.json") | |
fred = firebase_admin.initialize_app(cred) | |
def sign_up_with_email_and_password(email, password, username=None, return_secure_token=True): | |
try: | |
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signUp" | |
payload = { | |
"email": email, | |
"password": password, | |
"returnSecureToken": return_secure_token | |
} | |
if username: | |
payload["displayName"] = username | |
payload = json.dumps(payload) | |
r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload) | |
try: | |
if r.status_code == 200: | |
response_data = r.json() | |
email = response_data.get('email') | |
display_name = response_data.get('displayName') | |
return email, display_name | |
except Exception as e: | |
pass | |
except Exception as e: | |
pass | |
def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True): | |
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword" | |
try: | |
payload = { | |
"returnSecureToken": return_secure_token | |
} | |
if email: | |
payload["email"] = email | |
if password: | |
payload["password"] = password | |
payload = json.dumps(payload) | |
r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload) | |
r.raise_for_status() | |
data = r.json() | |
if 'idToken' in data: | |
return data['email'] | |
else: | |
return False | |
except requests.exceptions.RequestException as e: | |
print(f"Error signing in: {e}") | |
return False | |
def login(request: req.RequestLoginEmail): | |
try: | |
email = request.email | |
password = request.password | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
if password is None: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Password is empty") | |
) | |
user_check = get_user1(email) | |
if user_check is None: | |
return res.ReponseError( | |
status=404, | |
data=res.Message(message="Email not exits") | |
) | |
user = sign_in_with_email_and_password(email, password) | |
if user: | |
check = signJWT(user) | |
if check is False: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Invalid authorization code.") | |
) | |
else: | |
access_token = check["access_token"] | |
refresh_token = check["refresh_token"] | |
expires_in = check["expires_in"] | |
session_id = check["session_id"] | |
return res.ResponseLoginEmail( | |
status=200, | |
data=res.DataLogin(access_token=access_token, refresh_token=refresh_token, expires_in=expires_in, | |
session_id=session_id) | |
) | |
else: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Passwords do not match") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) | |
def verify_token_google(token): | |
from google.oauth2 import id_token | |
from google.auth.transport import requests | |
try: | |
CLIENT_ID = CLIENT_ID_GOOGLE | |
id_info = id_token.verify_oauth2_token(token, requests.Request(), CLIENT_ID) | |
check = id_info['email_verified'] | |
if check is True: | |
return True | |
except ValueError as e: | |
return False | |
def login_google(request: req.RequestLoginGoogle): | |
try: | |
email = request.email | |
token_google = request.token_google | |
check_google = verify_token_google(token_google) | |
if check_google is False: | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="Login google failed") | |
) | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
user = get_user1(email) | |
if user: | |
check = signJWT(email) | |
if check == False: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="Invalid authorization code.") | |
) | |
else: | |
access_token = check["access_token"] | |
refresh_token = check["refresh_token"] | |
expires_in = check["expires_in"] | |
session_id = check["session_id"] | |
return res.ResponseLoginGoogle( | |
status= 200, | |
data = res.DataLogin(access_token=access_token,refresh_token=refresh_token,expires_in=expires_in,session_id=session_id) | |
) | |
else: | |
return res.ReponseError( | |
status= 404, | |
data = res.Message(message="Email not exist") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) | |
def sign_up(request: req.RequestRegister): | |
try: | |
email = request.email | |
password = request.password | |
confirm_password = request.confirm_password | |
username = request.username | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
if password is None or password == "": | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="password is empty") | |
) | |
if confirm_password is None or confirm_password == "": | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="confirm password is empty") | |
) | |
if confirm_password != password: | |
return res.ReponseError(status=400, | |
data =res.Message(message="password and confirm_password do not match")) | |
user_signup = get_user1(email) | |
if user_signup is not None: | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="Email exist") | |
) | |
user_info, display_name = sign_up_with_email_and_password(email, password, username) | |
if user_info: | |
return res.ResponseSignUp( | |
status=200, | |
data =res.DataSignUp(email=user_info) | |
) | |
else: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="Internal Server Error") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) | |
import pytz, datetime | |
import auth.authentication as auth123 | |
from response import ResponseDefault as res1 | |
def refresh_token(request: req.RequestRefreshTokenLogin): | |
try: | |
token = request.refresh_token | |
if token is None: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="token is empty") | |
) | |
user_token = UserRepository.getUserIdByRefreshToken(token) | |
if user_token is None: | |
return res.ReponseError( | |
status=404, | |
data=res.Message(message="Not found refresh token") | |
) | |
email = UserRepository.getEmailUserById(user_token) | |
if email: | |
rf_token = token | |
result = auth123.get_refresh_token(token, email) | |
token_new = result["access_token"] | |
rf_token_new = result["refresh_token"] | |
seconds1 = result["expires_in"] | |
session_id = result["session_id"] | |
vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh') | |
current_time = datetime.datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta( | |
seconds=seconds1) | |
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ') | |
if rf_token == rf_token_new: | |
UserRepository.updateAccessToken(user_token, token_new, formatted_time) | |
else: | |
UserRepository.UpdateAccessTokenRefreshTokenById(user_token, token_new, rf_token_new, formatted_time) | |
user = token_new | |
if user == False: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Refresh token error") | |
) | |
return res.ResponseRefreshTokenLogin( | |
status=200, | |
data=res.DataRefreshToken(token_new=user, session_id=session_id) | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) | |
def check_token_is_valid(token): | |
try: | |
check = UserRepository.getEmailUserByAccessToken(token) | |
if check is None: | |
return False | |
return True | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) |