Skip to content

Commit

Permalink
fix: don't load all of an i2b2 file into memory
Browse files Browse the repository at this point in the history
The primary change in this commit is to stop loading i2b2 input files
all at once, but rather stream them in, in chunks determined by the
--batch-size parameter.

But this commit also includes several small fixes:
- Fixes location of MS tool during CI
- Adds comma-formatting to a lot of progress-count prints
- Continues ETL even if cTAKES can't process one message (just logs
  the error instead)
- Changes default batch size from 10M to 200k. This works more
  reliably for small-memory (8G) machines. The previous number was
  optimized for the size of the resulting parquet files. This number
  is optimized for memory during the run, which feels like a safer
  default.
- When using --input-format=ndjson and pointing at a local folder,
  we now still use a temporary folder and copy in just the resource
  ndjson files we want. This is to speed up the MS deid tool, so it
  doesn't have to read all possible ndjson inputs.
- Add better progress messaging while reading i2b2 files.
- Separate out race & ethnicity from i2b2, which combines them
  • Loading branch information
mikix committed Dec 27, 2022
1 parent f657110 commit 7a5ef75
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
--runtime=linux-x64 \
--configuration=Release \
-p:PublishSingleFile=true \
--output=~/.local/bin \
--output=$HOME/.local/bin \
mstool/FHIR/src/Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool
- name: Test with pytest
Expand Down
15 changes: 0 additions & 15 deletions cumulus/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from urllib.parse import urlparse

import fsspec
import pandas
from fhirclient.models.resource import Resource
from fhirclient.models.fhirabstractbase import FHIRAbstractBase

Expand Down Expand Up @@ -58,20 +57,6 @@ def find_by_name(folder, path_contains='filemask', progress_bar=1000) -> list:
return found


def extract_csv(path_csv: str, sample=1.0) -> pandas.DataFrame:
"""
:param path_csv: /path/to/file.csv
:param sample: %percentage of file to read
:return: pandas Dataframe
"""
logging.info('Reading csv %s ...', path_csv)
df = pandas.read_csv(path_csv, dtype=str, na_filter=False)
if sample != 1.0:
df = df.sample(frac=sample)
logging.info('Done reading %s .', path_csv)
return df


def fake_id(category: str) -> str:
"""
Randomly generate a linked Patient identifier
Expand Down
2 changes: 1 addition & 1 deletion cumulus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def success_rate(self, show_every=1000 * 10) -> float:
prct = float(self.success) / float(self.attempt)

if 0 == self.attempt % show_every:
print(f'success = {self.success} rate % {prct}')
print(f'success = {self.success:,} rate % {prct}')

return prct

Expand Down
6 changes: 5 additions & 1 deletion cumulus/ctakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def symptoms(cache: store.Root, docref: DocumentReference) -> List[Observation]:
logging.warning('No text/plain content for symptoms') # ideally would print identifier, but it's PHI...
return []

ctakes_json = extract(cache, physician_note)
try:
ctakes_json = extract(cache, physician_note)
except Exception as exc: # pylint: disable=broad-except
logging.error('Could not extract symptoms: %s', exc)
return []

observations = []
for match in ctakes_json.list_sign_symptom(ctakesclient.typesystem.Polarity.pos):
Expand Down
3 changes: 2 additions & 1 deletion cumulus/deid/mstool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess # nosec: B404
import sys

from cumulus import errors
from cumulus import common, errors

MSTOOL_CMD = 'Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool'

Expand All @@ -23,6 +23,7 @@ def run_mstool(input_dir: str, output_dir: str) -> None:
The input must be in ndjson format. And the output will be as well.
"""
common.print_header('De-identifying data...')
try:
# The following call only points at some temporary directory names (which we generate),
# so it should be safe, and we thus disable the security linter warning about validating inputs.
Expand Down
6 changes: 3 additions & 3 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,9 @@ def main(args: List[str]):
help='input format (default is ndjson)')
parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'],
help='output format (default is parquet)')
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000,
parser.add_argument('--batch-size', type=int, metavar='SIZE', default=200000,
help='how many entries to process at once and thus '
'how many to put in one output file (default is 10M)')
'how many to put in one output file (default is 200k)')
parser.add_argument('--comment', help='add the comment to the log file')
parser.add_argument('--s3-region', help='if using S3 paths (s3://...), this is their region')
parser.add_argument('--s3-kms-key', help='if using S3 paths (s3://...), this is the KMS key ID to use')
Expand Down Expand Up @@ -397,7 +397,7 @@ def main(args: List[str]):
job_datetime = common.datetime_now() # grab timestamp before we do anything

if args.input_format == 'i2b2':
config_loader = loaders.I2b2Loader(root_input)
config_loader = loaders.I2b2Loader(root_input, args.batch_size)
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks)

Expand Down
20 changes: 11 additions & 9 deletions cumulus/loaders/fhir/fhir_ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ def load_all(self, resources: List[str]) -> tempfile.TemporaryDirectory:
if self.root.protocol in ['http', 'https']:
return self._load_from_bulk_export(resources)

# Are we reading from a local directory?
if self.root.protocol == 'file':
# We can actually just re-use the input dir without copying the files, since everything is local.
class Dir:
name: str = self.root.path
return Dir() # once we drop python3.7, we can have load_all return a Protocol for proper typing

# Fall back to copying from a remote directory (like S3 buckets) to a local one
# Copy the resources we need from the remote directory (like S3 buckets) to a local one.
#
# We do this even if the files are local, because the next step in our pipeline is the MS deid tool,
# and it will just process *everything* in a directory. So if there are other *.ndjson sitting next to our
# target resources, they'll get processed by the MS tool and that slows down running a single task with
# "--task" a lot.
#
# This uses more disk space temporarily (copied files will get deleted once the MS tool is done and this
# TemporaryDirectory gets discarded), but that seems reasonable.
tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
self.root.get(self.root.joinpath('*.ndjson'), f'{tmpdir.name}/')
for resource in resources:
self.root.get(self.root.joinpath(f'*{resource}*.ndjson'), f'{tmpdir.name}/')
return tmpdir

def _load_from_bulk_export(self, resources: List[str]) -> tempfile.TemporaryDirectory:
Expand Down
75 changes: 75 additions & 0 deletions cumulus/loaders/i2b2/external_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This file contains a mapping of various external coding systems to the concepts they represent


# PHIN VADS 1000-9, Race & Ethnicity - CDC
# https://phinvads.cdc.gov/vads/ViewCodeSystemConcept.action?oid=2.16.840.1.113883.6.238&code=1000-9
# https://hl7.org/fhir/us/core/StructureDefinition-us-core-race.html
CDC_RACE = {
'White': ('urn:oid:2.16.840.1.113883.6.238', '2106-3'),
'Black or African American': ('urn:oid:2.16.840.1.113883.6.238', '2054-5'),
'American Indian or Alaska Native': ('urn:oid:2.16.840.1.113883.6.238', '1002-5'),
'Asian': ('urn:oid:2.16.840.1.113883.6.238', '2028-9'),
'Native Hawaiian or Other Pacific Islander': ('urn:oid:2.16.840.1.113883.6.238', '2076-8'),
'Other': ('urn:oid:2.16.840.1.113883.6.238', '2131-1'),
'Declined to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'),
'Unable to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'),
'Unknown': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'UNK'),
}

# https://hl7.org/fhir/us/core/StructureDefinition-us-core-ethnicity.html
CDC_ETHNICITY = {
'Hispanic or Latino': ('urn:oid:2.16.840.1.113883.6.238', '2135-2'),
'Not Hispanic or Latino': ('urn:oid:2.16.840.1.113883.6.238', '2186-5'),
'Declined to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'),
'Unable to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'),
'Unknown': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'UNK'),
}

# FHIR AdministrativeGender code (not a full gender spectrum, but quite limited)
# https://www.hl7.org/fhir/valueset-administrative-gender.html
# Anything not in this dictionary maps should map to 'other'
FHIR_GENDER = {
'F': 'female',
'M': 'male',
'U': 'unknown',
}


# BCH internal lab codes mapping to international covid-19 codes
# system: http://loinc.org
LOINC_COVID_LAB_TESTS = {
'LAB:1043473617': '94500-6',
'LAB:1044804335': '94500-6',
'LAB:1044704735': '94500-6',
'LAB:1134792565': '95406-5',
'LAB:1148157467': '95406-5',
'LAB:467288722': '85477-8',
'LAB:152831642': '85476-0',
'LAB:467288694': '85478-6',
'LAB:467288700': '85479-4',
'LAB:13815125': '62462-7'
}


# PHIN VADS General adjectival modifier (qualifier value) {106234000 , SNOMED-CT }
# Subset of codes related to evaluating lab results
# system: http://snomed.info/sct
SNOMED_LAB_RESULT = {
'Positive': '10828004',
'Negative': '260385009',
'Absent': '272519000'
}


# PHIN VADS Admission statuses {308277006, SNOMED-CT }
# Subset of codes related to means of admission
# system: http://snomed.info/sct
# https://terminology.hl7.org/5.0.0/ValueSet-v3-ActEncounterCode.html
# Other terms seen:
# - "Recurring Outpatient Series" (AMB?)
# - "Day Surgery" (AMB?)
SNOMED_ADMISSION = {
'Emergency': 'EMER',
'Inpatient': 'IMP',
'Outpatient': 'AMB',
}
64 changes: 27 additions & 37 deletions cumulus/loaders/i2b2/extract.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,58 @@
"""Read files into data structures"""

from typing import List
import logging
from typing import Iterator

import pandas
from cumulus import common

from cumulus.loaders.i2b2.schema import ObservationFact, PatientDimension, VisitDimension


def extract_csv(path_csv: str, sample=1.0) -> pandas.DataFrame:
def extract_csv(path_csv: str, batch_size: int) -> Iterator[dict]:
"""
:param path_csv: /path/to/i2b2_formatted_file.csv
:param sample: %percentage of file to read
:return: pandas Dataframe
:param batch_size: how many entries to load into memory at once
:return: an iterator over each row from the file
"""
return common.extract_csv(path_csv, sample)
print(f'Reading csv {path_csv}...')
count = 0
with pandas.read_csv(path_csv, dtype=str, na_filter=False, chunksize=batch_size) as reader:
for chunk in reader:
print(f' Read {count:,} entries...')
for _, row in chunk.iterrows():
yield dict(row)
count += batch_size
print(f'Done reading {path_csv} .')


def extract_csv_observation_facts(path_csv: str,
sample=1.0) -> List[ObservationFact]:
def extract_csv_observation_facts(path_csv: str, batch_size: int) -> Iterator[ObservationFact]:
"""
:param path_csv: /path/to/file.csv
:param sample: %percentage of file to read
:param batch_size: how many entries to load into memory at once
:return: i2b2 ObservationFact table
"""
df = extract_csv(path_csv, sample)

logging.info('Transforming text into List[ObservationFact]')
facts = []
for _, row in df.iterrows():
facts.append(ObservationFact(row))

logging.info('Ready List[ObservationFact]')
return facts
for row in extract_csv(path_csv, batch_size):
yield ObservationFact(row)


def extract_csv_patients(path_csv: str, sample=1.0) -> List[PatientDimension]:
def extract_csv_patients(path_csv: str, batch_size: int) -> Iterator[PatientDimension]:
"""
:param path_csv: /path/to/file.csv
:param sample: %percentage of file to read
:param batch_size: how many entries to load into memory at once
:return: List i2b2 patient dimension table
"""
df = extract_csv(path_csv, sample)

logging.info('Transforming text into List[PatientDimension]')
patients = []
for _, row in df.iterrows():
patients.append(PatientDimension(row))

logging.info('Ready List[PatientDimension]')
return patients
for row in extract_csv(path_csv, batch_size):
yield PatientDimension(row)


def extract_csv_visits(path_csv: str, sample=1.0) -> List[VisitDimension]:
def extract_csv_visits(path_csv: str, batch_size: int) -> Iterator[VisitDimension]:
"""
:param path_csv: /path/to/file.csv
:param sample: %percentage of file to read
:param batch_size: how many entries to load into memory at once
:return: List i2b2 visit dimension table
"""
df = extract_csv(path_csv, sample)

logging.info('Transforming text into List[VisitDimension]')
visits = []
for _, row in df.iterrows():
visits.append(VisitDimension(row))

logging.info('Ready List[VisitDimension]')
return visits
for row in extract_csv(path_csv, batch_size):
yield VisitDimension(row)
21 changes: 16 additions & 5 deletions cumulus/loaders/i2b2/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from fhirclient.models.resource import Resource

from cumulus import store
from cumulus.loaders.base import Loader
from cumulus.loaders.i2b2 import extract, schema, transform
from cumulus.loaders.i2b2.oracle import extract as oracle_extract
Expand All @@ -31,6 +32,15 @@ class I2b2Loader(Loader):
- csv_visit
"""

def __init__(self, root: store.Root, batch_size: int):
"""
Initialize a new I2b2Loader class
:param root: the base location to read data from
:param batch_size: the most entries to keep in memory at once
"""
super().__init__(root)
self.batch_size = batch_size

def load_all(self, resources: List[str]) -> tempfile.TemporaryDirectory:
if self.root.protocol in ['tcp']:
return self._load_all_from_oracle(resources)
Expand Down Expand Up @@ -117,13 +127,14 @@ def _load_all_from_csv(self, resources: List[str]) -> tempfile.TemporaryDirector
return self._load_all_with_extractors(
resources,
conditions=partial(extract.extract_csv_observation_facts,
os.path.join(path, 'observation_fact_diagnosis.csv')),
os.path.join(path, 'observation_fact_diagnosis.csv'), self.batch_size),
observations=partial(extract.extract_csv_observation_facts,
os.path.join(path, 'observation_fact_lab_views.csv')),
os.path.join(path, 'observation_fact_lab_views.csv'), self.batch_size),
documentreferences=partial(extract.extract_csv_observation_facts,
os.path.join(path, 'observation_fact_notes.csv')),
patients=partial(extract.extract_csv_patients, os.path.join(path, 'patient_dimension.csv')),
encounters=partial(extract.extract_csv_visits, os.path.join(path, 'visit_dimension.csv')),
os.path.join(path, 'observation_fact_notes.csv'), self.batch_size),
patients=partial(extract.extract_csv_patients, os.path.join(path, 'patient_dimension.csv'),
self.batch_size),
encounters=partial(extract.extract_csv_visits, os.path.join(path, 'visit_dimension.csv'), self.batch_size),
)

###################################################################################################################
Expand Down
2 changes: 1 addition & 1 deletion cumulus/loaders/i2b2/oracle/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def sql_visit() -> str:
import_date = format_date('IMPORT_DATE')

cols_dates = f'{start_date}, {end_date}, {import_date}, LENGTH_OF_STAY'
cols = 'ENCOUNTER_NUM, PATIENT_NUM, LOCATION_CD, INOUT_CD, LOCATION_CD, ' \
cols = 'ENCOUNTER_NUM, PATIENT_NUM, LOCATION_CD, INOUT_CD, ' \
f'{cols_dates}'
return f'select {cols} \n from {Table.visit.value}'

Expand Down
Loading

0 comments on commit 7a5ef75

Please sign in to comment.