Spaces:
Runtime error
Runtime error
from datetime import timedelta | |
from fastapi import APIRouter, Request, Response, status, Depends, HTTPException | |
from pydantic import EmailStr | |
from api import oauth2 | |
from .. import schemas, models, utils | |
from sqlalchemy.orm import Session | |
from ..database import get_db | |
from api.oauth2 import AuthJWT | |
from ..config import settings | |
router = APIRouter() | |
ACCESS_TOKEN_EXPIRES_IN = settings.ACCESS_TOKEN_EXPIRES_IN | |
REFRESH_TOKEN_EXPIRES_IN = settings.REFRESH_TOKEN_EXPIRES_IN | |
async def create_user(payload: schemas.CreateUserSchema, db: Session = Depends(get_db)): | |
# Check if user already exist | |
user = db.query(models.User).filter( | |
models.User.email == EmailStr(payload.email.lower())).first() | |
if user: | |
raise HTTPException(status_code=status.HTTP_409_CONFLICT, | |
detail='Account already exist') | |
# Compare password and passwordConfirm | |
if payload.password != payload.passwordConfirm: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail='Passwords do not match') | |
# Hash the password | |
payload.password = utils.hash_password(payload.password) | |
del payload.passwordConfirm | |
payload.role = 'user' | |
payload.verified = True | |
payload.email = EmailStr(payload.email.lower()) | |
new_user = models.User(**payload.dict()) | |
db.add(new_user) | |
db.commit() | |
db.refresh(new_user) | |
return new_user | |
def login(payload: schemas.LoginUserSchema, response: Response, db: Session = Depends(get_db), Authorize: AuthJWT = Depends()): | |
# Check if the user exist | |
user = db.query(models.User).filter( | |
models.User.email == EmailStr(payload.email.lower())).first() | |
if not user: | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, | |
detail='Incorrect Email or Password') | |
# Check if user verified his email | |
if not user.verified: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, | |
detail='Please verify your email address') | |
# Check if the password is valid | |
if not utils.verify_password(payload.password, user.password): | |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, | |
detail='Incorrect Email or Password') | |
# Create access token | |
access_token = Authorize.create_access_token( | |
subject=str(user.id), expires_time=timedelta(minutes=ACCESS_TOKEN_EXPIRES_IN)) | |
# Create refresh token | |
refresh_token = Authorize.create_refresh_token( | |
subject=str(user.id), expires_time=timedelta(minutes=REFRESH_TOKEN_EXPIRES_IN)) | |
# Store refresh and access tokens in cookie | |
response.set_cookie('access_token', access_token, ACCESS_TOKEN_EXPIRES_IN * 60, | |
ACCESS_TOKEN_EXPIRES_IN * 60, '/', None, False, True, 'lax') | |
response.set_cookie('refresh_token', refresh_token, | |
REFRESH_TOKEN_EXPIRES_IN * 60, REFRESH_TOKEN_EXPIRES_IN * 60, '/', None, False, True, 'lax') | |
response.set_cookie('logged_in', 'True', ACCESS_TOKEN_EXPIRES_IN * 60, | |
ACCESS_TOKEN_EXPIRES_IN * 60, '/', None, False, False, 'lax') | |
# Send both access | |
return {'status': 'success', 'access_token': access_token} | |
def refresh_token(response: Response, request: Request, Authorize: AuthJWT = Depends(), db: Session = Depends(get_db)): | |
try: | |
print(Authorize._refresh_cookie_key) | |
Authorize.jwt_refresh_token_required() | |
user_id = Authorize.get_jwt_subject() | |
if not user_id: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, | |
detail='Could not refresh access token') | |
user = db.query(models.User).filter(models.User.id == user_id).first() | |
if not user: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, | |
detail='The user belonging to this token no logger exist') | |
access_token = Authorize.create_access_token( | |
subject=str(user.id), expires_time=timedelta(minutes=ACCESS_TOKEN_EXPIRES_IN)) | |
except Exception as e: | |
error = e.__class__.__name__ | |
if error == 'MissingTokenError': | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail='Please provide refresh token') | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=error) | |
response.set_cookie('access_token', access_token, ACCESS_TOKEN_EXPIRES_IN * 60, | |
ACCESS_TOKEN_EXPIRES_IN * 60, '/', None, False, True, 'lax') | |
response.set_cookie('logged_in', 'True', ACCESS_TOKEN_EXPIRES_IN * 60, | |
ACCESS_TOKEN_EXPIRES_IN * 60, '/', None, False, False, 'lax') | |
return {'access_token': access_token} | |
def logout(response: Response, Authorize: AuthJWT = Depends(), user_id: str = Depends(oauth2.require_user)): | |
Authorize.unset_jwt_cookies() | |
response.set_cookie('logged_in', '', -1) | |
return {'status': 'success'} | |