Skip to content

Commit

Permalink
Merge pull request #374 from smart-on-fhir/mikix/inline
Browse files Browse the repository at this point in the history
feat: add inlining options and a new `inline` command
  • Loading branch information
mikix authored Feb 4, 2025
2 parents 2395bb4 + da3d4c1 commit 9bba38d
Show file tree
Hide file tree
Showing 33 changed files with 1,443 additions and 229 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
# Compare results
export OUTDIR=$DATADIR/run-output/covid_symptom__nlp_results
sudo chown -R $(id -u) $OUTDIR
sed -i 's/"generated_on": "[^"]*", //g' $OUTDIR/*.ndjson
sed -i 's/"generated_on":"[^"]*",//g' $OUTDIR/*.ndjson
diff -upr $DATADIR/expected-output $OUTDIR
echo "All Good!"
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Turns FHIR data into de-identified & aggregated records"""

__version__ = "2.0.0"
__version__ = "2.1.0"
7 changes: 5 additions & 2 deletions cumulus_etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import rich.logging

from cumulus_etl import common, etl, export, upload_notes
from cumulus_etl import common, etl, export, inliner, upload_notes
from cumulus_etl.etl import convert, init


Expand All @@ -20,6 +20,7 @@ class Command(enum.Enum):
ETL = "etl"
EXPORT = "export"
INIT = "init"
INLINE = "inline"
UPLOAD_NOTES = "upload-notes"

# Why isn't this part of Enum directly...?
Expand Down Expand Up @@ -69,13 +70,15 @@ async def main(argv: list[str]) -> None:
run_method = export.run_export
elif subcommand == Command.INIT.value:
run_method = init.run_init
elif subcommand == Command.INLINE.value:
run_method = inliner.run_inline
else:
parser.description = "Extract, transform, and load FHIR data."
if not subcommand:
# Add a note about other subcommands we offer, and tell argparse not to wrap our formatting
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.description += "\n\nother commands available:\n"
parser.description += " convert\n export\n init\n upload-notes"
parser.description += " convert\n export\n init\n inline\n upload-notes"
run_method = etl.run_etl

with tempfile.TemporaryDirectory() as tempdir:
Expand Down
72 changes: 72 additions & 0 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Helper methods for CLI parsing."""

import argparse
import itertools
import os
import socket
import tempfile
Expand Down Expand Up @@ -61,6 +62,24 @@ def add_bulk_export(parser: argparse.ArgumentParser, *, as_subgroup: bool = True
"--until", metavar="TIMESTAMP", help="end date for export from the FHIR server"
)
parser.add_argument("--resume", metavar="URL", help="polling status URL from a previous export")
parser.add_argument(
"--inline",
action="store_true",
help="attachments will be inlined after the export",
)
parser.add_argument(
"--inline-resource",
metavar="RESOURCES",
action="append",
help="only consider this resource for inlining (default is all supported inline targets: "
"DiagnosticReport and DocumentReference)",
)
parser.add_argument(
"--inline-mimetype",
metavar="MIMETYPES",
action="append",
help="only inline this attachment mimetype (default is text, HTML, and XHTML)",
)
return parser


Expand Down Expand Up @@ -176,3 +195,56 @@ def make_progress_bar() -> rich.progress.Progress:
rich.progress.TimeElapsedColumn(),
]
return rich.progress.Progress(*columns)


def expand_inline_resources(arg: Iterable[str] | None) -> set[str]:
"""
This converts a list of inline resource args into the final properly cased resource names.
If you have an arg like --inline-resource, this will process that for you.
"""
allowed = {"diagnosticreport": "DiagnosticReport", "documentreference": "DocumentReference"}

if arg is None:
return set(allowed.values())

resources = set(expand_comma_list_arg(arg))
for resource in resources:
if resource.casefold() not in allowed:
errors.fatal(f"Unsupported resource for inlining: {resource}", errors.ARGS_INVALID)

return {allowed[resource.casefold()] for resource in resources}


def expand_inline_mimetypes(arg: Iterable[str] | None) -> set[str]:
"""
This converts a list of inline mimetype args into a set of normalized mimetypes.
If you have an arg like --inline-mimetype, this will process that for you.
"""
if arg is None:
return {"text/plain", "text/html", "application/xhtml+xml"}

return set(expand_comma_list_arg(arg, casefold=True))


def expand_comma_list_arg(arg: Iterable[str] | None, casefold: bool = False) -> Iterable[str]:
"""
This converts a list of string args, splits any strings on commas, and combines results.
This is useful for CLI arguments with action="append" but you also want to allow comma
separated args. --task does this, as well as others.
An example CLI:
--task=patient --task=condition,procedure
Would give:
["patient", "condition,procedure"]
And this method would turn that into:
["patient", "condition", procedure"]
"""
if arg is None:
return []
split_args = itertools.chain.from_iterable(x.split(",") for x in arg)
if casefold:
return map(str.casefold, split_args)
return split_args
18 changes: 2 additions & 16 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def write(self, obj: dict) -> None:
# lazily create the file, to avoid 0-line ndjson files (unless created in __init__)
self._ensure_file()

json.dump(obj, self._file)
# Specify separators for the most compact (no whitespace) representation saves disk space.
json.dump(obj, self._file, separators=(",", ":"))
self._file.write("\n")


Expand Down Expand Up @@ -316,21 +317,6 @@ def human_time_offset(seconds: int) -> str:
return f"{_pretty_float(hours)}h"


def info_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


def debug_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


def warn_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.WARN)


def print_header(name: str | None = None) -> None:
"""Prints a section break to the console, with a name for the user"""
rich.get_console().rule()
Expand Down
12 changes: 11 additions & 1 deletion cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,30 @@
MISSING_REQUESTED_RESOURCES = 36
TOO_MANY_SMART_CREDENTIALS = 37
BAD_SMART_CREDENTIAL = 38
INLINE_TASK_FAILED = 39
INLINE_WITHOUT_FOLDER = 40


class FatalError(Exception):
"""An unrecoverable error"""


class NetworkError(FatalError):
"""An unrecoverable network error"""
"""A network error"""

def __init__(self, msg: str, response: httpx.Response):
super().__init__(msg)
self.response = response


class FatalNetworkError(NetworkError):
"""An unrecoverable network error that should not be retried"""


class TemporaryNetworkError(NetworkError):
"""An recoverable network error that could be retried"""


class FhirConnectionConfigError(FatalError):
"""We needed to connect to a FHIR server but are not configured correctly"""

Expand Down
6 changes: 6 additions & 0 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ async def etl_main(args: argparse.Namespace) -> None:
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

inline_resources = cli_utils.expand_inline_resources(args.inline_resource)
inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype)

# Create a client to talk to a FHIR server.
# This is useful even if we aren't doing a bulk export, because some resources like DocumentReference can still
# reference external resources on the server (like the document text).
Expand All @@ -326,6 +329,9 @@ async def etl_main(args: argparse.Namespace) -> None:
since=args.since,
until=args.until,
resume=args.resume,
inline=args.inline,
inline_resources=inline_resources,
inline_mimetypes=inline_mimetypes,
)

required_resources = await check_available_resources(
Expand Down
13 changes: 5 additions & 8 deletions cumulus_etl/etl/tasks/task_factory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Finds and creates ETL tasks"""

import itertools
import sys
from collections.abc import Iterable
from typing import TypeVar

from cumulus_etl import errors
from cumulus_etl import cli_utils, errors
from cumulus_etl.etl.studies import covid_symptom, hftest
from cumulus_etl.etl.tasks import basic_tasks

Expand Down Expand Up @@ -67,13 +66,11 @@ def get_selected_tasks(
:param filter_tags: only tasks that have all the listed tags will be eligible for selection
:returns: a list of selected EtlTask subclasses, to instantiate and run
"""
names = names and set(itertools.chain.from_iterable(t.lower().split(",") for t in names))
filter_tags = filter_tags and list(
itertools.chain.from_iterable(t.lower().split(",") for t in filter_tags)
)
filter_tag_set = set(filter_tags or [])
names = set(cli_utils.expand_comma_list_arg(names, casefold=True))
filter_tags = list(cli_utils.expand_comma_list_arg(filter_tags, casefold=True))
filter_tag_set = set(filter_tags)

if names and "help" in names:
if "help" in names:
# OK, we actually are just going to print the list of all task names and be done.
_print_task_names()
raise SystemExit(errors.TASK_HELP) # not an *error* exactly, but not successful ETL either
Expand Down
6 changes: 6 additions & 0 deletions cumulus_etl/export/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ async def export_main(args: argparse.Namespace) -> None:
required_resources = {t.resource for t in selected_tasks}
using_default_tasks = not args.task and not args.task_filter

inline_resources = cli_utils.expand_inline_resources(args.inline_resource)
inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype)

fhir_root = store.Root(args.url_input)
client = fhir.create_fhir_client_for_cli(args, fhir_root, required_resources)

Expand All @@ -39,6 +42,9 @@ async def export_main(args: argparse.Namespace) -> None:
since=args.since,
until=args.until,
resume=args.resume,
inline=args.inline,
inline_resources=inline_resources,
inline_mimetypes=inline_mimetypes,
)
await loader.load_from_bulk_export(
sorted(required_resources), prefer_url_resources=using_default_tasks
Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/fhir/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
FhirUrl,
download_reference,
get_docref_note,
parse_content_type,
parse_datetime,
ref_resource,
request_attachment,
unref_resource,
)
Loading

0 comments on commit 9bba38d

Please sign in to comment.