pychatbot / service /UserService.py
kltn20133118's picture
Upload 48 files
6b725f7 verified
raw
history blame
11.3 kB
from datetime import timedelta, datetime
from request import RequestUser as req_login
from response import ResponseUser as res_login
import requests
import json, re
from auth.authentication import signJWT
from firebase_admin import credentials, auth, exceptions
import firebase_admin
from repository import UserLoginRepository, UserRepository, UserInfoRepository, OTPRepository
import service.OTPService
from function import support_function as sf
from dotenv import load_dotenv
import os
from response import ResponseUser as res
from response import ResponseDefault as res1
load_dotenv()
CLIENT_ID_GOOGLE = os.getenv('CLIENT_ID')
API_SIGN_UP_FIREBASE_PATH = os.getenv('API_SIGN_UP_FIREBASE')
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
def get_user1(email):
try:
user = auth.get_user_by_email(email)
return user
except exceptions.FirebaseError as e:
return None
def check_email(email):
if isinstance(email, str) and re.fullmatch(regex, email):
return True
else:
return False
from pathlib import Path
try:
if not firebase_admin._apps:
json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json'
cred = credentials.Certificate(str(json_path))
fred = firebase_admin.initialize_app(cred)
except:
if not firebase_admin._apps:
cred = credentials.Certificate("firebase_certificate.json")
fred = firebase_admin.initialize_app(cred)
def sign_up_with_email_and_password(email, password, username=None, return_secure_token=True):
try:
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signUp"
payload = {
"email": email,
"password": password,
"returnSecureToken": return_secure_token
}
if username:
payload["displayName"] = username
payload = json.dumps(payload)
r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload)
try:
return r.json()['email']
except Exception as e:
pass
except Exception as e:
pass
def sign_in_with_email_and_password(email=None, password=None, return_secure_token=True):
rest_api_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword"
try:
payload = {
"returnSecureToken": return_secure_token
}
if email:
payload["email"] = email
if password:
payload["password"] = password
payload = json.dumps(payload)
r = requests.post(rest_api_url, params={"key": API_SIGN_UP_FIREBASE_PATH}, data=payload)
r.raise_for_status()
data = r.json()
if 'idToken' in data:
return data['email']
else:
return False
except requests.exceptions.RequestException as e:
print(f"Error signing in: {e}")
return False
def update_info_user(uid, email=None, user_name=None, photo_url=None):
user_data = {}
if email is not None:
user_data['email'] = email
if user_name is not None:
user_data['display_name'] = user_name
if photo_url is not None and photo_url != 'N/A':
user_data['photo_url'] = photo_url
if user_data:
auth.update_user(uid, **user_data)
def update_user_info(request: req_login.RequestUpdateUserInfo):
try:
email = request.email
user_id = request.user_id
email_check = sf.check_email_service(user_id)
if isinstance(email_check, res1.ReponseError):
return email_check
check_email_fc = sf.check_email_empty_invalid(email)
if check_email_fc is not True:
return check_email_fc
user = get_user1(email)
if user:
user_info = UserInfoRepository.getUserInfo(user_id)
if user_info:
UserInfoRepository.updateUserInfo(
request.user_id,
request.uid,
request.email,
request.display_name,
request.photo_url)
else:
UserInfoRepository.addUserInfo(
request.uid,
request.email,
request.display_name,
request.photo_url
)
update_info_user(request.uid,
request.email,
request.display_name,
request.photo_url
)
return res_login.ResponseUpdateUserInfo(status=200,
data = res_login.Message(message=f"User info updated successfully"))
else:
return res_login.ReponseError(
status = 404,
data = res_login.Message(message="Not found user")
)
except:
return res_login.ReponseError(
status=500,
data=res_login.Message(message="Server Error")
)
def check_info_google(request: req_login.RequestCheckInfoGoogle):
try:
user_id = request.user_id
check = UserRepository.getEmailUserByIdFix(user_id)
if check is None:
return res_login.ReponseError(
status = 404,
data = res_login.Message(message="user_id not exist")
)
email = sf.check_email_service(str(user_id))
if isinstance(email, res.ReponseError):
return email
user_info = UserInfoRepository.getUserInfo(user_id)
if user_info is not None:
email_check = True
else:
email_check = False
if email_check is not None:
return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check))
except:
return res_login.ReponseError(
status=500,
data=res_login.Message(message="Server Error")
)
def check_info_google_email(request: req_login.RequestCheckInfoGoogleEmail):
try:
email = request.email
check_email_fc = sf.check_email_empty_invalid(email)
if check_email_fc is not True:
return check_email_fc
user_info = UserInfoRepository.getUserInfoByEmail(email)
if user_info is not None:
email_check = True
else:
email_check = False
if email_check is not None:
return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check=email_check))
except:
return res_login.ReponseError(
status=500,
data=res_login.Message(message="Server Error")
)
def check_state_login(request: req_login.RequestCheckStateLogin):
try:
user_id = request.user_id
session_id_now = request.session_id_now
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
elif session_id_now is None or session_id_now=="":
return res_login.ReponseError(
status= 400,
data =res_login.Message(message="session_id is empty")
)
user = get_user1(email)
if user:
check1 = False
session_id = UserLoginRepository.getUserSessionIdByUserEmail(user_id)
print(f"session_id: {session_id}")
if session_id != session_id_now:
check1 = False
else:
check1 = True
return res_login.ResponseCheckInfoGoogle(status= 200,data = res_login.CheckModel(check = check1))
else:
return res_login.ReponseError(
status=404,
data =res_login.Message(message="Not found user")
)
except:
return res_login.ReponseError(
status=500,
data=res_login.Message(message="Server Error")
)
import string, random
def generate_otp(length=6):
characters = string.ascii_uppercase + string.digits
otp = ''.join(random.choice(characters) for _ in range(length))
return otp
def createOTPReset(email):
otp = generate_otp()
check_email_fc = sf.check_email_empty_invalid(email)
if check_email_fc is not True:
return check_email_fc
OTPRepository.addOTP(email,otp)
return otp
def reset_password(request: req_login.RequestResetPassword):
try:
email = request.email
check_email_fc = sf.check_email_empty_invalid(email)
if check_email_fc is not True:
return check_email_fc
try:
user = get_user1(email)
if user is not None:
otp = createOTPReset(email)
return res_login.ResponseCreateOTP(
status= 200,
data= res_login.CheckModel(check = True),
otp = otp
)
else:
return res_login.ReponseError(
status= 404,
data =res_login.Message(message="Email not exist")
)
except auth.UserNotFoundError as e:
return res_login.ReponseError(
status=500,
data =res_login.Message(message=str(e))
)
except:
return res_login.ReponseError(
status=500,
data=res_login.Message(message="Server Error")
)
def change_password(request: req_login.RequestChangePassword):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
new_password = request.new_password
current_password= request.current_password
confirm_new_password = request.confirm_new_password
if isinstance(email, res1.ReponseError):
return email
if new_password is None:
return res_login.ReponseError(
status=400,
data =res_login.Message(message="new_password is empty")
)
if current_password is None or confirm_new_password == "":
return res_login.ReponseError(
status=400,
data =res_login.Message(message="current_password is empty")
)
if confirm_new_password is None or confirm_new_password == "":
return res_login.ReponseError(
status=400,
data =res_login.Message(message="confirm_new_password is empty")
)
if current_password == new_password:
return res_login.ReponseError(
status=400,
data=res_login.Message(message="The new_password and the current_password must be different")
)
if confirm_new_password != new_password:
return res_login.ReponseError(
status=400,
data=res_login.Message(message="The new_password and the confirm_new_password must be similar")
)
user = sign_in_with_email_and_password(email, current_password)
try:
if user:
user_email = auth.get_user_by_email(email)
auth.update_user(
user_email.uid,
password=new_password
)
return res_login.ResponseChangePassword(
status= 200,
data = res_login.Message(message=f"Update password success"))
else:
return res_login.ReponseError(
status=400,
data =res_login.Message(message="Current password not valid")
)
except :
return res_login.ReponseError(
status=500,
data =res_login.Message(message="Server Error")
)
except:
return res_login.ReponseError(
status=500,
data=res_login.Message(message="Server Error!!")
)