kltn20133118 commited on
Commit
cb5ec66
·
verified ·
1 Parent(s): 8168c9c

Delete auth

Browse files
Files changed (1) hide show
  1. auth/authentication.py +0 -133
auth/authentication.py DELETED
@@ -1,133 +0,0 @@
1
- import time
2
- from typing import Dict
3
- import jwt
4
- import secrets
5
- import logging
6
- from fastapi import Depends, HTTPException
7
- import base64
8
- from datetime import datetime, timedelta
9
- from repository import UserRepository, UserLoginRepository
10
- import string, random
11
-
12
- def check_token_is_valid(token):
13
- check = UserRepository.getEmailUserByAccessToken(token)
14
- if check is None:
15
- return False
16
- return True
17
-
18
- def unique_string(byte: int = 8) -> str:
19
- return secrets.token_urlsafe(byte)
20
- JWT_SECRET = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
21
- JWT_ALGORITHM = "HS512"
22
- SECRET_KEY="8deadce9449770680910741063cd0a3fe0acb62a8978661f421bbcbb66dc41f1"
23
-
24
- def token_response(token: str):
25
- return {
26
- "access_token": token
27
- }
28
- def str_encode(string: str) -> str:
29
- return base64.b85encode(string.encode('ascii')).decode('ascii')
30
-
31
- def get_token_payload(token: str, secret: str, algo: str):
32
- try:
33
- payload = jwt.decode(token, secret, algorithms=algo)
34
- except Exception as jwt_exec:
35
- logging.debug(f"JWT Error: {str(jwt_exec)}")
36
- payload = None
37
- return payload
38
-
39
- from datetime import datetime
40
- def generate_token(payload: dict, secret: str, algo: str, expiry: timedelta):
41
- expire = datetime.now() + expiry
42
- payload.update({"exp": expire})
43
- return jwt.encode(payload, secret, algorithm=algo)
44
-
45
- def str_decode(string: str) -> str:
46
- return base64.b85decode(string.encode('ascii')).decode('ascii')
47
-
48
- def generate_random_string(length=12):
49
- characters = string.ascii_letters + string.digits
50
- random_string = ''.join(random.choice(characters) for i in range(length))
51
- return random_string
52
-
53
- import pytz
54
- from datetime import datetime
55
- def signJWT(user_email: str) -> Dict[str, str]:
56
- rt_expires = timedelta(days=3)
57
- refresh_key = unique_string(100)
58
- access_key = unique_string(50)
59
- at_expires = timedelta(minutes=180)
60
- at_payload = {
61
- "sub": str_encode(str(user_email)),
62
- 'a': access_key,
63
- }
64
- access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
65
- rt_payload = {"sub": str_encode(str(user_email)), "t": refresh_key, 'a': access_key}
66
- refresh_token = generate_token(rt_payload, SECRET_KEY,JWT_ALGORITHM, rt_expires)
67
- expires_in = at_expires.seconds
68
- vn_timezone = pytz.timezone('Asia/Ho_Chi_Minh')
69
- current_time = datetime.now().replace(tzinfo=pytz.utc).astimezone(vn_timezone) + timedelta(seconds=expires_in)
70
- formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S ')
71
- existing_user = UserRepository.getUserByEmail(user_email)
72
- if existing_user is None:
73
- UserRepository.addUser(user_email, access_token, refresh_token, formatted_time)
74
- else:
75
- UserRepository.updateUserLogin(user_email, access_token, refresh_token, formatted_time)
76
- user_record = UserRepository.getUserByEmail(user_email)
77
- session_id = ""
78
- if user_record:
79
- session_id = generate_random_string()
80
- existing_userlogin = UserLoginRepository.getUserLogin(user_email)
81
- if existing_userlogin is None:
82
- UserLoginRepository.addUserLogin(user_email,session_id=session_id)
83
- else:
84
- UserLoginRepository.updateUserLogin(user_email, session_id)
85
- return {
86
- "access_token": access_token,
87
- "refresh_token": refresh_token,
88
- "expires_in": at_expires.seconds,
89
- "session_id": session_id
90
- }
91
-
92
- def returnAccessToken(user_email: str, refresh_token: str) -> Dict[str, str]:
93
- access_key = unique_string(50)
94
- at_expires = timedelta(minutes=180)
95
- at_payload = {
96
- "sub": str_encode(str(user_email)),
97
- 'a': access_key,
98
- }
99
- access_token = generate_token(at_payload, JWT_SECRET, JWT_ALGORITHM, at_expires)
100
- user_record = UserRepository.getUserByEmail(user_email)
101
- session_id = ""
102
- if user_record:
103
- email1 = user_record.email
104
- if email1:
105
- session_id = generate_random_string()
106
- existing_userlogin = UserLoginRepository.getUserLogin(user_email)
107
- if existing_userlogin is None:
108
- UserLoginRepository.addUserLogin(user_email,session_id=session_id)
109
- else:
110
- UserLoginRepository.updateUserLogin(user_email,session_id)
111
- return {
112
- "access_token": access_token,
113
- "refresh_token": refresh_token,
114
- "expires_in": at_expires.seconds,
115
- "session_id": session_id
116
- }
117
-
118
- def decodeJWT(token: str) -> dict:
119
- try:
120
- decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
121
- return decoded_token if decoded_token["exp"] >= time.time() else None
122
- except:
123
- return {}
124
-
125
- def get_refresh_token(refresh_token, email):
126
- token_payload = get_token_payload(refresh_token, SECRET_KEY, JWT_ALGORITHM)
127
- if not token_payload:
128
- raise HTTPException(status_code=400, detail="Invalid Request.")
129
- exp = token_payload.get('exp')
130
- if exp >= time.time() and token_payload:
131
- return returnAccessToken(email,refresh_token)
132
- elif not token_payload:
133
- return signJWT(email)