Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MFA v1 - TOTP #2830

Draft
wants to merge 31 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
74907af
Add pyotp dependency to Pipfile
yash-learner Feb 10, 2025
5c3964e
Add TOTP secret and MFA settings to User model
yash-learner Feb 10, 2025
f560363
Add make migratons
yash-learner Feb 10, 2025
173b4d3
Add TOTP setup and verification endpoints for MFA
yash-learner Feb 10, 2025
0f25d2d
Register TOTPViewSet in API router
yash-learner Feb 10, 2025
023b721
Refactor TOTPViewSet to use Pydantic models
yash-learner Feb 11, 2025
418eaf7
fix and improve logic
yash-learner Feb 11, 2025
8efcd17
Add backup code generation
yash-learner Feb 11, 2025
2527506
Change TOTP secret field type from CharField to TextField in user mod…
yash-learner Feb 11, 2025
ca0e9dc
Implement TOTP secret encryption and decryption; update user model to…
yash-learner Feb 11, 2025
e7a24ae
Update TOTP route to use 'mfa/totp' for improved clarity
yash-learner Feb 11, 2025
901ba9d
WIP: integrate MFA into login flow
yash-learner Feb 11, 2025
ea3d27d
WIP: integrate MFA into login flow
yash-learner Feb 11, 2025
622fa57
simply flow
yash-learner Feb 12, 2025
5eef7d7
Refactor TOTP login method and clean up MFA token handling
yash-learner Feb 12, 2025
d5aeab4
Add email notification for TOTP activation
yash-learner Feb 13, 2025
d45a732
Add TOTP disable functionality and refactor login request handling
yash-learner Feb 13, 2025
fac7eef
Refactor TOTP setup and error handling; add regenerate backup codes f…
yash-learner Feb 14, 2025
edcc411
Add endpoint backup_login and wip:refactor
yash-learner Feb 16, 2025
f46573c
Add mailer for TOTP disable and tiny fixes
yash-learner Feb 17, 2025
c027542
WIP: Fixing review comments and playing with rate limit
yash-learner Feb 18, 2025
ce9f6f8
Fix review comments
yash-learner Feb 19, 2025
ccddf0c
Remove success message from TOTP enable response
yash-learner Feb 19, 2025
8bd3442
Refactor:
yash-learner Feb 20, 2025
3eeb85a
Create a unified MFA endpoint and cleanup separate endpoints
yash-learner Feb 20, 2025
22d2d79
Tiny clean up
yash-learner Feb 20, 2025
7754731
Add MFALoginResponse model for unified MFA login endpoint
yash-learner Feb 20, 2025
ff6a286
Add password requirement when using setup and regenerate backup codes
yash-learner Feb 20, 2025
e9417f3
Merge branch 'develop' into issues/2614/mfa
yash-learner Feb 20, 2025
477096f
Remove TOTP_SECRET_ENCRYPTION_KEY from environment configuration files
yash-learner Feb 20, 2025
280f7d3
Remove unnecessary comment from MFA login viewset
yash-learner Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pillow = "==11.1.0"
psycopg = { extras = ["c"], version = "==3.2.3" }
pydantic = "==2.9.2"
pyjwt = "==2.10.1"
pyotp = "==2.9.0"
python-slugify = "==8.0.4"
pywebpush = "==2.0.1"
redis = { extras = ["hiredis"], version = "==5.2.1" }
Expand Down
194 changes: 104 additions & 90 deletions Pipfile.lock

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions care/emr/api/viewsets/mfa_login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from enum import Enum

from django.contrib.auth.hashers import check_password
from django.utils import timezone
from drf_spectacular.utils import extend_schema
from pydantic import BaseModel
from pyotp import TOTP
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError

from care.emr.api.viewsets.base import EMRBaseViewSet
from care.emr.utils.mfa import (
check_mfa_ip_rate_limit,
check_mfa_user_rate_limit,
create_auth_response,
validate_temp_token,
)
from care.users.models import User


class LoginMethod(str, Enum):
totp = "totp"
backup = "backup"


class MFALoginRequest(BaseModel):
method: LoginMethod
code: str
temp_token: str


class MFALoginResponse(BaseModel):
access: str
refresh: str


class MFALoginViewSet(EMRBaseViewSet):
@extend_schema(
description="Unified MFA login endpoint supporting TOTP and backup codes",
request=MFALoginRequest,
responses={200: MFALoginResponse},
)
@action(
detail=False,
methods=["POST"],
permission_classes=[],
authentication_classes=[],
)
def login(self, request):
check_mfa_ip_rate_limit(request)
request_data = MFALoginRequest(**request.data)

user_id = validate_temp_token(request_data.temp_token)
check_mfa_user_rate_limit(request, user_id)

user = User.objects.get(external_id=user_id)

if request_data.method == LoginMethod.totp:
return self._handle_totp_login(user, request_data.code)
if request_data.method == LoginMethod.backup:
return self._handle_backup_login(user, request_data.code)

raise ValidationError("Invalid login method")

@staticmethod
def _handle_totp_login(user, code):
totp = TOTP(user.totp_secret)
if totp.verify(code, valid_window=1):
return create_auth_response(user)
raise ValidationError("Invalid TOTP code")

@staticmethod
def _handle_backup_login(user, code):
mfa_settings = user.mfa_settings or {}
backup_codes = mfa_settings.get("totp", {}).get("backup_codes", [])

matching_code = next(
(
code_entry
for code_entry in backup_codes
if not code_entry["used"] and check_password(code, code_entry["code"])
),
None,
)

if not matching_code:
raise ValidationError("Invalid or already used backup code")

matching_code.update({"used": True, "used_at": timezone.now().isoformat()})
user.mfa_settings = mfa_settings
user.save(update_fields=["mfa_settings"])

return create_auth_response(user)
217 changes: 217 additions & 0 deletions care/emr/api/viewsets/totp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from secrets import choice
from string import digits

from django.contrib.auth.hashers import make_password
from django.utils import timezone
from drf_spectacular.utils import extend_schema
from pydantic import BaseModel
from pyotp import TOTP, random_base32
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from care.emr.api.viewsets.base import EMRBaseViewSet
from care.emr.tasks.totp import send_totp_disabled_email, send_totp_enabled_email
from care.emr.utils.mfa import verify_password


class TOTPSetupResponse(BaseModel):
uri: str
secret_key: str


class TOTPVerifyRequest(BaseModel):
code: str


class TOTPVerifyResponse(BaseModel):
backup_codes: list[str]


class PasswordVerifyRequest(BaseModel):
password: str


class TOTPDisableResponse(BaseModel):
message: str


class TOTPViewSet(EMRBaseViewSet):
permission_classes = [IsAuthenticated]

@extend_schema(
description="Initialize TOTP setup for user",
responses={
200: TOTPSetupResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def setup(self, request):
password = PasswordVerifyRequest(**request.data).password
user = request.user

verify_password(user, password)

mfa_settings = user.mfa_settings or {}

self._validate_totp_state(mfa_settings, required_state=False)

secret = random_base32()

totp = TOTP(secret)
uri = totp.provisioning_uri(name=user.email, issuer_name="CARE")

user.totp_secret = secret
user.save(update_fields=["totp_secret"])

response_data = TOTPSetupResponse(uri=uri, secret_key=secret)
return Response(response_data.model_dump())

@staticmethod
def _generate_backup_codes(count: int = 10) -> list[str]:
"""Generate 8-digit backup codes."""
codes = []
for _ in range(count):
code = "".join(choice(digits) for _ in range(8))
codes.append(code)
return codes

@extend_schema(
description="Verify TOTP code and enable 2FA",
request=TOTPVerifyRequest,
responses={
200: TOTPVerifyResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def verify(self, request):
request_data = TOTPVerifyRequest(**request.data)
user = request.user

if not user.totp_secret:
raise ValidationError("TOTP not configured for your account")

mfa_settings = user.mfa_settings or {}

self._validate_totp_state(mfa_settings, required_state=False)

secret = user.totp_secret
totp = TOTP(secret)

if totp.verify(request_data.code, valid_window=1):
backup_codes = self._generate_backup_codes()

mfa_settings["totp"] = {
"enabled": True,
"enabled_at": timezone.now().isoformat(),
"backup_codes": [
{
"code": make_password(code),
"used": False,
"created_at": timezone.now().isoformat(),
}
for code in backup_codes
],
}
user.mfa_settings = mfa_settings
user.save(update_fields=["mfa_settings"])

send_totp_enabled_email.delay(user.email, user.username)

response_data = TOTPVerifyResponse(
backup_codes=backup_codes,
)
return Response(response_data.model_dump())

return Response({"error": "Invalid code"}, status=status.HTTP_400_BAD_REQUEST)

@extend_schema(
description="Disable TOTP-based two-factor authentication",
request=PasswordVerifyRequest,
responses={
200: TOTPDisableResponse,
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def disable(self, request):
password = PasswordVerifyRequest(**request.data).password

verify_password(request.user, password)

user = request.user
mfa_settings = user.mfa_settings or {}

self._validate_totp_state(mfa_settings, required_state=True)

mfa_settings["totp"] = {
"enabled": False,
"totp_disabled_at": timezone.now().isoformat(),
"backup_codes": [],
}
user.mfa_settings = mfa_settings
user.totp_secret = None
user.save(update_fields=["mfa_settings", "totp_secret"])

send_totp_disabled_email.delay(user.email, user.username)

response_data = TOTPDisableResponse(
message="Two-factor authentication has been disabled successfully"
)
return Response(response_data.model_dump())

@extend_schema(
description="Regenerate TOTP backup codes",
request=PasswordVerifyRequest,
responses={
200: {
"type": "object",
"properties": {
"backup_codes": {"type": "array", "items": {"type": "string"}}
},
},
400: {"type": "object", "properties": {"error": {"type": "string"}}},
},
)
@action(detail=False, methods=["POST"])
def regenerate_backup_codes(self, request):
password = PasswordVerifyRequest(**request.data).password
user = request.user

verify_password(user, password)

mfa_settings = user.mfa_settings or {}

self._validate_totp_state(mfa_settings, required_state=True)

backup_codes = self._generate_backup_codes()
mfa_settings["totp"]["backup_codes"] = [
{
"code": make_password(code),
"used": False,
"created_at": timezone.now().isoformat(),
}
for code in backup_codes
]
user.mfa_settings = mfa_settings
user.save(update_fields=["mfa_settings"])

return Response({"backup_codes": backup_codes})

@staticmethod
def _validate_totp_state(mfa_settings: dict, required_state: bool):
is_enabled = mfa_settings.get("totp", {}).get("enabled", False)

if required_state and not is_enabled:
raise ValidationError(
"Two-factor authentication is not enabled for your account"
)

if not required_state and is_enabled:
raise ValidationError(
"Two-factor authentication is already enabled for your account"
)
55 changes: 55 additions & 0 deletions care/emr/tasks/totp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from celery import shared_task
from django.conf import settings
from django.core.mail import EmailMessage
from django.template.loader import render_to_string
from django.utils import timezone


@shared_task(
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3},
expires=10 * 60,
)
def send_totp_enabled_email(user_email: str, user_name: str):
"""Send email notification when TOTP is enabled"""
context = {
"username": user_name,
"email": user_email,
"enabled_at": timezone.now().strftime("%Y-%m-%d %H:%M:%S"),
}

email_html_message = render_to_string("email/totp_enabled.html", context)

msg = EmailMessage(
"Two-Factor Authentication Enabled",
email_html_message,
settings.DEFAULT_FROM_EMAIL,
(user_email,),
)
msg.content_subtype = "html"
msg.send()


@shared_task(
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3},
expires=10 * 60,
)
def send_totp_disabled_email(user_email: str, user_name: str):
"""Send email notification when TOTP is disabled"""
context = {
"username": user_name,
"email": user_email,
"disabled_at": timezone.now().strftime("%Y-%m-%d %H:%M:%S"),
}

email_html_message = render_to_string("email/totp_disabled.html", context)

msg = EmailMessage(
"Two-Factor Authentication Disabled",
email_html_message,
settings.DEFAULT_FROM_EMAIL,
(user_email,),
)
msg.content_subtype = "html"
msg.send()
Loading