Skip to content

Commit

Permalink
feat: don't simply toss extensions
Browse files Browse the repository at this point in the history
- Print a report at the end of the ETL run of all extensions stripped
  by resource, URL, and count.
- Plus a report on resources skipped due to unrecognized modifier
  extensions.
- Leave stub extension in place, with just the `url` field, so that
  we can inspect stripped extensions in SQL if we want.

Allow the following new extensions:
- http://hl7.org/fhir/StructureDefinition/data-absent-reason
- http://hl7.org/fhir/StructureDefinition/derivation-reference
- http://hl7.org/fhir/us/core/StructureDefinition/us-core-jurisdiction
- http://hl7.org/fhir/us/core/StructureDefinition/us-core-medication-
  adherence
- http://hl7.org/fhir/us/core/StructureDefinition/us-core-sex
- http://hl7.org/fhir/us/core/StructureDefinition/us-core-tribal-
  affiliation

Drop support for the following extension:
- http://hl7.org/fhir/us/core/StructureDefinition/us-core-sex-for-
  clinical-use (this was in US Core 6.0 ballot, but didn't make final
  cut - it seems to be replaced by just 'us-core-sex')
  • Loading branch information
mikix committed Oct 31, 2024
1 parent 1ae37fd commit 3708ce2
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 31 deletions.
15 changes: 4 additions & 11 deletions cumulus_etl/deid/ms-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,10 @@
"fhirVersion": "R4",
"processingErrors": "raise",
"fhirPathRules": [
// Only allow a few known extensions
{"path": "Patient.extension('http://hl7.org/fhir/Profile/us-core#ethnicity')", "method": "keep"}, // Old DSTU1 URL, still out there in the wild: https://www.hl7.org/fhir/DSTU1/us-core.html
{"path": "Patient.extension('http://hl7.org/fhir/Profile/us-core#race')", "method": "keep"}, // Old DSTU1 URL, still out there in the wild: https://www.hl7.org/fhir/DSTU1/us-core.html
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-genderIdentity')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-race')", "method": "keep"},
{"path": "Patient.extension('http://hl7.org/fhir/us/core/StructureDefinition/us-core-sex-for-clinical-use')", "method": "keep"},
{"path": "Patient.extension('http://open.epic.com/FHIR/StructureDefinition/extension/sex-for-clinical-use')", "method": "keep"}, // Epic has used this pre-final-spec URL
{"path": "nodesByName('modifierExtension')", "method": "keep"}, // keep these so we can ignore resources with modifiers we don't understand
{"path": "nodesByType('Extension')", "method": "redact"}, // drop all unknown extensions
// modifierExtension is handled by ETL so that we can skip resources we don't understand
{"path": "nodesByName('modifierExtension')", "method": "keep"},
// extension is handled by ETL so that we can flag the URLs of extensions we strip
{"path": "nodesByName('extension')", "method": "keep"},

// Elements that might be embedded and kept elsewhere -- redact pieces of the whole
{"path": "nodesByType('Attachment').title", "method": "redact"},
Expand Down
142 changes: 127 additions & 15 deletions cumulus_etl/deid/scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
import tempfile
from typing import Any

from cumulus_etl import fhir
import rich
import rich.padding
import rich.tree

from cumulus_etl import common, fhir
from cumulus_etl.deid import codebook, mstool, philter


# Record of unknown extensions (resource type -> extension URL -> count)

Check failure on line 15 in cumulus_etl/deid/scrubber.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (I001)

cumulus_etl/deid/scrubber.py:3:1: I001 Import block is un-sorted or un-formatted
ExtensionCount = dict[str, dict[str, int]]


class SkipResource(Exception):
pass

Expand All @@ -34,6 +42,10 @@ def __init__(self, codebook_dir: str | None = None, use_philter: bool = False):
self.codebook = codebook.Codebook(codebook_dir)
self.codebook_dir = codebook_dir
self.philter = philter.Philter() if use_philter else None
# List of ignored extensions (resource -> url -> count)
self.dropped_extensions: ExtensionCount = {}
# List of modifier extensions that caused us to skip a resource (resource -> url -> count)
self.skipped_modifer_extensions: ExtensionCount = {}

@staticmethod
async def scrub_bulk_data(input_dir: str) -> tempfile.TemporaryDirectory:
Expand All @@ -48,7 +60,9 @@ async def scrub_bulk_data(input_dir: str) -> tempfile.TemporaryDirectory:
await mstool.run_mstool(input_dir, tmpdir.name)
return tmpdir

def scrub_resource(self, node: dict, scrub_attachments: bool = True) -> bool:
def scrub_resource(
self, node: dict, scrub_attachments: bool = True, keep_stats: bool = True
) -> bool:
"""
Cleans/de-identifies resource (in-place) and returns False if it should be rejected
Expand All @@ -57,14 +71,19 @@ def scrub_resource(self, node: dict, scrub_attachments: bool = True) -> bool:
:param node: resource to de-identify
:param scrub_attachments: whether to remove any attachment data found
:param keep_stats: whether to records stats about this resource
:returns: whether this resource is allowed to be used
"""
try:
self._scrub_node(
node.get("resourceType"), "root", node, scrub_attachments=scrub_attachments
node.get("resourceType"),
"root",
node,
scrub_attachments=scrub_attachments,
keep_stats=keep_stats,
)
except SkipResource as exc:
logging.warning("Ignoring resource of type %s: %s", node.__class__.__name__, exc)
except SkipResource:
# No need to log, we'll have already reported out what we should
return False
except ValueError as exc:
logging.warning("Could not parse value: %s", exc)
Expand All @@ -86,14 +105,30 @@ def save(self) -> None:
if self.codebook_dir:
self.codebook.db.save(self.codebook_dir)

def print_extension_report(self) -> None:
self._print_extension_table(
"Unrecognized extensions dropped from resources:",
self.dropped_extensions,
)
self._print_extension_table(
"🚨 Resources skipped due to unrecognized modifier extensions: 🚨",
self.skipped_modifer_extensions,
)

###############################################################################
#
# Implementation details
#
###############################################################################

def _scrub_node(
self, resource_type: str, node_path: str, node: dict, scrub_attachments: bool
self,
resource_type: str,
node_path: str,
node: dict,
*,
scrub_attachments: bool,
keep_stats: bool,
) -> None:
"""Examines all properties of a node"""
for key, values in list(node.items()):
Expand All @@ -106,7 +141,13 @@ def _scrub_node(

for value in values:
self._scrub_single_value(
resource_type, node_path, node, key, value, scrub_attachments=scrub_attachments
resource_type,
node_path,
node,
key,
value,
scrub_attachments=scrub_attachments,
keep_stats=keep_stats,
)

def _scrub_single_value(
Expand All @@ -116,12 +157,15 @@ def _scrub_single_value(
node: dict,
key: str,
value: Any,
*,
scrub_attachments: bool,
keep_stats: bool,
) -> None:
"""Examines one single property of a node"""
# For now, just manually run each operation. If this grows further, we can abstract it more.
self._check_ids(node, key, value)
self._check_modifier_extensions(key, value)
self._check_extensions(resource_type, node, key, value, keep_stats=keep_stats)
self._check_modifier_extensions(resource_type, key, value, keep_stats=keep_stats)
self._check_security(node_path, node, key, value)
self._check_text(node, key, value)
if scrub_attachments:
Expand All @@ -130,26 +174,94 @@ def _scrub_single_value(
# Recurse if we are holding another FHIR object (i.e. a dict instead of a string)
if isinstance(value, dict):
self._scrub_node(
resource_type, f"{node_path}.{key}", value, scrub_attachments=scrub_attachments
resource_type,
f"{node_path}.{key}",
value,
scrub_attachments=scrub_attachments,
keep_stats=keep_stats,
)

def _print_extension_table(self, title: str, table: ExtensionCount) -> None:
if not table:
return # nothing to do!

common.print_header(title)
for resource_type in sorted(table):
tree = rich.tree.Tree(resource_type)
for url, count in sorted(table[resource_type].items()):
tree.add(f"{url} ({count} time{'' if count == 1 else 's'})")
indented = rich.padding.Padding.indent(tree, 1)
rich.get_console().print(indented)

###############################################################################
#
# Individual checkers
#
###############################################################################

@staticmethod
def _check_modifier_extensions(key: str, value: Any) -> None:
def _count_unknown_extension(self, table: ExtensionCount, resource: str, url: str) -> None:
resource_counts = table.setdefault(resource, {})
count = resource_counts.setdefault(url, 0)
resource_counts[url] = count + 1

def _check_extensions(
self, resource_type: str, node: dict, key: str, value: Any, *, keep_stats: bool
) -> None:
"""If there's any unrecognized extensions, log and delete them"""
if key == "extension" and isinstance(value, dict):
# We want to allow almost any extension that might have clinical relevance.
# But we use an allow list to avoid letting in PHI.
allowed_extensions = {
# Base spec extensions
"http://hl7.org/fhir/StructureDefinition/data-absent-reason",
"http://hl7.org/fhir/StructureDefinition/derivation-reference",
# US Core extensions
# (See https://hl7.org/fhir/us/core/profiles-and-extensions.html)
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-genderIdentity",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-jurisdiction",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-medication-adherence",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-race",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-sex",
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-tribal-affiliation",
# Next two are old US Core DSTU1 URLs, still seen in the wild
# (See https://www.hl7.org/fhir/DSTU1/us-core.html)
"http://hl7.org/fhir/Profile/us-core#ethnicity",
"http://hl7.org/fhir/Profile/us-core#race",
# Epic extensions
"http://open.epic.com/FHIR/StructureDefinition/extension/sex-for-clinical-use",
}
# Some extensions we know about, but aren't necessary to us (they duplicate standard
# extensions, contain PHI, or are otherwise not relevant). We don't want to warn
# the user about them as "unrecognized", so we just ignore them entirely.
ignored_extensions = {
# US Core extension, but deals with how to email a person, which we don't need
"http://hl7.org/fhir/us/core/StructureDefinition/us-core-direct",
}
url = value.get("url")
if url not in allowed_extensions:
value.clear() # get rid of any other keys
value["url"] = url # just keep the url, to track that it existed
if keep_stats and url not in ignored_extensions:
self._count_unknown_extension(self.dropped_extensions, resource_type, url)

def _check_modifier_extensions(
self, resource_type: str, key: str, value: Any, *, keep_stats: bool
) -> None:
"""If there's any unrecognized modifierExtensions, raise a SkipResource exception"""
if key == "modifierExtension" and isinstance(value, dict):
known_extensions = [
allowed_extensions = {
# These NLP extensions are generated by ctakesclient's text2fhir code.
# While we don't anticipate ingesting any resources using these extensions
# (and we don't currently generate them ourselves), we might in the future.
"http://fhir-registry.smarthealthit.org/StructureDefinition/nlp-polarity",
"http://fhir-registry.smarthealthit.org/StructureDefinition/nlp-source",
]
}
url = value.get("url")
if url not in known_extensions:
raise SkipResource(f'Unrecognized modifierExtension with URL "{url}"')
if keep_stats and url not in allowed_extensions:
self._count_unknown_extension(self.skipped_modifer_extensions, resource_type, url)
raise SkipResource

def _check_ids(self, node: dict, key: str, value: Any) -> None:
"""Converts any IDs and references to a de-identified version"""
Expand Down
11 changes: 7 additions & 4 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@


async def etl_job(
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
config: JobConfig, selected_tasks: TaskList, scrubber: deid.Scrubber
) -> list[JobSummary]:
"""
:param config: job config
:param selected_tasks: the tasks to run
:param use_philter: whether to run text through philter
:param scrubber: de-id scrubber to use for jobs
:return: a list of job summaries
"""
summary_list = []

scrubber = deid.Scrubber(config.dir_phi, use_philter=use_philter)
for task_class in selected_tasks:
task = task_class(config, scrubber)
task_summaries = await task.run()
Expand Down Expand Up @@ -374,14 +373,18 @@ async def etl_main(args: argparse.Namespace) -> None:
common.write_json(config.path_config(), config.as_json(), indent=4)

# Finally, actually run the meat of the pipeline! (Filtered down to requested tasks)
summaries = await etl_job(config, selected_tasks, use_philter=args.philter)
scrubber = deid.Scrubber(config.dir_phi, use_philter=args.philter)
summaries = await etl_job(config, selected_tasks, scrubber)

# Update job context for future runs
job_context.last_successful_datetime = job_datetime
job_context.last_successful_input_dir = args.dir_input
job_context.last_successful_output_dir = args.dir_output
job_context.save()

# Report out any stripped extensions or dropped resources due to modiferExtensions
scrubber.print_extension_report()

# Flag final status to user
common.print_header()
if any(s.had_errors for s in summaries):
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/etl/tasks/nlp_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def read_notes(
can_process = (
nlp.is_docref_valid(docref)
and (doc_check is None or doc_check(docref))
and self.scrubber.scrub_resource(docref, scrub_attachments=False)
and self.scrubber.scrub_resource(docref, scrub_attachments=False, keep_stats=False)
)
if not can_process:
continue
Expand Down

0 comments on commit 3708ce2

Please sign in to comment.