diff --git a/care/facility/api/serializers/facility_flag.py b/care/facility/api/serializers/facility_flag.py new file mode 100644 index 0000000000..ad8647e9b9 --- /dev/null +++ b/care/facility/api/serializers/facility_flag.py @@ -0,0 +1,13 @@ +from rest_framework import serializers + +from care.facility.models import Facility, FacilityFlag +from care.utils.serializers.fields import ExternalIdSerializerField + + +class FacilityFlagSerializer(serializers.ModelSerializer): + id = serializers.UUIDField(source="external_id", read_only=True) + facility = ExternalIdSerializerField(queryset=Facility.objects.all(), required=True) + + class Meta: + model = FacilityFlag + exclude = ["external_id", "deleted", "modified_date", "created_date"] diff --git a/care/facility/api/viewsets/facility_flag.py b/care/facility/api/viewsets/facility_flag.py new file mode 100644 index 0000000000..e590b5e36f --- /dev/null +++ b/care/facility/api/viewsets/facility_flag.py @@ -0,0 +1,44 @@ +from django_filters import rest_framework as filters +from rest_framework import status, viewsets +from rest_framework.decorators import action +from rest_framework.response import Response + +from care.facility.api.serializers.facility_flag import FacilityFlagSerializer +from care.facility.models import FacilityFlag +from care.utils.custom_permissions import IsSuperUser +from care.utils.registries.feature_flag import FlagRegistry, FlagType + + +class FacilityFlagFilter(filters.FilterSet): + flag = filters.CharFilter(field_name="flag", lookup_expr="exact") + facility = filters.UUIDFilter(field_name="facility__external_id") + + +class FacilityFlagViewSet(viewsets.ModelViewSet): + """ + CRUD operations for FacilityFlag model. + + This viewset is restricted to superusers only and provides endpoints to manage facility flags. + """ + + queryset = FacilityFlag.objects.all() + serializer_class = FacilityFlagSerializer + permission_classes = [IsSuperUser] + lookup_field = "external_id" + + filter_backends = [filters.DjangoFilterBackend] + filterset_class = FacilityFlagFilter + + @action(detail=False, methods=["get"], url_path="available-flags") + def list_available_flags(self, request): + """ + List all available flags for FacilityFlag. + """ + try: + flags = FlagRegistry.get_all_flags(FlagType.FACILITY) + return Response({"available_flags": list(flags)}) + except Exception as e: + return Response( + {"error": "Failed to fetch available flags", "detail": str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) diff --git a/care/facility/tests/test_facility_flags_api.py b/care/facility/tests/test_facility_flags_api.py new file mode 100644 index 0000000000..a0973468d3 --- /dev/null +++ b/care/facility/tests/test_facility_flags_api.py @@ -0,0 +1,165 @@ +from rest_framework import status +from rest_framework.test import APITestCase + +from care.facility.models import FacilityFlag +from care.utils.registries.feature_flag import FlagRegistry, FlagType +from care.utils.tests.test_utils import TestUtils + + +class FacilityFlagsViewSetTestCase(TestUtils, APITestCase): + @classmethod + def setUpTestData(cls): + FlagRegistry.register(FlagType.FACILITY, "TEST_FLAG") + FlagRegistry.register(FlagType.FACILITY, "TEST_FLAG_2") + + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("su", cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + cls.facility2 = cls.create_facility( + cls.super_user, cls.district, cls.local_body + ) + cls.user = cls.create_user("staff", cls.district, home_facility=cls.facility) + cls.facility_flag_2 = cls.create_facility_flag("TEST_FLAG_2", cls.facility) + + def setUp(self): + self.facility_flag_1 = self.create_facility_flag("TEST_FLAG", self.facility) + + def get_url(self, facility_flag_id=None, action=None): + base_url = "/api/v1/facility_flags/" + if facility_flag_id is not None: + base_url += f"{facility_flag_id}/" + if action is not None: + base_url += f"{action}/" + return base_url + + def test_access_with_non_super_user(self): + self.client.force_authenticate(user=self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_access_with_super_user(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_list_facility_flags(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["count"], 2) + + def test_list_available_flags(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url(action="available-flags")) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(sorted(data["available_flags"]), ["TEST_FLAG", "TEST_FLAG_2"]) + + def test_create_facility_flag(self): + self.client.force_authenticate(user=self.super_user) + + response = self.client.post(self.get_url(), {}) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("This field is required.", response.json()["flag"]) + self.assertIn("This field is required.", response.json()["facility"]) + + # Attempting to create a duplicate flag + response = self.client.post( + self.get_url(), {"flag": "TEST_FLAG", "facility": self.facility.external_id} + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn( + "The fields facility, flag must make a unique set.", + response.json()["non_field_errors"], + ) + + # Creating a new facility flag + response = self.client.post( + self.get_url(), + {"flag": "TEST_FLAG", "facility": self.facility2.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + def test_retrieve_facility_flag(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url(self.facility_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["flag"], "TEST_FLAG") + self.assertEqual(data["facility"], str(self.facility.external_id)) + + def test_update_facility_flag(self): + self.client.force_authenticate(user=self.super_user) + + # Confirming original values + response = self.client.get(self.get_url(self.facility_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["flag"], "TEST_FLAG") + self.assertEqual(data["facility"], str(self.facility.external_id)) + + # Update the facility flag + response = self.client.put( + self.get_url(self.facility_flag_1.external_id), + {"flag": "TEST_FLAG", "facility": self.facility2.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.facility_flag_1.refresh_from_db() + self.assertEqual(self.facility_flag_1.flag, "TEST_FLAG") + self.assertEqual( + self.facility_flag_1.facility.external_id, self.facility2.external_id + ) + + def test_patch_facility_flag(self): + self.client.force_authenticate(user=self.super_user) + + # Confirming original values + response = self.client.get(self.get_url(self.facility_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["flag"], "TEST_FLAG") + self.assertEqual(data["facility"], str(self.facility.external_id)) + + # Patch the facility flag + response = self.client.patch( + self.get_url(self.facility_flag_1.external_id), + {"facility": self.facility2.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.facility_flag_1.refresh_from_db() + self.assertEqual(self.facility_flag_1.flag, "TEST_FLAG") + self.assertEqual( + self.facility_flag_1.facility.external_id, self.facility2.external_id + ) + + def test_delete_facility_flag(self): + self.client.force_authenticate(user=self.super_user) + + # Confirm if the object exist + self.assertTrue( + FacilityFlag.objects.filter( + external_id=self.facility_flag_1.external_id + ).exists() + ) + response = self.client.delete(self.get_url(self.facility_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + self.facility_flag_1.refresh_from_db() + self.assertFalse( + FacilityFlag.objects.filter( + external_id=self.facility_flag_1.external_id + ).exists() + ) + + def test_creating_facility_flag_with_non_existing_flag(self): + self.client.force_authenticate(user=self.super_user) + + response = self.client.post( + self.get_url(), + {"flag": "TEST_FLAG_NON_EXISTING", "facility": self.facility2.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.json()["detail"], "Flag not registered") diff --git a/care/users/api/serializers/user_flag.py b/care/users/api/serializers/user_flag.py new file mode 100644 index 0000000000..6510c5c6a0 --- /dev/null +++ b/care/users/api/serializers/user_flag.py @@ -0,0 +1,13 @@ +from rest_framework import serializers + +from care.users.models import User, UserFlag +from care.utils.serializers.fields import ExternalIdSerializerField + + +class UserFlagSerializer(serializers.ModelSerializer): + id = serializers.UUIDField(source="external_id", read_only=True) + user = ExternalIdSerializerField(queryset=User.objects.all(), required=True) + + class Meta: + model = UserFlag + exclude = ["external_id", "deleted", "modified_date", "created_date"] diff --git a/care/users/api/viewsets/user_flag.py b/care/users/api/viewsets/user_flag.py new file mode 100644 index 0000000000..8c6d431316 --- /dev/null +++ b/care/users/api/viewsets/user_flag.py @@ -0,0 +1,44 @@ +from django_filters import rest_framework as filters +from rest_framework import status, viewsets +from rest_framework.decorators import action +from rest_framework.response import Response + +from care.users.api.serializers.user_flag import UserFlagSerializer +from care.users.models import UserFlag +from care.utils.custom_permissions import IsSuperUser +from care.utils.registries.feature_flag import FlagRegistry, FlagType + + +class UserFlagFilter(filters.FilterSet): + flag = filters.CharFilter(field_name="flag", lookup_expr="exact") + user = filters.UUIDFilter(field_name="user__external_id") + + +class UserFlagViewSet(viewsets.ModelViewSet): + """ + CRUD operations for UserFlag model. + + This viewset is restricted to superusers only and provides endpoints to manage user flags. + """ + + queryset = UserFlag.objects.all() + serializer_class = UserFlagSerializer + permission_classes = [IsSuperUser] + lookup_field = "external_id" + + filter_backends = [filters.DjangoFilterBackend] + filterset_class = UserFlagFilter + + @action(detail=False, methods=["get"], url_path="available-flags") + def list_available_flags(self, request): + """ + List all available flags for FacilityFlag. + """ + try: + flags = FlagRegistry.get_all_flags(FlagType.USER) + return Response({"available_flags": list(flags)}) + except Exception as e: + return Response( + {"error": "Failed to fetch available flags", "detail": str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) diff --git a/care/users/tests/test_user_flags_api.py b/care/users/tests/test_user_flags_api.py new file mode 100644 index 0000000000..f965f0af83 --- /dev/null +++ b/care/users/tests/test_user_flags_api.py @@ -0,0 +1,155 @@ +from rest_framework import status +from rest_framework.test import APITestCase + +from care.users.models import UserFlag +from care.utils.registries.feature_flag import FlagRegistry, FlagType +from care.utils.tests.test_utils import TestUtils + + +class UserFlagsViewSetTestCase(TestUtils, APITestCase): + @classmethod + def setUpTestData(cls): + FlagRegistry.register(FlagType.USER, "TEST_FLAG") + FlagRegistry.register(FlagType.USER, "TEST_FLAG_2") + + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("su", cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + cls.user = cls.create_user("staff", cls.district, home_facility=cls.facility) + cls.user_2 = cls.create_user("user2", cls.district, home_facility=cls.facility) + cls.user_flag_2 = cls.create_user_flag("TEST_FLAG_2", cls.user_2) + + def setUp(self): + self.user_flag_1 = self.create_user_flag("TEST_FLAG", self.user) + + def get_url(self, user_flag_id=None, action=None): + base_url = "/api/v1/user_flags/" + if user_flag_id is not None: + base_url += f"{user_flag_id}/" + if action is not None: + base_url += f"{action}/" + return base_url + + def test_access_with_non_super_user(self): + self.client.force_authenticate(user=self.user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_access_with_super_user(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_list_user_flags(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url()) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["count"], 2) + + def test_list_available_flags(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url(action="available-flags")) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(sorted(data["available_flags"]), ["TEST_FLAG", "TEST_FLAG_2"]) + + def test_create_user_flag(self): + self.client.force_authenticate(user=self.super_user) + + response = self.client.post(self.get_url(), {}) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("This field is required.", response.json()["flag"]) + self.assertIn("This field is required.", response.json()["user"]) + + # Attempting to create a duplicate flag + response = self.client.post( + self.get_url(), {"flag": "TEST_FLAG", "user": self.user.external_id} + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn( + "The fields user, flag must make a unique set.", + response.json()["non_field_errors"], + ) + + # Creating a new user flag + response = self.client.post( + self.get_url(), {"flag": "TEST_FLAG", "user": self.user_2.external_id} + ) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + def test_retrieve_user_flag(self): + self.client.force_authenticate(user=self.super_user) + response = self.client.get(self.get_url(self.user_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["flag"], "TEST_FLAG") + self.assertEqual(data["user"], str(self.user.external_id)) + + def test_update_user_flag(self): + self.client.force_authenticate(user=self.super_user) + + # Confirm original values + response = self.client.get(self.get_url(self.user_flag_1.external_id)) + data = response.json() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(data["flag"], "TEST_FLAG") + self.assertEqual(data["user"], str(self.user.external_id)) + + # Update the user flag + response = self.client.put( + self.get_url(self.user_flag_1.external_id), + {"flag": "TEST_FLAG", "user": self.user_2.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.user_flag_1.refresh_from_db() + self.assertEqual(self.user_flag_1.flag, "TEST_FLAG") + self.assertEqual(self.user_flag_1.user.external_id, self.user_2.external_id) + + def test_patch_user_flag(self): + self.client.force_authenticate(user=self.super_user) + + # Confirm original values + response = self.client.get(self.get_url(self.user_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.json() + self.assertEqual(data["flag"], "TEST_FLAG") + self.assertEqual(data["user"], str(self.user.external_id)) + + # Patch the user flag + response = self.client.patch( + self.get_url(self.user_flag_1.external_id), + {"user": self.user_2.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.user_flag_1.refresh_from_db() + self.assertEqual(self.user_flag_1.flag, "TEST_FLAG") + self.assertEqual(self.user_flag_1.user.external_id, self.user_2.external_id) + + def test_delete_user_flag(self): + self.client.force_authenticate(user=self.super_user) + + # Confirm if the object exist + self.assertTrue( + UserFlag.objects.filter(external_id=self.user_flag_1.external_id).exists() + ) + + response = self.client.delete(self.get_url(self.user_flag_1.external_id)) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + self.user_flag_1.refresh_from_db() + self.assertFalse( + UserFlag.objects.filter(external_id=self.user_flag_1.external_id).exists() + ) + + def test_creating_user_flag_with_non_existing_flag(self): + self.client.force_authenticate(user=self.super_user) + + response = self.client.post( + self.get_url(), + {"flag": "TEST_FLAG_NON_EXISTING", "user": self.user.external_id}, + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.json()["detail"], "Flag not registered") diff --git a/care/utils/custom_permissions.py b/care/utils/custom_permissions.py new file mode 100644 index 0000000000..42841e4864 --- /dev/null +++ b/care/utils/custom_permissions.py @@ -0,0 +1,11 @@ +from rest_framework.permissions import BasePermission + + +class IsSuperUser(BasePermission): + def has_permission(self, request, view): + return bool( + request.user + and request.user.is_authenticated + and request.user.is_active + and request.user.is_superuser + ) diff --git a/care/utils/tests/test_utils.py b/care/utils/tests/test_utils.py index 99faed645b..ed1804fdb6 100644 --- a/care/utils/tests/test_utils.py +++ b/care/utils/tests/test_utils.py @@ -22,6 +22,7 @@ DiseaseStatusEnum, EncounterSymptom, Facility, + FacilityFlag, InvestigationSession, InvestigationValue, LocalBody, @@ -51,7 +52,7 @@ PatientCodeStatusType, PatientConsent, ) -from care.users.models import District, State +from care.users.models import District, State, UserFlag fake = Faker() @@ -728,6 +729,26 @@ def create_assetbed(cls, bed: Bed, asset: Asset, **kwargs) -> AssetBed: data.update(kwargs) return AssetBed.objects.create(**data) + @classmethod + def create_facility_flag( + cls, flag: str, facility: Facility, **kwargs + ) -> FacilityFlag: + data = { + "facility": facility, + "flag": flag, + } + data.update(**kwargs) + return FacilityFlag.objects.create(**data) + + @classmethod + def create_user_flag(cls, flag: str, user: User, **kwargs) -> UserFlag: + data = { + "user": user, + "flag": flag, + } + data.update(**kwargs) + return UserFlag.objects.create(**data) + def get_list_representation(self, obj) -> dict: """ Returns the dict representation of the obj in list API diff --git a/config/api_router.py b/config/api_router.py index 94b18f61de..0a821fd1f6 100644 --- a/config/api_router.py +++ b/config/api_router.py @@ -40,6 +40,7 @@ FacilityViewSet, ) from care.facility.api.viewsets.facility_capacity import FacilityCapacityViewSet +from care.facility.api.viewsets.facility_flag import FacilityFlagViewSet from care.facility.api.viewsets.facility_users import FacilityUserViewSet from care.facility.api.viewsets.file_upload import FileUploadViewSet from care.facility.api.viewsets.hospital_doctor import HospitalDoctorViewSet @@ -101,6 +102,7 @@ ) from care.users.api.viewsets.plug_config import PlugConfigViewset from care.users.api.viewsets.skill import SkillViewSet +from care.users.api.viewsets.user_flag import UserFlagViewSet from care.users.api.viewsets.users import UserViewSet from care.users.api.viewsets.userskill import UserSkillViewSet @@ -315,6 +317,9 @@ router.register("medibase", MedibaseViewSet, basename="medibase") +router.register(r"facility_flags", FacilityFlagViewSet, basename="facility-flags") +router.register(r"user_flags", UserFlagViewSet, basename="user-flags") + # Public endpoints router.register("public/asset", AssetPublicViewSet, basename="public-asset") router.register("public/asset_qr", AssetPublicQRViewSet, basename="public-asset-qr")