Skip to content

Commit

Permalink
Create New Parsers Framework (#102)
Browse files Browse the repository at this point in the history
## Description
Abstract the parsing of FHIR out into its own module, as the linking
algorithm should be input format agnostic. This approach will let us
expand our supported formats for ELR and eCR ahead.

## Related Issues
closes #80 

## Additional Notes
- creates `recordlinker.parsers.fhir`
- add new `link/fhir` endpoint that accepts `fhir` bundles but response
similar to the `/link` endpoint
- add tests for new endpoint and new test file for `fhir` parser



Co-authored-by: Eric Buckley <[email protected]>
  • Loading branch information
cbrinson-rise8 and ericbuckley authored Oct 30, 2024
1 parent 0fb8042 commit 800265a
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 213 deletions.
2 changes: 1 addition & 1 deletion docs/process_for_adding_feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- Update the [PIIRecord.feature_iter](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/schemas/pii.py#L246) method to return the value of the new feature when it's used for comparison.

### Extract the FHIR Field in `fhir_record_to_pii_record`
- In [src/recordlinker/linking/link.py](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/linking/link.py), update the [fhir_record_to_pii_record](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/linking/link.py#L26) function to map the relevant FHIR field to the new feature in [PIIRecord](https://github.com/CDCgov/RecordLinker/blob/c85f555e5da91d54eb8c51e3bdf0789d1e204b2f/src/recordlinker/schemas/pii.py#L97).
- In [src/recordlinker/linking/link.py](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/parsers/fhir.py), update the [fhir_record_to_pii_record](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/parsers/fhir.py#L12) function to map the relevant FHIR field to the new feature in [PIIRecord](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/schemas/pii.py#L141).

### Update the Tests
- Add or modify unit tests to verify that the new feature is properly extracted, mapped, and compared.
Empty file.
87 changes: 87 additions & 0 deletions src/recordlinker/hl7/fhir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
recordlinker.hl7.fhir
~~~~~~~~~~~~~~~~~~~~~~~~~
This module is used to handle fhir parsing
"""

import typing

import pydantic

from recordlinker import schemas


def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
"""
val = {
"external_id": fhir_record.get("id"),
"name": fhir_record.get("name", []),
"birthDate": fhir_record.get("birthDate"),
"sex": fhir_record.get("gender"),
"address": fhir_record.get("address", []),
"mrn": None,
"ssn": None,
"race": None,
"gender": None,
"telecom": fhir_record.get("telecom", []),
}
for identifier in fhir_record.get("identifier", []):
for coding in identifier.get("type", {}).get("coding", []):
if coding.get("code") == "MR":
val["mrn"] = identifier.get("value")
elif coding.get("code") == "SS":
val["ssn"] = identifier.get("value")
for address in val["address"]:
address["county"] = address.get("district", "")
for extension in address.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
for coord in extension.get("extension", []):
if coord.get("url") == "latitude":
address["latitude"] = coord.get("valueDecimal")
elif coord.get("url") == "longitude":
address["longitude"] = coord.get("valueDecimal")
for extension in fhir_record.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for ext in extension.get("extension", []):
if ext.get("url") == "ombCategory":
val["race"] = ext.get("valueCoding", {}).get("display")
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/individual-genderIdentity":
for ext in extension.get("extension", []):
if ext.get("url") == "value":
for coding in ext.get("valueCodeableConcept", {}).get("coding", []):
val["gender"] = coding.get("display")

return schemas.PIIRecord(**val)

def add_person_resource(
person_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Adds a simplified person resource to a bundle if the patient resource in the bundle
matches an existing record in the Master Patient Index. Returns the bundle with
the newly added person resource.
:param person_id: _description_
:param patient_id: _description_
:param bundle: _description_, defaults to Field(description="A FHIR bundle")
:return: _description_
"""
person_resource = {
"fullUrl": f"urn:uuid:{person_id}",
"resource": {
"resourceType": "Person",
"id": f"{person_id}",
"link": [{"target": {"reference": f"Patient/{patient_id}"}}],
},
"request": {
"method": "PUT",
"url": f"Person/{person_id}",
},
}
bundle.get("entry", []).append(person_resource)
return bundle
80 changes: 0 additions & 80 deletions src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import typing
import uuid

import pydantic
from sqlalchemy import orm

from recordlinker import models
Expand All @@ -29,85 +28,6 @@

TRACER = MockTracer()


# TODO: This is a FHIR specific function, should be moved to a FHIR module
def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
"""
val = {
"external_id": fhir_record.get("id"),
"name": fhir_record.get("name", []),
"birthDate": fhir_record.get("birthDate"),
"sex": fhir_record.get("gender"),
"address": fhir_record.get("address", []),
"mrn": None,
"ssn": None,
"race": None,
"gender": None,
"telecom": fhir_record.get("telecom", []),
}
for identifier in fhir_record.get("identifier", []):
for coding in identifier.get("type", {}).get("coding", []):
if coding.get("code") == "MR":
val["mrn"] = identifier.get("value")
elif coding.get("code") == "SS":
val["ssn"] = identifier.get("value")
for address in val["address"]:
address["county"] = address.get("district", "")
for extension in address.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
for coord in extension.get("extension", []):
if coord.get("url") == "latitude":
address["latitude"] = coord.get("valueDecimal")
elif coord.get("url") == "longitude":
address["longitude"] = coord.get("valueDecimal")
for extension in fhir_record.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for ext in extension.get("extension", []):
if ext.get("url") == "ombCategory":
val["race"] = ext.get("valueCoding", {}).get("display")
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/individual-genderIdentity":
for ext in extension.get("extension", []):
if ext.get("url") == "value":
for coding in ext.get("valueCodeableConcept", {}).get("coding", []):
val["gender"] = coding.get("display")

return schemas.PIIRecord(**val)


# TODO: This is a FHIR specific function, should be moved to a FHIR module
def add_person_resource(
person_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Adds a simplified person resource to a bundle if the patient resource in the bundle
matches an existing record in the Master Patient Index. Returns the bundle with
the newly added person resource.
:param person_id: _description_
:param patient_id: _description_
:param bundle: _description_, defaults to Field(description="A FHIR bundle")
:return: _description_
"""
person_resource = {
"fullUrl": f"urn:uuid:{person_id}",
"resource": {
"resourceType": "Person",
"id": f"{person_id}",
"link": [{"target": {"reference": f"Patient/{patient_id}"}}],
},
"request": {
"method": "PUT",
"url": f"Person/{person_id}",
},
}
bundle.get("entry", []).append(person_resource)
return bundle


def compare(
record: schemas.PIIRecord, patient: models.Patient, algorithm_pass: models.AlgorithmPass
) -> bool:
Expand Down
62 changes: 60 additions & 2 deletions src/recordlinker/routes/link_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from recordlinker import schemas
from recordlinker.database import get_session
from recordlinker.hl7 import fhir
from recordlinker.linking import algorithm_service
from recordlinker.linking import link

Expand Down Expand Up @@ -108,7 +109,7 @@ async def link_dibbs(
)

# convert record to PII
pii_record: schemas.PIIRecord = link.fhir_record_to_pii_record(record_to_link)
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# Now link the record
try:
Expand All @@ -118,7 +119,7 @@ async def link_dibbs(
algorithm=algorithm,
external_person_id=external_id,
)
updated_bundle = link.add_person_resource(
updated_bundle = fhir.add_person_resource(
str(new_person_id), pii_record.external_id, input_bundle
)
return schemas.LinkFhirResponse(found_match=found_match, updated_bundle=updated_bundle)
Expand All @@ -130,3 +131,60 @@ async def link_dibbs(
updated_bundle=input_bundle,
message=f"Could not connect to database: {err}",
)

@router.post("/fhir", summary="Link FHIR")
async def link_fhir(
request: fastapi.Request,
input: typing.Annotated[schemas.LinkFhirInput, fastapi.Body()],
response: fastapi.Response,
db_session: orm.Session = fastapi.Depends(get_session),
) -> schemas.LinkResponse:
"""
Compare a FHIR bundle with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the patient and person reference id's
"""
input_bundle = input.bundle
external_id = input.external_person_id

if input.algorithm:
algorithm = algorithm_service.get_algorithm(db_session, input.algorithm)
else:
algorithm = algorithm_service.default_algorithm(db_session)

if not algorithm:
response.status_code = fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY
raise fastapi.HTTPException(status_code=422, detail="Error: Invalid algorithm specified")

# Now extract the patient record we want to link
try:
record_to_link = [
entry.get("resource")
for entry in input_bundle.get("entry", [])
if entry.get("resource", {}).get("resourceType", "") == "Patient"
][0]
except IndexError:
response.status_code = fastapi.status.HTTP_400_BAD_REQUEST
raise fastapi.HTTPException(status_code=400, detail="Error: Supplied bundle contains no Patient resource to link on.")

# convert record to PII
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# link the record
try:
# Make a copy of pii_record so we don't modify the original
(found_match, new_person_id, patient_reference_id) = link.link_record_against_mpi(
record=pii_record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
return schemas.LinkResponse(
is_match=found_match,
patient_reference_id=patient_reference_id,
person_reference_id=new_person_id,
)

except ValueError:
response.status_code = fastapi.status.HTTP_400_BAD_REQUEST
raise fastapi.HTTPException(status_code=400, detail="Error: Bad request")
Loading

0 comments on commit 800265a

Please sign in to comment.