Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support DocumentReference URL attachments #172

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cumulus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from socket import gethostname
from typing import List

from cumulus import common, formats, store
from cumulus import common, fhir_client, formats, store


class JobConfig:
Expand All @@ -25,6 +25,7 @@ def __init__(
dir_phi: str,
input_format: str,
output_format: str,
client: fhir_client.FhirClient,
timestamp: datetime.datetime = None,
comment: str = None,
batch_size: int = 1, # this default is never really used - overridden by command line args
Expand All @@ -36,6 +37,7 @@ def __init__(
self.dir_phi = dir_phi
self._input_format = input_format
self._output_format = output_format
self.client = client
self.timestamp = common.timestamp_filename(timestamp)
self.hostname = gethostname()
self.comment = comment or ""
Expand Down
96 changes: 83 additions & 13 deletions cumulus/ctakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import hashlib
import logging
import os
from typing import List
from typing import List, Optional
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file are the actual feature change -- all other changes are mostly just refactoring to bring FhirClient out from a bulk export implementation detail up to a core piece of the etl.py machinery.


import ctakesclient

from cumulus import common, fhir_common, store
from cumulus import common, fhir_client, fhir_common, store


def covid_symptoms_extract(cache: store.Root, docref: dict) -> List[dict]:
async def covid_symptoms_extract(client: fhir_client.FhirClient, cache: store.Root, docref: dict) -> List[dict]:
"""
Extract a list of Observations from NLP-detected symptoms in physician notes

:param client: a client ready to talk to a FHIR server
:param cache: Where to cache NLP results
:param docref: Physician Note
:return: list of NLP results encoded as FHIR observations
Expand All @@ -25,20 +26,14 @@ def covid_symptoms_extract(cache: store.Root, docref: dict) -> List[dict]:

encounters = docref.get("context", {}).get("encounter", [])
if not encounters:
logging.warning("No valid encounters for symptoms") # ideally would print identifier, but it's PHI...
logging.warning("No encounters for docref %s", docref_id)
return []
_, encounter_id = fhir_common.unref_resource(encounters[0])

# Find the physician note among the attachments
for content in docref["content"]:
if "contentType" in content["attachment"] and "data" in content["attachment"]:
mimetype, params = cgi.parse_header(content["attachment"]["contentType"])
if mimetype == "text/plain": # just grab first text we find
charset = params.get("charset", "utf8")
physician_note = base64.standard_b64decode(content["attachment"]["data"]).decode(charset)
break
else:
logging.warning("No text/plain content in docref %s", docref_id)
physician_note = await get_docref_note(client, [content["attachment"] for content in docref["content"]])
if physician_note is None:
logging.warning("No text content in docref %s", docref_id)
return []

# Strip this "line feed" character that often shows up in notes and is confusing for cNLP.
Expand Down Expand Up @@ -97,6 +92,81 @@ def is_covid_match(m: ctakesclient.typesystem.MatchText):
return positive_matches


def parse_content_type(content_type: str) -> (str, str):
"""Returns (mimetype, encoding)"""
# TODO: switch to message.Message parsing, since cgi is deprecated
mimetype, params = cgi.parse_header(content_type)
return mimetype, params.get("charset", "utf8")


def mimetype_priority(mimetype: str) -> int:
"""
Returns priority of mimetypes for docref notes.

0 means "ignore"
Higher numbers are higher priority
"""
if mimetype == "text/plain":
return 3
elif mimetype.startswith("text/"):
return 2
elif mimetype in ("application/xml", "application/xhtml+xml"):
return 1
return 0


async def get_docref_note(client: fhir_client.FhirClient, attachments: List[dict]) -> Optional[str]:
# Find the best attachment to use, based on mimetype.
# We prefer basic text documents, to avoid confusing cTAKES with extra formatting (like <body>).
best_attachment_index = -1
best_attachment_priority = 0
for index, attachment in enumerate(attachments):
if "contentType" in attachment:
mimetype, _ = parse_content_type(attachment["contentType"])
priority = mimetype_priority(mimetype)
if priority > best_attachment_priority:
best_attachment_priority = priority
best_attachment_index = index

if best_attachment_index >= 0:
return await get_docref_note_from_attachment(client, attachments[best_attachment_index])

# We didn't find _any_ of our target text content types.
# A content type isn't required by the spec with external URLs... so it's possible an unmarked link could be good.
# But let's optimistically enforce the need for a content type ourselves by bailing here.
# If we find a real-world need to be more permissive, we can change this later.
# But note that if we do, we'll need to handle downloading Binary FHIR objects, in addition to arbitrary URLs.
return None
dogversioning marked this conversation as resolved.
Show resolved Hide resolved


async def get_docref_note_from_attachment(client: fhir_client.FhirClient, attachment: dict) -> Optional[str]:
"""
Decodes or downloads a note from an attachment.

Note that it is assumed a contentType is provided.

:returns: the attachment's note text
"""
mimetype, charset = parse_content_type(attachment["contentType"])

if "data" in attachment:
return base64.standard_b64decode(attachment["data"]).decode(charset)

# TODO: At some point we should centralize the downloading of attachments -- once we have multiple NLP tasks,
# we may not want to re-download the overlapping notes. When we do that, it should not be part of our bulk
# exporter, since we may be given already-exported ndjson.
#
# TODO: There are future optimizations to try to use our ctakes cache to avoid downloading in the first place:
# - use attachment["hash"] if available (algorithm mismatch though... maybe we should switch to sha1...)
# - send a HEAD request with "Want-Digest: sha-256" but Cerner at least does not support that
if "url" in attachment:
# We need to pass Accept to get the raw data, not a Binary object. See https://www.hl7.org/fhir/binary.html
response = await client.request("GET", attachment["url"], headers={"Accept": mimetype})
return response.text

return None


def extract(cache: store.Root, namespace: str, sentence: str) -> ctakesclient.typesystem.CtakesJSON:
"""
This is a version of ctakesclient.client.extract() that also uses a cache
Expand Down
1 change: 1 addition & 0 deletions cumulus/deid/ms-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"fhirPathRules": [
{"path": "nodesByName('modifierExtension')", "method": "keep", "comment": "Cumulus: keep these so we can ignore resources with modifiers we don't understand"},
{"path": "DocumentReference.nodesByType('Attachment').data", "method": "keep", "comment": "Cumulus: needed to run NLP on physician notes"},
{"path": "DocumentReference.nodesByType('Attachment').url", "method": "keep", "comment": "Cumulus: needed to run NLP on physician notes"},
{"path": "Patient.extension.where(url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex')", "method": "keep", "comment": "Cumulus: useful for studies"},
{"path": "Patient.extension.where(url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity')", "method": "keep", "comment": "Cumulus: useful for studies"},
{"path": "Patient.extension.where(url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-genderIdentity')", "method": "keep", "comment": "Cumulus: useful for studies"},
Expand Down
4 changes: 2 additions & 2 deletions cumulus/deid/scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def _check_ids(self, node_path: str, node: dict, key: str, value: Any) -> None:
@staticmethod
def _check_attachments(node_path: str, node: dict, key: str) -> None:
"""Strip any attachment data"""
if node_path == "root.content.attachment" and key == "data":
del node["data"]
if node_path == "root.content.attachment" and key in {"data", "url"}:
del node[key]

@staticmethod
def _check_security(node_path: str, node: dict, key: str, value: Any) -> None:
Expand Down
1 change: 1 addition & 0 deletions cumulus/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
TASK_SET_EMPTY = 21
ARGS_CONFLICT = 22
ARGS_INVALID = 23
FHIR_URL_MISSING = 24
123 changes: 79 additions & 44 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import sys
import tempfile
import time
from typing import List, Type
from typing import Iterable, List, Type
from urllib.parse import urlparse

import ctakesclient

from cumulus import common, context, deid, errors, loaders, store, tasks
from cumulus import common, context, deid, errors, fhir_client, loaders, store, tasks
from cumulus.config import JobConfig, JobSummary


Expand All @@ -27,27 +27,22 @@
###############################################################################


async def load_and_deidentify(
loader: loaders.Loader, selected_tasks: List[Type[tasks.EtlTask]]
) -> tempfile.TemporaryDirectory:
async def load_and_deidentify(loader: loaders.Loader, resources: Iterable[str]) -> tempfile.TemporaryDirectory:
"""
Loads the input directory and does a first-pass de-identification

Code outside this method should never see the original input files.

:returns: a temporary directory holding the de-identified files in FHIR ndjson format
"""
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

# First step is loading all the data into a local ndjson format
loaded_dir = await loader.load_all(list(required_resources))
loaded_dir = await loader.load_all(list(resources))

# Second step is de-identifying that data (at a bulk level)
return await deid.Scrubber.scrub_bulk_data(loaded_dir.name)


def etl_job(config: JobConfig, selected_tasks: List[Type[tasks.EtlTask]]) -> List[JobSummary]:
async def etl_job(config: JobConfig, selected_tasks: List[Type[tasks.EtlTask]]) -> List[JobSummary]:
"""
:param config: job config
:param selected_tasks: the tasks to run
Expand All @@ -58,7 +53,7 @@ def etl_job(config: JobConfig, selected_tasks: List[Type[tasks.EtlTask]]) -> Lis
scrubber = deid.Scrubber(config.dir_phi)
for task_class in selected_tasks:
task = task_class(config, scrubber)
summary = task.run()
summary = await task.run()
Comment on lines -61 to +56
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is still not running tasks in parallel, but just making the task runners able to run async code themselves. (Parallel tasks is a whole other discussion with its own difficulties.)

summary_list.append(summary)

path = os.path.join(config.dir_job_config(), f"{summary.label}.json")
Expand Down Expand Up @@ -195,6 +190,9 @@ def make_parser() -> argparse.ArgumentParser:
metavar="PATH",
help="Bearer token for custom bearer authentication",
)
export.add_argument(
"--fhir-url", metavar="URL", help="FHIR server base URL, only needed if you exported separately"
)
export.add_argument("--since", help="Start date for export from the FHIR server")
export.add_argument("--until", help="End date for export from the FHIR server")

Expand All @@ -213,6 +211,39 @@ def make_parser() -> argparse.ArgumentParser:
return parser


def create_fhir_client(args, root_input, resources):
client_base_url = args.fhir_url
if root_input.protocol in {"http", "https"}:
if args.fhir_url and not root_input.path.startswith(args.fhir_url):
print(
"You provided both an input FHIR server and a different --fhir-url. Try dropping --fhir-url.",
file=sys.stderr,
)
raise SystemExit(errors.ARGS_CONFLICT)
client_base_url = root_input.path

try:
try:
# Try to load client ID from file first (some servers use crazy long ones, like SMART's bulk-data-server)
smart_client_id = common.read_text(args.smart_client_id).strip() if args.smart_client_id else None
except FileNotFoundError:
smart_client_id = args.smart_client_id

smart_jwks = common.read_json(args.smart_jwks) if args.smart_jwks else None
bearer_token = common.read_text(args.bearer_token).strip() if args.bearer_token else None
except OSError as exc:
print(exc, file=sys.stderr)
raise SystemExit(errors.ARGS_INVALID) from exc

return fhir_client.FhirClient(
client_base_url,
resources,
client_id=smart_client_id,
jwks=smart_jwks,
bearer_token=bearer_token,
)


async def main(args: List[str]):
parser = make_parser()
args = parser.parse_args(args)
Expand All @@ -233,45 +264,49 @@ async def main(args: List[str]):
job_context = context.JobContext(root_phi.joinpath("context.json"))
job_datetime = common.datetime_now() # grab timestamp before we do anything

if args.input_format == "i2b2":
config_loader = loaders.I2b2Loader(root_input, args.batch_size)
else:
config_loader = loaders.FhirNdjsonLoader(
root_input,
client_id=args.smart_client_id,
jwks=args.smart_jwks,
bearer_token=args.bearer_token,
since=args.since,
until=args.until,
)

# Check which tasks are being run, allowing comma-separated values
task_names = args.task and set(itertools.chain.from_iterable(t.split(",") for t in args.task))
task_filters = args.task_filter and list(itertools.chain.from_iterable(t.split(",") for t in args.task_filter))
selected_tasks = tasks.EtlTask.get_selected_tasks(task_names, task_filters)

# Pull down resources and run the MS tool on them
deid_dir = await load_and_deidentify(config_loader, selected_tasks)

# Prepare config for jobs
config = JobConfig(
args.dir_input,
deid_dir.name,
args.dir_output,
args.dir_phi,
args.input_format,
args.output_format,
comment=args.comment,
batch_size=args.batch_size,
timestamp=job_datetime,
tasks=[t.name for t in selected_tasks],
)
common.write_json(config.path_config(), config.as_json(), indent=4)
common.print_header("Configuration:")
print(json.dumps(config.as_json(), indent=4))
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

# Create a client to talk to a FHIR server.
# This is useful even if we aren't doing a bulk export, because some resources like DocumentReference can still
# reference external resources on the server (like the document text).
# If we don't need this client (e.g. we're using local data and don't download any attachments), this is a no-op.
client = create_fhir_client(args, root_input, required_resources)

async with client:
if args.input_format == "i2b2":
config_loader = loaders.I2b2Loader(root_input, args.batch_size)
else:
config_loader = loaders.FhirNdjsonLoader(root_input, client, since=args.since, until=args.until)

# Pull down resources and run the MS tool on them
deid_dir = await load_and_deidentify(config_loader, required_resources)

# Prepare config for jobs
config = JobConfig(
args.dir_input,
deid_dir.name,
args.dir_output,
args.dir_phi,
args.input_format,
args.output_format,
client,
comment=args.comment,
batch_size=args.batch_size,
timestamp=job_datetime,
tasks=[t.name for t in selected_tasks],
)
common.write_json(config.path_config(), config.as_json(), indent=4)
common.print_header("Configuration:")
print(json.dumps(config.as_json(), indent=4))

# Finally, actually run the meat of the pipeline! (Filtered down to requested tasks)
summaries = etl_job(config, selected_tasks)
# Finally, actually run the meat of the pipeline! (Filtered down to requested tasks)
summaries = await etl_job(config, selected_tasks)

# Print results to the console
common.print_header("Results:")
Expand Down
Loading