Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create New Parsers Framework #102

Merged
merged 4 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/process_for_adding_feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- Update the [PIIRecord.feature_iter](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/schemas/pii.py#L246) method to return the value of the new feature when it's used for comparison.

### Extract the FHIR Field in `fhir_record_to_pii_record`
- In [src/recordlinker/linking/link.py](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/linking/link.py), update the [fhir_record_to_pii_record](https://github.com/CDCgov/RecordLinker/blob/a672d2b6409cbd1a08f729d94fba5692f57f6fc6/src/recordlinker/linking/link.py#L26) function to map the relevant FHIR field to the new feature in [PIIRecord](https://github.com/CDCgov/RecordLinker/blob/c85f555e5da91d54eb8c51e3bdf0789d1e204b2f/src/recordlinker/schemas/pii.py#L97).
- In [src/recordlinker/linking/link.py](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/parsers/fhir.py), update the [fhir_record_to_pii_record](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/parsers/fhir.py#L12) function to map the relevant FHIR field to the new feature in [PIIRecord](https://github.com/CDCgov/RecordLinker/blob/e8a64407b6e8564595cad6380d5291e9f5c959e3/src/recordlinker/schemas/pii.py#L141).

### Update the Tests
- Add or modify unit tests to verify that the new feature is properly extracted, mapped, and compared.
Empty file.
87 changes: 87 additions & 0 deletions src/recordlinker/hl7/fhir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
recordlinker.hl7.fhir
~~~~~~~~~~~~~~~~~~~~~~~~~

This module is used to handle fhir parsing
"""

import typing

import pydantic

from recordlinker import schemas


def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
"""
val = {
"external_id": fhir_record.get("id"),
"name": fhir_record.get("name", []),
"birthDate": fhir_record.get("birthDate"),
"sex": fhir_record.get("gender"),
"address": fhir_record.get("address", []),
"mrn": None,
"ssn": None,
"race": None,
"gender": None,
"telecom": fhir_record.get("telecom", []),
}
for identifier in fhir_record.get("identifier", []):
for coding in identifier.get("type", {}).get("coding", []):
if coding.get("code") == "MR":
val["mrn"] = identifier.get("value")
elif coding.get("code") == "SS":
val["ssn"] = identifier.get("value")
for address in val["address"]:
address["county"] = address.get("district", "")
for extension in address.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
for coord in extension.get("extension", []):
if coord.get("url") == "latitude":
address["latitude"] = coord.get("valueDecimal")
elif coord.get("url") == "longitude":
address["longitude"] = coord.get("valueDecimal")

Check warning on line 45 in src/recordlinker/hl7/fhir.py

View check run for this annotation

Codecov / codecov/patch

src/recordlinker/hl7/fhir.py#L40-L45

Added lines #L40 - L45 were not covered by tests
for extension in fhir_record.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for ext in extension.get("extension", []):
if ext.get("url") == "ombCategory":
val["race"] = ext.get("valueCoding", {}).get("display")
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/individual-genderIdentity":
for ext in extension.get("extension", []):
if ext.get("url") == "value":
for coding in ext.get("valueCodeableConcept", {}).get("coding", []):
val["gender"] = coding.get("display")

return schemas.PIIRecord(**val)

def add_person_resource(
person_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Adds a simplified person resource to a bundle if the patient resource in the bundle
matches an existing record in the Master Patient Index. Returns the bundle with
the newly added person resource.

:param person_id: _description_
:param patient_id: _description_
:param bundle: _description_, defaults to Field(description="A FHIR bundle")
:return: _description_
"""
person_resource = {
"fullUrl": f"urn:uuid:{person_id}",
"resource": {
"resourceType": "Person",
"id": f"{person_id}",
"link": [{"target": {"reference": f"Patient/{patient_id}"}}],
},
"request": {
"method": "PUT",
"url": f"Person/{person_id}",
},
}
bundle.get("entry", []).append(person_resource)
return bundle
80 changes: 0 additions & 80 deletions src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import typing
import uuid

import pydantic
from sqlalchemy import orm

from recordlinker import models
Expand All @@ -29,85 +28,6 @@

TRACER = MockTracer()


# TODO: This is a FHIR specific function, should be moved to a FHIR module
def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
"""
Parse the FHIR record into a PIIRecord object
"""
val = {
"external_id": fhir_record.get("id"),
"name": fhir_record.get("name", []),
"birthDate": fhir_record.get("birthDate"),
"sex": fhir_record.get("gender"),
"address": fhir_record.get("address", []),
"mrn": None,
"ssn": None,
"race": None,
"gender": None,
"telecom": fhir_record.get("telecom", []),
}
for identifier in fhir_record.get("identifier", []):
for coding in identifier.get("type", {}).get("coding", []):
if coding.get("code") == "MR":
val["mrn"] = identifier.get("value")
elif coding.get("code") == "SS":
val["ssn"] = identifier.get("value")
for address in val["address"]:
address["county"] = address.get("district", "")
for extension in address.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/geolocation":
for coord in extension.get("extension", []):
if coord.get("url") == "latitude":
address["latitude"] = coord.get("valueDecimal")
elif coord.get("url") == "longitude":
address["longitude"] = coord.get("valueDecimal")
for extension in fhir_record.get("extension", []):
if extension.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
for ext in extension.get("extension", []):
if ext.get("url") == "ombCategory":
val["race"] = ext.get("valueCoding", {}).get("display")
if extension.get("url") == "http://hl7.org/fhir/StructureDefinition/individual-genderIdentity":
for ext in extension.get("extension", []):
if ext.get("url") == "value":
for coding in ext.get("valueCodeableConcept", {}).get("coding", []):
val["gender"] = coding.get("display")

return schemas.PIIRecord(**val)


# TODO: This is a FHIR specific function, should be moved to a FHIR module
def add_person_resource(
person_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Adds a simplified person resource to a bundle if the patient resource in the bundle
matches an existing record in the Master Patient Index. Returns the bundle with
the newly added person resource.

:param person_id: _description_
:param patient_id: _description_
:param bundle: _description_, defaults to Field(description="A FHIR bundle")
:return: _description_
"""
person_resource = {
"fullUrl": f"urn:uuid:{person_id}",
"resource": {
"resourceType": "Person",
"id": f"{person_id}",
"link": [{"target": {"reference": f"Patient/{patient_id}"}}],
},
"request": {
"method": "PUT",
"url": f"Person/{person_id}",
},
}
bundle.get("entry", []).append(person_resource)
return bundle


def compare(
record: schemas.PIIRecord, patient: models.Patient, algorithm_pass: models.AlgorithmPass
) -> bool:
Expand Down
62 changes: 60 additions & 2 deletions src/recordlinker/routes/link_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from recordlinker import schemas
from recordlinker.database import get_session
from recordlinker.hl7 import fhir
from recordlinker.linking import algorithm_service
from recordlinker.linking import link

Expand Down Expand Up @@ -108,7 +109,7 @@
)

# convert record to PII
pii_record: schemas.PIIRecord = link.fhir_record_to_pii_record(record_to_link)
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# Now link the record
try:
Expand All @@ -118,7 +119,7 @@
algorithm=algorithm,
external_person_id=external_id,
)
updated_bundle = link.add_person_resource(
updated_bundle = fhir.add_person_resource(
str(new_person_id), pii_record.external_id, input_bundle
)
return schemas.LinkFhirResponse(found_match=found_match, updated_bundle=updated_bundle)
Expand All @@ -130,3 +131,60 @@
updated_bundle=input_bundle,
message=f"Could not connect to database: {err}",
)

@router.post("/fhir", summary="Link FHIR")
async def link_fhir(
request: fastapi.Request,
input: typing.Annotated[schemas.LinkFhirInput, fastapi.Body()],
response: fastapi.Response,
db_session: orm.Session = fastapi.Depends(get_session),
) -> schemas.LinkResponse:
"""
Compare a FHIR bundle with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the patient and person reference id's
"""
input_bundle = input.bundle
external_id = input.external_person_id

if input.algorithm:
algorithm = algorithm_service.get_algorithm(db_session, input.algorithm)
else:
algorithm = algorithm_service.default_algorithm(db_session)

if not algorithm:
response.status_code = fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY
raise fastapi.HTTPException(status_code=422, detail="Error: Invalid algorithm specified")

# Now extract the patient record we want to link
try:
record_to_link = [
entry.get("resource")
for entry in input_bundle.get("entry", [])
if entry.get("resource", {}).get("resourceType", "") == "Patient"
][0]
except IndexError:
response.status_code = fastapi.status.HTTP_400_BAD_REQUEST
raise fastapi.HTTPException(status_code=400, detail="Error: Supplied bundle contains no Patient resource to link on.")

# convert record to PII
pii_record: schemas.PIIRecord = fhir.fhir_record_to_pii_record(record_to_link)

# link the record
try:
# Make a copy of pii_record so we don't modify the original
(found_match, new_person_id, patient_reference_id) = link.link_record_against_mpi(
record=pii_record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
return schemas.LinkResponse(
is_match=found_match,
patient_reference_id=patient_reference_id,
person_reference_id=new_person_id,
)

except ValueError:
response.status_code = fastapi.status.HTTP_400_BAD_REQUEST
raise fastapi.HTTPException(status_code=400, detail="Error: Bad request")

Check warning on line 190 in src/recordlinker/routes/link_router.py

View check run for this annotation

Codecov / codecov/patch

src/recordlinker/routes/link_router.py#L188-L190

Added lines #L188 - L190 were not covered by tests
Loading
Loading