Skip to content

Commit

Permalink
feat: support DocumentReference URL attachments
Browse files Browse the repository at this point in the history
Previously, we only supported DocumentReferences with inlined notes.
Now, we will properly download URL attachments.

Also:
- Expands the mimetypes we look for from just text/plain to
  text/plain, text/*, and application/xml in that order.
- Renames and moves the BackendServiceServer in the ndjson loader
  to FhirClient in toplevel code.
- Moves some credential argument handling out of the Ndjson loader
  into etl.py code.
- Eased credential requirement checking, so that you don't even need
  credentials, as long as the server doesn't complain (e.g. we can
  even run against Cerner's public sandbox that doesn't need auth)
- Made tasks async.
- Bumps FhirClient's timeout from 5 seconds to 5 minutes, for safety

Note:
- This implementation is a little naive. It just downloads each URL
  as it sees them, with no caching. If we grow another NLP task,
  we'll to be more clever. And even without that, we could maybe be
  smarter about looking for a cached NLP result first.
  • Loading branch information
mikix committed Feb 13, 2023
1 parent 8f5f761 commit a19f73c
Show file tree
Hide file tree
Showing 13 changed files with 584 additions and 291 deletions.
4 changes: 3 additions & 1 deletion cumulus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from socket import gethostname
from typing import List

from cumulus import common, formats, store
from cumulus import common, fhir_client, formats, store


class JobConfig:
Expand All @@ -25,6 +25,7 @@ def __init__(
dir_phi: str,
input_format: str,
output_format: str,
client: fhir_client.FhirClient,
timestamp: datetime.datetime = None,
comment: str = None,
batch_size: int = 1, # this default is never really used - overridden by command line args
Expand All @@ -36,6 +37,7 @@ def __init__(
self.dir_phi = dir_phi
self._input_format = input_format
self._output_format = output_format
self.client = client
self.timestamp = common.timestamp_filename(timestamp)
self.hostname = gethostname()
self.comment = comment or ""
Expand Down
96 changes: 83 additions & 13 deletions cumulus/ctakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import hashlib
import logging
import os
from typing import List
from typing import List, Optional

import ctakesclient

from cumulus import common, fhir_common, store
from cumulus import common, fhir_client, fhir_common, store


def covid_symptoms_extract(cache: store.Root, docref: dict) -> List[dict]:
async def covid_symptoms_extract(client: fhir_client.FhirClient, cache: store.Root, docref: dict) -> List[dict]:
"""
Extract a list of Observations from NLP-detected symptoms in physician notes
:param client: a client ready to talk to a FHIR server
:param cache: Where to cache NLP results
:param docref: Physician Note
:return: list of NLP results encoded as FHIR observations
Expand All @@ -25,20 +26,14 @@ def covid_symptoms_extract(cache: store.Root, docref: dict) -> List[dict]:

encounters = docref.get("context", {}).get("encounter", [])
if not encounters:
logging.warning("No valid encounters for symptoms") # ideally would print identifier, but it's PHI...
logging.warning("No encounters for docref %s", docref_id)
return []
_, encounter_id = fhir_common.unref_resource(encounters[0])

# Find the physician note among the attachments
for content in docref["content"]:
if "contentType" in content["attachment"] and "data" in content["attachment"]:
mimetype, params = cgi.parse_header(content["attachment"]["contentType"])
if mimetype == "text/plain": # just grab first text we find
charset = params.get("charset", "utf8")
physician_note = base64.standard_b64decode(content["attachment"]["data"]).decode(charset)
break
else:
logging.warning("No text/plain content in docref %s", docref_id)
physician_note = await get_docref_note(client, [content["attachment"] for content in docref["content"]])
if physician_note is None:
logging.warning("No text content in docref %s", docref_id)
return []

# Strip this "line feed" character that often shows up in notes and is confusing for cNLP.
Expand Down Expand Up @@ -97,6 +92,81 @@ def is_covid_match(m: ctakesclient.typesystem.MatchText):
return positive_matches


def parse_content_type(content_type: str) -> (str, str):
"""Returns (mimetype, encoding)"""
# TODO: switch to message.Message parsing, since cgi is deprecated
mimetype, params = cgi.parse_header(content_type)
return mimetype, params.get("charset", "utf8")


def mimetype_priority(mimetype: str) -> int:
"""
Returns priority of mimetypes for docref notes.
0 means "ignore"
Higher numbers are higher priority
"""
if mimetype == "text/plain":
return 3
elif mimetype.startswith("text/"):
return 2
elif mimetype in ("application/xml", "application/xhtml+xml"):
return 1
return 0


async def get_docref_note(client: fhir_client.FhirClient, attachments: List[dict]) -> Optional[str]:
# Find the best attachment to use, based on mimetype.
# We prefer basic text documents, to avoid confusing cTAKES with extra formatting (like <body>).
best_attachment_index = -1
best_attachment_priority = 0
for index, attachment in enumerate(attachments):
if "contentType" in attachment:
mimetype, _ = parse_content_type(attachment["contentType"])
priority = mimetype_priority(mimetype)
if priority > best_attachment_priority:
best_attachment_priority = priority
best_attachment_index = index

if best_attachment_index >= 0:
return await get_docref_note_from_attachment(client, attachments[best_attachment_index])

# We didn't find _any_ of our target text content types.
# A content type isn't required by the spec with external URLs... so it's possible an unmarked link could be good.
# But let's optimistically enforce the need for a content type ourselves by bailing here.
# If we find a real-world need to be more permissive, we can change this later.
# But note that if we do, we'll need to handle downloading Binary FHIR objects, in addition to arbitrary URLs.
return None


async def get_docref_note_from_attachment(client: fhir_client.FhirClient, attachment: dict) -> Optional[str]:
"""
Decodes or downloads a note from an attachment.
Note that it is assumed a contentType is provided.
:returns: the attachment's note text
"""
mimetype, charset = parse_content_type(attachment["contentType"])

if "data" in attachment:
return base64.standard_b64decode(attachment["data"]).decode(charset)

# TODO: At some point we should centralize the downloading of attachments -- once we have multiple NLP tasks,
# we may not want to re-download the overlapping notes. When we do that, it should not be part of our bulk
# exporter, since we may be given already-exported ndjson.
#
# TODO: There are future optimizations to try to use our ctakes cache to avoid downloading in the first place:
# - use attachment["hash"] if available (algorithm mismatch though... maybe we should switch to sha1...)
# - send a HEAD request with "Want-Digest: sha-256" but Cerner at least does not support that
if "url" in attachment:
# We need to pass Accept to get the raw data, not a Binary object. See https://www.hl7.org/fhir/binary.html
response = await client.request("GET", attachment["url"], headers={"Accept": mimetype})
return response.text

return None


def extract(cache: store.Root, namespace: str, sentence: str) -> ctakesclient.typesystem.CtakesJSON:
"""
This is a version of ctakesclient.client.extract() that also uses a cache
Expand Down
1 change: 1 addition & 0 deletions cumulus/deid/ms-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"fhirPathRules": [
{"path": "nodesByName('modifierExtension')", "method": "keep", "comment": "Cumulus: keep these so we can ignore resources with modifiers we don't understand"},
{"path": "DocumentReference.nodesByType('Attachment').data", "method": "keep", "comment": "Cumulus: needed to run NLP on physician notes"},
{"path": "DocumentReference.nodesByType('Attachment').url", "method": "keep", "comment": "Cumulus: needed to run NLP on physician notes"},
{"path": "Patient.extension.where(url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex')", "method": "keep", "comment": "Cumulus: useful for studies"},
{"path": "Patient.extension.where(url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity')", "method": "keep", "comment": "Cumulus: useful for studies"},
{"path": "Patient.extension.where(url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-genderIdentity')", "method": "keep", "comment": "Cumulus: useful for studies"},
Expand Down
4 changes: 2 additions & 2 deletions cumulus/deid/scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def _check_ids(self, node_path: str, node: dict, key: str, value: Any) -> None:
@staticmethod
def _check_attachments(node_path: str, node: dict, key: str) -> None:
"""Strip any attachment data"""
if node_path == "root.content.attachment" and key == "data":
del node["data"]
if node_path == "root.content.attachment" and key in {"data", "url"}:
del node[key]

@staticmethod
def _check_security(node_path: str, node: dict, key: str, value: Any) -> None:
Expand Down
1 change: 1 addition & 0 deletions cumulus/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
TASK_SET_EMPTY = 21
ARGS_CONFLICT = 22
ARGS_INVALID = 23
FHIR_URL_MISSING = 24
123 changes: 79 additions & 44 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import sys
import tempfile
import time
from typing import List, Type
from typing import Iterable, List, Type
from urllib.parse import urlparse

import ctakesclient

from cumulus import common, context, deid, errors, loaders, store, tasks
from cumulus import common, context, deid, errors, fhir_client, loaders, store, tasks
from cumulus.config import JobConfig, JobSummary


Expand All @@ -27,27 +27,22 @@
###############################################################################


async def load_and_deidentify(
loader: loaders.Loader, selected_tasks: List[Type[tasks.EtlTask]]
) -> tempfile.TemporaryDirectory:
async def load_and_deidentify(loader: loaders.Loader, resources: Iterable[str]) -> tempfile.TemporaryDirectory:
"""
Loads the input directory and does a first-pass de-identification
Code outside this method should never see the original input files.
:returns: a temporary directory holding the de-identified files in FHIR ndjson format
"""
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

# First step is loading all the data into a local ndjson format
loaded_dir = await loader.load_all(list(required_resources))
loaded_dir = await loader.load_all(list(resources))

# Second step is de-identifying that data (at a bulk level)
return await deid.Scrubber.scrub_bulk_data(loaded_dir.name)


def etl_job(config: JobConfig, selected_tasks: List[Type[tasks.EtlTask]]) -> List[JobSummary]:
async def etl_job(config: JobConfig, selected_tasks: List[Type[tasks.EtlTask]]) -> List[JobSummary]:
"""
:param config: job config
:param selected_tasks: the tasks to run
Expand All @@ -58,7 +53,7 @@ def etl_job(config: JobConfig, selected_tasks: List[Type[tasks.EtlTask]]) -> Lis
scrubber = deid.Scrubber(config.dir_phi)
for task_class in selected_tasks:
task = task_class(config, scrubber)
summary = task.run()
summary = await task.run()
summary_list.append(summary)

path = os.path.join(config.dir_job_config(), f"{summary.label}.json")
Expand Down Expand Up @@ -195,6 +190,9 @@ def make_parser() -> argparse.ArgumentParser:
metavar="PATH",
help="Bearer token for custom bearer authentication",
)
export.add_argument(
"--fhir-url", metavar="URL", help="FHIR server base URL, only needed if you exported separately"
)
export.add_argument("--since", help="Start date for export from the FHIR server")
export.add_argument("--until", help="End date for export from the FHIR server")

Expand All @@ -213,6 +211,39 @@ def make_parser() -> argparse.ArgumentParser:
return parser


def create_fhir_client(args, root_input, resources):
client_base_url = args.fhir_url
if root_input.protocol in {"http", "https"}:
if args.fhir_url and not root_input.path.startswith(args.fhir_url):
print(
"You provided both an input FHIR server and a different --fhir-url. Try dropping --fhir-url.",
file=sys.stderr,
)
raise SystemExit(errors.ARGS_CONFLICT)
client_base_url = root_input.path

try:
try:
# Try to load client ID from file first (some servers use crazy long ones, like SMART's bulk-data-server)
smart_client_id = common.read_text(args.smart_client_id).strip() if args.smart_client_id else None
except FileNotFoundError:
smart_client_id = args.smart_client_id

smart_jwks = common.read_json(args.smart_jwks) if args.smart_jwks else None
bearer_token = common.read_text(args.bearer_token).strip() if args.bearer_token else None
except OSError as exc:
print(exc, file=sys.stderr)
raise SystemExit(errors.ARGS_INVALID) from exc

return fhir_client.FhirClient(
client_base_url,
resources,
client_id=smart_client_id,
jwks=smart_jwks,
bearer_token=bearer_token,
)


async def main(args: List[str]):
parser = make_parser()
args = parser.parse_args(args)
Expand All @@ -233,45 +264,49 @@ async def main(args: List[str]):
job_context = context.JobContext(root_phi.joinpath("context.json"))
job_datetime = common.datetime_now() # grab timestamp before we do anything

if args.input_format == "i2b2":
config_loader = loaders.I2b2Loader(root_input, args.batch_size)
else:
config_loader = loaders.FhirNdjsonLoader(
root_input,
client_id=args.smart_client_id,
jwks=args.smart_jwks,
bearer_token=args.bearer_token,
since=args.since,
until=args.until,
)

# Check which tasks are being run, allowing comma-separated values
task_names = args.task and set(itertools.chain.from_iterable(t.split(",") for t in args.task))
task_filters = args.task_filter and list(itertools.chain.from_iterable(t.split(",") for t in args.task_filter))
selected_tasks = tasks.EtlTask.get_selected_tasks(task_names, task_filters)

# Pull down resources and run the MS tool on them
deid_dir = await load_and_deidentify(config_loader, selected_tasks)

# Prepare config for jobs
config = JobConfig(
args.dir_input,
deid_dir.name,
args.dir_output,
args.dir_phi,
args.input_format,
args.output_format,
comment=args.comment,
batch_size=args.batch_size,
timestamp=job_datetime,
tasks=[t.name for t in selected_tasks],
)
common.write_json(config.path_config(), config.as_json(), indent=4)
common.print_header("Configuration:")
print(json.dumps(config.as_json(), indent=4))
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

# Create a client to talk to a FHIR server.
# This is useful even if we aren't doing a bulk export, because some resources like DocumentReference can still
# reference external resources on the server (like the document text).
# If we don't need this client (e.g. we're using local data and don't download any attachments), this is a no-op.
client = create_fhir_client(args, root_input, required_resources)

async with client:
if args.input_format == "i2b2":
config_loader = loaders.I2b2Loader(root_input, args.batch_size)
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client, since=args.since, until=args.until)

# Pull down resources and run the MS tool on them
deid_dir = await load_and_deidentify(config_loader, required_resources)

# Prepare config for jobs
config = JobConfig(
args.dir_input,
deid_dir.name,
args.dir_output,
args.dir_phi,
args.input_format,
args.output_format,
client,
comment=args.comment,
batch_size=args.batch_size,
timestamp=job_datetime,
tasks=[t.name for t in selected_tasks],
)
common.write_json(config.path_config(), config.as_json(), indent=4)
common.print_header("Configuration:")
print(json.dumps(config.as_json(), indent=4))

# Finally, actually run the meat of the pipeline! (Filtered down to requested tasks)
summaries = etl_job(config, selected_tasks)
# Finally, actually run the meat of the pipeline! (Filtered down to requested tasks)
summaries = await etl_job(config, selected_tasks)

# Print results to the console
common.print_header("Results:")
Expand Down
Loading

0 comments on commit a19f73c

Please sign in to comment.