diff --git a/care/facility/models/asset.py b/care/facility/models/asset.py index bf2ab5e317..06637d24cd 100644 --- a/care/facility/models/asset.py +++ b/care/facility/models/asset.py @@ -2,6 +2,7 @@ import uuid from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from django.db import models from django.db.models import JSONField, Q @@ -54,6 +55,15 @@ class RoomType(enum.Enum): null=True, blank=True, default=None, max_length=200 ) + def delete(self, *args): + if UserDefaultAssetLocation.objects.filter(location=self).exists(): + error = f"Cannot delete AssetLocation {self} because they are referenced as `location` in UserDefaultAssetLocation records." + raise ValidationError(error) + if FacilityDefaultAssetLocation.objects.filter(location=self).exists(): + error = f"Cannot delete AssetLocation {self} because they are referenced as `location` in FacilityDefaultAssetLocation records." + raise ValidationError(error) + return super().delete(*args) + class AssetType(enum.Enum): INTERNAL = 50 diff --git a/care/facility/models/bed.py b/care/facility/models/bed.py index 992f36ac74..cd22debb6c 100644 --- a/care/facility/models/bed.py +++ b/care/facility/models/bed.py @@ -59,6 +59,10 @@ def save(self, *args, **kwargs) -> None: return super().save(*args, **kwargs) def delete(self, *args, **kwargs) -> None: + if ConsultationBed.objects.filter(bed=self).exists(): + error = f"Cannot delete Bed {self} because they are referenced as `bed` in ConsultationBed records." + raise ValidationError(error) + AssetBed.objects.filter(bed=self).update(deleted=True) super().delete(*args, **kwargs) diff --git a/care/facility/models/facility.py b/care/facility/models/facility.py index 01c0102b10..80dc95a140 100644 --- a/care/facility/models/facility.py +++ b/care/facility/models/facility.py @@ -1,6 +1,7 @@ from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.postgres.fields import ArrayField +from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator from django.db import models, transaction from django.db.models import IntegerChoices @@ -302,14 +303,26 @@ def save(self, *args, **kwargs) -> None: @transaction.atomic def delete(self, *args): - from care.facility.models.asset import Asset, AssetLocation + from care.facility.models.asset import ( + Asset, + AssetLocation, + FacilityDefaultAssetLocation, + ) + from care.facility.models.patient_sample import PatientSample + if FacilityDefaultAssetLocation.objects.filter(facility=self).exists(): + error = f"Cannot delete Facility {self} because they are referenced as `facility` in FacilityDefaultAssetLocation records." + raise ValidationError(error) AssetLocation.objects.filter(facility_id=self.id).update(deleted=True) Asset.objects.filter( current_location_id__in=AssetLocation._base_manager.filter( # noqa: SLF001 facility_id=self.id ).values_list("id", flat=True) ).update(deleted=True) + FacilityUser.objects.filter(facility=self).delete() + PatientSample.objects.filter(testing_facility=self).update( + testing_facility=None + ) return super().delete(*args) @property diff --git a/care/facility/models/patient.py b/care/facility/models/patient.py index 4f8d54c971..049d1586e9 100644 --- a/care/facility/models/patient.py +++ b/care/facility/models/patient.py @@ -3,6 +3,7 @@ from dateutil.relativedelta import relativedelta from django.contrib.postgres.aggregates import ArrayAgg +from django.core.exceptions import ValidationError from django.core.validators import MaxValueValidator, MinValueValidator from django.db import models from django.db.models import Case, F, Func, JSONField, Value, When @@ -474,6 +475,14 @@ def save(self, *args, **kwargs) -> None: self._alias_recovery_to_recovered() super().save(*args, **kwargs) + def delete(self, *args): + from care.facility.models.patient_sample import PatientSample + + if PatientSample.objects.filter(patient=self).exists(): + error = f"Cannot delete PatientRegistration {self} because they are referenced as `patient` in PatientSample records." + raise ValidationError(error) + return super().delete(*args) + def get_age(self) -> str: start = self.date_of_birth or date(self.year_of_birth, 1, 1) end = (self.death_datetime or timezone.now()).date() diff --git a/care/facility/models/patient_consultation.py b/care/facility/models/patient_consultation.py index f21d4f6a4d..5d915843d7 100644 --- a/care/facility/models/patient_consultation.py +++ b/care/facility/models/patient_consultation.py @@ -1,4 +1,5 @@ from django.contrib.postgres.fields import ArrayField +from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator from django.db import models from django.db.models import JSONField @@ -264,6 +265,25 @@ def save(self, *args, **kwargs): self.patient.save(update_fields=["death_datetime"]) super().save(*args, **kwargs) + def delete(self, *args): + from care.facility.models.bed import ConsultationBed + from care.facility.models.patient_investigation import InvestigationValue + from care.facility.models.patient_sample import PatientSample + + if InvestigationValue.objects.filter(consultation=self).exists(): + error = f"Cannot delete PatientConsultation {self.external_id} because they are referenced as `consultation` in InvestigationValue records." + raise ValidationError(error) + + if PatientSample.objects.filter(consultation=self).exists(): + error = f"Cannot delete PatientConsultation {self} because they are referenced as `consultation` in PatientSample records." + raise ValidationError(error) + + if ConsultationBed.objects.filter(consultation=self).exists(): + error = f"Cannot delete PatientConsultation {self} because they are referenced as `consultation` in ConsultationBed records." + raise ValidationError(error) + + return super().delete(*args) + class Meta: constraints = [ models.CheckConstraint( diff --git a/care/facility/models/patient_investigation.py b/care/facility/models/patient_investigation.py index 00a9e4c202..749933d970 100644 --- a/care/facility/models/patient_investigation.py +++ b/care/facility/models/patient_investigation.py @@ -1,5 +1,6 @@ from uuid import uuid4 +from django.core.exceptions import ValidationError from django.db import models from care.facility.models.patient_consultation import PatientConsultation @@ -15,6 +16,12 @@ class PatientInvestigationGroup(BaseModel): def __str__(self) -> str: return self.name + def delete(self, *args): + if InvestigationValue.objects.filter(group=self).exists(): + error = f"Cannot delete PatientInvestigationGroup {self.name} because they are referenced as `group` in InvestigationValue records." + raise ValidationError(error) + super().delete(*args) + class PatientInvestigation(BaseModel): name = models.CharField(max_length=500, blank=False, null=False, unique=True) @@ -32,6 +39,13 @@ def __str__(self) -> str: unit_part = f" in {self.unit}" if self.unit else "" return f"{self.name}{unit_part} as {self.investigation_type}" + def delete(self, *args): + if InvestigationValue.objects.filter(investigation=self).exists(): + error = f"Cannot delete PatientInvestigation {self} because they are referenced as `investigation` in InvestigationValue records." + + raise ValidationError(error) + return super().delete(*args) + class InvestigationSession(BaseModel): external_id = models.UUIDField( @@ -41,6 +55,12 @@ class InvestigationSession(BaseModel): User, null=False, blank=False, on_delete=models.PROTECT ) + def delete(self, *args): + if InvestigationValue.objects.filter(session=self).exists(): + error = f"Cannot delete InvestigationSession {self.external_id} because they are referenced as `session` in InvestigationValue records." + raise ValidationError(error) + return super().delete(*args) + class Meta: indexes = [ models.Index( diff --git a/care/facility/tests/test_m2m_soft_delete_for_related_fields.py b/care/facility/tests/test_m2m_soft_delete_for_related_fields.py new file mode 100644 index 0000000000..e86567b218 --- /dev/null +++ b/care/facility/tests/test_m2m_soft_delete_for_related_fields.py @@ -0,0 +1,836 @@ +from django.core.exceptions import ValidationError +from django.test import TestCase + +from care.facility.models import ( + AssetLocation, + Bed, + ConsultationBed, + Facility, + FacilityDefaultAssetLocation, + FacilityUser, + InvestigationSession, + InvestigationValue, + PatientConsultation, + PatientInvestigation, + PatientInvestigationGroup, + PatientRegistration, + PatientSample, + UserDefaultAssetLocation, +) +from care.users.models import User +from care.utils.tests.test_utils import TestUtils + + +class TestFacilityUserDeletion(TestUtils, TestCase): + @classmethod + def setUpTestData(cls): + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("su", cls.district) + + def setUp(self): + self.facility = self.create_facility( + self.super_user, self.district, self.local_body + ) + + self.user1 = self.create_user( + "user1", + district=self.district, + local_body=self.local_body, + ) + self.user2 = self.create_user( + "user2", + district=self.district, + local_body=self.local_body, + ) + self.facility_user = self.create_facility_user( + self.facility, self.user1, self.user2 + ) + + def test_facility_user_delete_when_related_facility_is_deleted(self): + self.assertTrue(FacilityUser.objects.filter(facility=self.facility).exists()) + + self.facility.delete() + self.assertFalse(FacilityUser.objects.filter(facility=self.facility).exists()) + + def test_facility_user_delete_when_related_user_is_deleted(self): + self.assertTrue(FacilityUser.objects.filter(user=self.user1).exists()) + + self.user1.delete() + + self.assertFalse( + User.objects.filter(external_id=self.user1.external_id).exists() + ) + + def test_facility_user_delete_when_related_created_by_is_deleted_for_existing_facility_user( + self, + ): + # case 1 when facility user exist for create_by user + self.assertTrue(FacilityUser.objects.filter(created_by=self.user2).exists()) + + with self.assertRaises(ValidationError) as context: + self.user2.delete() + + self.assertIn( + f"Cannot delete User {self.user2} because they are referenced as `created_by` in FacilityUser records.", + context.exception, + ) + + self.assertTrue( + User.objects.filter(external_id=self.user2.external_id).exists() + ) + + def test_facility_user_delete_when_related_created_by_is_deleted(self): + # case 2 when facility user doesn't exist for create_by user + + self.assertTrue(FacilityUser.objects.filter(created_by=self.user2).exists()) + FacilityUser.objects.filter(created_by=self.user2).delete() + + self.user2.delete() + + self.assertFalse( + User.objects.filter(external_id=self.user2.external_id).exists() + ) + + +class TestInvestigationValueDeletion(TestUtils, TestCase): + @classmethod + def setUpTestData(cls): + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("superuser", cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + + def setUp(self): + # Create base objects for use in tests + self.patient = self.create_patient( + self.district, self.facility, local_body=self.local_body + ) + self.consultation = self.create_consultation(self.patient, self.facility) + self.investigation_group = self.create_patient_investigation_group() + self.investigation_session = self.create_patient_investigation_session( + self.super_user + ) + self.investigation = self.create_patient_investigation(self.investigation_group) + + def test_delete_patient_investigation_with_investigation_value(self): + # Create an InvestigationValue referencing the PatientInvestigation + InvestigationValue.objects.create( + investigation=self.investigation, + group=self.investigation_group, + consultation=self.consultation, + session=self.investigation_session, + ) + + # Attempt to delete the investigation + with self.assertRaises(ValidationError) as context: + self.investigation.delete() + + self.assertIn( + f"Cannot delete PatientInvestigation {self.investigation!s} because they are referenced as `investigation` in InvestigationValue records.", + context.exception, + ) + + # Ensure the investigation still exists + self.assertTrue( + PatientInvestigation.objects.filter(pk=self.investigation.pk).exists() + ) + + def test_delete_patient_investigation_without_investigation_value(self): + # Ensure no InvestigationValue exists + InvestigationValue.objects.filter(investigation=self.investigation).update( + deleted=True + ) + + # Delete the investigation + self.investigation.delete() + + # Ensure the investigation is deleted + self.assertFalse( + PatientInvestigation.objects.filter(pk=self.investigation.pk).exists() + ) + + def test_delete_investigation_session_with_investigation_value(self): + # Create an InvestigationValue referencing the InvestigationSession + InvestigationValue.objects.create( + investigation=self.investigation, + group=self.investigation_group, + consultation=self.consultation, + session=self.investigation_session, + ) + + # Attempt to delete the session + with self.assertRaises(ValidationError) as context: + self.investigation_session.delete() + + self.assertIn( + f"Cannot delete InvestigationSession {self.investigation_session.external_id} because they are referenced as `session` in InvestigationValue records.", + str(context.exception), + ) + + # Ensure the session still exists + self.assertTrue( + InvestigationSession.objects.filter( + pk=self.investigation_session.pk + ).exists() + ) + + def test_delete_investigation_session_without_investigation_value(self): + # Ensure no InvestigationValue exists + InvestigationValue.objects.filter(session=self.investigation_session).update( + deleted=True + ) + + # Delete the session + self.investigation_session.delete() + + # Ensure the session is deleted + self.assertFalse( + InvestigationSession.objects.filter( + pk=self.investigation_session.pk + ).exists() + ) + + def test_delete_patient_investigation_group_with_investigation_value(self): + # Create an InvestigationValue referencing the PatientInvestigationGroup + InvestigationValue.objects.create( + investigation=self.investigation, + group=self.investigation_group, + consultation=self.consultation, + session=self.investigation_session, + ) + + # Attempt to delete the group + with self.assertRaises(ValidationError) as context: + self.investigation_group.delete() + + self.assertIn( + f"Cannot delete PatientInvestigationGroup {self.investigation_group.name} because they are referenced as `group` in InvestigationValue records.", + str(context.exception), + ) + + # Ensure the group still exists + self.assertTrue( + PatientInvestigationGroup.objects.filter( + pk=self.investigation_group.pk + ).exists() + ) + + def test_delete_patient_investigation_group_without_investigation_value(self): + # Ensure no InvestigationValue exists + InvestigationValue.objects.filter(group=self.investigation_group).update( + deleted=True + ) + + # Delete the group + self.investigation_group.delete() + + # Ensure the group is deleted + self.assertFalse( + PatientInvestigationGroup.objects.filter( + pk=self.investigation_group.pk + ).exists() + ) + + def test_delete_patient_consultation_with_investigation_value(self): + # Create an InvestigationValue referencing the PatientConsultation + InvestigationValue.objects.create( + investigation=self.investigation, + group=self.investigation_group, + consultation=self.consultation, + session=self.investigation_session, + ) + + # Attempt to delete the consultation + with self.assertRaises(ValidationError) as context: + self.consultation.delete() + + self.assertIn( + f"Cannot delete PatientConsultation {self.consultation.external_id} because they are referenced as `consultation` in InvestigationValue records.", + str(context.exception), + ) + + # Ensure the consultation still exists + self.assertTrue( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + + def test_delete_patient_consultation_without_investigation_value(self): + # Ensure no InvestigationValue exists + InvestigationValue.objects.filter(consultation=self.consultation).update( + deleted=True + ) + + # Delete the consultation + self.consultation.delete() + + # Ensure the consultation is deleted + self.assertFalse( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + + +class TestPatientSampleDeletion(TestUtils, TestCase): + @classmethod + def setUpTestData(cls): + # Create necessary related objects + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("superuser", cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + + def setUp(self): + # Create patient and consultation for the tests + self.patient = self.create_patient(self.district, self.facility) + self.consultation = self.create_consultation(self.patient, self.facility) + + # Create a user to act as the creator + self.user = self.create_user("test_user", district=self.district) + + # Create a sample linked to the patient and consultation + self.sample = self.create_patient_sample( + self.patient, self.consultation, self.facility, self.user + ) + + def test_delete_patient_registration_with_related_sample(self): + # Ensure the sample is linked to the patient + self.assertTrue(PatientSample.objects.filter(patient=self.patient).exists()) + + # Attempt to delete the patient + with self.assertRaises(ValidationError) as context: + self.patient.delete() + + self.assertIn( + f"Cannot delete PatientRegistration {self.patient} because they are referenced as `patient` in PatientSample records.", + str(context.exception), + ) + + # Ensure the patient and sample still exist + self.assertTrue(PatientRegistration.objects.filter(pk=self.patient.pk).exists()) + self.assertTrue(PatientSample.objects.filter(pk=self.sample.pk).exists()) + + def test_delete_patient_registration_without_sample(self): + # Delete the sample first + self.sample.delete() + + # Ensure the sample no longer exists + self.assertFalse(PatientSample.objects.filter(patient=self.patient).exists()) + + # Now delete the patient + self.patient.delete() + + # Ensure the patient is deleted + self.assertFalse( + PatientRegistration.objects.filter(pk=self.patient.pk).exists() + ) + + def test_delete_patient_consultation_with_related_sample(self): + # Ensure the sample is linked to the consultation + self.assertTrue( + PatientSample.objects.filter(consultation=self.consultation).exists() + ) + + # Attempt to delete the consultation + with self.assertRaises(ValidationError) as context: + self.consultation.delete() + + self.assertIn( + f"Cannot delete PatientConsultation {self.consultation} because they are referenced as `consultation` in PatientSample records.", + str(context.exception), + ) + + # Ensure the consultation and sample still exist + self.assertTrue( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + self.assertTrue(PatientSample.objects.filter(pk=self.sample.pk).exists()) + + def test_delete_patient_consultation_without_sample(self): + # Delete the sample first + self.sample.delete() + + # Ensure the sample no longer exists + self.assertFalse( + PatientSample.objects.filter(consultation=self.consultation).exists() + ) + + # Now delete the consultation + self.consultation.delete() + + # Ensure the consultation is deleted + self.assertFalse( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + + def test_delete_patient_sample(self): + # Ensure the sample exists + self.assertTrue(PatientSample.objects.filter(pk=self.sample.pk).exists()) + + # Delete the sample + self.sample.delete() + + # Ensure the sample is deleted + self.assertFalse(PatientSample.objects.filter(pk=self.sample.pk).exists()) + + def test_delete_created_by_user_sets_null_in_patient_sample(self): + # Ensure the created_by user is linked to the sample + self.assertEqual(self.sample.created_by, self.user) + + # Delete the user + self.user.delete() + + # Reload the sample and ensure `created_by` is now NULL + self.sample.refresh_from_db() + self.assertIsNone(self.sample.created_by) + + def test_delete_last_edited_by_user_sets_null_in_patient_sample(self): + # Ensure the last_edited_by user is linked to the sample + self.assertEqual(self.sample.last_edited_by, self.user) + + # Delete the user + self.user.delete() + + # Reload the sample and ensure `last_edited_by` is now NULL + self.sample.refresh_from_db() + self.assertIsNone(self.sample.last_edited_by) + + def test_delete_created_by_and_last_edited_by_users_sets_null_in_patient_sample( + self, + ): + # Ensure the created_by and last_edited_by users are linked to the sample + self.assertEqual(self.sample.created_by, self.user) + self.assertEqual(self.sample.last_edited_by, self.user) + + # Delete the user + self.user.delete() + + # Reload the sample and ensure both fields are now NULL + self.sample.refresh_from_db() + self.assertIsNone(self.sample.created_by) + self.assertIsNone(self.sample.last_edited_by) + + def test_delete_user_sets_null_in_multiple_samples(self): + # Create another sample by the same user + sample2 = PatientSample.objects.create( + patient=self.patient, + consultation=self.consultation, + sample_type=0, + created_by=self.user, + last_edited_by=self.user, + ) + + # Ensure the user is linked to both samples + self.assertEqual(self.sample.created_by, self.user) + self.assertEqual(self.sample.last_edited_by, self.user) + self.assertEqual(sample2.created_by, self.user) + self.assertEqual(sample2.last_edited_by, self.user) + + # Delete the user + self.user.delete() + + # Reload both samples and ensure both fields are now NULL + self.sample.refresh_from_db() + sample2.refresh_from_db() + self.assertIsNone(self.sample.created_by) + self.assertIsNone(self.sample.last_edited_by) + self.assertIsNone(sample2.created_by) + self.assertIsNone(sample2.last_edited_by) + + +class TestUserDefaultAssetLocation(TestUtils, TestCase): + @classmethod + def setUpTestData(cls): + # Create necessary data + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.super_user = cls.create_super_user("superuser", cls.district) + cls.local_body = cls.create_local_body(cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + + def setUp(self): + # Create a user and a UserDefaultAssetLocation + self.user = self.create_user("test_user", district=self.district) + self.location = self.create_asset_location(self.facility) + self.user_default_location = self.create_user_default_asset_location( + user=self.user, location=self.location + ) + + def test_delete_user_with_related_user_default_asset_location(self): + # Ensure the user is linked to UserDefaultAssetLocation + self.assertTrue( + UserDefaultAssetLocation.objects.filter(user=self.user).exists() + ) + + # Attempt to delete the user + with self.assertRaises(ValidationError) as context: + self.user.delete() + + # Assert that the exception is raised + self.assertIn( + f"Cannot delete User {self.user} because they are referenced as `user` in UserDefaultAssetLocation records.", + str(context.exception), + ) + + # Ensure the user and UserDefaultAssetLocation still exist + self.assertTrue(User.objects.filter(pk=self.user.pk).exists()) + self.assertTrue( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + def test_delete_location_with_related_user_default_asset_location(self): + # Ensure the location is linked to UserDefaultAssetLocation + self.assertTrue( + UserDefaultAssetLocation.objects.filter(location=self.location).exists() + ) + + # Attempt to delete the location + with self.assertRaises(ValidationError) as context: + self.location.delete() + + # Assert that the correct exception is raised + self.assertIn( + f"Cannot delete AssetLocation {self.location} because they are referenced as `location` in UserDefaultAssetLocation records.", + str(context.exception), + ) + + # Ensure the location and UserDefaultAssetLocation still exist + self.assertTrue(AssetLocation.objects.filter(pk=self.location.pk).exists()) + self.assertTrue( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + def test_delete_user_default_asset_location(self): + # Ensure the UserDefaultAssetLocation exists + self.assertTrue( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + # Delete the UserDefaultAssetLocation + self.user_default_location.delete() + + # Ensure it is deleted and other related objects are unaffected + self.assertFalse( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + self.assertTrue(User.objects.filter(pk=self.user.pk).exists()) + self.assertTrue(AssetLocation.objects.filter(pk=self.location.pk).exists()) + + def test_delete_user_default_asset_location_then_user(self): + # Ensure the UserDefaultAssetLocation exists + self.assertTrue( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + # Delete the UserDefaultAssetLocation + self.user_default_location.delete() + + # Ensure it is deleted + self.assertFalse( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + # Attempt to delete the User + self.user.delete() + + # Ensure the User is deleted + self.assertFalse(User.objects.filter(pk=self.user.pk).exists()) + + def test_delete_user_default_asset_location_then_location(self): + # Ensure the UserDefaultAssetLocation exists + self.assertTrue( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + # Delete the UserDefaultAssetLocation + self.user_default_location.delete() + + # Ensure it is deleted + self.assertFalse( + UserDefaultAssetLocation.objects.filter( + pk=self.user_default_location.pk + ).exists() + ) + + # Attempt to delete the Location + self.location.delete() + + # Ensure the Location is deleted + self.assertFalse(AssetLocation.objects.filter(pk=self.location.pk).exists()) + + +class TestFacilityDefaultAssetLocation(TestUtils, TestCase): + @classmethod + def setUpTestData(cls): + # Create necessary data + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("superuser", cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + + def setUp(self): + # Create a location and a FacilityDefaultAssetLocation + self.location = self.create_asset_location(self.facility) + self.facility_default_location = self.create_facility_default_asset_location( + facility=self.facility, location=self.location + ) + + def test_delete_facility_with_related_facility_default_asset_location(self): + # Ensure the facility is linked to FacilityDefaultAssetLocation + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter(facility=self.facility).exists() + ) + + # Attempt to delete the facility + with self.assertRaises(ValidationError) as context: + self.facility.delete() + + # Assert that the correct exception is raised + self.assertIn( + f"Cannot delete Facility {self.facility} because they are referenced as `facility` in FacilityDefaultAssetLocation records.", + str(context.exception), + ) + + # Ensure the facility and FacilityDefaultAssetLocation still exist + self.assertTrue(Facility.objects.filter(pk=self.facility.pk).exists()) + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + def test_delete_location_with_related_facility_default_asset_location(self): + # Ensure the location is linked to FacilityDefaultAssetLocation + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter(location=self.location).exists() + ) + + # Attempt to delete the location + with self.assertRaises(ValidationError) as context: + self.location.delete() + + # Assert that the correct exception is raised + self.assertIn( + f"Cannot delete AssetLocation {self.location} because they are referenced as `location` in FacilityDefaultAssetLocation records.", + str(context.exception), + ) + + # Ensure the location and FacilityDefaultAssetLocation still exist + self.assertTrue(AssetLocation.objects.filter(pk=self.location.pk).exists()) + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + def test_delete_facility_default_asset_location(self): + # Ensure the FacilityDefaultAssetLocation exists + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + # Delete the FacilityDefaultAssetLocation + self.facility_default_location.delete() + + # Ensure it is deleted and other related objects are unaffected + self.assertFalse( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + self.assertTrue(Facility.objects.filter(pk=self.facility.pk).exists()) + self.assertTrue(AssetLocation.objects.filter(pk=self.location.pk).exists()) + + def test_delete_facility_default_asset_location_then_facility(self): + # Ensure the FacilityDefaultAssetLocation exists + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + # Delete the FacilityDefaultAssetLocation + self.facility_default_location.delete() + + # Ensure it is deleted + self.assertFalse( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + # Attempt to delete the Facility + self.facility.delete() + + # Ensure the Facility is deleted + self.assertFalse(Facility.objects.filter(pk=self.facility.pk).exists()) + + def test_delete_facility_default_asset_location_then_location(self): + # Ensure the FacilityDefaultAssetLocation exists + self.assertTrue( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + # Delete the FacilityDefaultAssetLocation + self.facility_default_location.delete() + + # Ensure it is deleted + self.assertFalse( + FacilityDefaultAssetLocation.objects.filter( + pk=self.facility_default_location.pk + ).exists() + ) + + # Attempt to delete the Location + self.location.delete() + + # Ensure the Location is deleted + self.assertFalse(AssetLocation.objects.filter(pk=self.location.pk).exists()) + + +class TestConsultationBed(TestUtils, TestCase): + @classmethod + def setUpTestData(cls): + # Create necessary data + cls.state = cls.create_state() + cls.district = cls.create_district(cls.state) + cls.local_body = cls.create_local_body(cls.district) + cls.super_user = cls.create_super_user("superuser", cls.district) + cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body) + cls.location = cls.create_asset_location(cls.facility) + + def setUp(self): + # Create patient, consultation, bed, and consultation bed + self.patient = self.create_patient(self.district, self.facility) + self.consultation = self.create_consultation(self.patient, self.facility) + self.bed = self.create_bed(self.facility, self.location) + self.consultation_bed = ConsultationBed.objects.create( + consultation=self.consultation, + bed=self.bed, + start_date="2024-11-01T10:00:00Z", + ) + + def test_delete_consultation_with_related_consultation_bed(self): + # Ensure the consultation is linked to ConsultationBed + self.assertTrue( + ConsultationBed.objects.filter(consultation=self.consultation).exists() + ) + + # Attempt to delete the consultation + with self.assertRaises(ValidationError) as context: + self.consultation.delete() + + # Assert that the correct exception is raised + self.assertIn( + f"Cannot delete PatientConsultation {self.consultation} because they are referenced as `consultation` in ConsultationBed records.", + str(context.exception), + ) + + # Ensure the consultation and ConsultationBed still exist + self.assertTrue( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + self.assertTrue( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + def test_delete_bed_with_related_consultation_bed(self): + # Ensure the bed is linked to ConsultationBed + self.assertTrue(ConsultationBed.objects.filter(bed=self.bed).exists()) + + # Attempt to delete the bed + with self.assertRaises(ValidationError) as context: + self.bed.delete() + + # Assert that the correct exception is raised + self.assertIn( + f"Cannot delete Bed {self.bed} because they are referenced as `bed` in ConsultationBed records.", + str(context.exception), + ) + + # Ensure the bed and ConsultationBed still exist + self.assertTrue(Bed.objects.filter(pk=self.bed.pk).exists()) + self.assertTrue( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + def test_delete_consultation_bed(self): + # Ensure the ConsultationBed exists + self.assertTrue( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + # Delete the ConsultationBed + self.consultation_bed.delete() + + # Ensure it is deleted and other related objects are unaffected + self.assertFalse( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + self.assertTrue( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + self.assertTrue(Bed.objects.filter(pk=self.bed.pk).exists()) + + def test_delete_consultation_bed_then_consultation(self): + # Ensure the ConsultationBed exists + self.assertTrue( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + # Delete the ConsultationBed + self.consultation_bed.delete() + + # Ensure it is deleted + self.assertFalse( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + # Attempt to delete the Consultation + self.consultation.delete() + + # Ensure the Consultation is deleted + self.assertFalse( + PatientConsultation.objects.filter(pk=self.consultation.pk).exists() + ) + + def test_delete_consultation_bed_then_bed(self): + # Ensure the ConsultationBed exists + self.assertTrue( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + # Delete the ConsultationBed + self.consultation_bed.delete() + + # Ensure it is deleted + self.assertFalse( + ConsultationBed.objects.filter(pk=self.consultation_bed.pk).exists() + ) + + # Attempt to delete the Bed + self.bed.delete() + + # Ensure the Bed is deleted + self.assertFalse(Bed.objects.filter(pk=self.bed.pk).exists()) diff --git a/care/users/models.py b/care/users/models.py index bf0d47c284..a8bf03572c 100644 --- a/care/users/models.py +++ b/care/users/models.py @@ -4,8 +4,9 @@ from django.conf import settings from django.contrib.auth.models import AbstractUser, UserManager +from django.core.exceptions import ValidationError from django.core.validators import MaxValueValidator, MinValueValidator -from django.db import models +from django.db import models, transaction from django.urls import reverse from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ @@ -405,7 +406,23 @@ def has_add_user_permission(request): def check_username_exists(username): return User.objects.get_entire_queryset().filter(username=username).exists() + @transaction.atomic def delete(self, *args, **kwargs): + from care.facility.models.asset import UserDefaultAssetLocation + from care.facility.models.facility import FacilityUser + from care.facility.models.patient_sample import PatientSample + + if FacilityUser.objects.filter(created_by=self).exists(): + error = f"Cannot delete User {self.username} because they are referenced as `created_by` in FacilityUser records." + raise ValidationError(error) + + if UserDefaultAssetLocation.objects.filter(user=self).exists(): + error = f"Cannot delete User {self} because they are referenced as `user` in UserDefaultAssetLocation records." + raise ValidationError(error) + + FacilityUser.objects.filter(user=self).delete() + PatientSample.objects.filter(created_by=self).update(created_by=None) + PatientSample.objects.filter(last_edited_by=self).update(last_edited_by=None) self.deleted = True self.save() diff --git a/care/utils/tests/test_utils.py b/care/utils/tests/test_utils.py index fbc286a337..ea82d3bcf8 100644 --- a/care/utils/tests/test_utils.py +++ b/care/utils/tests/test_utils.py @@ -37,7 +37,12 @@ User, Ward, ) -from care.facility.models.asset import Asset, AssetLocation +from care.facility.models.asset import ( + Asset, + AssetLocation, + FacilityDefaultAssetLocation, + UserDefaultAssetLocation, +) from care.facility.models.bed import AssetBed, Bed, ConsultationBed from care.facility.models.facility import FacilityUser from care.facility.models.icd11_diagnosis import ( @@ -722,12 +727,38 @@ def create_prescription( data.update(**kwargs) return Prescription.objects.create(**data) + @classmethod + def create_facility_user( + cls, facility: Facility, user: User, created_by: User, **kwargs + ) -> FacilityUser: + data = { + "facility": facility, + "created_by": created_by, + "user": user, + } + data.update(**kwargs) + return FacilityUser.objects.create(**data) + @classmethod def create_assetbed(cls, bed: Bed, asset: Asset, **kwargs) -> AssetBed: data = {"bed": bed, "asset": asset} data.update(kwargs) return AssetBed.objects.create(**data) + @classmethod + def create_facility_default_asset_location( + cls, facility: Facility, location: AssetLocation + ) -> FacilityDefaultAssetLocation: + data = {"facility": facility, "location": location} + return FacilityDefaultAssetLocation.objects.create(**data) + + @classmethod + def create_user_default_asset_location( + cls, user: User, location: AssetLocation + ) -> UserDefaultAssetLocation: + data = {"user": user, "location": location} + return UserDefaultAssetLocation.objects.create(**data) + def get_list_representation(self, obj) -> dict: """ Returns the dict representation of the obj in list API