Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: don't simply toss extensions #347

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions cumulus_etl/deid/ms-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,10 @@
"fhirVersion": "R4",
"processingErrors": "raise",
"fhirPathRules": [
// Only allow a few known extensions
{"path": "Patient.extension('http://hl7.org/fhir/Profile/us-core#ethnicity')", "method": "keep"}, // Old DSTU1 URL, still out there in the wild: https://www.hl7.org/fhir/DSTU1/us-core.html
{"path": "Patient.extension('http://hl7.org/fhir/Profile/us-core#race')", "method": "keep"}, // Old DSTU1 URL, still out there in the wild: https://www.hl7.org/fhir/DSTU1/us-core.html
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-genderIdentity')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-race')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-sex-for-clinical-use')", "method": "keep"},
{"path": "Patient.extension('http://open.epic.com/FHIR/StructureDefinition/extension/sex-for-clinical-use')", "method": "keep"}, // Epic has used this pre-final-spec URL
{"path": "nodesByName('modifierExtension')", "method": "keep"}, // keep these so we can ignore resources with modifiers we don't understand
{"path": "nodesByType('Extension')", "method": "redact"}, // drop all unknown extensions
// modifierExtension is handled by ETL so that we can skip resources we don't understand
{"path": "nodesByName('modifierExtension')", "method": "keep"},
// extension is handled by ETL so that we can flag the URLs of extensions we strip
{"path": "nodesByName('extension')", "method": "keep"},

// Elements that might be embedded and kept elsewhere -- redact pieces of the whole
{"path": "nodesByType('Attachment').title", "method": "redact"},
Expand Down
151 changes: 136 additions & 15 deletions cumulus_etl/deid/scrubber.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
"""Cleans (de-identifies) a single resource node recursively"""

import logging
import tempfile
from typing import Any

from cumulus_etl import fhir
import rich
import rich.padding
import rich.tree

from cumulus_etl import common, fhir
from cumulus_etl.deid import codebook, mstool, philter


# Record of unknown extensions (resource type -> extension URL -> count)

Check failure on line 15 in cumulus_etl/deid/scrubber.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (I001)

cumulus_etl/deid/scrubber.py:3:1: I001 Import block is un-sorted or un-formatted
ExtensionCount = dict[str, dict[str, int]]


class SkipResource(Exception):
pass

Expand All @@ -34,6 +42,10 @@
self.codebook = codebook.Codebook(codebook_dir)
self.codebook_dir = codebook_dir
self.philter = philter.Philter() if use_philter else None
# List of ignored extensions (resource -> url -> count)
self.dropped_extensions: ExtensionCount = {}
# List of modifier extensions that caused us to skip a resource (resource -> url -> count)
self.skipped_modifer_extensions: ExtensionCount = {}

@staticmethod
async def scrub_bulk_data(input_dir: str) -> tempfile.TemporaryDirectory:
Expand All @@ -48,7 +60,9 @@
await mstool.run_mstool(input_dir, tmpdir.name)
return tmpdir

def scrub_resource(self, node: dict, scrub_attachments: bool = True) -> bool:
def scrub_resource(
self, node: dict, scrub_attachments: bool = True, keep_stats: bool = True
) -> bool:
"""
Cleans/de-identifies resource (in-place) and returns False if it should be rejected

Expand All @@ -57,14 +71,19 @@

:param node: resource to de-identify
:param scrub_attachments: whether to remove any attachment data found
:param keep_stats: whether to records stats about this resource
:returns: whether this resource is allowed to be used
"""
try:
self._scrub_node(
node.get("resourceType"), "root", node, scrub_attachments=scrub_attachments
node.get("resourceType"),
"root",
node,
scrub_attachments=scrub_attachments,
keep_stats=keep_stats,
)
except SkipResource as exc:
logging.warning("Ignoring resource of type %s: %s", node.__class__.__name__, exc)
except SkipResource:
# No need to log, we'll have already reported out what we should
return False
except ValueError as exc:
logging.warning("Could not parse value: %s", exc)
Expand All @@ -86,14 +105,30 @@
if self.codebook_dir:
self.codebook.db.save(self.codebook_dir)

def print_extension_report(self) -> None:
self._print_extension_table(
"Unrecognized extensions dropped from resources:",
self.dropped_extensions,
)
self._print_extension_table(
"🚨 Resources skipped due to unrecognized modifier extensions: 🚨",
self.skipped_modifer_extensions,
)

###############################################################################
#
# Implementation details
#
###############################################################################

def _scrub_node(
self, resource_type: str, node_path: str, node: dict, scrub_attachments: bool
self,
resource_type: str,
node_path: str,
node: dict,
*,
scrub_attachments: bool,
keep_stats: bool,
) -> None:
"""Examines all properties of a node"""
for key, values in list(node.items()):
Expand All @@ -106,7 +141,13 @@

for value in values:
self._scrub_single_value(
resource_type, node_path, node, key, value, scrub_attachments=scrub_attachments
resource_type,
node_path,
node,
key,
value,
scrub_attachments=scrub_attachments,
keep_stats=keep_stats,
)

def _scrub_single_value(
Expand All @@ -116,12 +157,15 @@
node: dict,
key: str,
value: Any,
*,
scrub_attachments: bool,
keep_stats: bool,
) -> None:
"""Examines one single property of a node"""
# For now, just manually run each operation. If this grows further, we can abstract it more.
self._check_ids(node, key, value)
self._check_modifier_extensions(key, value)
self._check_extensions(resource_type, node, key, value, keep_stats=keep_stats)
self._check_modifier_extensions(resource_type, key, value, keep_stats=keep_stats)
self._check_security(node_path, node, key, value)
self._check_text(node, key, value)
if scrub_attachments:
Expand All @@ -130,26 +174,103 @@
# Recurse if we are holding another FHIR object (i.e. a dict instead of a string)
if isinstance(value, dict):
self._scrub_node(
resource_type, f"{node_path}.{key}", value, scrub_attachments=scrub_attachments
resource_type,
f"{node_path}.{key}",
value,
scrub_attachments=scrub_attachments,
keep_stats=keep_stats,
)

def _print_extension_table(self, title: str, table: ExtensionCount) -> None:
if not table:
return # nothing to do!

common.print_header(title)
for resource_type in sorted(table):
tree = rich.tree.Tree(resource_type)
for url, count in sorted(table[resource_type].items()):
tree.add(f"{url} ({count:,} time{'' if count == 1 else 's'})")
indented = rich.padding.Padding.indent(tree, 1)
rich.get_console().print(indented)

###############################################################################
#
# Individual checkers
#
###############################################################################

@staticmethod
def _check_modifier_extensions(key: str, value: Any) -> None:
def _count_unknown_extension(self, table: ExtensionCount, resource: str, url: str) -> None:
resource_counts = table.setdefault(resource, {})
count = resource_counts.setdefault(url, 0)
resource_counts[url] = count + 1

def _check_extensions(
self, resource_type: str, node: dict, key: str, value: Any, *, keep_stats: bool
) -> None:
"""If there's any unrecognized extensions, log and delete them"""
if key == "extension" and isinstance(value, dict):
# We want to allow almost any extension that might have clinical relevance.
# But we use an allow list to avoid letting in PHI.
allowed_extensions = {
# TODO: what about _system kind of stuff in schema?
### Base spec extensions
"http://hl7.org/fhir/StructureDefinition/data-absent-reason",
"http://hl7.org/fhir/StructureDefinition/derivation-reference",
"http://hl7.org/fhir/StructureDefinition/event-performerFunction",
### US Core extensions
### (See https://hl7.org/fhir/us/core/profiles-and-extensions.html)
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-genderIdentity",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-jurisdiction",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-medication-adherence",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-race",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-sex",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-tribal-affiliation",
### Next two are old US Core DSTU1 URLs, still seen in the wild
### (See https://www.hl7.org/fhir/DSTU1/us-core.html)
"http://hl7.org/fhir/Profile/us-core#ethnicity",
"http://hl7.org/fhir/Profile/us-core#race",
### Cerner extensions
# Links to a client Organization reference in an Encounter
"https://fhir-ehr.cerner.com/r4/StructureDefinition/client-organization",
# "precision" here is the precision of a date field
"https://fhir-ehr.cerner.com/r4/StructureDefinition/precision",
### Epic extensions
"http://open.epic.com/FHIR/StructureDefinition/extension/sex-for-clinical-use",
}
# Some extensions we know about, but aren't necessary to us (they duplicate standard
# extensions, contain PHI, or are otherwise not relevant). We don't want to warn
# the user about them as "unrecognized", so we just ignore them entirely.
ignored_extensions = {
# US Core extension, but deals with how to email a person, which we don't need
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-direct",
# Cerner financial extension for Encounters, we don't need to record this
"https://fhir-ehr.cerner.com/r4/StructureDefinition/estimated-financial-responsibility-amount",
}
url = value.get("url")
if url not in allowed_extensions:
value.clear() # get rid of any other keys
value["url"] = url # just keep the url, to track that it existed
if keep_stats and url not in ignored_extensions:
self._count_unknown_extension(self.dropped_extensions, resource_type, url)

def _check_modifier_extensions(
self, resource_type: str, key: str, value: Any, *, keep_stats: bool
) -> None:
"""If there's any unrecognized modifierExtensions, raise a SkipResource exception"""
if key == "modifierExtension" and isinstance(value, dict):
known_extensions = [
allowed_extensions = {
# These NLP extensions are generated by ctakesclient's text2fhir code.
# While we don't anticipate ingesting any resources using these extensions
# (and we don't currently generate them ourselves), we might in the future.
"http://fhir-registry.smarthealthit.org/StructureDefinition/nlp-polarity",
"http://fhir-registry.smarthealthit.org/StructureDefinition/nlp-source",
]
}
url = value.get("url")
if url not in known_extensions:
raise SkipResource(f'Unrecognized modifierExtension with URL "{url}"')
if keep_stats and url not in allowed_extensions:
self._count_unknown_extension(self.skipped_modifer_extensions, resource_type, url)
raise SkipResource

def _check_ids(self, node: dict, key: str, value: Any) -> None:
"""Converts any IDs and references to a de-identified version"""
Expand Down
11 changes: 7 additions & 4 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@


async def etl_job(
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
config: JobConfig, selected_tasks: TaskList, scrubber: deid.Scrubber
) -> list[JobSummary]:
"""
:param config: job config
:param selected_tasks: the tasks to run
:param use_philter: whether to run text through philter
:param scrubber: de-id scrubber to use for jobs
:return: a list of job summaries
"""
summary_list = []

scrubber = deid.Scrubber(config.dir_phi, use_philter=use_philter)
for task_class in selected_tasks:
task = task_class(config, scrubber)
task_summaries = await task.run()
Expand Down Expand Up @@ -374,14 +373,18 @@ async def etl_main(args: argparse.Namespace) -> None:
common.write_json(config.path_config(), config.as_json(), indent=4)

# Finally, actually run the meat of the pipeline! (Filtered down to requested tasks)
summaries = await etl_job(config, selected_tasks, use_philter=args.philter)
scrubber = deid.Scrubber(config.dir_phi, use_philter=args.philter)
summaries = await etl_job(config, selected_tasks, scrubber)

# Update job context for future runs
job_context.last_successful_datetime = job_datetime
job_context.last_successful_input_dir = args.dir_input
job_context.last_successful_output_dir = args.dir_output
job_context.save()

# Report out any stripped extensions or dropped resources due to modiferExtensions
scrubber.print_extension_report()

# Flag final status to user
common.print_header()
if any(s.had_errors for s in summaries):
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/etl/tasks/nlp_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def read_notes(
can_process = (
nlp.is_docref_valid(docref)
and (doc_check is None or doc_check(docref))
and self.scrubber.scrub_resource(docref, scrub_attachments=False)
and self.scrubber.scrub_resource(docref, scrub_attachments=False, keep_stats=False)
)
if not can_process:
continue
Expand Down
19 changes: 13 additions & 6 deletions docs/deid.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,25 @@ Zip codes are redacted down to just the first three digits (e.g. `12139` becomes
Additionally, for certain small-population zip codes where even three digits is too identifying,
the zip code is entirely redacted to `00000`.

#### Extensions

All extensions are removed, except for:
- The USCDI patient extensions (birth sex, gender identity, race, and ethnicity)
- "Modifier" extensions which will flag to the ETL that a resource should be skipped

#### Clinical Notes

Be aware that clinical notes are not removed at this stage.
They are kept for now, so that Cumulus ETL can run natural language processing on them.
See below for more information on that.

### Extensions

Extensions are stripped out unless they are on a list of recognized extensions,
to ensure that PHI doesn't accidentally slip in.
The allowed extensions include the standard USCDI patient extensions
(birth sex, gender identity, race, and ethnicity)
as well as various harmless vendor extensions.

Any unrecognized
["Modifier" extension](https://www.hl7.org/fhir/R4/extensibility.html#modifierExtension)
will cause Cumulus ETL to entirely skip the containing resource,
since the resource can't be properly understood.

### IDs

Cumulus ETL de-identifies FHIR resource IDs itself.
Expand Down
Loading