-
Notifications
You must be signed in to change notification settings - Fork 2
/
authentication.py
104 lines (74 loc) · 3.1 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
from django.contrib.auth.models import AnonymousUser
from django.core.cache import cache
from mohawk import Receiver
from mohawk.exc import AlreadyProcessed, HawkFail
from rest_framework import authentication, exceptions
from sentry_sdk import capture_exception
from conf import settings
logger = logging.getLogger(__name__)
class HawkOnlyAuthentication(authentication.BaseAuthentication):
def authenticate(self, request):
"""Authenticate the request and return a two-tuple of (user, token).
Establish that the request has come from an authorised LITE API client
by checking that the request is correctly Hawk signed
"""
try:
hawk_receiver = _authenticate(request)
except HawkFail as e:
logger.warning("Failed HAWK authentication %s", e)
raise exceptions.AuthenticationFailed(f"Failed HAWK authentication")
except Exception as e:
logger.error("Failed HAWK authentication %s", e)
if settings.SENTRY_ENABLED:
capture_exception(e)
raise exceptions.AuthenticationFailed(f"Failed HAWK authentication")
return AnonymousUser(), hawk_receiver
def authenticate_header(self, request):
return "Hawk"
def _authenticate(request):
"""
Raises a HawkFail exception if the passed request cannot be authenticated
"""
if hawk_authentication_enabled():
return Receiver(
_lookup_credentials,
request.META["HTTP_HAWK_AUTHENTICATION"],
# build_absolute_uri() returns 'http' which is incorrect since our clients communicate via https
request.build_absolute_uri().replace("http", "https"),
request.method,
content=request.body,
content_type=request.content_type,
seen_nonce=_seen_nonce,
)
def _seen_nonce(access_key_id, nonce, _):
"""
Returns if the passed access_key_id/nonce combination has been
used within settings.HAWK_RECEIVER_NONCE_EXPIRY_SECONDS
"""
cache_key = f"hawk:{access_key_id}:{nonce}"
# cache.add only adds key if it isn't present
seen_cache_key = not cache.add(cache_key, True, timeout=settings.HAWK_RECEIVER_NONCE_EXPIRY_SECONDS)
if seen_cache_key:
raise AlreadyProcessed(f"Already seen nonce {nonce}")
return seen_cache_key
def _lookup_credentials(access_key_id):
"""
Raises HawkFail if the access key ID cannot be found.
"""
try:
credentials = settings.HAWK_CREDENTIALS[access_key_id]
except KeyError as exc:
raise HawkFail(f"No Hawk ID of {access_key_id}") from exc
return {
"id": access_key_id,
"algorithm": "sha256",
**credentials,
}
def hawk_authentication_enabled() -> bool:
"""Defined as method as you can't override settings.HAWK_AUTHENTICATION_ENABLED correctly in tests.
Patch this function to get desired behaviour.
See here for reason:
https://stackoverflow.com/questions/29367043/unit-testing-django-rest-framework-authentication-at-runtime
"""
return settings.HAWK_AUTHENTICATION_ENABLED