Skip to content

Commit

Permalink
Adding test member oid as the extension to all observations in the el…
Browse files Browse the repository at this point in the history
…r messages
  • Loading branch information
kant committed Nov 12, 2024
1 parent abfc0c0 commit 04bd870
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 1 deletion.
42 changes: 41 additions & 1 deletion prime-router/src/main/kotlin/azure/ConditionMapper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ import gov.cdc.prime.router.Metadata
import gov.cdc.prime.router.fhirengine.utils.getCodeSourcesMap
import gov.cdc.prime.router.metadata.ObservationMappingConstants
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Extension
import org.hl7.fhir.r4.model.Observation
import org.hl7.fhir.r4.model.StringType

interface IConditionMapper {
/**
* Attempt to find diagnostic conditions for a series of test [codings]
* @return a map associating test [codings] to their diagnostic conditions as Coding's
*/
fun lookupConditions(codings: List<Coding>): Map<Coding, List<Coding>>

/**
* Lookup test code to Member OID mappings for the given [codings].
* @return a map associating test codes to their Member OIDs
*/
fun lookupMemberOid(codings: List<Coding>): Map<String, String>
}

class LookupTableConditionMapper(metadata: Metadata) : IConditionMapper {
Expand All @@ -34,16 +42,32 @@ class LookupTableConditionMapper(metadata: Metadata) : IConditionMapper {
acc
}
}

override fun lookupMemberOid(codings: List<Coding>): Map<String, String> {
return mappingTable.FilterBuilder()
.isIn(ObservationMappingConstants.TEST_CODE_KEY, codings.map { it.code })
.filter().caseSensitiveDataRowsMap.fold(mutableMapOf()) { acc, condition ->
val testCode = condition[ObservationMappingConstants.TEST_CODE_KEY] ?: ""
val memberOid = condition[ObservationMappingConstants.TEST_OID_KEY] ?: ""
if (testCode.isNotEmpty() && memberOid.isNotEmpty()) {
acc[testCode] = memberOid
}
acc
}
}
}

class ConditionStamper(private val conditionMapper: IConditionMapper) {
companion object {
const val conditionCodeExtensionURL = "https://reportstream.cdc.gov/fhir/StructureDefinition/condition-code"
const val MEMBER_OID_EXTENSION_URL =
"https://reportstream.cdc.gov/fhir/StructureDefinition/test-performed-member-oid"

const val BUNDLE_CODE_IDENTIFIER = "observation.code.coding.code"
const val BUNDLE_VALUE_IDENTIFIER = "observation.valueCodeableConcept.coding.code"
const val MAPPING_CODES_IDENTIFIER = "observation.{code|valueCodeableConcept}.coding.code"
}

data class ObservationMappingFailure(val source: String, val failures: List<Coding>)

data class ObservationStampingResult(
Expand All @@ -52,17 +76,21 @@ class ConditionStamper(private val conditionMapper: IConditionMapper) {
)

/**
* Lookup condition codes for an [observation] and add them as custom extensions
* Lookup condition codes and member OIDs for an [observation] and add them as custom extensions
* @param observation the observation that will be stamped
* @return a [ObservationStampingResult] including stamping success and any mapping failures
*/
fun stampObservation(observation: Observation): ObservationStampingResult {
val codeSourcesMap = observation.getCodeSourcesMap().filterValues { it.isNotEmpty() }
if (codeSourcesMap.values.flatten().isEmpty()) return ObservationStampingResult(false)

// Lookup conditions and Member OIDs
val conditionsToCode = conditionMapper.lookupConditions(codeSourcesMap.values.flatten())
val memberOidMap = conditionMapper.lookupMemberOid(codeSourcesMap.values.flatten())

var mappedSomething = false

// Process condition mappings
val failures = codeSourcesMap.mapNotNull { codes ->
val unnmapped = codes.value.mapNotNull { code ->
val conditions = conditionsToCode.getOrDefault(code, emptyList())
Expand All @@ -77,6 +105,18 @@ class ConditionStamper(private val conditionMapper: IConditionMapper) {
if (unnmapped.isEmpty()) null else ObservationMappingFailure(codes.key, unnmapped)
}

// Add the Member OID extension to the observation, based on the lookup
observation.code.coding.forEach { coding ->
val testCode = coding.code
val memberOid = memberOidMap[testCode]
if (memberOid != null) {
val memberOidExtension = Extension(MEMBER_OID_EXTENSION_URL)
memberOidExtension.setValue(StringType(memberOid))
observation.addExtension(memberOidExtension)
mappedSomething = true
}
}

return ObservationStampingResult(mappedSomething, failures)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import gov.cdc.prime.router.TestSource
import gov.cdc.prime.router.Topic
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.ConditionStamper
import gov.cdc.prime.router.azure.ConditionStamper.Companion.MEMBER_OID_EXTENSION_URL
import gov.cdc.prime.router.azure.ConditionStamper.Companion.conditionCodeExtensionURL
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.LookupTableConditionMapper
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService
Expand All @@ -50,6 +53,7 @@ import io.mockk.mockkStatic
import io.mockk.spyk
import io.mockk.verify
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.CodeableConcept
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.Observation
import org.jooq.tools.jdbc.MockConnection
Expand Down Expand Up @@ -959,4 +963,146 @@ class FhirReceiverFilterTests {
.matchesPredicate { it.receiver == settings.receivers.first() }
}
}

@Test
fun `pass - routing filter for stamped conditions`() {
// Initialize ConditionStamper with LookupTableConditionMapper for this test
val conditionStamper = ConditionStamper(LookupTableConditionMapper(metadata))

// Engine setup with a routing filter for specific condition code and member OID
val routingFilter = listOf(
"%resource.code.coding.extension('$conditionCodeExtensionURL')" +
".value.where(code in ('840539006')).exists() " +
"and %resource.interpretation.coding.code = 'A' " +
"and %resource.code.coding.extension('${MEMBER_OID_EXTENSION_URL}')" +
".value.where(code in ('2.16.840.1.113762.1.4.1146.1142')).exists()"
)

val settings = FileSettings().loadOrganizations(
createOrganizationWithFilteredReceivers(routingFilter = routingFilter)
)
val engine = spyk(makeFhirEngine(metadata, settings) as FHIRReceiverFilter)
val message = spyk(
FhirReceiverFilterQueueMessage(
UUID.randomUUID(),
BLOB_URL,
"test",
BLOB_SUB_FOLDER_NAME,
topic = Topic.FULL_ELR,
"$ORGANIZATION_NAME.$RECEIVER_NAME"
)
)

// Data setup
val fhirData = File(VALID_FHIR_FILEPATH).readText()
val bundle = FhirTranscoder.decode(fhirData)
bundle.getObservations().forEach {
conditionStamper.stampObservation(it)

// Create Coding and wrap it in a CodeableConcept
val interpretationCoding = Coding()
.setSystem("http://terminology.hl7.org/CodeSystem/v2-0078")
.setCode("A")
val interpretationCodeableConcept = CodeableConcept().addCoding(interpretationCoding)

// Add the CodeableConcept to the interpretation
it.interpretation.add(interpretationCodeableConcept)
}

// Mock setup
mockkObject(BlobAccess)
every { BlobAccess.downloadBlob(any(), any()) }
.returns(FhirTranscoder.encode(bundle))
every { BlobAccess.uploadBlob(any(), any()) } returns "test"
every { accessSpy.insertTask(any(), MimeFormat.FHIR.toString(), BODY_URL, any()) }
.returns(Unit)

// Act and Assert
accessSpy.transact { txn ->
val results = engine.run(message, actionLogger, actionHistory, txn)
assertThat(results).hasSize(1) // Ensure observation passed filter
}

verify(exactly = 1) {
BlobAccess.uploadBlob(any(), any())
accessSpy.insertTask(any(), any(), any(), any(), any())
}
}

@Test
fun `fail - routing filter does not match stamped conditions`() {
// Initialize ConditionStamper with LookupTableConditionMapper for this test
val conditionStamper = ConditionStamper(LookupTableConditionMapper(metadata))

// Engine setup with the same routing filter as above
val routingFilter = listOf(
"%resource.code.coding.extension('$conditionCodeExtensionURL')" +
".value.where(code in ('840539006')).exists() " +
"and %resource.interpretation.coding.code = 'A' " +
"and %resource.code.coding.extension('${MEMBER_OID_EXTENSION_URL}')" +
".value.where(code in ('2.16.840.1.113762.1.4.1146.1142')).exists()"
)

val settings = FileSettings().loadOrganizations(
createOrganizationWithFilteredReceivers(routingFilter = routingFilter)
)
val engine = spyk(makeFhirEngine(metadata, settings) as FHIRReceiverFilter)
val message = spyk(
FhirReceiverFilterQueueMessage(
UUID.randomUUID(),
BLOB_URL,
"test",
BLOB_SUB_FOLDER_NAME,
topic = Topic.FULL_ELR,
"$ORGANIZATION_NAME.$RECEIVER_NAME"
)
)

// Data setup
val fhirData = File(VALID_FHIR_FILEPATH).readText()
val bundle = FhirTranscoder.decode(fhirData)
bundle.getObservations().forEach {
// Apply a different code and member OID that will not match the filter
val nonMatchingCoding = Coding()
.setSystem("SNOMEDCT")
.setCode("999999") // Non-matching code
it.code.coding.first().addExtension(conditionCodeExtensionURL, nonMatchingCoding)

// Create a non-matching CodeableConcept for interpretation
val nonMatchingInterpretationCoding = Coding()
.setSystem("http://terminology.hl7.org/CodeSystem/v2-0078")
.setCode("B") // Non-matching interpretation code
val nonMatchingInterpretationConcept = CodeableConcept().addCoding(nonMatchingInterpretationCoding)

it.interpretation.add(nonMatchingInterpretationConcept)
}

// Mock setup
mockkObject(BlobAccess)
every { BlobAccess.downloadBlob(any(), any()) }
.returns(FhirTranscoder.encode(bundle))
every { BlobAccess.uploadBlob(any(), any()) } returns "test"
every { accessSpy.insertTask(any(), MimeFormat.FHIR.toString(), BODY_URL, any()) }
.returns(Unit)

// Act and Assert
accessSpy.transact { txn ->
val results = engine.run(message, actionLogger, actionHistory, txn)
assertThat(results).isEmpty() // Ensure observation did not pass the filter
}

azureEventService.events.forEach { event ->
assertThat(event)
.isInstanceOf<ReportStreamItemEvent>()
.matchesPredicate {
it.params[ReportStreamEventProperties.FAILING_FILTERS] == routingFilter &&
it.params[ReportStreamEventProperties.FILTER_TYPE] == ReportStreamFilterType.ROUTING_FILTER
}
}

verify(exactly = 0) {
BlobAccess.uploadBlob(any(), any())
accessSpy.insertTask(any(), any(), any(), any(), any())
}
}
}

0 comments on commit 04bd870

Please sign in to comment.