From 04bd8706f71810f6f984e48e50a2ef952b8fb862 Mon Sep 17 00:00:00 2001 From: kant Date: Tue, 29 Oct 2024 20:03:58 -0700 Subject: [PATCH] Adding test member oid as the extension to all observations in the elr messages --- .../src/main/kotlin/azure/ConditionMapper.kt | 42 ++++- .../engine/FhirReceiverFilterTests.kt | 146 ++++++++++++++++++ 2 files changed, 187 insertions(+), 1 deletion(-) diff --git a/prime-router/src/main/kotlin/azure/ConditionMapper.kt b/prime-router/src/main/kotlin/azure/ConditionMapper.kt index e14ba24276d..ecaab4d6441 100644 --- a/prime-router/src/main/kotlin/azure/ConditionMapper.kt +++ b/prime-router/src/main/kotlin/azure/ConditionMapper.kt @@ -4,7 +4,9 @@ import gov.cdc.prime.router.Metadata import gov.cdc.prime.router.fhirengine.utils.getCodeSourcesMap import gov.cdc.prime.router.metadata.ObservationMappingConstants import org.hl7.fhir.r4.model.Coding +import org.hl7.fhir.r4.model.Extension import org.hl7.fhir.r4.model.Observation +import org.hl7.fhir.r4.model.StringType interface IConditionMapper { /** @@ -12,6 +14,12 @@ interface IConditionMapper { * @return a map associating test [codings] to their diagnostic conditions as Coding's */ fun lookupConditions(codings: List): Map> + + /** + * Lookup test code to Member OID mappings for the given [codings]. + * @return a map associating test codes to their Member OIDs + */ + fun lookupMemberOid(codings: List): Map } class LookupTableConditionMapper(metadata: Metadata) : IConditionMapper { @@ -34,16 +42,32 @@ class LookupTableConditionMapper(metadata: Metadata) : IConditionMapper { acc } } + + override fun lookupMemberOid(codings: List): Map { + return mappingTable.FilterBuilder() + .isIn(ObservationMappingConstants.TEST_CODE_KEY, codings.map { it.code }) + .filter().caseSensitiveDataRowsMap.fold(mutableMapOf()) { acc, condition -> + val testCode = condition[ObservationMappingConstants.TEST_CODE_KEY] ?: "" + val memberOid = condition[ObservationMappingConstants.TEST_OID_KEY] ?: "" + if (testCode.isNotEmpty() && memberOid.isNotEmpty()) { + acc[testCode] = memberOid + } + acc + } + } } class ConditionStamper(private val conditionMapper: IConditionMapper) { companion object { const val conditionCodeExtensionURL = "https://reportstream.cdc.gov/fhir/StructureDefinition/condition-code" + const val MEMBER_OID_EXTENSION_URL = + "https://reportstream.cdc.gov/fhir/StructureDefinition/test-performed-member-oid" const val BUNDLE_CODE_IDENTIFIER = "observation.code.coding.code" const val BUNDLE_VALUE_IDENTIFIER = "observation.valueCodeableConcept.coding.code" const val MAPPING_CODES_IDENTIFIER = "observation.{code|valueCodeableConcept}.coding.code" } + data class ObservationMappingFailure(val source: String, val failures: List) data class ObservationStampingResult( @@ -52,7 +76,7 @@ class ConditionStamper(private val conditionMapper: IConditionMapper) { ) /** - * Lookup condition codes for an [observation] and add them as custom extensions + * Lookup condition codes and member OIDs for an [observation] and add them as custom extensions * @param observation the observation that will be stamped * @return a [ObservationStampingResult] including stamping success and any mapping failures */ @@ -60,9 +84,13 @@ class ConditionStamper(private val conditionMapper: IConditionMapper) { val codeSourcesMap = observation.getCodeSourcesMap().filterValues { it.isNotEmpty() } if (codeSourcesMap.values.flatten().isEmpty()) return ObservationStampingResult(false) + // Lookup conditions and Member OIDs val conditionsToCode = conditionMapper.lookupConditions(codeSourcesMap.values.flatten()) + val memberOidMap = conditionMapper.lookupMemberOid(codeSourcesMap.values.flatten()) + var mappedSomething = false + // Process condition mappings val failures = codeSourcesMap.mapNotNull { codes -> val unnmapped = codes.value.mapNotNull { code -> val conditions = conditionsToCode.getOrDefault(code, emptyList()) @@ -77,6 +105,18 @@ class ConditionStamper(private val conditionMapper: IConditionMapper) { if (unnmapped.isEmpty()) null else ObservationMappingFailure(codes.key, unnmapped) } + // Add the Member OID extension to the observation, based on the lookup + observation.code.coding.forEach { coding -> + val testCode = coding.code + val memberOid = memberOidMap[testCode] + if (memberOid != null) { + val memberOidExtension = Extension(MEMBER_OID_EXTENSION_URL) + memberOidExtension.setValue(StringType(memberOid)) + observation.addExtension(memberOidExtension) + mappedSomething = true + } + } + return ObservationStampingResult(mappedSomething, failures) } } \ No newline at end of file diff --git a/prime-router/src/test/kotlin/fhirengine/engine/FhirReceiverFilterTests.kt b/prime-router/src/test/kotlin/fhirengine/engine/FhirReceiverFilterTests.kt index a6c989e9a66..f59a662cef2 100644 --- a/prime-router/src/test/kotlin/fhirengine/engine/FhirReceiverFilterTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/engine/FhirReceiverFilterTests.kt @@ -27,8 +27,11 @@ import gov.cdc.prime.router.TestSource import gov.cdc.prime.router.Topic import gov.cdc.prime.router.azure.ActionHistory import gov.cdc.prime.router.azure.BlobAccess +import gov.cdc.prime.router.azure.ConditionStamper +import gov.cdc.prime.router.azure.ConditionStamper.Companion.MEMBER_OID_EXTENSION_URL import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL import gov.cdc.prime.router.azure.DatabaseAccess +import gov.cdc.prime.router.azure.LookupTableConditionMapper import gov.cdc.prime.router.azure.db.enums.TaskAction import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService @@ -50,6 +53,7 @@ import io.mockk.mockkStatic import io.mockk.spyk import io.mockk.verify import org.hl7.fhir.r4.model.Bundle +import org.hl7.fhir.r4.model.CodeableConcept import org.hl7.fhir.r4.model.Coding import org.hl7.fhir.r4.model.Observation import org.jooq.tools.jdbc.MockConnection @@ -959,4 +963,146 @@ class FhirReceiverFilterTests { .matchesPredicate { it.receiver == settings.receivers.first() } } } + + @Test + fun `pass - routing filter for stamped conditions`() { + // Initialize ConditionStamper with LookupTableConditionMapper for this test + val conditionStamper = ConditionStamper(LookupTableConditionMapper(metadata)) + + // Engine setup with a routing filter for specific condition code and member OID + val routingFilter = listOf( + "%resource.code.coding.extension('$conditionCodeExtensionURL')" + + ".value.where(code in ('840539006')).exists() " + + "and %resource.interpretation.coding.code = 'A' " + + "and %resource.code.coding.extension('${MEMBER_OID_EXTENSION_URL}')" + + ".value.where(code in ('2.16.840.1.113762.1.4.1146.1142')).exists()" + ) + + val settings = FileSettings().loadOrganizations( + createOrganizationWithFilteredReceivers(routingFilter = routingFilter) + ) + val engine = spyk(makeFhirEngine(metadata, settings) as FHIRReceiverFilter) + val message = spyk( + FhirReceiverFilterQueueMessage( + UUID.randomUUID(), + BLOB_URL, + "test", + BLOB_SUB_FOLDER_NAME, + topic = Topic.FULL_ELR, + "$ORGANIZATION_NAME.$RECEIVER_NAME" + ) + ) + + // Data setup + val fhirData = File(VALID_FHIR_FILEPATH).readText() + val bundle = FhirTranscoder.decode(fhirData) + bundle.getObservations().forEach { + conditionStamper.stampObservation(it) + + // Create Coding and wrap it in a CodeableConcept + val interpretationCoding = Coding() + .setSystem("http://terminology.hl7.org/CodeSystem/v2-0078") + .setCode("A") + val interpretationCodeableConcept = CodeableConcept().addCoding(interpretationCoding) + + // Add the CodeableConcept to the interpretation + it.interpretation.add(interpretationCodeableConcept) + } + + // Mock setup + mockkObject(BlobAccess) + every { BlobAccess.downloadBlob(any(), any()) } + .returns(FhirTranscoder.encode(bundle)) + every { BlobAccess.uploadBlob(any(), any()) } returns "test" + every { accessSpy.insertTask(any(), MimeFormat.FHIR.toString(), BODY_URL, any()) } + .returns(Unit) + + // Act and Assert + accessSpy.transact { txn -> + val results = engine.run(message, actionLogger, actionHistory, txn) + assertThat(results).hasSize(1) // Ensure observation passed filter + } + + verify(exactly = 1) { + BlobAccess.uploadBlob(any(), any()) + accessSpy.insertTask(any(), any(), any(), any(), any()) + } + } + + @Test + fun `fail - routing filter does not match stamped conditions`() { + // Initialize ConditionStamper with LookupTableConditionMapper for this test + val conditionStamper = ConditionStamper(LookupTableConditionMapper(metadata)) + + // Engine setup with the same routing filter as above + val routingFilter = listOf( + "%resource.code.coding.extension('$conditionCodeExtensionURL')" + + ".value.where(code in ('840539006')).exists() " + + "and %resource.interpretation.coding.code = 'A' " + + "and %resource.code.coding.extension('${MEMBER_OID_EXTENSION_URL}')" + + ".value.where(code in ('2.16.840.1.113762.1.4.1146.1142')).exists()" + ) + + val settings = FileSettings().loadOrganizations( + createOrganizationWithFilteredReceivers(routingFilter = routingFilter) + ) + val engine = spyk(makeFhirEngine(metadata, settings) as FHIRReceiverFilter) + val message = spyk( + FhirReceiverFilterQueueMessage( + UUID.randomUUID(), + BLOB_URL, + "test", + BLOB_SUB_FOLDER_NAME, + topic = Topic.FULL_ELR, + "$ORGANIZATION_NAME.$RECEIVER_NAME" + ) + ) + + // Data setup + val fhirData = File(VALID_FHIR_FILEPATH).readText() + val bundle = FhirTranscoder.decode(fhirData) + bundle.getObservations().forEach { + // Apply a different code and member OID that will not match the filter + val nonMatchingCoding = Coding() + .setSystem("SNOMEDCT") + .setCode("999999") // Non-matching code + it.code.coding.first().addExtension(conditionCodeExtensionURL, nonMatchingCoding) + + // Create a non-matching CodeableConcept for interpretation + val nonMatchingInterpretationCoding = Coding() + .setSystem("http://terminology.hl7.org/CodeSystem/v2-0078") + .setCode("B") // Non-matching interpretation code + val nonMatchingInterpretationConcept = CodeableConcept().addCoding(nonMatchingInterpretationCoding) + + it.interpretation.add(nonMatchingInterpretationConcept) + } + + // Mock setup + mockkObject(BlobAccess) + every { BlobAccess.downloadBlob(any(), any()) } + .returns(FhirTranscoder.encode(bundle)) + every { BlobAccess.uploadBlob(any(), any()) } returns "test" + every { accessSpy.insertTask(any(), MimeFormat.FHIR.toString(), BODY_URL, any()) } + .returns(Unit) + + // Act and Assert + accessSpy.transact { txn -> + val results = engine.run(message, actionLogger, actionHistory, txn) + assertThat(results).isEmpty() // Ensure observation did not pass the filter + } + + azureEventService.events.forEach { event -> + assertThat(event) + .isInstanceOf() + .matchesPredicate { + it.params[ReportStreamEventProperties.FAILING_FILTERS] == routingFilter && + it.params[ReportStreamEventProperties.FILTER_TYPE] == ReportStreamFilterType.ROUTING_FILTER + } + } + + verify(exactly = 0) { + BlobAccess.uploadBlob(any(), any()) + accessSpy.insertTask(any(), any(), any(), any(), any()) + } + } } \ No newline at end of file