Skip to content

Validate the Access token with FastAPI

Last reviewed: over 1 year ago

This tutorial covers how to validate that the Access JWT is on requests made to FastAPI apps.

Time to complete: 15 minutes

Prerequisites

1. Create a validation function

  1. In your FastAPI project, create a new file called cloudflare.py that contains the following code:
from fastapi import Request, HTTPException
# The Application Audience (AUD) tag for your application
POLICY_AUD = "XXXXX"
# Your CF Access team domain
TEAM_DOMAIN = "https://<your-team-name>.cloudflareaccess.com"
CERTS_URL = "{}/cdn-cgi/access/certs".format(TEAM_DOMAIN)
async def validate_cloudflare(request: Request):
"""
Validate that the request is authenticated by Cloudflare Access.
"""
if verify_token(request) != True:
raise HTTPException(status_code=400, detail="Not authenticated properly!")
def _get_public_keys():
"""
Returns:
List of RSA public keys usable by PyJWT.
"""
r = requests.get(CERTS_URL)
public_keys = []
jwk_set = r.json()
for key_dict in jwk_set["keys"]:
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key_dict))
public_keys.append(public_key)
return public_keys
def verify_token(request):
"""
Verify the token in the request.
"""
token = ""
if "CF_Authorization" in request.cookies:
token = request.cookies["CF_Authorization"]
else:
raise HTTPException(status_code=400, detail="missing required cf authorization token")
keys = _get_public_keys()
# Loop through the keys since we can't pass the key set to the decoder
valid_token = False
for key in keys:
try:
# decode returns the claims that has the email when needed
jwt.decode(token, key=key, audience=POLICY_AUD, algorithms=["RS256"])
valid_token = True
break
except:
raise HTTPException(status_code=400, detail="Error decoding token")
if not valid_token:
raise HTTPException(status_code=400, detail="Invalid token")
return True

2. Use the validation function in your app

You can now add the validation function as a dependency in your FastAPI app. One way to do this is by creating an APIRouter instance. The following example executes the validation function on each request made to paths that start with /admin:

from fastapi import APIRouter, Depends, HTTPException
from cloudflare import validate_cloudflare
router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(validate_cloudflare)]
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def root():
return {"message": "Hello World"}