From fe809cb76ce3ef32f120ec9f47fdcbadda33a900 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 31 Jul 2024 09:54:58 -0400 Subject: [PATCH] feat: add new "cumulus-etl export" command for bulk exporting This new command is quite simple. It just takes a URL and an output folder. The usual auth and export args (like --since) are accepted. You can also filter exported resources by task with --task. But even if you don't use --since or --task, you can provide an export URL that has _since, _type, _typeFilter, whatever you want. In addition: - "--task=help" will now print a list of task names and exit. - The traditional way to kick off a bulk export when doing an ETL job (by providing a URL as the input path) now also lets you specify custom parameters like _since or _typeFilter directly in the URL. - Handle non-compliant servers that give us transactionTime values that aren't formatted correctly (as bulk-data-server does). --- .github/workflows/ci.yaml | 8 +- cumulus_etl/__init__.py | 4 +- cumulus_etl/cli.py | 8 +- cumulus_etl/cli_utils.py | 53 +++++++++---- cumulus_etl/errors.py | 1 + cumulus_etl/etl/cli.py | 27 +------ cumulus_etl/etl/tasks/task_factory.py | 25 ++++-- cumulus_etl/export/__init__.py | 3 + cumulus_etl/export/cli.py | 49 ++++++++++++ cumulus_etl/fhir/fhir_client.py | 4 +- cumulus_etl/loaders/fhir/bulk_export.py | 68 +++++++++++----- cumulus_etl/loaders/fhir/export_log.py | 9 ++- cumulus_etl/loaders/fhir/ndjson_loader.py | 21 ++++- docs/bulk-exports.md | 73 +++++++----------- pyproject.toml | 3 +- tests/etl/test_etl_cli.py | 11 +++ tests/export/__init__.py | 0 tests/export/test_export_cli.py | 78 +++++++++++++++++++ tests/fhir/test_fhir_client.py | 33 +++++++- tests/loaders/ndjson/test_bulk_export.py | 94 +++++++++++++++++++++-- 20 files changed, 438 insertions(+), 134 deletions(-) create mode 100644 cumulus_etl/export/__init__.py create mode 100644 cumulus_etl/export/cli.py create mode 100644 tests/export/__init__.py create mode 100644 tests/export/test_export_cli.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0ef2a0ea..3b8641b8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,13 +57,17 @@ jobs: run: | python -m pytest --cov=cumulus_etl --cov-report=xml + - name: Log missing coverage + run: | + coverage report -m --skip-covered + - name: Check coverage report if: github.ref != 'refs/heads/main' - uses: orgoro/coverage@v3.1 + uses: orgoro/coverage@v3.2 with: coverageFile: coverage.xml token: ${{ secrets.GITHUB_TOKEN }} - thresholdAll: .93 + thresholdAll: .97 thresholdNew: 1 thresholdModified: 1 diff --git a/cumulus_etl/__init__.py b/cumulus_etl/__init__.py index d5d9fc38..e822e92b 100644 --- a/cumulus_etl/__init__.py +++ b/cumulus_etl/__init__.py @@ -1,3 +1,3 @@ -"""Cumulus public entry point""" +"""Turns FHIR data into de-identified & aggregated records""" -__version__ = "1.1.1" +__version__ = "1.2.0" diff --git a/cumulus_etl/cli.py b/cumulus_etl/cli.py index 56b6c264..35f61e42 100644 --- a/cumulus_etl/cli.py +++ b/cumulus_etl/cli.py @@ -9,7 +9,7 @@ import rich.logging -from cumulus_etl import common, etl, upload_notes +from cumulus_etl import common, etl, export, upload_notes from cumulus_etl.etl import convert @@ -22,6 +22,7 @@ class Command(enum.Enum): CHART_REVIEW = "chart-review" CONVERT = "convert" ETL = "etl" + EXPORT = "export" UPLOAD_NOTES = "upload-notes" # Why isn't this part of Enum directly...? @@ -67,12 +68,15 @@ async def main(argv: list[str]) -> None: run_method = upload_notes.run_upload_notes elif subcommand == Command.CONVERT.value: run_method = convert.run_convert + elif subcommand == Command.EXPORT.value: + run_method = export.run_export else: parser.description = "Extract, transform, and load FHIR data." if not subcommand: # Add a note about other subcommands we offer, and tell argparse not to wrap our formatting parser.formatter_class = argparse.RawDescriptionHelpFormatter - parser.description += "\n\nother commands available:\n convert\n upload-notes" + parser.description += "\n\nother commands available:\n" + parser.description += " convert\n export\n upload-notes" run_method = etl.run_etl with tempfile.TemporaryDirectory() as tempdir: diff --git a/cumulus_etl/cli_utils.py b/cumulus_etl/cli_utils.py index 41513618..42d43c6e 100644 --- a/cumulus_etl/cli_utils.py +++ b/cumulus_etl/cli_utils.py @@ -11,47 +11,74 @@ from cumulus_etl import common, errors, store -def add_auth(parser: argparse.ArgumentParser) -> None: +def add_auth(parser: argparse.ArgumentParser, *, use_fhir_url: bool = True): group = parser.add_argument_group("authentication") - group.add_argument("--smart-client-id", metavar="ID", help="Client ID for SMART authentication") + group.add_argument("--smart-client-id", metavar="ID", help="client ID for SMART authentication") group.add_argument("--smart-jwks", metavar="PATH", help="JWKS file for SMART authentication") - group.add_argument("--basic-user", metavar="USER", help="Username for Basic authentication") + group.add_argument("--basic-user", metavar="USER", help="username for Basic authentication") group.add_argument( - "--basic-passwd", metavar="PATH", help="Password file for Basic authentication" + "--basic-passwd", metavar="PATH", help="password file for Basic authentication" ) group.add_argument( - "--bearer-token", metavar="PATH", help="Token file for Bearer authentication" - ) - group.add_argument( - "--fhir-url", - metavar="URL", - help="FHIR server base URL, only needed if you exported separately", + "--bearer-token", metavar="PATH", help="token file for Bearer authentication" ) + if use_fhir_url: + group.add_argument( + "--fhir-url", + metavar="URL", + help="FHIR server base URL, only needed if you exported separately", + ) def add_aws(parser: argparse.ArgumentParser) -> None: group = parser.add_argument_group("AWS") group.add_argument( - "--s3-region", metavar="REGION", help="If using S3 paths (s3://...), this is their region" + "--s3-region", metavar="REGION", help="if using S3 paths (s3://...), this is their region" ) group.add_argument( "--s3-kms-key", metavar="KEY", - help="If using S3 paths (s3://...), this is the KMS key ID to use", + help="if using S3 paths (s3://...), this is the KMS key ID to use", ) +def add_bulk_export(parser: argparse.ArgumentParser, *, as_subgroup: bool = True): + if as_subgroup: + parser = parser.add_argument_group("bulk export") + parser.add_argument("--since", help="start date for export from the FHIR server") + # "Until" is not an official part of the bulk FHIR API, but some custom servers support it + parser.add_argument("--until", help="end date for export from the FHIR server") + return parser + + def add_nlp(parser: argparse.ArgumentParser): group = parser.add_argument_group("NLP") group.add_argument( "--ctakes-overrides", metavar="DIR", default="/ctakes-overrides", - help="Path to cTAKES overrides dir (default is /ctakes-overrides)", + help="path to cTAKES overrides dir (default is /ctakes-overrides)", ) return group +def add_task_selection(parser: argparse.ArgumentParser): + task = parser.add_argument_group("task selection") + task.add_argument( + "--task", + action="append", + help="only consider these tasks (comma separated, " + "default is all supported FHIR resources, " + "use '--task help' to see full list)", + ) + task.add_argument( + "--task-filter", + action="append", + choices=["covid_symptom", "cpu", "gpu"], + help="restrict tasks to only the given sets (comma separated)", + ) + + def add_debugging(parser: argparse.ArgumentParser): group = parser.add_argument_group("debugging") group.add_argument("--skip-init-checks", action="store_true", help=argparse.SUPPRESS) diff --git a/cumulus_etl/errors.py b/cumulus_etl/errors.py index b3e6bf95..075fc053 100644 --- a/cumulus_etl/errors.py +++ b/cumulus_etl/errors.py @@ -33,6 +33,7 @@ FHIR_AUTH_FAILED = 32 SERVICE_MISSING = 33 # generic init-check service is missing COMPLETION_ARG_MISSING = 34 +TASK_HELP = 35 class FatalError(Exception): diff --git a/cumulus_etl/etl/cli.py b/cumulus_etl/etl/cli.py index e6d5af94..fc605e37 100644 --- a/cumulus_etl/etl/cli.py +++ b/cumulus_etl/etl/cli.py @@ -2,7 +2,6 @@ import argparse import datetime -import itertools import os import shutil import sys @@ -124,14 +123,12 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None: cli_utils.add_aws(parser) cli_utils.add_auth(parser) - export = parser.add_argument_group("bulk export") + export = cli_utils.add_bulk_export(parser) export.add_argument( "--export-to", metavar="DIR", - help="Where to put exported files (default is to delete after use)", + help="where to put exported files (default is to delete after use)", ) - export.add_argument("--since", help="Start date for export from the FHIR server") - export.add_argument("--until", help="End date for export from the FHIR server") group = parser.add_argument_group("external export identification") group.add_argument("--export-group", help=argparse.SUPPRESS) @@ -142,18 +139,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None: ) cli_utils.add_nlp(parser) - - task = parser.add_argument_group("task selection") - task.add_argument( - "--task", action="append", help="Only update the given output tables (comma separated)" - ) - task.add_argument( - "--task-filter", - action="append", - choices=["covid_symptom", "cpu", "gpu"], - help="Restrict tasks to only the given sets (comma separated)", - ) - + cli_utils.add_task_selection(parser) cli_utils.add_debugging(parser) @@ -241,12 +227,7 @@ async def etl_main(args: argparse.Namespace) -> None: job_context = context.JobContext(root_phi.joinpath("context.json")) job_datetime = common.datetime_now() # grab timestamp before we do anything - # Check which tasks are being run, allowing comma-separated values - task_names = args.task and set(itertools.chain.from_iterable(t.split(",") for t in args.task)) - task_filters = args.task_filter and list( - itertools.chain.from_iterable(t.split(",") for t in args.task_filter) - ) - selected_tasks = task_factory.get_selected_tasks(task_names, task_filters) + selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter) # Print configuration print_config(args, job_datetime, selected_tasks) diff --git a/cumulus_etl/etl/tasks/task_factory.py b/cumulus_etl/etl/tasks/task_factory.py index 5b0db1bc..cd7ec361 100644 --- a/cumulus_etl/etl/tasks/task_factory.py +++ b/cumulus_etl/etl/tasks/task_factory.py @@ -1,5 +1,6 @@ """Finds and creates ETL tasks""" +import itertools import sys from collections.abc import Iterable from typing import TypeVar @@ -64,9 +65,17 @@ def get_selected_tasks( :param filter_tags: only tasks that have all the listed tags will be eligible for selection :returns: a list of selected EtlTask subclasses, to instantiate and run """ - names = names and set(names) + names = names and set(itertools.chain.from_iterable(t.lower().split(",") for t in names)) + filter_tags = filter_tags and list( + itertools.chain.from_iterable(t.lower().split(",") for t in filter_tags) + ) filter_tag_set = set(filter_tags or []) + if names and "help" in names: + # OK, we actually are just going to print the list of all task names and be done. + _print_task_names() + raise SystemExit(errors.TASK_HELP) # not an *error* exactly, but not successful ETL either + # Just give back the default set if the user didn't specify any constraints if not names and not filter_tag_set: return get_default_tasks() @@ -88,11 +97,8 @@ def get_selected_tasks( # Check for unknown names the user gave us all_task_names = {t.name for t in all_tasks} if unknown_names := names - all_task_names: - print_names = "\n".join(sorted(f" {key}" for key in all_task_names)) - print( - f"Unknown task '{unknown_names.pop()}' requested. Valid task names:\n{print_names}", - file=sys.stderr, - ) + print(f"Unknown task '{unknown_names.pop()}' requested.", file=sys.stderr) + _print_task_names(file=sys.stderr) raise SystemExit(errors.TASK_UNKNOWN) # Check for names that conflict with the chosen filters @@ -106,3 +112,10 @@ def get_selected_tasks( raise SystemExit(errors.TASK_FILTERED_OUT) return [task for task in filtered_tasks if task.name in names] + + +def _print_task_names(*, file=sys.stdout) -> None: + all_tasks = get_all_tasks() + all_task_names = {t.name for t in all_tasks} + print_names = "\n".join(sorted(f" {key}" for key in all_task_names)) + print(f"Valid task names:\n{print_names}", file=file) diff --git a/cumulus_etl/export/__init__.py b/cumulus_etl/export/__init__.py new file mode 100644 index 00000000..68638540 --- /dev/null +++ b/cumulus_etl/export/__init__.py @@ -0,0 +1,3 @@ +"""Bulk export""" + +from .cli import run_export diff --git a/cumulus_etl/export/cli.py b/cumulus_etl/export/cli.py new file mode 100644 index 00000000..0945ab63 --- /dev/null +++ b/cumulus_etl/export/cli.py @@ -0,0 +1,49 @@ +"""Do a standalone bulk export from an EHR""" + +import argparse + +from cumulus_etl import cli_utils, fhir, loaders, store +from cumulus_etl.etl.tasks import task_factory + + +def define_export_parser(parser: argparse.ArgumentParser) -> None: + parser.usage = "cumulus-etl export [OPTION]... FHIR_URL DIR" + + parser.add_argument("url_input", metavar="https://fhir.example.com/Group/ABC") + parser.add_argument("export_to", metavar="/path/to/output") + cli_utils.add_bulk_export(parser, as_subgroup=False) + + cli_utils.add_auth(parser, use_fhir_url=False) + cli_utils.add_task_selection(parser) + + +async def export_main(args: argparse.Namespace) -> None: + """Exports data from an EHR to a folder.""" + # record filesystem options before creating Roots + store.set_user_fs_options(vars(args)) + + selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter) + required_resources = {t.resource for t in selected_tasks} + using_default_tasks = not args.task and not args.task_filter + + fhir_root = store.Root(args.url_input) + client = fhir.create_fhir_client_for_cli(args, fhir_root, required_resources) + + async with client: + loader = loaders.FhirNdjsonLoader( + fhir_root, + client=client, + export_to=args.export_to, + since=args.since, + until=args.until, + ) + await loader.load_from_bulk_export( + sorted(required_resources), prefer_url_resources=using_default_tasks + ) + + +async def run_export(parser: argparse.ArgumentParser, argv: list[str]) -> None: + """Parses an export CLI""" + define_export_parser(parser) + args = parser.parse_args(argv) + await export_main(args) diff --git a/cumulus_etl/fhir/fhir_client.py b/cumulus_etl/fhir/fhir_client.py index 088bc093..78b52b89 100644 --- a/cumulus_etl/fhir/fhir_client.py +++ b/cumulus_etl/fhir/fhir_client.py @@ -247,9 +247,9 @@ def create_fhir_client_for_cli( The usual FHIR server authentication options should be represented in args. """ - client_base_url = args.fhir_url + client_base_url = getattr(args, "fhir_url", None) if root_input.protocol in {"http", "https"}: - if args.fhir_url and not root_input.path.startswith(args.fhir_url): + if client_base_url and not root_input.path.startswith(client_base_url): print( "You provided both an input FHIR server and a different --fhir-url. Try dropping --fhir-url.", file=sys.stderr, diff --git a/cumulus_etl/loaders/fhir/bulk_export.py b/cumulus_etl/loaders/fhir/bulk_export.py index b46fa001..8ed93e54 100644 --- a/cumulus_etl/loaders/fhir/bulk_export.py +++ b/cumulus_etl/loaders/fhir/bulk_export.py @@ -40,6 +40,7 @@ def __init__( destination: str, since: str | None = None, until: str | None = None, + prefer_url_resources: bool = False, ): """ Initialize a bulk exporter (but does not start an export). @@ -50,24 +51,56 @@ def __init__( :param destination: a local folder to store all the files :param since: start date for export :param until: end date for export + :param prefer_url_resources: if the URL includes _type, ignore the provided resources """ super().__init__() self._client = client - self._resources = resources - self._url = url - if not self._url.endswith("/"): - # This will ensure the last segment does not get chopped off by urljoin - self._url += "/" self._destination = destination self._total_wait_time = 0 # in seconds, across all our requests - self._since = since - self._until = until self._log: export_log.BulkExportLogWriter = None + self._url = self.format_kickoff_url( + url, + resources=resources, + since=since, + until=until, + prefer_url_resources=prefer_url_resources, + ) + # Public properties, to be read after the export: self.export_datetime = None self.group_name = fhir.parse_group_from_url(self._url) + def format_kickoff_url( + self, + url: str, + *, + resources: list[str], + since: str | None, + until: str | None, + prefer_url_resources: bool, + ) -> str: + parsed = urllib.parse.urlsplit(url) + + # Add an export path to the end of the URL if it's not provided + if not parsed.path.endswith("/$export"): + parsed = parsed._replace(path=os.path.join(parsed.path, "$export")) + + # Integrate in any externally-provided flags + query = urllib.parse.parse_qs(parsed.query) + ignore_provided_resources = prefer_url_resources and "_type" in query + if not ignore_provided_resources: + query.setdefault("_type", []).extend(resources) + if since: + query["_since"] = since + if until: + # This parameter is not part of the FHIR spec and is unlikely to be supported by your + # server. But some custom servers *do* support it, so we added support for it too. + query["_until"] = until + parsed = parsed._replace(query=urllib.parse.urlencode(query, doseq=True)) + + return urllib.parse.urlunsplit(parsed) + async def export(self) -> None: """ Bulk export resources from a FHIR server into local ndjson files. @@ -89,26 +122,17 @@ async def export(self) -> None: print("Starting bulk FHIR export…") self._log = export_log.BulkExportLogWriter(store.Root(self._destination)) - params = {"_type": ",".join(self._resources)} - if self._since: - params["_since"] = self._since - if self._until: - # This parameter is not part of the FHIR spec and is unlikely to be supported by your server. - # But some servers do support it, and it is a possible future addition to the spec. - params["_until"] = self._until - - full_url = urllib.parse.urljoin(self._url, f"$export?{urllib.parse.urlencode(params)}") try: response = await self._request_with_delay( - full_url, + self._url, headers={"Prefer": "respond-async"}, target_status_code=202, ) except Exception as exc: - self._log.kickoff(full_url, self._client.get_capabilities(), exc) + self._log.kickoff(self._url, self._client.get_capabilities(), exc) raise else: - self._log.kickoff(full_url, self._client.get_capabilities(), response) + self._log.kickoff(self._url, self._client.get_capabilities(), response) # Grab the poll location URL for status updates poll_location = response.headers["Content-Location"] @@ -125,7 +149,11 @@ async def export(self) -> None: # Finished! We're done waiting and can download all the files response_json = response.json() - self.export_datetime = datetime.datetime.fromisoformat(response_json["transactionTime"]) + try: + transaction_time = response_json.get("transactionTime", "") + self.export_datetime = datetime.datetime.fromisoformat(transaction_time) + except ValueError: + pass # server gave us a bad timestamp, ignore it :shrug: # Were there any server-side errors during the export? # The spec acknowledges that "error" is perhaps misleading for an array that can contain info messages. diff --git a/cumulus_etl/loaders/fhir/export_log.py b/cumulus_etl/loaders/fhir/export_log.py index c6d4fa3a..0a394b9d 100644 --- a/cumulus_etl/loaders/fhir/export_log.py +++ b/cumulus_etl/loaders/fhir/export_log.py @@ -172,8 +172,15 @@ def kickoff(self, url: str, capabilities: dict, response: httpx.Response | Excep software = capabilities.get("software", {}) response_info = {} + # Create a "merged" version of the params. + # (Merged in the sense that duplicates are converted to comma separated lists.) + request_headers = {} + for k, v in httpx.URL(url).params.multi_items(): + if k in request_headers: + request_headers[k] += f",{v}" + else: + request_headers[k] = v # Spec says we shouldn't log the `patient` parameter, so strip it here. - request_headers = dict(httpx.URL(url).params) request_headers.pop("patient", None) if isinstance(response, Exception): diff --git a/cumulus_etl/loaders/fhir/ndjson_loader.py b/cumulus_etl/loaders/fhir/ndjson_loader.py index d3be266e..e21d6e7e 100644 --- a/cumulus_etl/loaders/fhir/ndjson_loader.py +++ b/cumulus_etl/loaders/fhir/ndjson_loader.py @@ -37,7 +37,7 @@ def __init__( async def load_all(self, resources: list[str]) -> common.Directory: # Are we doing a bulk FHIR export from a server? if self.root.protocol in ["http", "https"]: - loaded_dir = await self._load_from_bulk_export(resources) + loaded_dir = await self.load_from_bulk_export(resources) input_root = store.Root(loaded_dir.name) else: if self.export_to or self.since or self.until: @@ -57,6 +57,7 @@ async def load_all(self, resources: list[str]) -> common.Directory: # Once we require group name & export datetime, we should warn about this. # For now, just ignore any errors. pass + # Copy the resources we need from the remote directory (like S3 buckets) to a local one. # # We do this even if the files are local, because the next step in our pipeline is the MS deid tool, @@ -73,12 +74,26 @@ async def load_all(self, resources: list[str]) -> common.Directory: input_root.get(filename, f"{tmpdir.name}/") return tmpdir - async def _load_from_bulk_export(self, resources: list[str]) -> common.Directory: + async def load_from_bulk_export( + self, resources: list[str], prefer_url_resources: bool = False + ) -> common.Directory: + """ + Performs a bulk export and drops the results in an export dir. + + :param resources: a list of resources to load + :param prefer_url_resources: if the URL includes _type, ignore the provided resources + """ target_dir = cli_utils.make_export_dir(self.export_to) try: bulk_exporter = BulkExporter( - self.client, resources, self.root.path, target_dir.name, self.since, self.until + self.client, + resources, + self.root.path, + target_dir.name, + self.since, + self.until, + prefer_url_resources=prefer_url_resources, ) await bulk_exporter.export() diff --git a/docs/bulk-exports.md b/docs/bulk-exports.md index 2f8ce6f7..a63bf9a8 100644 --- a/docs/bulk-exports.md +++ b/docs/bulk-exports.md @@ -10,28 +10,41 @@ nav_order: 9 Cumulus ETL wants data, and lots of it. -It's happy to ingest data that you've gathered elsewhere (what we call external exports), -and it's happy to download the data itself (internal exports). +It's happy to ingest data that you've gathered elsewhere (as a separate export), +but it's also happy to download the data itself as needed during the ETL (as an on-the-fly export). -## External Exports +## Separate Exports -If you have an existing process to export health data, you can do that bulk export externally, +1. If you have an existing process to export health data, you can do that bulk export externally, and then just feed the resulting files to Cumulus ETL. -Or you may need more export options than our internal exporter supports. -The [SMART Bulk Data Client](https://github.com/smart-on-fhir/bulk-data-client) -is a great tool with lots of features. +2. Cumulus ETL has an `export` command to perform just a bulk export without an ETL step. + Run it like so: `cumulus-etl export FHIR_URL ./output` (see `--help` for more options). + You can use all sorts of + [interesting FHIR options](https://hl7.org/fhir/uv/bulkdata/export.html#query-parameters) + like `_typeFilter` or `_since` in the URL. -In either case, it's simple to feed that data to the ETL: +3. Or you may need more advanced options than our internal exporter supports. + The [SMART Bulk Data Client](https://github.com/smart-on-fhir/bulk-data-client) + is a great tool with lots of features. + +In any case, it's simple to feed that data to the ETL: 1. Pass Cumulus ETL the folder that holds the downloaded data as the input path. -1. Pass `--fhir-url=` pointing at your FHIR server so that external document notes can be downloaded. +1. Pass `--fhir-url=` pointing at your FHIR server so that externally referenced document notes + and medications can still be downloaded as needed. + +## On-The-Fly Exports -## Internal Exports +If it's easier to just do it all in one step, +you can also start an ETL run with your FHIR URL as the input path. +Cumulus ETL will do a bulk export first, then ETL the results. -If you don't have an existing process or you don't need too many fancy options, -Cumulus ETL's internal bulk exporter can do the trick. +You can save the exported files for archiving after the fact with `--export-to=PATH`. -### Registering Cumulus ETL +However, bulk exports tend to be brittle and slow for many EHRs at the time of this writing. +It might be wiser to separately export, make sure the data is all there and good, and then ETL it. + +## Registering an Export Client On your server, you need to register a new "backend service" client. You'll be asked to provide a JWKS (JWK Set) file. @@ -70,37 +83,3 @@ And for Cumulus ETL's input path argument, you will give your server's URL address, including a Group identifier if you want to scope the export (e.g. `https://example.com/fhir` or `https://example.com/fhir/Group/1234`). - -### Narrowing Export Scope - -You can pass `--since=` and/or `--until=` to narrow your bulk export to a date range. - -Note that support for these parameters among EHRs is not super common. -- `--since=` is in the FHIR spec but is not required by law. - (And notably, it's not supported by Epic.) -- `--until=` is not even in the FHIR spec yet. No major EHR supports it. - -But if you are lucky enough to be working with an EHR that supports either one, -you can pass in a time like `--since=2023-01-16T20:32:48Z`. - -### Saving Bulk Export Files - -Bulk exports can be tricky to get right and can take a long time. -Often (and especially when first experimenting with Cumulus ETL), -you will want to save the results of a bulk export for inspection or in case Cumulus ETL fails. - -By default, Cumulus ETL throws away the results of a bulk export once it's done with them. -But you can pass `--export-to=/path/to/folder` to instead save the exported `.ndjson` files in the given folder. - -Note that you'll want to expose the local path to docker so that the files reach your actual disk, like so: - -```sh -docker compose \ - run --rm \ - --volume /my/exported/files:/folder \ - cumulus-etl \ - --export-to=/folder \ - https://my-fhir-server/ \ - s3://output/ \ - s3://phi/ -``` diff --git a/pyproject.toml b/pyproject.toml index e4678aec..e8833d23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,6 @@ authors = [ { name="Andy McMurry, PhD", email="andrew.mcmurry@childrens.harvard.edu" }, { name="Michael Terry", email="michael.terry@childrens.harvard.edu" }, ] -description = "Turns FHIR data into de-identified & aggregated records" readme = "README.md" license = { text="Apache License 2.0" } classifiers = [ @@ -37,7 +36,7 @@ classifiers = [ "Programming Language :: Python :: 3", "Topic :: Software Development :: Libraries :: Python Modules", ] -dynamic = ["version"] +dynamic = ["description", "version"] [project.optional-dependencies] tests = [ diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index c82c93b6..dc71b26e 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -87,6 +87,11 @@ async def test_unknown_task(self): await self.run_etl(tasks=["blarg"]) self.assertEqual(errors.TASK_UNKNOWN, cm.exception.code) + async def test_help_task(self): + with self.assertRaises(SystemExit) as cm: + await self.run_etl(tasks=["patient", "help"]) + self.assertEqual(errors.TASK_HELP, cm.exception.code) + async def test_failed_task(self): # Make it so any writes will fail with mock.patch( @@ -194,6 +199,12 @@ async def test_bulk_no_auth(self): await self.run_etl(input_path="https://localhost:12345/", tasks=["patient"]) self.assertEqual(errors.BULK_EXPORT_FAILED, cm.exception.code) + async def test_bulk_no_url(self): + """Verify that if no FHIR URL is provided, but export args *are*, we'll error out.""" + with self.assertRaises(SystemExit) as cm: + await self.run_etl(export_to="output/dir") + self.assertEqual(errors.ARGS_CONFLICT, cm.exception.code) + async def test_no_ms_tool(self): """Verify that we require the MS tool to be in PATH.""" self.patch_dict(os.environ, {"PATH": "/nothing-here"}) diff --git a/tests/export/__init__.py b/tests/export/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/export/test_export_cli.py b/tests/export/test_export_cli.py new file mode 100644 index 00000000..683d1896 --- /dev/null +++ b/tests/export/test_export_cli.py @@ -0,0 +1,78 @@ +"""Tests for export/cli.py""" + +from unittest import mock + +import ddt + +from cumulus_etl import cli +from cumulus_etl.etl.tasks.task_factory import get_default_tasks +from tests.utils import AsyncTestCase + + +@ddt.ddt +class TestExportCLI(AsyncTestCase): + """Tests for high-level export support.""" + + def setUp(self): + super().setUp() + self.loader_mock = mock.AsyncMock() + self.loader_init_mock = self.patch( + "cumulus_etl.loaders.FhirNdjsonLoader", return_value=self.loader_mock + ) + self.client_mock = self.patch("cumulus_etl.fhir.create_fhir_client_for_cli") + + async def run_export(self, *args) -> None: + await cli.main(["export", "https://example.com", "fake/path", *args]) + + @ddt.data( + ([], True), + (["--task=patient"], False), + (["--task-filter=gpu"], False), + ) + @ddt.unpack + async def test_prefer_url_resources(self, args, expected_prefer): + """Verify that if no task filtering is done, we flag to prefer the url's _type""" + await self.run_export(*args) + self.assertEqual( + expected_prefer, + self.loader_mock.load_from_bulk_export.call_args.kwargs["prefer_url_resources"], + ) + + @ddt.data( + ([], ["*default*"]), # special value that the test will expand + (["--task=patient,condition"], ["Condition", "Patient"]), + (["--task-filter=covid_symptom"], ["DocumentReference"]), + ) + @ddt.unpack + async def test_task_selection(self, args, expected_resources): + """Verify that we do the expected task filtering as requested""" + await self.run_export(*args) + if expected_resources == ["*default*"]: + expected_resources = sorted(t.resource for t in get_default_tasks()) + self.assertEqual( + expected_resources, + self.loader_mock.load_from_bulk_export.call_args.args[0], + ) + + async def test_arg_passthrough(self): + """Verify that we accept and send down all our different args""" + await self.run_export( + "--since=1920", + "--until=1923", + "--smart-client-id=ID", + "--smart-jwks=jwks.json", + "--basic-user=alice", + "--basic-passwd=passwd.txt", + "--bearer-token=token.txt", + ) + # built-in positional args + self.assertEqual("https://example.com", self.loader_init_mock.call_args.args[0].path) + self.assertEqual("fake/path", self.loader_init_mock.call_args.kwargs["export_to"]) + # custom args from above + self.assertEqual("1920", self.loader_init_mock.call_args.kwargs["since"]) + self.assertEqual("1923", self.loader_init_mock.call_args.kwargs["until"]) + self.assertEqual("ID", self.client_mock.call_args.args[0].smart_client_id) + self.assertEqual("jwks.json", self.client_mock.call_args.args[0].smart_jwks) + self.assertEqual("alice", self.client_mock.call_args.args[0].basic_user) + self.assertEqual("passwd.txt", self.client_mock.call_args.args[0].basic_passwd) + self.assertEqual("token.txt", self.client_mock.call_args.args[0].bearer_token) diff --git a/tests/fhir/test_fhir_client.py b/tests/fhir/test_fhir_client.py index 0a3092e1..2613bc59 100644 --- a/tests/fhir/test_fhir_client.py +++ b/tests/fhir/test_fhir_client.py @@ -172,7 +172,8 @@ async def test_get_with_new_header(self): self.respx_mock.get( f"{self.server_url}/foo", headers={ - "Accept": "application/fhir+json", # just to confirm we don't replace default headers entirely + # just to confirm we don't replace default headers entirely + "Accept": "application/fhir+json", "Test": "Value", }, ) @@ -272,7 +273,30 @@ async def test_authorize_error(self, response_params, expected_error): mock_fatal.call_args, ) - async def test_get_error_401(self): + def test_file_read_error(self): + """Verify that if we can't read a provided file, we gracefully error out""" + args = argparse.Namespace( + fhir_url=None, + smart_client_id=None, + smart_jwks="does-not-exist.txt", + basic_user=None, + basic_passwd=None, + bearer_token=None, + ) + with self.assertRaises(SystemExit) as cm: + fhir.create_fhir_client_for_cli(args, store.Root("/tmp"), []) + self.assertEqual(errors.ARGS_INVALID, cm.exception.code) + + async def test_must_be_context_manager(self): + """Verify that FHIRClient enforces its use as a context manager.""" + client = fhir.FhirClient( + self.server_url, [], smart_client_id=self.client_id, smart_jwks=self.jwks + ) + with self.assertRaisesRegex(RuntimeError, "FhirClient must be used as a context manager"): + await client.request("GET", "foo") + + @ddt.data(True, False) # confirm that we handle both stream and non-stream resets + async def test_get_error_401(self, stream_mode): """Verify that an expired token is refreshed.""" route = self.respx_mock.get(f"{self.server_url}/foo") route.side_effect = [make_response(status_code=401), make_response()] @@ -283,7 +307,7 @@ async def test_get_error_401(self): self.assertEqual(1, self.respx_mock["token"].call_count) # Check that we correctly tried to re-authenticate - response = await server.request("GET", "foo") + response = await server.request("GET", "foo", stream=stream_mode) self.assertEqual(200, response.status_code) self.assertEqual(2, self.respx_mock["token"].call_count) @@ -368,11 +392,14 @@ def mock_as_server_type(self, server_type: str | None): response_json = {} if server_type == "epic": response_json = {"software": {"name": "Epic"}} + elif server_type == "cerner": + response_json = {"publisher": "Cerner"} self.respx_mock.get(f"{self.server_url}/metadata").respond(json=response_json) @ddt.data( ("epic", "present"), + ("cerner", "missing"), (None, "missing"), ) @ddt.unpack diff --git a/tests/loaders/ndjson/test_bulk_export.py b/tests/loaders/ndjson/test_bulk_export.py index 6f13fadd..cdf9e16c 100644 --- a/tests/loaders/ndjson/test_bulk_export.py +++ b/tests/loaders/ndjson/test_bulk_export.py @@ -3,6 +3,7 @@ import contextlib import datetime import io +import os import tempfile from unittest import mock @@ -91,7 +92,10 @@ def assert_log_equals(self, *rows) -> None: self.assertEqual(reordered_details[index][1], row[1]) def mock_kickoff( - self, params: str = "?_type=Condition%2CPatient", side_effect: list | None = None, **kwargs + self, + params: str = "?_type=Condition&_type=Patient", + side_effect: list | None = None, + **kwargs, ) -> None: kwargs.setdefault("status_code", 202) route = self.respx_mock.get( @@ -172,7 +176,7 @@ async def test_happy_path(self): ( "kickoff", { - "exportUrl": f"{self.fhir_url}/$export?_type=Condition%2CPatient", + "exportUrl": f"{self.fhir_url}/$export?_type=Condition&_type=Patient", "softwareName": "Test", "softwareVersion": "0.git", "softwareReleaseDate": "today", @@ -240,7 +244,7 @@ async def test_happy_path(self): async def test_since_until(self): """Verify that we send since & until parameters correctly to the server""" self.mock_kickoff( - params="?_type=Condition%2CPatient&_since=2000-01-01T00%3A00%3A00%2B00.00&_until=2010", + params="?_type=Condition&_type=Patient&_since=2000-01-01T00%3A00%3A00%2B00.00&_until=2010", status_code=500, # early exit ) @@ -256,7 +260,7 @@ async def test_export_error(self): headers={"Accept": "application/json"}, ).respond( json={ - "transactionTime": "2015-02-07T13:28:17.239+02:00", + "transactionTime": "bogus-time", # just confirm we gracefully handle this "error": [ {"type": "OperationOutcome", "url": "https://example.com/err1"}, {"type": "OperationOutcome", "url": "https://example.com/err2"}, @@ -303,6 +307,8 @@ async def test_export_error(self): ): await self.export() + self.assertIsNone(self.exporter.export_datetime) # date time couldn't be parsed + self.assert_log_equals( ("kickoff", None), ( @@ -311,7 +317,7 @@ async def test_export_error(self): "deletedFileCount": 0, "errorFileCount": 2, "outputFileCount": 1, - "transactionTime": "2015-02-07T13:28:17.239+02:00", + "transactionTime": "bogus-time", }, ), ( @@ -429,7 +435,7 @@ async def test_unexpected_status_code(self): ( "kickoff", { - "exportUrl": f"{self.fhir_url}/$export?_type=Condition%2CPatient", + "exportUrl": f"{self.fhir_url}/$export?_type=Condition&_type=Patient", "softwareName": "Test", "softwareVersion": "0.git", "softwareReleaseDate": "today", @@ -527,6 +533,15 @@ async def test_no_delete_if_interrupted(self): ), ) + async def test_delete_error_is_ignored(self): + """Verify that we don't freak out if our DELETE call fails""" + self.mock_kickoff() + self.mock_delete(status_code=500) + self.respx_mock.get("https://example.com/poll").respond( + json={"transactionTime": "2015-02-07T13:28:17.239+02:00"}, + ) + await self.export() # no exception thrown + async def test_log_duration(self): """Verify that we calculate the correct export duration for the logs""" @@ -556,6 +571,40 @@ def status_check(request): ) +@ddt.ddt +class TestBulkExporterInit(utils.AsyncTestCase): + """Tests for just creating the exporter, without any mocking needed""" + + @ddt.data( + ("http://a.com", {}, "http://a.com/$export?_type=Patient"), # no extra args + ( # all the args, including merged _type fields and overridden _since! + "http://a.com/$export?_type=Condition&_since=2000", + {"since": "2020", "until": "2024"}, + "http://a.com/$export?_type=Condition&_type=Patient&_since=2020&_until=2024", + ), + ( # extraneous args are kept + "http://a.com/$export?_elements=birthDate", + {}, + "http://a.com/$export?_elements=birthDate&_type=Patient", + ), + ( # can ignore provided resources when _type is present + "http://a.com/$export?_type=Condition", + {"prefer_url_resources": True}, + "http://a.com/$export?_type=Condition", + ), + ( # will not ignore provided resources when _type is *not* present + "http://a.com/$export", + {"prefer_url_resources": True}, + "http://a.com/$export?_type=Patient", + ), + ) + @ddt.unpack + def test_param_merging(self, original_url, kwargs, expected_url): + """Verify that we allow existing params and add some in ourselves""" + exporter = BulkExporter(None, ["Patient"], original_url, "fake_dir", **kwargs) + self.assertEqual(exporter._url, expected_url) + + class TestBulkExportEndToEnd(utils.AsyncTestCase, utils.FhirClientMixin): """ Test case for doing an entire bulk export loop, without mocking python code. @@ -619,8 +668,8 @@ def set_up_requests(self): status_code=202, ) - async def test_successful_bulk_export(self): - """Verify a happy path bulk export, from toe to tip""" + async def test_successful_etl_bulk_export(self): + """Verify a happy path ETL bulk export, from toe to tip""" with tempfile.TemporaryDirectory() as tmpdir: self.set_up_requests() @@ -654,3 +703,32 @@ async def test_successful_bulk_export(self): }, common.read_json(f"{tmpdir}/output/etl__completion/etl__completion.000.ndjson"), ) + + async def test_successful_standalone_bulk_export(self): + """Verify a happy path standalone bulk export, from toe to tip""" + with tempfile.TemporaryDirectory() as tmpdir: + self.set_up_requests() + + await cli.main( + [ + "export", + self.fhir_url, + tmpdir, + "--task=patient", + f"--smart-client-id={self.fhir_client_id}", + f"--smart-jwks={self.fhir_jwks_path}", + ] + ) + + self.assertEqual({"Patient.000.ndjson", "log.ndjson"}, set(os.listdir(tmpdir))) + + self.assertEqual( + { + "id": "testPatient1", + "resourceType": "Patient", + }, + common.read_json(f"{tmpdir}/Patient.000.ndjson"), + ) + + # log should have kickoff, progress, download start, download complete, finished + self.assertEqual(5, common.read_local_line_count(f"{tmpdir}/log.ndjson"))