Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating crud endpoint for user and facility flag #2585

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions care/facility/api/serializers/facility_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from rest_framework import serializers

from care.facility.models import Facility, FacilityFlag
from care.utils.serializers.fields import ExternalIdSerializerField


class FacilityFlagSerializer(serializers.ModelSerializer):
id = serializers.UUIDField(source="external_id", read_only=True)
facility = ExternalIdSerializerField(queryset=Facility.objects.all(), required=True)

vigneshhari marked this conversation as resolved.
Show resolved Hide resolved
class Meta:
model = FacilityFlag
exclude = ["external_id", "deleted", "modified_date", "created_date"]
27 changes: 27 additions & 0 deletions care/facility/api/viewsets/facility_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django_filters import rest_framework as filters
from rest_framework import viewsets

from care.facility.api.serializers.facility_flag import FacilityFlagSerializer
from care.facility.models import FacilityFlag
from care.utils.custom_permissions import IsSuperUser


class FacilityFlagFilter(filters.FilterSet):
flag = filters.CharFilter(field_name="flag", lookup_expr="icontains")
facility = filters.UUIDFilter(field_name="facility__external_id")


class FacilityFlagViewSet(viewsets.ModelViewSet):
DraKen0009 marked this conversation as resolved.
Show resolved Hide resolved
"""
CRUD operations for FacilityFlag model.

This viewset is restricted to superusers only and provides endpoints to manage facility flags.
"""

queryset = FacilityFlag.objects.all()
serializer_class = FacilityFlagSerializer
permission_classes = [IsSuperUser]
lookup_field = "external_id"

filter_backends = [filters.DjangoFilterBackend]
filterset_class = FacilityFlagFilter
vigneshhari marked this conversation as resolved.
Show resolved Hide resolved
156 changes: 156 additions & 0 deletions care/facility/tests/test_facility_flags_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from rest_framework import status
from rest_framework.test import APITestCase

from care.facility.models import FacilityFlag
from care.utils.registries.feature_flag import FlagRegistry, FlagType
from care.utils.tests.test_utils import TestUtils


class FacilityFlagsViewSetTestCase(TestUtils, APITestCase):
@classmethod
def setUpTestData(cls):
FlagRegistry.register(FlagType.FACILITY, "TEST_FLAG")
FlagRegistry.register(FlagType.FACILITY, "TEST_FLAG_2")

cls.state = cls.create_state()
cls.district = cls.create_district(cls.state)
cls.local_body = cls.create_local_body(cls.district)
cls.super_user = cls.create_super_user("su", cls.district)
cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body)
cls.facility2 = cls.create_facility(
cls.super_user, cls.district, cls.local_body
)
cls.user = cls.create_user("staff", cls.district, home_facility=cls.facility)
cls.facility_flag_2 = cls.create_facility_flag("TEST_FLAG_2", cls.facility)

def setUp(self):
self.facility_flag_1 = self.create_facility_flag("TEST_FLAG", self.facility)

def get_url(self, facility_flag_id=None):
base_url = "/api/v1/facility_flags/"
if facility_flag_id is not None:
base_url += f"{facility_flag_id}/"
return base_url

def test_access_with_non_super_user(self):
self.client.force_authenticate(user=self.user)
response = self.client.get(self.get_url())
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_access_with_super_user(self):
self.client.force_authenticate(user=self.super_user)
response = self.client.get(self.get_url())
self.assertEqual(response.status_code, status.HTTP_200_OK)

def test_list_facility_flags(self):
self.client.force_authenticate(user=self.super_user)
response = self.client.get(self.get_url())
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["count"], 2)

def test_create_facility_flag(self):
self.client.force_authenticate(user=self.super_user)

response = self.client.post(self.get_url(), {})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("This field is required.", response.json()["flag"])
self.assertIn("This field is required.", response.json()["facility"])

# Attempting to create a duplicate flag
response = self.client.post(
self.get_url(), {"flag": "TEST_FLAG", "facility": self.facility.external_id}
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"The fields facility, flag must make a unique set.",
response.json()["non_field_errors"],
)

# Creating a new facility flag
response = self.client.post(
self.get_url(),
{"flag": "TEST_FLAG", "facility": self.facility2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

def test_retrieve_facility_flag(self):
self.client.force_authenticate(user=self.super_user)
response = self.client.get(self.get_url(self.facility_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["flag"], "TEST_FLAG")
self.assertEqual(data["facility"], str(self.facility.external_id))

def test_update_facility_flag(self):
self.client.force_authenticate(user=self.super_user)

# Confirming original values
response = self.client.get(self.get_url(self.facility_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["flag"], "TEST_FLAG")
self.assertEqual(data["facility"], str(self.facility.external_id))

# Update the facility flag
response = self.client.put(
self.get_url(self.facility_flag_1.external_id),
{"flag": "TEST_FLAG", "facility": self.facility2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.facility_flag_1.refresh_from_db()
self.assertEqual(self.facility_flag_1.flag, "TEST_FLAG")
self.assertEqual(
self.facility_flag_1.facility.external_id, self.facility2.external_id
)

def test_patch_facility_flag(self):
self.client.force_authenticate(user=self.super_user)

# Confirming original values
response = self.client.get(self.get_url(self.facility_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["flag"], "TEST_FLAG")
self.assertEqual(data["facility"], str(self.facility.external_id))

# Patch the facility flag
response = self.client.patch(
self.get_url(self.facility_flag_1.external_id),
{"facility": self.facility2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.facility_flag_1.refresh_from_db()
self.assertEqual(self.facility_flag_1.flag, "TEST_FLAG")
self.assertEqual(
self.facility_flag_1.facility.external_id, self.facility2.external_id
)

def test_delete_facility_flag(self):
self.client.force_authenticate(user=self.super_user)

# Confirm if the object exist
self.assertTrue(
FacilityFlag.objects.filter(
external_id=self.facility_flag_1.external_id
).exists()
)
response = self.client.delete(self.get_url(self.facility_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)

self.facility_flag_1.refresh_from_db()
self.assertFalse(
FacilityFlag.objects.filter(
external_id=self.facility_flag_1.external_id
).exists()
)

def test_creating_facility_flag_with_non_existing_flag(self):
self.client.force_authenticate(user=self.super_user)

response = self.client.post(
self.get_url(),
{"flag": "TEST_FLAG_NON_EXISTING", "facility": self.facility2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json()["detail"], "Flag not registered")
13 changes: 13 additions & 0 deletions care/users/api/serializers/user_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from rest_framework import serializers

from care.users.models import User, UserFlag
from care.utils.serializers.fields import ExternalIdSerializerField


class UserFlagSerializer(serializers.ModelSerializer):
id = serializers.UUIDField(source="external_id", read_only=True)
user = ExternalIdSerializerField(queryset=User.objects.all(), required=True)
sainak marked this conversation as resolved.
Show resolved Hide resolved

class Meta:
model = UserFlag
exclude = ["external_id", "deleted", "modified_date", "created_date"]
27 changes: 27 additions & 0 deletions care/users/api/viewsets/user_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django_filters import rest_framework as filters
from rest_framework import viewsets

from care.users.api.serializers.user_flag import UserFlagSerializer
from care.users.models import UserFlag
from care.utils.custom_permissions import IsSuperUser


class UserFlagFilter(filters.FilterSet):
flag = filters.CharFilter(field_name="flag", lookup_expr="icontains")
DraKen0009 marked this conversation as resolved.
Show resolved Hide resolved
user = filters.UUIDFilter(field_name="user__external_id")


class UserFlagViewSet(viewsets.ModelViewSet):
"""
CRUD operations for UserFlag model.

This viewset is restricted to superusers only and provides endpoints to manage user flags.
"""

queryset = UserFlag.objects.all()
serializer_class = UserFlagSerializer
permission_classes = [IsSuperUser]
lookup_field = "external_id"

filter_backends = [filters.DjangoFilterBackend]
filterset_class = UserFlagFilter
146 changes: 146 additions & 0 deletions care/users/tests/test_user_flags_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from rest_framework import status
from rest_framework.test import APITestCase

from care.users.models import UserFlag
from care.utils.registries.feature_flag import FlagRegistry, FlagType
from care.utils.tests.test_utils import TestUtils


class UserFlagsViewSetTestCase(TestUtils, APITestCase):
@classmethod
def setUpTestData(cls):
FlagRegistry.register(FlagType.USER, "TEST_FLAG")
FlagRegistry.register(FlagType.USER, "TEST_FLAG_2")

cls.state = cls.create_state()
cls.district = cls.create_district(cls.state)
cls.local_body = cls.create_local_body(cls.district)
cls.super_user = cls.create_super_user("su", cls.district)
cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body)
cls.user = cls.create_user("staff", cls.district, home_facility=cls.facility)
cls.user_2 = cls.create_user("user2", cls.district, home_facility=cls.facility)
cls.user_flag_2 = cls.create_user_flag("TEST_FLAG_2", cls.user_2)

def setUp(self):
self.user_flag_1 = self.create_user_flag("TEST_FLAG", self.user)

def get_url(self, user_flag_id=None):
base_url = "/api/v1/user_flags/"
if user_flag_id is not None:
base_url += f"{user_flag_id}/"
return base_url

def test_access_with_non_super_user(self):
self.client.force_authenticate(user=self.user)
response = self.client.get(self.get_url())
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_access_with_super_user(self):
self.client.force_authenticate(user=self.super_user)
response = self.client.get(self.get_url())
self.assertEqual(response.status_code, status.HTTP_200_OK)

Comment on lines +35 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

How thoughtful of you to test the bare minimum access controls...

While you've covered the basic super user and non-super user cases (slow clap), you might want to consider adding tests for:

  • Unauthenticated access
  • Validation of error messages in the response body
  • Token-based authentication scenarios

But I guess that would be asking for too much... 🤷‍♂️

def test_list_user_flags(self):
self.client.force_authenticate(user=self.super_user)
response = self.client.get(self.get_url())
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["count"], 2)

def test_create_user_flag(self):
self.client.force_authenticate(user=self.super_user)

response = self.client.post(self.get_url(), {})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("This field is required.", response.json()["flag"])
self.assertIn("This field is required.", response.json()["user"])

# Attempting to create a duplicate flag
response = self.client.post(
self.get_url(), {"flag": "TEST_FLAG", "user": self.user.external_id}
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
"The fields user, flag must make a unique set.",
response.json()["non_field_errors"],
)

# Creating a new user flag
response = self.client.post(
self.get_url(), {"flag": "TEST_FLAG", "user": self.user_2.external_id}
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

def test_retrieve_user_flag(self):
self.client.force_authenticate(user=self.super_user)
response = self.client.get(self.get_url(self.user_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["flag"], "TEST_FLAG")
self.assertEqual(data["user"], str(self.user.external_id))

def test_update_user_flag(self):
self.client.force_authenticate(user=self.super_user)

# Confirm original values
response = self.client.get(self.get_url(self.user_flag_1.external_id))
data = response.json()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(data["flag"], "TEST_FLAG")
self.assertEqual(data["user"], str(self.user.external_id))

# Update the user flag
response = self.client.put(
self.get_url(self.user_flag_1.external_id),
{"flag": "TEST_FLAG", "user": self.user_2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user_flag_1.refresh_from_db()
self.assertEqual(self.user_flag_1.flag, "TEST_FLAG")
self.assertEqual(self.user_flag_1.user.external_id, self.user_2.external_id)

def test_patch_user_flag(self):
self.client.force_authenticate(user=self.super_user)

# Confirm original values
response = self.client.get(self.get_url(self.user_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data["flag"], "TEST_FLAG")
self.assertEqual(data["user"], str(self.user.external_id))

# Patch the user flag
response = self.client.patch(
self.get_url(self.user_flag_1.external_id),
{"user": self.user_2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.user_flag_1.refresh_from_db()
self.assertEqual(self.user_flag_1.flag, "TEST_FLAG")
self.assertEqual(self.user_flag_1.user.external_id, self.user_2.external_id)

def test_delete_user_flag(self):
self.client.force_authenticate(user=self.super_user)

# Confirm if the object exist
self.assertTrue(
UserFlag.objects.filter(external_id=self.user_flag_1.external_id).exists()
)

response = self.client.delete(self.get_url(self.user_flag_1.external_id))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)

self.user_flag_1.refresh_from_db()
self.assertFalse(
UserFlag.objects.filter(external_id=self.user_flag_1.external_id).exists()
)

def test_creating_user_flag_with_non_existing_flag(self):
self.client.force_authenticate(user=self.super_user)

response = self.client.post(
self.get_url(),
{"flag": "TEST_FLAG_NON_EXISTING", "user": self.user.external_id},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json()["detail"], "Flag not registered")
6 changes: 6 additions & 0 deletions care/utils/custom_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from rest_framework.permissions import BasePermission


class IsSuperUser(BasePermission):
def has_permission(self, request, view):
return request.user and request.user.is_superuser
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

I see we're being very trusting with our superuser checks.

While the implementation is functionally correct for the happy path, it's worth noting (not that anyone asked) that this doesn't protect against inactive superusers. I mean, who needs that level of security anyway? 🙃

Here's a slightly more thorough version, if you're interested in actual security:

class IsSuperUser(BasePermission):
    def has_permission(self, request, view):
-        return request.user and request.user.is_superuser
+        return bool(
+            request.user and
+            request.user.is_authenticated and
+            request.user.is_active and
+            request.user.is_superuser
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class IsSuperUser(BasePermission):
def has_permission(self, request, view):
return request.user and request.user.is_superuser
class IsSuperUser(BasePermission):
def has_permission(self, request, view):
return bool(
request.user and
request.user.is_authenticated and
request.user.is_active and
request.user.is_superuser
)

Loading