From acfcca15d8fcf56a4114bcfba20050178a2fa80d Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 22 Aug 2024 14:33:57 +0100 Subject: [PATCH] Add docstrings and edit formatting of required function arguments --- fhirflat/flat2fhir.py | 274 ++++++++++++++++++++++++++++++--------- tests/test_extensions.py | 3 + 2 files changed, 218 insertions(+), 59 deletions(-) diff --git a/fhirflat/flat2fhir.py b/fhirflat/flat2fhir.py index d5fc8cd..24a7cf0 100644 --- a/fhirflat/flat2fhir.py +++ b/fhirflat/flat2fhir.py @@ -20,14 +20,21 @@ def step_down(data: dict) -> dict: """ Splits column names on the first '.' to step 'down' one level into the nested data. - { - "timingPhaseDetail.timingPhase.code": ["http://snomed.info/sct|281379000"], - "timingPhaseDetail.timingPhase.text": ["pre-admission"], - } -> - { - "timingPhase.code": ["http://snomed.info/sct|281379000"], - "timingPhase.text": ["pre-admission"], - } + Parameters + ---------- + data + { + "timingPhaseDetail.timingPhase.code": ["http://snomed.info/sct|281379000"], + "timingPhaseDetail.timingPhase.text": ["pre-admission"], + } + + Returns + ------- + dict + { + "timingPhase.code": ["http://snomed.info/sct|281379000"], + "timingPhase.text": ["pre-admission"], + } """ return {s.split(".", 1)[1]: data[s] for s in data} @@ -35,7 +42,27 @@ def step_down(data: dict) -> dict: def create_codeable_concept( old_dict: dict[str, list[str] | str | float | None], name: str ) -> dict[str, list[str]]: - """Re-creates a codeableConcept structure from the FHIRflat representation.""" + """ + Re-creates a codeableConcept structure from the FHIRflat representation. + + Parameters + ---------- + old_dict + The dictionary containing the flattened codings and text. E.g., + {"bodySite.code": ["SNOMED-CT|123456"], "bodySite.text": "Left arm"} + name + The base name of the data, e.g. "bodySite" + + Returns + ------- + dict + The FHIR representation of the codeableConcept. E.g., + { + "bodySite": { + "coding": [{"system": "SNOMED-CT", "code": "123456", "display": "Left arm"}] + } + } + """ # for creating backbone elements if name + ".code" in old_dict and name + ".system" in old_dict: @@ -99,7 +126,32 @@ def create_codeable_concept( return new_dict -def createQuantity(df, group): +def create_quantity(df: dict, group: str) -> dict: + """ + Re-creates a Quantity structure from the FHIRflat representation. + Ensures that any flattened codes are correctly unpacked. + + Parameters + ---------- + df + The dictionary containing the flattened quantity data. E.g., + { + "doseQuantity.value": 5, + "doseQuantity.code": "http://unitsofmeasure.org|mg" + } + group + The base name of the data, e.g. "doseQuantity" + + Returns + ------- + dict + The FHIR representation of the quantity. E.g., + { + "doseQuantity": + {"value": 5, "system": "http://unitsofmeasure.org", "code": "mg"} + } + """ + quant = {} for attribute in df.keys(): @@ -119,9 +171,46 @@ def createQuantity(df, group): return quant -def createSingleExtension(k, v) -> dict: +def create_single_extension(k: str, v: dict | str | float | bool) -> dict: """ - {"relativeDay": 3} -> {'url': 'relativeDay', 'valueInteger': 3} + Creates a single ISARIC extension, by inferring the datatype from the value. + Nested extensions aren't dealt with here, they're found in 'create_extension'. + + Parameters + ---------- + k + The key of the data, e.g. "approximateDate", "birthSex", "timingDetail" + v + The value of the data, e.g. + "month 3", + {"code": ["http://snomed.info/sct|1234"], "text": ["female"]}, + {"low.value": -7, "low.unit": "days", "high.value": 0, "high.unit": "days"} + + Returns + ------- + dict + The formatted data, e.g., + {'url': 'approximateDate', 'valueString': 'month 3'} \ + { + "url": "birthSex", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": "1234", + "display": "female", + } + ] + }, + } \ + { + "url": "timingDetail", + "valueRange": { + "low": {"value": -7, "unit": "days"}, + "high": {"value": 0, "unit": "days"}, + }, + } + """ klass = get_local_extension_type(k) @@ -129,8 +218,8 @@ def createSingleExtension(k, v) -> dict: prop = klass.schema()["properties"] value_type = [key for key in prop.keys() if key.startswith("value")] - if not value_type: - raise RuntimeError("Inappropriate entry into createSingleExtension") + if not value_type: # pragma: no cover + raise RuntimeError("Inappropriate entry into create_single_extension") for v_type in value_type: data_type = prop[v_type]["type"] @@ -138,13 +227,15 @@ def createSingleExtension(k, v) -> dict: data_class = get_fhirtype(data_type) # unpack coding etc if isinstance(v, dict) and len(group_keys(v.keys())) > 1: - # if there are still groups to organise + # if there are still groups to organise, e.g. valueRange new_dict = expand_concepts(v, data_class) elif isinstance(v, dict): - # single group needs formatting, e.g. codeableConcepts as [url|code] - new_dict = set_datatypes(k, v, data_class) + # single group needs formatting, e.g. valueCodeableConcept + # requires the format to include the k in the dict names + v_appended = {f"{k}.{ki}": vi for ki, vi in v.items()} + new_dict = set_datatypes(k, v_appended, data_class) else: - # standard json type + # standard json type, e.g. valueInteger new_dict = v try: @@ -163,21 +254,113 @@ def createSingleExtension(k, v) -> dict: else: raise e - raise RuntimeError(f"extension not created from {k, v}") + raise RuntimeError(f"extension not created from {k, v}") # pragma: no cover -def set_datatypes(k, v_dict, klass) -> dict: +def create_extension(k: str, v_dict: dict, klass: _ISARICExtension) -> dict: """ - k: "presenceAbsence" - v_dict: { - "presenceAbsence.code": ["SNOMED-CT|123456"], - "presenceAbsence.text": "Left arm" + Formats ISARIC extensions into the correct FHIR structure, while finding the correct + value type for the data. + Can handle both nested and simple extensions. + + Parameters + ---------- + k + The key of the data, e.g. "timingPhaseDetail" + v_dict + The value of the data, e.g. + { + "timingDetail.high.unit": "days", + "timingDetail.high.value": 0.0, + "timingDetail.low.unit": "days", + "timingDetail.low.value": -7.0, + "timingPhase.code": ["http://snomed.info/sct|281379000"], + "timingPhase.text": ["pre-admission"], } - klass: + klass + The class of the data, e.g. . + + Returns + ------- + dict + The formatted data, e.g. + { + "url": "timingPhaseDetail", + "extension": [ + { + "url": "timingDetail", + "valueRange": { + "low": {"value": -7, "unit": "days"}, + "high": {"value": 0, "unit": "days"}, + }, + }, + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": "281379000", + "display": "pre-admission", + } + ] + }, + }, + ], + } + """ + + if klass.nested_extension: + classes = find_data_class_options(klass, "extension") + short_extensions = [s for s in v_dict.keys() if s.count(".") == 0] + expanded_short_extensions = [] + if short_extensions: + # these get skipped over in expand_concepts because they don't get grouped + # so have to be dealt with here + for se in short_extensions: + short_ext_dict = {se: v_dict[se]} + expanded_short_extensions.append( + create_single_extension(se, short_ext_dict[se]) + ) + v_dict.pop(se) + return { + "url": k, + "extension": list(expand_concepts(v_dict, classes).values()) + + expanded_short_extensions, + } + + return create_single_extension(k, v_dict) + + +def set_datatypes(k: str, v_dict: dict, klass: type[_DomainResource]) -> dict: + """ + Once the final datatype is found, this function formats the data into the correct + FHIR structure. + + Parameters + ---------- + k + The key of the data, e.g. "bodySite" + v_dict + The value of the data, e.g. + {"bodySite.code": ["SNOMED-CT|123456"], "bodySite.text": "Left arm"} + klass + The class of the data, e.g. Quantity, CodeableConcept. Should be present in + either this library, or the fhir.resources module. + + Returns + ------- + dict + The formatted data, e.g. + { + "bodySite": { + "coding": [{"system": "SNOMED-CT", "code": "123456", "display": "Left arm"}] + } + } """ if klass == Quantity: - return createQuantity(v_dict, k) + return create_quantity(v_dict, k) elif klass == CodeableConcept: return create_codeable_concept(v_dict, k) elif klass == Period: @@ -186,50 +369,25 @@ def set_datatypes(k, v_dict, klass) -> dict: stripped_dict = step_down(v_dict) return { "extension": [ - createSingleExtension(ki, vi) for ki, vi in stripped_dict.items() + create_single_extension(ki, vi) for ki, vi in stripped_dict.items() ], } elif issubclass(klass, _ISARICExtension): if klass.nested_extension: - # nested extension stripped_dict = step_down(v_dict) return { "url": k, "extension": [ - createSingleExtension(ki, vi) for ki, vi in stripped_dict.items() + create_single_extension(ki, vi) for ki, vi in stripped_dict.items() ], } - return createSingleExtension(k, v_dict if k not in v_dict else v_dict[k]) + return create_single_extension(k, step_down(v_dict)) return step_down(v_dict) -def find_value_type(k: str, v_dict, klass): - if klass.nested_extension: - classes = find_data_class_options(klass, "extension") - short_extensions = [s for s in v_dict[k].keys() if s.count(".") == 0] - expanded_short_extensions = [] - if short_extensions: - # these get skipped over in expand_concepts because they don't get grouped - # so have to be dealt with here - for se in short_extensions: - short_ext_dict = {se: v_dict[k][se]} - short_ext = find_data_class_options(classes, se) - expanded_short_extensions.append( - find_value_type(se, short_ext_dict, short_ext) - ) - v_dict[k].pop(se) - return { - "url": k, - "extension": list(expand_concepts(v_dict[k], classes).values()) - + expanded_short_extensions, - } - - return createSingleExtension(k, v_dict if k not in v_dict else v_dict[k]) - - -def expand_concepts(data: dict[str, str], data_class: type[_DomainResource]) -> dict: +def expand_concepts(data: dict[str, dict], data_class: type[_DomainResource]) -> dict: """ Combines columns containing flattened FHIR concepts back into JSON-like structures. @@ -262,8 +420,7 @@ def expand_concepts(data: dict[str, str], data_class: type[_DomainResource]) -> elif is_single_fhir_extension: # column name will be missing one or more datatype layers, e.g. # valueString, valueRange that need to be inferred - new_v_dict = {k: stripped_dict} - expanded[k] = find_value_type(k, new_v_dict, group_classes[k]) + expanded[k] = create_extension(k, stripped_dict, group_classes[k]) continue if all(isinstance(v, dict) for v in v_dict.values()): @@ -279,8 +436,7 @@ def expand_concepts(data: dict[str, str], data_class: type[_DomainResource]) -> } stripped_dict = step_down(non_dict_items) for k1, v1 in stripped_dict.items(): - klass = find_data_class_options(group_classes[k], k1) - v_dict[k + "." + k1] = set_datatypes(k1, {k1: v1}, klass) + v_dict[k + "." + k1] = create_single_extension(k1, v1) expanded[k] = step_down(v_dict) diff --git a/tests/test_extensions.py b/tests/test_extensions.py index c1468e0..9b119e4 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -39,6 +39,7 @@ def test_timingPhase(): assert timing_phase.resource_type == "timingPhase" assert timing_phase.url == "timingPhase" assert type(timing_phase.valueCodeableConcept) is _CodeableConcept + assert timing_phase.nested_extension is False @pytest.mark.parametrize( @@ -84,6 +85,7 @@ def test_timingPhaseDetail(): assert isinstance(timing_phase_detail, DataType) assert timing_phase_detail.resource_type == "timingPhaseDetail" assert timing_phase_detail.url == "timingPhaseDetail" + assert timing_phase_detail.nested_extension is True tpd_data_error = { @@ -155,6 +157,7 @@ def test_relativePeriod(): isinstance(ext, (relativeStart, relativeEnd)) for ext in relative_phase.extension ) + assert relative_phase.nested_extension is True @pytest.mark.parametrize(