Skip to content

Commit

Permalink
Draft attempt at ingestion using mapping file
Browse files Browse the repository at this point in the history
- Reads data into class then back out again, probably not very efficient.
- Not tested with multiple rows of input data
- Can't cope with multiple columns trying to set the same variable (need logic in mapping file for this)
  • Loading branch information
pipliggins committed May 8, 2024
1 parent 5b65ced commit a9f8a4b
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 10 deletions.
28 changes: 23 additions & 5 deletions fhirflat/flat2fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ def create_codeable_concept(
old_dict: dict[str, list[str] | str], name: str
) -> dict[str, list[str]]:
"""Re-creates a codeableConcept structure from the FHIRflat representation."""

# for reading in from ingestion pipeline
if (name + ".code" and name + ".system") in old_dict:
new_dict = {
"coding": [
{
"system": old_dict[name + ".system"],
"code": str(int(old_dict[name + ".code"])),
"display": old_dict[name + ".text"],
}
]
}
return new_dict

# From FHIRflat file
codes = old_dict.get(name + ".code")

if codes is None:
Expand Down Expand Up @@ -193,15 +208,18 @@ def expand_concepts(data: dict, data_class: type[_DomainResource]) -> dict:
if all(isinstance(v, dict) for v in v_dict.values()):
# coming back out of nested recursion
expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict}
if data_class.schema()["properties"][k].get("type") == "array":
if k == "extension":
expanded[k] = [v for v in expanded[k].values()]
else:
expanded[k] = [expanded[k]]

else:
expanded[k] = set_datatypes(k, v_dict, group_classes[k])

if isinstance(data_class, list):
continue
elif data_class.schema()["properties"][k].get("type") == "array":
if k == "extension":
expanded[k] = [v for v in expanded[k].values()]
else:
expanded[k] = [expanded[k]]

dense_cols = {
k: k.removesuffix("_dense") for k in data.keys() if k.endswith("_dense")
}
Expand Down
124 changes: 124 additions & 0 deletions fhirflat/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""
Stores the main functions for converting clinical data (initally from RedCap-ARCH) to
FHIRflat.
Assumes two files are provided: one with the clinical data and one containing the
mappings. PL: Actually, maybe rather than the mappings it's either a file or a
dictionary showing the location of each mapping file (one per resource type).
TODO: Eventually, this ahould link to a google sheet file that contains the mappings
"""

import pandas as pd
import numpy as np

# 1:1 (single row, single resource) mapping: Patient, Encounter
# 1:M (single row, multiple resources) mapping: Observation, Condition, Procedure, ...

"""
1. Create one input-data dataframe per resource type, using the column names from
the mapping file
2. For 1:1 mappings: use an apply function to create a fhir-like (or maybe
fhir-flat-like?) input data dictionary in one column, then a resource object in another.
Then follow format similar to fhir_file_to_flat to create the flat representation.
3. For 1:M mappings: (PL: not sure about this) Group columns by single_resource column
(to be created in the mapping file), explode the dataframe by these groups, then follow
the 1:1 process.
"""

"""
TODO
* cope with 'if' statements - e.g. for date overwriting.
* deal with duplicates/how to add multiple values to a single field
"""


def create_dictionary(data, map_file):
"""
Given a data file and a single mapping file for one FHIR resource type,
returns a single column dataframe with the mapped data in a FHIRflat-like
format, ready for further processing.
"""

data = pd.read_csv(data, header=0)
map_df = pd.read_csv(map_file, header=0)

filtered_data = data[map_df["redcap_variable"].dropna().unique()]

# Fills the na redcap variables with the previous value
map_df["redcap_variable"] = map_df["redcap_variable"].ffill()

# strips the text answers out of the redcap_response column
map_df["redcap_response"] = map_df["redcap_response"].apply(
lambda x: x.split(",")[0] if isinstance(x, str) else x
)

# Set multi-index for easier access
map_df.set_index(["redcap_variable", "redcap_response"], inplace=True)

def create_dict_from_row(row):
"""
Iterates through the columns of the row, applying the mapping to each columns
and produces a fhirflat-like dictionary to initialize the resource object.
"""

def find_field_value(row, response, map):
"""Returns the data for a given field, given the map."""
if map == "<FIELD>":
return response
elif "+" in map:
map = map.split("+")
results = [find_field_value(row, response, m) for m in map]
results = [x for x in results if x == x]
return "".join(results)
else:
col = map.lstrip("<").rstrip(">")
return row[col]

result = {}
for column in row.index:
if column in map_df.index.get_level_values(0):
response = row[column]
if pd.notna(response): # Ensure there is a response to map
try:
# Retrieve the mapping for the given column and response
if pd.isna(map_df.loc[column].index).all():
mapping = map_df.loc[(column, np.nan)].dropna()
else:
mapping = map_df.loc[(column, str(response))].dropna()
snippet = {
k: (
v
if "<" not in str(v)
else find_field_value(row, response, v)
)
for k, v in mapping.items()
}
except KeyError:
# No mapping found for this column and response
result[column] = f"No mapping for response {response}"
else:
continue
else:
raise ValueError(f"Column {column} not found in mapping file")
if not set(result.keys()).intersection(snippet.keys()):
result = result | snippet
else:
raise ValueError(
"Duplicate keys in mapping:"
f" {set(result.keys()).intersection(snippet.keys())}"
)
return result

# Apply the function across the DataFrame rows
filtered_data["flat_dict"] = filtered_data.apply(create_dict_from_row, axis=1)
return filtered_data


def load_data(data, mapping_files, resource_type, file_name):

df = create_dictionary(data, mapping_files)

resource_type.ingest_to_flat(df, file_name)
40 changes: 37 additions & 3 deletions fhirflat/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def flat_fields(cls) -> list[str]:
return [x for x in cls.elements_sequence() if x not in cls.flat_exclusions]

@classmethod
def cleanup(cls, data: JsonString) -> FHIRFlatBase:
def cleanup(cls, data: JsonString | dict, json_data=True) -> FHIRFlatBase:
"""
Load data into a dictionary-like structure, then
apply resource-specific changes and unpack flattened data
Expand Down Expand Up @@ -73,6 +73,31 @@ def from_flat(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]:
else:
return list(df["fhir"])

@classmethod
def ingest_to_flat(cls, data: pd.DataFrame, filename: str):
"""
Takes a pandas dataframe and populates the resource with the data.
data: pd.DataFrame
Pandas dataframe containing the data
Returns
-------
FHIRFlatBase or list[FHIRFlatBase]
"""

# Creates a columns of FHIR resource instances
data["fhir"] = data["flat_dict"].apply(
lambda x: cls.cleanup(x, json_data=False)
)

data["fhir_flat"] = data["fhir"].apply(lambda x: x.to_flat())

# get the flat dataframe out into it's own variable
flat_df = pd.concat(data["fhir_flat"].tolist(), ignore_index=True)

flat_df.to_parquet(f"{filename}.parquet")

@classmethod
def fhir_bulk_import(cls, file: str) -> list[FHIRFlatBase]:
"""
Expand Down Expand Up @@ -130,11 +155,17 @@ def fhir_file_to_flat(cls, source_file: str, output_name: str | None = None):
flat_rows.append(fhir2flat(resource, lists=list_resources))

df = pd.concat(flat_rows)

# remove required attributes now it's in the flat representation
for attr in cls.flat_defaults:
df.drop(list(df.filter(regex=attr)), axis=1, inplace=True)

return df.to_parquet(output_name)

def to_flat(self, filename: str) -> None:
def to_flat(self, filename: str | None = None) -> None:
"""
Generates a FHIRflat parquet file from the resource.
If no file name is provided, returns the pandas dataframe.
filename: str
Name of the parquet file to be generated.
Expand All @@ -159,4 +190,7 @@ def to_flat(self, filename: str) -> None:
for attr in self.flat_defaults:
flat_df.drop(list(flat_df.filter(regex=attr)), axis=1, inplace=True)

return flat_df.to_parquet(filename)
if filename:
return flat_df.to_parquet(filename)
else:
return flat_df
5 changes: 3 additions & 2 deletions fhirflat/resources/encounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ def validate_extension_contents(cls, extensions):
return extensions

@classmethod
def cleanup(cls, data: JsonString) -> Encounter:
def cleanup(cls, data: JsonString | dict, json_data=True) -> Encounter:
"""
Load data into a dictionary-like structure, then
apply resource-specific changes and unpack flattened data
like codeableConcepts back into structured data.
"""
data = orjson.loads(data)
if json_data:
data = orjson.loads(data)

for field in [
"subject",
Expand Down
2 changes: 2 additions & 0 deletions tests/dummy_data/encounter_dummy_data_single.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
usubjid,visitid,dates_enrolment,dates_adm,dates_admdate,dates_admtime,non_encounter_field,outco_denguediag,outco_denguediag_main,outco_denguediag_class,outco_not_dengue,outco_secondiag_oth,outco_date,outco_outcome
2,11,,1,2021-04-01,,fish,1,,2,,,2021-04-10,1
25 changes: 25 additions & 0 deletions tests/dummy_data/encounter_dummy_mapping.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
redcap_variable,redcap_response,subject,class.system,class.code,class.text,actualPeriod.start,actualPeriod.end,reason.value.concept.system,reason.value.concept.code,reason.value.concept.text,reason.use.system,reason.use.code,reason.use.text,diagnosis.condition.concept.system,diagnosis.condition.concept.code,diagnosis.condition.concept.text,diagnosis.use.system,diagnosis.use.code,diagnosis.use.text,admission.dischargeDisposition.system,admission.dischargeDisposition.code,admission.dischargeDisposition.text
usubjid,,<FIELD>,,,,,,,,,,,,,,,,,,,,
dates_enrolment,,,,,,<FIELD>,<FIELD>,,,,,,,,,,,,,,,
dates_adm,"1, Yes",,https://snomed.info/sct,32485007,Hospital admission (procedure),,,,,,,,,,,,,,,,,
,"0, No",,https://snomed.info/sct,32485007,Hospital admission (procedure),,,,,,,,,,,,,,,,,
,,,https://snomed.info/sct,371883000,Outpatient procedure (procedure),,,,,,,,,,,,,,,,,
,"99, Unknown",,https://snomed.info/sct,32485007,Hospital admission (procedure),,,,,,,,,,,,,,,,,
dates_admdate,,,,,,<FIELD>+<dates_admtime>,,,,,,,,,,,,,,,,
dates_admtime,,,,,,<dates_admdate>+<FIELD>,,,,,,,,,,,,,,,,
outco_denguediag,"1, Yes",,,,,,,https://snomed.info/sct,38362002,Dengue (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,,,,,,,,
,"0, No",,,,,,,,,,,,,,,,,,,,,
,"99, Unknown",,,,,,,https://snomed.info/sct,261665006,Unknown (qualifier value),,,,,,,,,,,,
outco_denguediag_main,,,,,,,,,,,,,,,,,https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,,
outco_denguediag_class,"1, Uncomplicated dengue",,,,,,,,,,,,,https://snomed.info/sct,722862003,Dengue without warning signs (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,,
,"2, Dengue with warning signs",,,,,,,,,,,,,https://snomed.info/sct,722863008,Dengue with warning signs (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,,
,"3, Severe dengue",,,,,,,,,,,,,https://snomed.info/sct,20927009,Dengue hemorrhagic fever (disorder),https://snomed.info/sct,89100005,Final diagnosis (discharge) (contextual qualifier) (qualifier value),,,
outco_secondiag_oth,,,,,,,,,,,,,,,,,https://snomed.info/sct,85097005,Secondary diagnosis (contextual qualifier) (qualifier value),,,
outco_date,,,,,,,<FIELD>,,,,,,,,,,,,,,,
outco_outcome,"1, Discharged alive",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,371827001,Patient discharged alive (finding)
,"2, Still hospitalised",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,32485007,Hospital admission (procedure)
,"3, Transfer to other facility",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,306685000,Discharge to establishment (procedure)
,"4, Death",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,419099009,Dead (finding)
,"5, Palliative care",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,306237005,Referral to palliative care service (procedure)
,"6, Discharged against medical advice",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,225928004,Patient self-discharge against medical advice (procedure)
,"7, Alive, not admitted",,,,,,,,,,,,,,,,,,,https://snomed.info/sct,371827001,Patient discharged alive (finding)
14 changes: 14 additions & 0 deletions tests/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from fhirflat.ingest import load_data
from fhirflat.resources.encounter import Encounter
import pandas as pd


def test_load_data():
load_data(
"tests/dummy_data/encounter_dummy_data_single.csv",
"tests/dummy_data/encounter_dummy_mapping.csv",
Encounter,
"encounter_ingestion_single",
)

pd.read_parquet("encounter_ingestion_single.parquet")

0 comments on commit a9f8a4b

Please sign in to comment.