I'm using FastAPI to create a simple api for automating my emails. I want to protect certain routes and I'm using this class:
import time
import jwt
from fastapi import HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
#if you need to try it out just swap the following
from exchange_api.auth.jwt_handler import JWT_ALGORITHM, JWT_SECRET
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = decodeJWT(jwtoken)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["expiration"] >= time.time() else None
except:
return {}
Then I wanted to protect certain routes:
from fastapi import APIRouter, Depends, HTTPException, Request, Header
from exchange_api.auth.jwt_bearer import JWTBearer
@auth_router.get("/protected", dependencies=[Depends(JWTBearer())], tags=["auth test"])
def get_user_data():
#my issue: I'd like to access the token and its payload
return {}
I have no issues with my code so far. But I'd like to periodically refresh the token and I need its payload anyway to do things inside my functions. How can I do that? I don't want my users to get kicked out in the middle of their session because their token expired.
It works fine without depends and using the token as the body of the various routes but I don't think that's the right way to do this. fastapi-jwt-auth is too old and it generates dependencies conflicts with my already installed libraries.
EDIT: What I mean is that I want to access the bearer token that my users submitted to authenticate to do various things and also because I plan to substitute it with a new one every time the users call a protected route but WITHOUT the need to authenticate again.
EDIT: I managed to get the token:
@auth_router.get("/protected", dependencies=[Depends(JWTBearer())], tags=["auth test"])
def get_user_data(request: Request):
token = request.headers["authorization"]
return {token}
I will show you how I approach JWT tokens in my FastAPI apps. I use library python-jose.
In my auth.py
file I have the following code:
from datetime import datetime, timedelta
from typing import Literal
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from app.models import User
from app.settings import access_token_jwk, refresh_token_jwk
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
def authenticate_user(email: str, password: str) -> User:
# Here you would verify password against hash from database
# or authenticate any other way you want. This function
# will be used in endpoint /token.
...
def create_token(
user: User,
token_type: Literal["refresh", "access"],
ttl: int
) -> str:
# This function generates token with any claims you want
payload = {
"sub": user.email,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(minutes=ttl),
"user_role": user.role,
}
if token_type == "access":
key = access_token_jwk
elif token_type == "refresh":
key = refresh_token_jwk
encoded_jwt = jwt.encode(
payload,
key,
"HS256"
)
return encoded_jwt
async def decode_access_token(token: str = Depends(oauth2_scheme) -> dict:
# This function will be used as dependency in endpoints
# we want secured. Basically it verifies the JWT and
# returns its contents as dictionary.
try:
payload = jwt.decode(
token,
access_token_jwk,
algorithms="HS256",
)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}"
)
return payload
async def decode_refresh_token(token: str = Depends(oauth2_scheme) -> dict:
# This function will be used as dependency only
# when you want to refresh your access token.
# Since access and refresh tokens have different signing keys,
# user won't be able to use refresh token to access endpoints
# protected by access token.
try:
payload = jwt.decode(
token,
refresh_token_jwk,
algorithms="HS256",
)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}"
)
return payload
Then my endpoint for getting token looks like this:
from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from app.auth import authenticate_user, create_token, decode_refresh_token
from app.users import get_user
router = APIRouter()
@router.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
):
# If authenticate_user() fails, exception from within the function
# will be raised.
user = authenticate_user(form_data.username, form_data.password)
access_token = create_token(user, "access", 60)
refresh_token = create_token(user, "refresh", 60*24*3)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@router.post("/token/refresh")
async def refresh_access_token(
token: dict = Depends(decode_refresh_token)
):
user = get_user(token.get["sub"]) # arbitrary function to get user by email
access_token = create_token(user, "access", 60)
refresh_token = create_token(user, "refresh", 60*24*3)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
That's it, use decode_access_token()
as dependency in any endpoint you want to secure and it will automatically read token from header and verify it:
from fastapi import APIRouter, Depends
from app.auth import decode_access_token
router = APIRouter()
@router.get("/secure/route")
def get_secured_data(token: dict = Depends(decode_access_token)):
# under token variable you can access token data and claims
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With