This repository has been archived by the owner on Mar 8, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: auth0 verify issue: adds dotenv package -- working auth roles te…
…sting
- Loading branch information
1 parent
bf92607
commit 1a51e02
Showing
9 changed files
with
2,122 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
import os | ||
import json | ||
import ssl | ||
|
||
from flask import request, _request_ctx_stack, abort | ||
from jose import jwt | ||
from functools import wraps | ||
from urllib.request import urlopen | ||
|
||
AUTH0_DOMAIN = os.getenv('AUTH0_DOMAIN') | ||
ALGORITHMS = os.getenv('AUTH0_ALGORITHMS') | ||
API_AUDIENCE = os.getenv('AUTH0_AUDIENCE') | ||
|
||
class AuthError(Exception): | ||
def __init__(self, error, status_code): | ||
self.error = error | ||
self.status_code = status_code | ||
|
||
|
||
# Auth Header | ||
def get_token_auth_header(): | ||
"""Obtains the Access Token from the Authorization Header""" | ||
auth = request.headers.get('Authorization', None) | ||
if not auth: | ||
raise AuthError({ | ||
'code': 'authorization_header_missing', | ||
'description': 'Authorization header is expected.' | ||
}, 401) | ||
parts = auth.split() | ||
if parts[0].lower() != 'bearer': | ||
raise AuthError({ | ||
'code': 'invalid_header', | ||
'description': 'Authorization header must start with "Bearer".' | ||
}, 401) | ||
elif len(parts) == 1: | ||
raise AuthError({ | ||
'code': 'invalid_header', | ||
'description': 'Token not found.' | ||
}, 401) | ||
elif len(parts) > 2: | ||
raise AuthError({ | ||
'code': 'invalid_header', | ||
'description': 'Authorization header must be bearer token.' | ||
}, 401) | ||
token = parts[1] | ||
return token | ||
|
||
|
||
def verify_decode_jwt(token): | ||
jsonurl = urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json") | ||
jwks = json.loads(jsonurl.read()) | ||
unverified_header = jwt.get_unverified_header(token) | ||
rsa_key = {} | ||
if 'kid' not in unverified_header: | ||
raise AuthError({ | ||
'code': 'invalid_header', | ||
'description': 'Authorization malformed.' | ||
}, 401) | ||
|
||
for key in jwks['keys']: | ||
if key['kid'] == unverified_header['kid']: | ||
rsa_key = { | ||
'kty': key['kty'], | ||
'kid': key['kid'], | ||
'use': key['use'], | ||
'n': key['n'], | ||
'e': key['e'] | ||
} | ||
|
||
if rsa_key: | ||
try: | ||
payload = jwt.decode( | ||
token, | ||
rsa_key, | ||
algorithms=ALGORITHMS, | ||
audience=API_AUDIENCE, | ||
issuer='https://' + AUTH0_DOMAIN + '/' | ||
) | ||
return payload | ||
except jwt.ExpiredSignatureError: | ||
raise AuthError({ | ||
'code': 'token_expired', | ||
'description': 'Token expired.' | ||
}, 401) | ||
except jwt.JWTClaimsError: | ||
raise AuthError({ | ||
'code': 'invalid_claims', | ||
'description': 'Incorrect claims. check the\ | ||
audience and issuer.' | ||
}, 401) | ||
except Exception: | ||
raise AuthError({ | ||
'code': 'invalid_header', | ||
'description': 'Unable to parse authentication token.' | ||
}, 400) | ||
raise AuthError({ | ||
'code': 'invalid_header', | ||
'description': 'Unable to find the appropriate key.' | ||
}, 400) | ||
|
||
|
||
def check_permissions(permission, payload): | ||
if 'permissions' not in payload: | ||
raise AuthError({ | ||
'code': 'invalid_claims', | ||
'description': 'Permissions not included in JWT.' | ||
}, 400) | ||
if permission not in payload['permissions']: | ||
raise AuthError({ | ||
'code': 'unauthorized', | ||
'description': 'Permission not found.' | ||
}, 401) | ||
return True | ||
|
||
|
||
def requires_auth(permission=''): | ||
def requires_auth_decorator(f): | ||
@wraps(f) | ||
def wrapper(*args, **kwargs): | ||
token = get_token_auth_header() | ||
try: | ||
payload = verify_decode_jwt(token) | ||
except BaseException: | ||
print("could not verify_decode_jwt") | ||
abort(401) | ||
check_permissions(permission, payload) | ||
return f(payload, *args, **kwargs) | ||
return wrapper | ||
return requires_auth_decorator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.