Skip to content

Commit

Permalink
feat: add fhir parser module
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrinson-rise8 committed Oct 30, 2024
1 parent 2533adb commit 2c8747e
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 213 deletions.
2 changes: 1 addition & 1 deletion docs/process_for_adding_feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- Update the [PIIRecord.feature_iter](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/schemas/pii.py#L246) method to return the value of the new feature when it's used for comparison.

### Extract the FHIR Field in `fhir_record_to_pii_record`
- In [src/recordlinker/linking/link.py](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/linking/link.py), update the [fhir_record_to_pii_record](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/linking/link.py#L26) function to map the relevant FHIR field to the new feature in [PIIRecord](https://github.com/CDCgov/RecordLinker/blob/c85f555e5da91d54eb8c51e3bdf0789d1e204b2f/src/recordlinker/schemas/pii.py#L97).
- In [src/recordlinker/linking/link.py](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/parsers/fhir.py), update the [fhir_record_to_pii_record](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/parsers/fhir.py#L12) function to map the relevant FHIR field to the new feature in [PIIRecord](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/schemas/pii.py#L141).

### Update the Tests
- Add or modify unit tests to verify that the new feature is properly extracted, mapped, and compared.
Empty file.
87 changes: 87 additions & 0 deletions src/recordlinker/hl7/fhir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
recordlinker.parsers.fhir
~~~~~~~~~~~~~~~~~~~~~~~~~
This module is used to handle fhir parsing
"""

import typing

import pydantic

from recordlinker import schemas


def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
"""
val = {
"external_id": fhir_record.get("id"),
"name": fhir_record.get("name", []),
"birthDate": fhir_record.get("birthDate"),
"sex": fhir_record.get("gender"),
"address": fhir_record.get("address", []),
"mrn": None,
"ssn": None,
"race": None,
"gender": None,
"telecom": fhir_record.get("telecom", []),
}
for identifier in fhir_record.get("identifier", []):
for coding in identifier.get("type", {}).get("coding", []):
if coding.get("code") == "MR":
val["mrn"] = identifier.get("value")
elif coding.get("code") == "SS":
val["ssn"] = identifier.get("value")
for address in val["address"]:
address["county"] = address.get("district", "")
for extension in address.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
for coord in extension.get("extension", []):
if coord.get("url") == "latitude":
address["latitude"] = coord.get("valueDecimal")
elif coord.get("url") == "longitude":
address["longitude"] = coord.get("valueDecimal")
for extension in fhir_record.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for ext in extension.get("extension", []):
if ext.get("url") == "ombCategory":
val["race"] = ext.get("valueCoding", {}).get("display")
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/individual-genderIdentity":
for ext in extension.get("extension", []):
if ext.get("url") == "value":
for coding in ext.get("valueCodeableConcept", {}).get("coding", []):
val["gender"] = coding.get("display")

return schemas.PIIRecord(**val)

def add_person_resource(
person_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Adds a simplified person resource to a bundle if the patient resource in the bundle
matches an existing record in the Master Patient Index. Returns the bundle with
the newly added person resource.
:param person_id: _description_
:param patient_id: _description_
:param bundle: _description_, defaults to Field(description="A FHIR bundle")
:return: _description_
"""
person_resource = {
"fullUrl": f"urn:uuid:{person_id}",
"resource": {
"resourceType": "Person",
"id": f"{person_id}",
"link": [{"target": {"reference": f"Patient/{patient_id}"}}],
},
"request": {
"method": "PUT",
"url": f"Person/{person_id}",
},
}
bundle.get("entry", []).append(person_resource)
return bundle
80 changes: 0 additions & 80 deletions src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import typing
import uuid

import pydantic
from sqlalchemy import orm

from recordlinker import models
Expand All @@ -29,85 +28,6 @@

TRACER = MockTracer()


# TODO: This is a FHIR specific function, should be moved to a FHIR module
def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
"""
val = {
"external_id": fhir_record.get("id"),
"name": fhir_record.get("name", []),
"birthDate": fhir_record.get("birthDate"),
"sex": fhir_record.get("gender"),
"address": fhir_record.get("address", []),
"mrn": None,
"ssn": None,
"race": None,
"gender": None,
"telecom": fhir_record.get("telecom", []),
}
for identifier in fhir_record.get("identifier", []):
for coding in identifier.get("type", {}).get("coding", []):
if coding.get("code") == "MR":
val["mrn"] = identifier.get("value")
elif coding.get("code") == "SS":
val["ssn"] = identifier.get("value")
for address in val["address"]:
address["county"] = address.get("district", "")
for extension in address.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
for coord in extension.get("extension", []):
if coord.get("url") == "latitude":
address["latitude"] = coord.get("valueDecimal")
elif coord.get("url") == "longitude":
address["longitude"] = coord.get("valueDecimal")
for extension in fhir_record.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for ext in extension.get("extension", []):
if ext.get("url") == "ombCategory":
val["race"] = ext.get("valueCoding", {}).get("display")
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/individual-genderIdentity":
for ext in extension.get("extension", []):
if ext.get("url") == "value":
for coding in ext.get("valueCodeableConcept", {}).get("coding", []):
val["gender"] = coding.get("display")

return schemas.PIIRecord(**val)


# TODO: This is a FHIR specific function, should be moved to a FHIR module
def add_person_resource(
person_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Adds a simplified person resource to a bundle if the patient resource in the bundle
matches an existing record in the Master Patient Index. Returns the bundle with
the newly added person resource.
:param person_id: _description_
:param patient_id: _description_
:param bundle: _description_, defaults to Field(description="A FHIR bundle")
:return: _description_
"""
person_resource = {
"fullUrl": f"urn:uuid:{person_id}",
"resource": {
"resourceType": "Person",
"id": f"{person_id}",
"link": [{"target": {"reference": f"Patient/{patient_id}"}}],
},
"request": {
"method": "PUT",
"url": f"Person/{person_id}",
},
}
bundle.get("entry", []).append(person_resource)
return bundle


def compare(
record: schemas.PIIRecord, patient: models.Patient, algorithm_pass: models.AlgorithmPass
) -> bool:
Expand Down
66 changes: 63 additions & 3 deletions src/recordlinker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from recordlinker import utils
from recordlinker.base_service import BaseService
from recordlinker.database import get_session
from recordlinker.hl7 import fhir
from recordlinker.linking import algorithm_service
from recordlinker.linking import link
from recordlinker.routes.algorithm_router import router as algorithm_router
Expand Down Expand Up @@ -189,7 +190,7 @@ async def link_record(
)

# convert record to PII
pii_record: schemas.PIIRecord = link.fhir_record_to_pii_record(record_to_link)
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# Now link the record
try:
Expand All @@ -199,7 +200,8 @@ async def link_record(
algorithm=algorithm,
external_person_id=external_id,
)
updated_bundle = link.add_person_resource(
#update the bundle
updated_bundle = fhir.add_person_resource(
str(new_person_id), pii_record.external_id, input_bundle
)
return LinkRecordResponse(found_match=found_match, updated_bundle=updated_bundle)
Expand Down Expand Up @@ -240,7 +242,7 @@ async def link_piirecord(

# link the record
try:
# Make a copy of record_to_link so we don't modify the original
# Make a copy of pii_record so we don't modify the original
record = copy.deepcopy(pii_record)
(found_match, new_person_id, patient_reference_id) = link.link_record_against_mpi(
record=record,
Expand All @@ -257,3 +259,61 @@ async def link_piirecord(
except ValueError:
response.status_code = status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=400, detail="Error: Bad request")

@app.post("/link/fhir")
async def link_fhir(
request: Request,
input: Annotated[LinkRecordInput, Body(examples=sample_link_record_requests)],
response: Response,
db_session: orm.Session = Depends(get_session),
) -> LinkResponse:
"""
Compare a FHIR bundle with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the patient and person reference id's
"""
input_bundle = input.bundle
external_id = input.external_person_id

if input.algorithm:
algorithm = algorithm_service.get_algorithm(db_session, input.algorithm)
else:
algorithm = algorithm_service.default_algorithm(db_session)

if not algorithm:
response.status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
raise HTTPException(status_code=422, detail="Error: Invalid algorithm specified")

# Now extract the patient record we want to link
try:
record_to_link = [
entry.get("resource")
for entry in input_bundle.get("entry", [])
if entry.get("resource", {}).get("resourceType", "") == "Patient"
][0]
except IndexError:
response.status_code = status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=400, detail="Error: Supplied bundle contains no Patient resource to link on.")

# convert record to PII
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# link the record
try:
# Make a copy of pii_record so we don't modify the original
record = copy.deepcopy(pii_record)
(found_match, new_person_id, patient_reference_id) = link.link_record_against_mpi(
record=record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
return LinkResponse(
is_match=found_match,
patient_reference_id=patient_reference_id,
person_reference_id=new_person_id,
)

except ValueError:
response.status_code = status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=400, detail="Error: Bad request")
Loading

0 comments on commit 2c8747e

Please sign in to comment.