diff --git a/fhirflat/flat2fhir.py b/fhirflat/flat2fhir.py index 62dcf2f..237ffd3 100644 --- a/fhirflat/flat2fhir.py +++ b/fhirflat/flat2fhir.py @@ -15,6 +15,21 @@ def create_codeable_concept( old_dict: dict[str, list[str] | str], name: str ) -> dict[str, list[str]]: """Re-creates a codeableConcept structure from the FHIRflat representation.""" + + # for reading in from ingestion pipeline + if (name + ".code" and name + ".system") in old_dict: + new_dict = { + "coding": [ + { + "system": old_dict[name + ".system"], + "code": str(int(old_dict[name + ".code"])), + "display": old_dict[name + ".text"], + } + ] + } + return new_dict + + # From FHIRflat file codes = old_dict.get(name + ".code") if codes is None: @@ -193,15 +208,18 @@ def expand_concepts(data: dict, data_class: type[_DomainResource]) -> dict: if all(isinstance(v, dict) for v in v_dict.values()): # coming back out of nested recursion expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict} - if data_class.schema()["properties"][k].get("type") == "array": - if k == "extension": - expanded[k] = [v for v in expanded[k].values()] - else: - expanded[k] = [expanded[k]] else: expanded[k] = set_datatypes(k, v_dict, group_classes[k]) + if isinstance(data_class, list): + continue + elif data_class.schema()["properties"][k].get("type") == "array": + if k == "extension": + expanded[k] = [v for v in expanded[k].values()] + else: + expanded[k] = [expanded[k]] + dense_cols = { k: k.removesuffix("_dense") for k in data.keys() if k.endswith("_dense") } diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py new file mode 100644 index 0000000..cbe8b4d --- /dev/null +++ b/fhirflat/ingest.py @@ -0,0 +1,124 @@ +""" +Stores the main functions for converting clinical data (initally from RedCap-ARCH) to +FHIRflat. + +Assumes two files are provided: one with the clinical data and one containing the +mappings. PL: Actually, maybe rather than the mappings it's either a file or a +dictionary showing the location of each mapping file (one per resource type). + +TODO: Eventually, this ahould link to a google sheet file that contains the mappings +""" + +import pandas as pd +import numpy as np + +# 1:1 (single row, single resource) mapping: Patient, Encounter +# 1:M (single row, multiple resources) mapping: Observation, Condition, Procedure, ... + +""" +1. Create one input-data dataframe per resource type, using the column names from +the mapping file + +2. For 1:1 mappings: use an apply function to create a fhir-like (or maybe +fhir-flat-like?) input data dictionary in one column, then a resource object in another. +Then follow format similar to fhir_file_to_flat to create the flat representation. + +3. For 1:M mappings: (PL: not sure about this) Group columns by single_resource column +(to be created in the mapping file), explode the dataframe by these groups, then follow +the 1:1 process. +""" + +""" +TODO +* cope with 'if' statements - e.g. for date overwriting. +* deal with duplicates/how to add multiple values to a single field +""" + + +def create_dictionary(data, map_file): + """ + Given a data file and a single mapping file for one FHIR resource type, + returns a single column dataframe with the mapped data in a FHIRflat-like + format, ready for further processing. + """ + + data = pd.read_csv(data, header=0) + map_df = pd.read_csv(map_file, header=0) + + filtered_data = data[map_df["redcap_variable"].dropna().unique()] + + # Fills the na redcap variables with the previous value + map_df["redcap_variable"] = map_df["redcap_variable"].ffill() + + # strips the text answers out of the redcap_response column + map_df["redcap_response"] = map_df["redcap_response"].apply( + lambda x: x.split(",")[0] if isinstance(x, str) else x + ) + + # Set multi-index for easier access + map_df.set_index(["redcap_variable", "redcap_response"], inplace=True) + + def create_dict_from_row(row): + """ + Iterates through the columns of the row, applying the mapping to each columns + and produces a fhirflat-like dictionary to initialize the resource object. + """ + + def find_field_value(row, response, map): + """Returns the data for a given field, given the map.""" + if map == "": + return response + elif "+" in map: + map = map.split("+") + results = [find_field_value(row, response, m) for m in map] + results = [x for x in results if x == x] + return "".join(results) + else: + col = map.lstrip("<").rstrip(">") + return row[col] + + result = {} + for column in row.index: + if column in map_df.index.get_level_values(0): + response = row[column] + if pd.notna(response): # Ensure there is a response to map + try: + # Retrieve the mapping for the given column and response + if pd.isna(map_df.loc[column].index).all(): + mapping = map_df.loc[(column, np.nan)].dropna() + else: + mapping = map_df.loc[(column, str(response))].dropna() + snippet = { + k: ( + v + if "<" not in str(v) + else find_field_value(row, response, v) + ) + for k, v in mapping.items() + } + except KeyError: + # No mapping found for this column and response + result[column] = f"No mapping for response {response}" + else: + continue + else: + raise ValueError(f"Column {column} not found in mapping file") + if not set(result.keys()).intersection(snippet.keys()): + result = result | snippet + else: + raise ValueError( + "Duplicate keys in mapping:" + f" {set(result.keys()).intersection(snippet.keys())}" + ) + return result + + # Apply the function across the DataFrame rows + filtered_data["flat_dict"] = filtered_data.apply(create_dict_from_row, axis=1) + return filtered_data + + +def load_data(data, mapping_files, resource_type, file_name): + + df = create_dictionary(data, mapping_files) + + resource_type.ingest_to_flat(df, file_name) diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index 5437c78..05d3504 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -39,7 +39,7 @@ def flat_fields(cls) -> list[str]: return [x for x in cls.elements_sequence() if x not in cls.flat_exclusions] @classmethod - def cleanup(cls, data: JsonString) -> FHIRFlatBase: + def cleanup(cls, data: JsonString | dict, json_data=True) -> FHIRFlatBase: """ Load data into a dictionary-like structure, then apply resource-specific changes and unpack flattened data @@ -73,6 +73,31 @@ def from_flat(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]: else: return list(df["fhir"]) + @classmethod + def ingest_to_flat(cls, data: pd.DataFrame, filename: str): + """ + Takes a pandas dataframe and populates the resource with the data. + + data: pd.DataFrame + Pandas dataframe containing the data + + Returns + ------- + FHIRFlatBase or list[FHIRFlatBase] + """ + + # Creates a columns of FHIR resource instances + data["fhir"] = data["flat_dict"].apply( + lambda x: cls.cleanup(x, json_data=False) + ) + + data["fhir_flat"] = data["fhir"].apply(lambda x: x.to_flat()) + + # get the flat dataframe out into it's own variable + flat_df = pd.concat(data["fhir_flat"].tolist(), ignore_index=True) + + flat_df.to_parquet(f"{filename}.parquet") + @classmethod def fhir_bulk_import(cls, file: str) -> list[FHIRFlatBase]: """ @@ -130,11 +155,17 @@ def fhir_file_to_flat(cls, source_file: str, output_name: str | None = None): flat_rows.append(fhir2flat(resource, lists=list_resources)) df = pd.concat(flat_rows) + + # remove required attributes now it's in the flat representation + for attr in cls.flat_defaults: + df.drop(list(df.filter(regex=attr)), axis=1, inplace=True) + return df.to_parquet(output_name) - def to_flat(self, filename: str) -> None: + def to_flat(self, filename: str | None = None) -> None: """ Generates a FHIRflat parquet file from the resource. + If no file name is provided, returns the pandas dataframe. filename: str Name of the parquet file to be generated. @@ -159,4 +190,7 @@ def to_flat(self, filename: str) -> None: for attr in self.flat_defaults: flat_df.drop(list(flat_df.filter(regex=attr)), axis=1, inplace=True) - return flat_df.to_parquet(filename) + if filename: + return flat_df.to_parquet(filename) + else: + return flat_df diff --git a/fhirflat/resources/encounter.py b/fhirflat/resources/encounter.py index 26592b3..46dff70 100644 --- a/fhirflat/resources/encounter.py +++ b/fhirflat/resources/encounter.py @@ -60,13 +60,14 @@ def validate_extension_contents(cls, extensions): return extensions @classmethod - def cleanup(cls, data: JsonString) -> Encounter: + def cleanup(cls, data: JsonString | dict, json_data=True) -> Encounter: """ Load data into a dictionary-like structure, then apply resource-specific changes and unpack flattened data like codeableConcepts back into structured data. """ - data = orjson.loads(data) + if json_data: + data = orjson.loads(data) for field in [ "subject", diff --git a/tests/dummy_data/encounter_dummy_data_single.csv b/tests/dummy_data/encounter_dummy_data_single.csv new file mode 100644 index 0000000..0c1560e --- /dev/null +++ b/tests/dummy_data/encounter_dummy_data_single.csv @@ -0,0 +1,2 @@ +usubjid,visitid,dates_enrolment,dates_adm,dates_admdate,dates_admtime,non_encounter_field,outco_denguediag,outco_denguediag_main,outco_denguediag_class,outco_not_dengue,outco_secondiag_oth,outco_date,outco_outcome +2,11,,1,2021-04-01,,fish,1,,2,,,2021-04-10,1 \ No newline at end of file diff --git a/tests/dummy_data/encounter_dummy_mapping.csv b/tests/dummy_data/encounter_dummy_mapping.csv new file mode 100644 index 0000000..5d88a4e --- /dev/null +++ b/tests/dummy_data/encounter_dummy_mapping.csv @@ -0,0 +1,25 @@ +redcap_variable,redcap_response,subject,class.system,class.code,class.text,actualPeriod.start,actualPeriod.end,reason.value.concept.system,reason.value.concept.code,reason.value.concept.text,reason.use.system,reason.use.code,reason.use.text,diagnosis.condition.concept.system,diagnosis.condition.concept.code,diagnosis.condition.concept.text,diagnosis.use.system,diagnosis.use.code,diagnosis.use.text,admission.dischargeDisposition.system,admission.dischargeDisposition.code,admission.dischargeDisposition.text +usubjid,,,,,,,,,,,,,,,,,,,,,, +dates_enrolment,,,,,,,,,,,,,,,,,,,,,, +dates_adm,"1, Yes",,https://snomed.info/sct,32485007,Hospital admission (procedure),,,,,,,,,,,,,,,,, +,"0, No",,https://snomed.info/sct,32485007,Hospital admission (procedure),,,,,,,,,,,,,,,,, +,,,https://snomed.info/sct,371883000,Outpatient procedure (procedure),,,,,,,,,,,,,,,,, +,"99, Unknown",,https://snomed.info/sct,32485007,Hospital admission (procedure),,,,,,,,,,,,,,,,, +dates_admdate,,,,,,+,,,,,,,,,,,,,,,, +dates_admtime,,,,,,+,,,,,,,,,,,,,,,, +outco_denguediag,"1, Yes",,,,,,,https://snomed.info/sct,38362002,Dengue (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,,,,,,,, +,"0, No",,,,,,,,,,,,,,,,,,,,, +,"99, Unknown",,,,,,,https://snomed.info/sct,261665006,Unknown (qualifier value),,,,,,,,,,,, +outco_denguediag_main,,,,,,,,,,,,,,,,,https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,, +outco_denguediag_class,"1, Uncomplicated dengue",,,,,,,,,,,,,https://snomed.info/sct,722862003,Dengue without warning signs (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,, +,"2, Dengue with warning signs",,,,,,,,,,,,,https://snomed.info/sct,722863008,Dengue with warning signs (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,, +,"3, Severe dengue",,,,,,,,,,,,,https://snomed.info/sct,20927009,Dengue hemorrhagic fever (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,, +outco_secondiag_oth,,,,,,,,,,,,,,,,,https://snomed.info/sct,85097005,Secondary diagnosis (contextual qualifier) (qualifier value),,, +outco_date,,,,,,,,,,,,,,,,,,,,,, +outco_outcome,"1, Discharged alive",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,371827001,Patient discharged alive (finding) +,"2, Still hospitalised",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,32485007,Hospital admission (procedure) +,"3, Transfer to other facility",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,306685000,Discharge to establishment (procedure) +,"4, Death",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,419099009,Dead (finding) +,"5, Palliative care",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,306237005,Referral to palliative care service (procedure) +,"6, Discharged against medical advice",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,225928004,Patient self-discharge against medical advice (procedure) +,"7, Alive, not admitted",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,371827001,Patient discharged alive (finding) \ No newline at end of file diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..8b9c183 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,14 @@ +from fhirflat.ingest import load_data +from fhirflat.resources.encounter import Encounter +import pandas as pd + + +def test_load_data(): + load_data( + "tests/dummy_data/encounter_dummy_data_single.csv", + "tests/dummy_data/encounter_dummy_mapping.csv", + Encounter, + "encounter_ingestion_single", + ) + + pd.read_parquet("encounter_ingestion_single.parquet")