Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MFA v1 - TOTP #2830

Draft
wants to merge 31 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
74907af
Add pyotp dependency to Pipfile
yash-learner Feb 10, 2025
5c3964e
Add TOTP secret and MFA settings to User model
yash-learner Feb 10, 2025
f560363
Add make migratons
yash-learner Feb 10, 2025
173b4d3
Add TOTP setup and verification endpoints for MFA
yash-learner Feb 10, 2025
0f25d2d
Register TOTPViewSet in API router
yash-learner Feb 10, 2025
023b721
Refactor TOTPViewSet to use Pydantic models
yash-learner Feb 11, 2025
418eaf7
fix and improve logic
yash-learner Feb 11, 2025
8efcd17
Add backup code generation
yash-learner Feb 11, 2025
2527506
Change TOTP secret field type from CharField to TextField in user mod…
yash-learner Feb 11, 2025
ca0e9dc
Implement TOTP secret encryption and decryption; update user model to…
yash-learner Feb 11, 2025
e7a24ae
Update TOTP route to use 'mfa/totp' for improved clarity
yash-learner Feb 11, 2025
901ba9d
WIP: integrate MFA into login flow
yash-learner Feb 11, 2025
ea3d27d
WIP: integrate MFA into login flow
yash-learner Feb 11, 2025
622fa57
simply flow
yash-learner Feb 12, 2025
5eef7d7
Refactor TOTP login method and clean up MFA token handling
yash-learner Feb 12, 2025
d5aeab4
Add email notification for TOTP activation
yash-learner Feb 13, 2025
d45a732
Add TOTP disable functionality and refactor login request handling
yash-learner Feb 13, 2025
fac7eef
Refactor TOTP setup and error handling; add regenerate backup codes f…
yash-learner Feb 14, 2025
edcc411
Add endpoint backup_login and wip:refactor
yash-learner Feb 16, 2025
f46573c
Add mailer for TOTP disable and tiny fixes
yash-learner Feb 17, 2025
c027542
WIP: Fixing review comments and playing with rate limit
yash-learner Feb 18, 2025
ce9f6f8
Fix review comments
yash-learner Feb 19, 2025
ccddf0c
Remove success message from TOTP enable response
yash-learner Feb 19, 2025
8bd3442
Refactor:
yash-learner Feb 20, 2025
3eeb85a
Create a unified MFA endpoint and cleanup separate endpoints
yash-learner Feb 20, 2025
22d2d79
Tiny clean up
yash-learner Feb 20, 2025
7754731
Add MFALoginResponse model for unified MFA login endpoint
yash-learner Feb 20, 2025
ff6a286
Add password requirement when using setup and regenerate backup codes
yash-learner Feb 20, 2025
e9417f3
Merge branch 'develop' into issues/2614/mfa
yash-learner Feb 20, 2025
477096f
Remove TOTP_SECRET_ENCRYPTION_KEY from environment configuration files
yash-learner Feb 20, 2025
280f7d3
Remove unnecessary comment from MFA login viewset
yash-learner Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ BUCKET_ENDPOINT=http://localhost:4566
BUCKET_EXTERNAL_ENDPOINT=http://localhost:4566
FILE_UPLOAD_BUCKET=patient-bucket
FACILITY_S3_BUCKET=facility-bucket

TOTP_SECRET_ENCRYPTION_KEY=your-encryption-key-here
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pillow = "==11.1.0"
psycopg = { extras = ["c"], version = "==3.2.3" }
pydantic = "==2.9.2"
pyjwt = "==2.10.1"
pyotp = "==2.9.0"
python-slugify = "==8.0.4"
pywebpush = "==2.0.1"
redis = { extras = ["hiredis"], version = "==5.2.1" }
Expand Down
194 changes: 104 additions & 90 deletions Pipfile.lock

Large diffs are not rendered by default.

334 changes: 334 additions & 0 deletions care/emr/api/viewsets/totp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
from secrets import choice
from string import digits

from django.contrib.auth.hashers import check_password, make_password
from django.utils import timezone
from drf_spectacular.utils import extend_schema
from pydantic import BaseModel
from pyotp import TOTP, random_base32
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken

from care.emr.api.viewsets.base import EMRBaseViewSet
from care.emr.tasks.totp import send_totp_disabled_email, send_totp_enabled_email
from care.users.models import User
from care.utils.encryption import decrypt_string, encrypt_string


class TOTPSetupResponse(BaseModel):
uri: str
secret_key: str


class TOTPVerifyRequest(BaseModel):
code: str


class TOTPVerifyResponse(BaseModel):
message: str
backup_codes: list[str]


class TOTPLoginRequest(BaseModel):
code: str
temp_token: str


class TOTPLoginResponse(BaseModel):
access: str
refresh: str


class TOTPDisableRequest(BaseModel):
password: str


class TOTPDisableResponse(BaseModel):
message: str


class TOTPViewSet(EMRBaseViewSet):
permission_classes = [IsAuthenticated]

@extend_schema(
description="Initialize TOTP setup for user",
responses={
200: TOTPSetupResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def setup(self, request):
user = request.user

mfa_settings = user.mfa_settings or {}

if self._check_totp_enabled(mfa_settings) is None:
return Response(
{
"error": "Two-factor authentication is already enabled for your account"
},
status=status.HTTP_400_BAD_REQUEST,
)

secret = random_base32()
encrypted_secret = encrypt_string(secret)

totp = TOTP(secret)
uri = totp.provisioning_uri(name=user.email, issuer_name="CARE")

user.totp_secret = encrypted_secret
user.save(update_fields=["totp_secret"])

response_data = TOTPSetupResponse(uri=uri, secret_key=secret)
return Response(response_data.model_dump())

@staticmethod
def _generate_backup_codes(count: int = 10) -> list[str]:
"""Generate 8-digit backup codes."""
codes = []
for _ in range(count):
code = "".join(choice(digits) for _ in range(8))
codes.append(code)
return codes

@extend_schema(
description="Verify TOTP code and enable 2FA",
request=TOTPVerifyRequest,
responses={
200: TOTPVerifyResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def verify(self, request):
request_data = TOTPVerifyRequest(**request.data)
user = request.user

if not user.totp_secret:
return Response(
{"error": "TOTP not configured"}, status=status.HTTP_400_BAD_REQUEST
)

secret = decrypt_string(user.totp_secret)
totp = TOTP(secret)

if totp.verify(request_data.code):
backup_codes = self._generate_backup_codes()

mfa_settings = user.mfa_settings or {}
mfa_settings["totp"] = {
"enabled": True,
"enabled_at": timezone.now().isoformat(),
"backup_codes": [
{
"code": make_password(code),
"used": False,
"created_at": timezone.now().isoformat(),
}
for code in backup_codes
],
}
user.mfa_settings = mfa_settings
user.save(update_fields=["mfa_settings"])

send_totp_enabled_email.delay(user.email, user.username)

response_data = TOTPVerifyResponse(
message="Two-factor authentication has been enabled successfully. Please save your backup codes in a secure location.",
backup_codes=backup_codes,
)
return Response(response_data.model_dump())

return Response({"error": "Invalid code"}, status=status.HTTP_400_BAD_REQUEST)

@extend_schema(
description="Verify TOTP code during login",
request=TOTPLoginRequest,
responses={
200: TOTPLoginResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(
detail=False,
methods=["POST"],
permission_classes=[],
authentication_classes=[],
)
def login(self, request):
request_data = TOTPLoginRequest(**request.data)

token = RefreshToken(request_data.temp_token)
if not token.get("temp_token"):
return Response(
{"error": "Invalid token type"}, status=status.HTTP_400_BAD_REQUEST
)

user = User.objects.get(external_id=token["user_id"])
totp = TOTP(decrypt_string(user.totp_secret))

if totp.verify(request_data.code):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the backup code used here? the backup codes are maintained by us or the package?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By us, pyotp does not provide out of the box

refresh = RefreshToken.for_user(user)

try:
token.blacklist()
except AttributeError:
pass

return Response(
{
"access": str(refresh.access_token),
"refresh": str(refresh),
}
)

return Response({"error": "Invalid code"}, status=status.HTTP_400_BAD_REQUEST)

@extend_schema(
description="Disable TOTP-based two-factor authentication",
request=TOTPDisableRequest,
responses={
200: TOTPDisableResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def disable(self, request):
password = TOTPDisableRequest(**request.data).password

if not request.user.check_password(password):
return Response(
{"error": "Invalid credentials"}, status=status.HTTP_400_BAD_REQUEST
)

user = request.user
mfa_settings = user.mfa_settings or {}

if error_response := self._check_totp_enabled(mfa_settings):
return error_response

mfa_settings["totp"] = {
"enabled": False,
"totp_disabled_at": timezone.now().isoformat(),
"backup_codes": [],
}
user.mfa_settings = mfa_settings
user.totp_secret = None
user.save(update_fields=["mfa_settings", "totp_secret"])

send_totp_disabled_email.delay(user.email, user.username)

response_data = TOTPDisableResponse(
message="Two-factor authentication has been disabled successfully"
)
return Response(response_data.model_dump())

@extend_schema(
description="Regenerate TOTP backup codes",
responses={
200: {
"type": "object",
"properties": {
"backup_codes": {"type": "array", "items": {"type": "string"}}
},
},
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def regenerate_backup_codes(self, request):
user = request.user
mfa_settings = user.mfa_settings or {}

if error_response := self._check_totp_enabled(mfa_settings):
return error_response

backup_codes = self._generate_backup_codes()
mfa_settings["totp"]["backup_codes"] = [
{
"code": make_password(code),
"used": False,
"created_at": timezone.now().isoformat(),
}
for code in backup_codes
]
user.mfa_settings = mfa_settings
user.save(update_fields=["mfa_settings"])

return Response({"backup_codes": backup_codes})

@staticmethod
def _check_totp_enabled(mfa_settings: dict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should return either true or false and let the implementer write the exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should return either true or false and let the implementer write the exception.

Yeah, I thought this was the expected way when writting this method but wrote the exception inside the_check_totp_enabled as the exception is same for all three methods which are calling this so trying to not repeat the code made me write this way and also this will make understanding the code little tough and also misleading with its method name.

Will fix it 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totp_enabled = mfa_settings.get("totp", {}).get("enabled", False)

if not totp_enabled:
return Response(
{"error": "Two-factor authentication is not enabled for your account"},
status=status.HTTP_400_BAD_REQUEST,
)
return None

@extend_schema(
description="Login using a backup code",
request=TOTPLoginRequest,
responses={
200: TOTPLoginResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(
detail=False,
methods=["POST"],
permission_classes=[],
authentication_classes=[],
)
def backup_login(self, request):
request_data = TOTPLoginRequest(**request.data)
token = RefreshToken(request_data.temp_token)

if not token.get("temp_token"):
return Response(
{"error": "Invalid token type"},
status=status.HTTP_400_BAD_REQUEST,
)

user = User.objects.get(external_id=token["user_id"])
mfa_settings = user.mfa_settings or {}
backup_codes = mfa_settings.get("totp", {}).get("backup_codes", [])

matching_code = next(
(
code
for code in backup_codes
if not code["used"] and check_password(request_data.code, code["code"])
),
None,
)

if not matching_code:
return Response(
{"error": "Invalid or already used backup code."},
status=status.HTTP_400_BAD_REQUEST,
)

matching_code.update({"used": True, "used_at": timezone.now().isoformat()})
user.mfa_settings = mfa_settings
user.save(update_fields=["mfa_settings"])

refresh = RefreshToken.for_user(user)

try:
token.blacklist()
except AttributeError:
pass

return Response(
{
"access": str(refresh.access_token),
"refresh": str(refresh),
}
)
Loading