kltn20133118 commited on
Commit
67906af
·
verified ·
1 Parent(s): 7f8ee40

Update auth/authentication.py

Browse files
Files changed (1) hide show
  1. auth/authentication.py +132 -132
auth/authentication.py CHANGED
@@ -1,133 +1,133 @@
1
- import time
2
- from typing import Dict
3
- import jwt
4
- import secrets
5
- import logging
6
- from fastapi import Depends, HTTPException
7
- import base64
8
- from datetime import datetime, timedelta
9
- from repository import UserRepository, UserLoginRepository
10
- import string, random
11
-
12
- def check_token_is_valid(token):
13
- check = UserRepository.getEmailUserByAccessToken(token)
14
- if check is None:
15
- return False
16
- return True
17
-
18
- def unique_string(byte: int = 8) -> str:
19
- return secrets.token_urlsafe(byte)
20
- JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
21
- JWT_ALGORITHM = "HS512"
22
- SECRET_KEY="8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1"
23
-
24
- def token_response(token: str):
25
- return {
26
- "access_token": token
27
- }
28
- def str_encode(string: str) -> str:
29
- return base64.b85encode(string.encode('ascii')).decode('ascii')
30
-
31
- def get_token_payload(token: str, secret: str, algo: str):
32
- try:
33
- payload = jwt.decode(token, secret, algorithms=algo)
34
- except Exception as jwt_exec:
35
- logging.debug(f"JWT Error: {str(jwt_exec)}")
36
- payload = None
37
- return payload
38
-
39
- from datetime import datetime
40
- def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
41
- expire = datetime.now() + expiry
42
- payload.update({"exp": expire})
43
- return jwt.encode(payload, secret, algorithm=algo)
44
-
45
- def str_decode(string: str) -> str:
46
- return base64.b85decode(string.encode('ascii')).decode('ascii')
47
-
48
- def generate_random_string(length=12):
49
- characters = string.ascii_letters + string.digits
50
- random_string = ''.join(random.choice(characters) for i in range(length))
51
- return random_string
52
-
53
- import pytz
54
- from datetime import datetime
55
- def signJWT(user_email: str) -> Dict[str, str]:
56
- rt_expires = timedelta(minutes=3)
57
- refresh_key = unique_string(100)
58
- access_key = unique_string(50)
59
- at_expires = timedelta(minutes=2)
60
- at_payload = {
61
- "sub": str_encode(str(user_email)),
62
- 'a': access_key,
63
- }
64
- access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
65
- rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
66
- refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
67
- expires_in = at_expires.seconds
68
- vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
69
- current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
70
- formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
71
- existing_user = UserRepository.getUserByEmail(user_email)
72
- if existing_user is None:
73
- UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
74
- else:
75
- UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
76
- user_record = UserRepository.getUserByEmail(user_email)
77
- session_id = ""
78
- if user_record:
79
- session_id = generate_random_string()
80
- existing_userlogin = UserLoginRepository.getUserLogin(user_email)
81
- if existing_userlogin is None:
82
- UserLoginRepository.addUserLogin(user_email,session_id=session_id)
83
- else:
84
- UserLoginRepository.updateUserLogin(user_email, session_id)
85
- return {
86
- "access_token": access_token,
87
- "refresh_token": refresh_token,
88
- "expires_in": at_expires.seconds,
89
- "session_id": session_id
90
- }
91
-
92
- def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
93
- access_key = unique_string(50)
94
- at_expires = timedelta(minutes=2)
95
- at_payload = {
96
- "sub": str_encode(str(user_email)),
97
- 'a': access_key,
98
- }
99
- access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
100
- user_record = UserRepository.getUserByEmail(user_email)
101
- session_id = ""
102
- if user_record:
103
- email1 = user_record.email
104
- if email1:
105
- session_id = generate_random_string()
106
- existing_userlogin = UserLoginRepository.getUserLogin(user_email)
107
- if existing_userlogin is None:
108
- UserLoginRepository.addUserLogin(user_email,session_id=session_id)
109
- else:
110
- UserLoginRepository.updateUserLogin(user_email,session_id)
111
- return {
112
- "access_token": access_token,
113
- "refresh_token": refresh_token,
114
- "expires_in": at_expires.seconds,
115
- "session_id": session_id
116
- }
117
-
118
- def decodeJWT(token: str) -> dict:
119
- try:
120
- decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
121
- return decoded_token if decoded_token["exp"] >= time.time() else None
122
- except:
123
- return {}
124
-
125
- def get_refresh_token(refresh_token,token_now, email):
126
- token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
127
- if not token_payload:
128
- raise HTTPException(status_code=400, detail="Invalid Request.")
129
- exp = token_payload.get('exp')
130
- if exp >= time.time() and token_payload:
131
- return returnAccessToken(email,refresh_token)
132
- elif not token_payload:
133
  return signJWT(email)
 
1
+ import time
2
+ from typing import Dict
3
+ import jwt
4
+ import secrets
5
+ import logging
6
+ from fastapi import Depends, HTTPException
7
+ import base64
8
+ from datetime import datetime, timedelta
9
+ from repository import UserRepository, UserLoginRepository
10
+ import string, random
11
+
12
+ def check_token_is_valid(token):
13
+ check = UserRepository.getEmailUserByAccessToken(token)
14
+ if check is None:
15
+ return False
16
+ return True
17
+
18
+ def unique_string(byte: int = 8) -> str:
19
+ return secrets.token_urlsafe(byte)
20
+ JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
21
+ JWT_ALGORITHM = "HS512"
22
+ SECRET_KEY="8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1"
23
+
24
+ def token_response(token: str):
25
+ return {
26
+ "access_token": token
27
+ }
28
+ def str_encode(string: str) -> str:
29
+ return base64.b85encode(string.encode('ascii')).decode('ascii')
30
+
31
+ def get_token_payload(token: str, secret: str, algo: str):
32
+ try:
33
+ payload = jwt.decode(token, secret, algorithms=algo)
34
+ except Exception as jwt_exec:
35
+ logging.debug(f"JWT Error: {str(jwt_exec)}")
36
+ payload = None
37
+ return payload
38
+
39
+ from datetime import datetime
40
+ def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
41
+ expire = datetime.now() + expiry
42
+ payload.update({"exp": expire})
43
+ return jwt.encode(payload, secret, algorithm=algo)
44
+
45
+ def str_decode(string: str) -> str:
46
+ return base64.b85decode(string.encode('ascii')).decode('ascii')
47
+
48
+ def generate_random_string(length=12):
49
+ characters = string.ascii_letters + string.digits
50
+ random_string = ''.join(random.choice(characters) for i in range(length))
51
+ return random_string
52
+
53
+ import pytz
54
+ from datetime import datetime
55
+ def signJWT(user_email: str) -> Dict[str, str]:
56
+ rt_expires = timedelta(days=7)
57
+ refresh_key = unique_string(100)
58
+ access_key = unique_string(50)
59
+ at_expires = timedelta(minutes=180)
60
+ at_payload = {
61
+ "sub": str_encode(str(user_email)),
62
+ 'a': access_key,
63
+ }
64
+ access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
65
+ rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
66
+ refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
67
+ expires_in = at_expires.seconds
68
+ vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
69
+ current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
70
+ formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
71
+ existing_user = UserRepository.getUserByEmail(user_email)
72
+ if existing_user is None:
73
+ UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
74
+ else:
75
+ UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
76
+ user_record = UserRepository.getUserByEmail(user_email)
77
+ session_id = ""
78
+ if user_record:
79
+ session_id = generate_random_string()
80
+ existing_userlogin = UserLoginRepository.getUserLogin(user_email)
81
+ if existing_userlogin is None:
82
+ UserLoginRepository.addUserLogin(user_email,session_id=session_id)
83
+ else:
84
+ UserLoginRepository.updateUserLogin(user_email, session_id)
85
+ return {
86
+ "access_token": access_token,
87
+ "refresh_token": refresh_token,
88
+ "expires_in": at_expires.seconds,
89
+ "session_id": session_id
90
+ }
91
+
92
+ def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
93
+ access_key = unique_string(50)
94
+ at_expires = timedelta(minutes=180)
95
+ at_payload = {
96
+ "sub": str_encode(str(user_email)),
97
+ 'a': access_key,
98
+ }
99
+ access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
100
+ user_record = UserRepository.getUserByEmail(user_email)
101
+ session_id = ""
102
+ if user_record:
103
+ email1 = user_record.email
104
+ if email1:
105
+ session_id = generate_random_string()
106
+ existing_userlogin = UserLoginRepository.getUserLogin(user_email)
107
+ if existing_userlogin is None:
108
+ UserLoginRepository.addUserLogin(user_email,session_id=session_id)
109
+ else:
110
+ UserLoginRepository.updateUserLogin(user_email,session_id)
111
+ return {
112
+ "access_token": access_token,
113
+ "refresh_token": refresh_token,
114
+ "expires_in": at_expires.seconds,
115
+ "session_id": session_id
116
+ }
117
+
118
+ def decodeJWT(token: str) -> dict:
119
+ try:
120
+ decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
121
+ return decoded_token if decoded_token["exp"] >= time.time() else None
122
+ except:
123
+ return {}
124
+
125
+ def get_refresh_token(refresh_token,token_now, email):
126
+ token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
127
+ if not token_payload:
128
+ raise HTTPException(status_code=400, detail="Invalid Request.")
129
+ exp = token_payload.get('exp')
130
+ if exp >= time.time() and token_payload:
131
+ return returnAccessToken(email,refresh_token)
132
+ elif not token_payload:
133
  return signJWT(email)