Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft attempt at ingestion using mapping file #24

Merged
merged 21 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b7f0848
Draft attempt at ingestion using mapping file
pipliggins May 8, 2024
dd61500
Ignore duplicate keys if values are the same
pipliggins May 13, 2024
a80c963
Improve single row 1:1 test without output comparison
pipliggins May 13, 2024
7a456ed
Multiple rows of encounter can be read in & out
pipliggins May 14, 2024
21e777b
Draft one-to-many conversion for observation
pipliggins May 14, 2024
cfadee9
Update overwritten cleanup() func in remaining classes
pipliggins May 15, 2024
b591be9
Start condensing ingestion code
pipliggins May 15, 2024
c142643
Create generic data conversion function for users
pipliggins May 15, 2024
6f9f4ad
Remove load_data functions
pipliggins May 15, 2024
e352003
Make fhirflat installable
pipliggins May 15, 2024
464dd67
Allow mappings from google sheets
pipliggins May 15, 2024
a49d78b
Update test workflow for package
pipliggins May 15, 2024
de6c177
Allow lists to be created during ingestion.
pipliggins May 17, 2024
234fd09
Improve references
pipliggins May 20, 2024
c524efe
Add race extension
pipliggins May 20, 2024
9fa116f
Misc fixes, add presenceAbsence and prespecifiedQuery extensions
pipliggins May 20, 2024
60e943b
Misc updates, now passes private checks on dengue data subset
pipliggins May 22, 2024
9bf774c
Fix some typehinting errors
pipliggins May 22, 2024
dd4afe9
Update init file
pipliggins May 22, 2024
bef7c35
Update some relative imports and fix different types test warning
pipliggins May 22, 2024
08eda15
Fix more types
pipliggins May 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
python-version: "3.11"
- name: Install dependencies
run: |
python3 -m pip install -r requirements.txt
python3 -m pip install '.[test]'
- name: Test with pytest
run: |
python3 -m pytest --cov
Expand Down
14 changes: 14 additions & 0 deletions fhirflat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .resources.condition import Condition
abhidg marked this conversation as resolved.
Show resolved Hide resolved
from .resources.encounter import Encounter
from .resources.immunization import Immunization
from .resources.location import Location
from .resources.medicationadministration import MedicationAdministration
from .resources.medicationstatement import MedicationStatement
from .resources.observation import Observation
from .resources.organization import Organization
from .resources.patient import Patient
from .resources.procedure import Procedure
from .resources.researchsubject import ResearchSubject
from .resources.specimen import Specimen

from .ingest import convert_data_to_flat
53 changes: 45 additions & 8 deletions fhirflat/flat2fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,35 @@ def create_codeable_concept(
old_dict: dict[str, list[str] | str], name: str
) -> dict[str, list[str]]:
"""Re-creates a codeableConcept structure from the FHIRflat representation."""

# for reading in from ingestion pipeline
if name + ".code" in old_dict and name + ".system" in old_dict:
code = old_dict[name + ".code"]
if isinstance(code, list) and len(code) > 1:
new_dict = {"coding": []}
for system, code, name in zip(
old_dict[name + ".system"], code, old_dict[name + ".text"]
):
formatted_code = code if isinstance(code, str) else str(int(code))
display = name

subdict = {"system": system, "code": code, "display": display}

new_dict["coding"].append(subdict)
else:
formatted_code = code if isinstance(code, str) else str(int(code))
new_dict = {
"coding": [
{
"system": old_dict[name + ".system"],
"code": formatted_code,
"display": old_dict[name + ".text"],
}
]
}
return new_dict

# From FHIRflat file
codes = old_dict.get(name + ".code")

if codes is None:
Expand Down Expand Up @@ -60,9 +89,14 @@ def createQuantity(df, group):
for attribute in df.keys():
attr = attribute.split(".")[-1]
if attr == "code":
system, code = df[group + ".code"].split("|")
quant["code"] = code
quant["system"] = system
if group + ".system" in df.keys():
# reading in from ingestion pipeline
quant["code"] = df[group + ".code"]
quant["system"] = df[group + ".system"]
else:
system, code = df[group + ".code"].split("|")
quant["code"] = code
quant["system"] = system
else:
quant[attr] = df[group + "." + attr]

Expand Down Expand Up @@ -193,15 +227,18 @@ def expand_concepts(data: dict, data_class: type[_DomainResource]) -> dict:
if all(isinstance(v, dict) for v in v_dict.values()):
# coming back out of nested recursion
expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict}
if data_class.schema()["properties"][k].get("type") == "array":
if k == "extension":
expanded[k] = [v for v in expanded[k].values()]
else:
expanded[k] = [expanded[k]]

else:
expanded[k] = set_datatypes(k, v_dict, group_classes[k])

if isinstance(data_class, list):
continue
elif data_class.schema()["properties"][k].get("type") == "array":
if k == "extension":
expanded[k] = [v for v in expanded[k].values()]
else:
expanded[k] = [expanded[k]]

dense_cols = {
k: k.removesuffix("_dense") for k in data.keys() if k.endswith("_dense")
}
Expand Down
282 changes: 282 additions & 0 deletions fhirflat/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
"""
Stores the main functions for converting clinical data (initally from RedCap-ARCH) to
FHIRflat.
"""

import pandas as pd
import numpy as np
import warnings
import os
from math import isnan
from fhirflat.util import get_local_resource

# 1:1 (single row, single resource) mapping: Patient, Encounter
# 1:M (single row, multiple resources) mapping: Observation, Condition, Procedure, ...

"""
TODO
* sort out how to choose ID's e.g. location within encounter etc
* cope with 'if' statements - e.g. for date overwriting.
* deal with how to check if lists are appropriate when adding multiple values to a
single field - list options.
* Consider using pandarallel (https://pypi.org/project/pandarallel/) to parallelize
the apply function, particularly for one to many mappings.
"""


def find_field_value(row, response, mapp, raw_data=None):
"""
Returns the data for a given field, given the mapping.
For one to many resources the raw data is provided to allow for searching for other
fields than in the melted data.
"""
if mapp == "<FIELD>":
return response
elif "+" in mapp:
mapp = mapp.split("+")
results = [find_field_value(row, response, m, raw_data) for m in mapp]
results = [str(x) for x in results if x == x]
return " ".join(results) if "/" not in results[0] else "".join(results)
elif "if not" in mapp:
mapp = mapp.replace(" ", "").split("ifnot")
abhidg marked this conversation as resolved.
Show resolved Hide resolved
results = [find_field_value(row, response, m, raw_data) for m in mapp]
x, y = results
if isinstance(y, float):
return x if isnan(y) else None
else:
return x if not y else None
elif "<" in mapp:
col = mapp.lstrip("<").rstrip(">")
try:
return row[col]
except KeyError:
return raw_data.loc[row["index"], col]
else:
return mapp


def create_dict_from_row(row, map_df):
"""
Iterates through the columns of the row, applying the mapping to each columns
and produces a fhirflat-like dictionary to initialize the resource object.
"""

result = {}
for column in row.index:
if column in map_df.index.get_level_values(0):
response = row[column]
if pd.notna(response): # Ensure there is a response to map
try:
# Retrieve the mapping for the given column and response
if pd.isna(map_df.loc[column].index).all():
mapping = map_df.loc[(column, np.nan)].dropna()
else:
mapping = map_df.loc[(column, str(int(response)))].dropna()
snippet = {
k: (
v
if "<" not in str(v)
else find_field_value(row, response, v)
)
for k, v in mapping.items()
}
except KeyError:
# No mapping found for this column and response despite presence
# in mapping file
warnings.warn(
f"No mapping for column {column} response {response}",
UserWarning,
)
continue
else:
continue
else:
raise ValueError(f"Column {column} not found in mapping file")
duplicate_keys = set(result.keys()).intersection(snippet.keys())
if not duplicate_keys:
result = result | snippet
else:
if all(
result[key] == snippet[key] for key in duplicate_keys
): # Ignore duplicates if they are the same
continue
elif all(result[key] is None for key in duplicate_keys):
result.update(snippet)
else:
for key in duplicate_keys:
if isinstance(result[key], list):
result[key].append(snippet[key])
else:
result[key] = [result[key], snippet[key]]
return result


def create_dict_from_cell(row, full_df, map_df):
"""
Iterates through the columns of the row, applying the mapping to each columns
pipliggins marked this conversation as resolved.
Show resolved Hide resolved
and produces a fhirflat-like dictionary to initialize the resource object.
"""

column = row["column"]
response = row["value"]
if pd.notna(response): # Ensure there is a response to map
try:
# Retrieve the mapping for the given column and response
if pd.isna(map_df.loc[column].index).all():
mapping = map_df.loc[(column, np.nan)].dropna()
else:
mapping = map_df.loc[(column, str(int(response)))].dropna()
snippet = {
k: (
v
if "<" not in str(v)
else find_field_value(row, response, v, raw_data=full_df)
)
for k, v in mapping.items()
}
return snippet
except KeyError:
# No mapping found for this column and response despite presence
# in mapping file
warnings.warn(
f"No mapping for column {column} response {response}",
UserWarning,
)
return None


def create_dictionary(
data: pd.DataFrame, map_file: pd.DataFrame, resource: str, one_to_one=False
) -> pd.DataFrame:
"""
Given a data file and a single mapping file for one FHIR resource type,
returns a single column dataframe with the mapped data in a FHIRflat-like
format, ready for further processing.

Parameters
----------
data: pd.DataFrame
The data file containing the clinical data.
map_file: pd.DataFrame
The mapping file containing the mapping of the clinical data to the FHIR
resource.
resource: str
The name of the resource being mapped.
one_to_one: bool
Whether the resource should be mapped as one-to-one or one-to-many.
"""

data = pd.read_csv(data, header=0)
map_df = pd.read_csv(map_file, header=0)

# setup the data -----------------------------------------------------------
relevant_cols = map_df["redcap_variable"].dropna().unique()
filtered_data = data.loc[:, data.columns.isin(relevant_cols)].copy()

if filtered_data.empty:
warnings.warn(f"No data found for the {resource} resource.", UserWarning)
return None

if not one_to_one:
filtered_data = filtered_data.reset_index()
melted_data = filtered_data.melt(id_vars="index", var_name="column")

# set up the mappings -------------------------------------------------------

# Fills the na redcap variables with the previous value
map_df["redcap_variable"] = map_df["redcap_variable"].ffill()
abhidg marked this conversation as resolved.
Show resolved Hide resolved

# strips the text answers out of the redcap_response column
map_df["redcap_response"] = map_df["redcap_response"].apply(
lambda x: x.split(",")[0] if isinstance(x, str) else x
)

# Set multi-index for easier access
map_df.set_index(["redcap_variable", "redcap_response"], inplace=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat!


# Generate the flat_like dictionary
if one_to_one:
filtered_data["flat_dict"] = filtered_data.apply(
create_dict_from_row, args=[map_df], axis=1
)
return filtered_data
else:
melted_data["flat_dict"] = melted_data.apply(
create_dict_from_cell, args=[data, map_df], axis=1
)
return melted_data["flat_dict"].to_frame()


def convert_data_to_flat(
data: str,
folder_name: str,
mapping_files_types: tuple[dict, dict] | None = None,
sheet_id: str | None = None,
):
"""
Takes raw clinical data (currently assumed to be a one-row-per-patient format like
RedCap exports) and produces a folder of FHIRflat files, one per resource. Takes
either local mapping files, or a Google Sheet ID containing the mapping files.

Parameters
----------
data: str
The path to the raw clinical data file.
folder_name: str
The name of the folder to store the FHIRflat files.
mapping_files_types: tuple[dict, dict] | None
A tuple containing two dictionaries, one with the mapping files for each
resource type and one with the mapping type (either one-to-one or one-to-many)
for each resource type.
sheet_id: str | None
The Google Sheet ID containing the mapping files. The first sheet must contain
the mapping types - one column listing the resource name, and another describing
whether the mapping is one-to-one or one-to-many. The subsequent sheets must
be named by resource, and contain the mapping for that resource.
"""

if not mapping_files_types and not sheet_id:
raise TypeError("Either mapping_files_types or sheet_id must be provided")

if not os.path.exists(folder_name):
os.makedirs(folder_name)

if mapping_files_types:
mappings, types = mapping_files_types
else:
sheet_link = (
f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv"
)

df_types = pd.read_csv(sheet_link, header=0, index_col="Resources")
types = dict(
zip(
df_types.index,
df_types["Resource Type"],
)
)
sheet_keys = {r: df_types.loc[r, "Sheet ID"] for r in types.keys()}
mappings = {
get_local_resource(r): sheet_link + f"&gid={i}"
for r, i in sheet_keys.items()
}

for resource, map_file in mappings.items():

t = types[resource.__name__]
if t == "one-to-one":
df = create_dictionary(data, map_file, resource.__name__, one_to_one=True)
if df is None:
continue
elif t == "one-to-many":
df = create_dictionary(data, map_file, resource.__name__, one_to_one=False)
if df is None:
continue
else:
df = df.dropna().reset_index(drop=True)
else:
raise ValueError(f"Unknown mapping type {t}")

resource.ingest_to_flat(
df, os.path.join(folder_name, resource.__name__.lower())
)
Loading
Loading