From b0a482b9aea8dba595290c7f84bf48d4f34d439d Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Mon, 28 Oct 2024 14:08:22 -0400 Subject: [PATCH] feat!: require completion tracking info, write completion by default - Removes the --write-completion opt-in flag, it is now always enabled. - Requires export group name and timestamp information to be available, either from export log or from CLI. - Updates some user docs, explaining how completion tracking expects to be fed data. --- cumulus_etl/errors.py | 7 ++- cumulus_etl/etl/cli.py | 44 ++++++++++----- cumulus_etl/etl/tasks/base.py | 6 --- cumulus_etl/etl/tasks/basic_tasks.py | 14 ++--- docs/bulk-exports.md | 68 ++++++++++++++++++++++-- tests/data/simple/input/log.ndjson | 2 + tests/etl/base.py | 3 -- tests/etl/test_etl_cli.py | 16 +++--- tests/etl/test_tasks.py | 24 --------- tests/loaders/ndjson/test_bulk_export.py | 1 - tests/test_cli.py | 2 - 11 files changed, 114 insertions(+), 73 deletions(-) create mode 100644 tests/data/simple/input/log.ndjson diff --git a/cumulus_etl/errors.py b/cumulus_etl/errors.py index 6e2e9b31..fe7f7d6c 100644 --- a/cumulus_etl/errors.py +++ b/cumulus_etl/errors.py @@ -69,7 +69,10 @@ def __init__(self): ) -def fatal(message: str, status: int) -> NoReturn: +def fatal(message: str, status: int, extra: str = "") -> NoReturn: """Convenience method to exit the program with a user-friendly error message a test-friendly status code""" - rich.console.Console(stderr=True).print(message, style="bold red", highlight=False) + stderr = rich.console.Console(stderr=True) + stderr.print(message, style="bold red", highlight=False) + if extra: + stderr.print(rich.padding.Padding.indent(extra, 2), highlight=False) sys.exit(status) # raises a SystemExit exception diff --git a/cumulus_etl/etl/cli.py b/cumulus_etl/etl/cli.py index a20fb5c3..481507c6 100644 --- a/cumulus_etl/etl/cli.py +++ b/cumulus_etl/etl/cli.py @@ -138,11 +138,18 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None: ) group = parser.add_argument_group("external export identification") - group.add_argument("--export-group", help=argparse.SUPPRESS) - group.add_argument("--export-timestamp", help=argparse.SUPPRESS) - # Temporary explicit opt-in flag during the development of the completion-tracking feature group.add_argument( - "--write-completion", action="store_true", default=False, help=argparse.SUPPRESS + "--export-group", + metavar="NAME", + help="name of the FHIR Group that was exported (default is to grab this from an " + "export log file in the input folder, but you can also use this to assign a " + "nickname as long as you consistently set the same nickname)", + ) + group.add_argument( + "--export-timestamp", + metavar="TIMESTAMP", + help="when the data was exported from the FHIR Group (default is to grab this from an " + "export log file in the input folder)", ) cli_utils.add_nlp(parser) @@ -206,17 +213,26 @@ def handle_completion_args( else loader_results.export_datetime ) - # Disable entirely if asked to - if not args.write_completion: - export_group_name = None - export_datetime = None - - # Error out if we have mismatched args - has_group_name = export_group_name is not None - has_datetime = bool(export_datetime) - if has_group_name and not has_datetime: + # Error out if we have missing args + missing_group_name = export_group_name is None + missing_datetime = not export_datetime + if missing_group_name and missing_datetime: + errors.fatal( + "Missing Group name and timestamp export information for the input data.", + errors.COMPLETION_ARG_MISSING, + extra="This is likely because you don’t have an export log in your input folder.\n" + "This log file (log.ndjson) is generated by some bulk export tools.\n" + "Instead, please manually specify the Group name and timestamp of the export " + "with the --export-group and --export-timestamp options.\n" + "These options are necessary to track whether all the required data from " + "a Group has been imported and is ready to be used.\n" + "See https://docs.smarthealthit.org/cumulus/etl/bulk-exports.html for more " + "information.\n", + ) + # These next two errors can be briefer because the user clearly knows about the args. + elif missing_datetime: errors.fatal("Missing --export-datetime argument.", errors.COMPLETION_ARG_MISSING) - elif not has_group_name and has_datetime: + elif missing_group_name: errors.fatal("Missing --export-group argument.", errors.COMPLETION_ARG_MISSING) return export_group_name, export_datetime diff --git a/cumulus_etl/etl/tasks/base.py b/cumulus_etl/etl/tasks/base.py index a4acb635..1d3924e7 100644 --- a/cumulus_etl/etl/tasks/base.py +++ b/cumulus_etl/etl/tasks/base.py @@ -113,9 +113,6 @@ def __init__(self, task_config: config.JobConfig, scrubber: deid.Scrubber): self.summaries: list[config.JobSummary] = [ config.JobSummary(output.get_name(self)) for output in self.outputs ] - self.completion_tracking_enabled = ( - self.task_config.export_group_name is not None and self.task_config.export_datetime - ) async def run(self) -> list[config.JobSummary]: """ @@ -260,9 +257,6 @@ def _delete_requested_ids(self): self.formatters[index].delete_records(deleted_ids) def _update_completion_table(self) -> None: - if not self.completion_tracking_enabled: - return - # Create completion rows batch = formats.Batch( rows=[ diff --git a/cumulus_etl/etl/tasks/basic_tasks.py b/cumulus_etl/etl/tasks/basic_tasks.py index 8de799ae..ce05a2b7 100644 --- a/cumulus_etl/etl/tasks/basic_tasks.py +++ b/cumulus_etl/etl/tasks/basic_tasks.py @@ -62,15 +62,11 @@ class EncounterTask(tasks.EtlTask): async def read_entries(self, *, progress: rich.progress.Progress = None) -> tasks.EntryIterator: async for encounter in super().read_entries(progress=progress): - if self.completion_tracking_enabled: - completion_info = { - "encounter_id": encounter["id"], - "group_name": self.task_config.export_group_name, - "export_time": self.task_config.export_datetime.isoformat(), - } - else: - completion_info = None - + completion_info = { + "encounter_id": encounter["id"], + "group_name": self.task_config.export_group_name, + "export_time": self.task_config.export_datetime.isoformat(), + } yield completion_info, encounter @classmethod diff --git a/docs/bulk-exports.md b/docs/bulk-exports.md index 209bf115..5a605a7d 100644 --- a/docs/bulk-exports.md +++ b/docs/bulk-exports.md @@ -13,27 +13,33 @@ Cumulus ETL wants data, and lots of it. It's happy to ingest data that you've gathered elsewhere (as a separate export), but it's also happy to download the data itself as needed during the ETL (as an on-the-fly export). -## Separate Exports +## Export Options + +### External Exports 1. If you have an existing process to export health data, you can do that bulk export externally, and then just feed the resulting files to Cumulus ETL. +(Though note that you will need to provide some export information manually, +with the `--export-group` and `--export-timestamp` options. See `--help` for more info.) 2. Cumulus ETL has an `export` command to perform just a bulk export without an ETL step. Run it like so: `cumulus-etl export FHIR_URL ./output` (see `--help` for more options). - You can use all sorts of + - You can use all sorts of [interesting FHIR options](https://hl7.org/fhir/uv/bulkdata/export.html#query-parameters) like `_typeFilter` or `_since` in the URL. + - This workflow will generate an export log file, from which Cumulus ETL can pull + some export metadata like the Group name and export timestamp. 3. Or you may need more advanced options than our internal exporter supports. The [SMART Bulk Data Client](https://github.com/smart-on-fhir/bulk-data-client) - is a great tool with lots of features. + is a great tool with lots of features (and also generates an export log file). In any case, it's simple to feed that data to the ETL: 1. Pass Cumulus ETL the folder that holds the downloaded data as the input path. 1. Pass `--fhir-url=` pointing at your FHIR server so that externally referenced document notes and medications can still be downloaded as needed. -## On-The-Fly Exports +### On-The-Fly Exports If it's easier to just do it all in one step, you can also start an ETL run with your FHIR URL as the input path. @@ -44,6 +50,60 @@ You can save the exported files for archiving after the fact with `--export-to=P However, bulk exports tend to be brittle and slow for many EHRs at the time of this writing. It might be wiser to separately export, make sure the data is all there and good, and then ETL it. +## Cumulus Assumptions + +Cumulus ETL makes some specific assumptions about the data you feed it and the order you feed it in. + +This is because Cumulus tracks which resources were exported from which FHIR Groups and when. +It only allows Encounters that have had all their data fully imported to be queried by SQL, +to prevent an in-progress ETL workflow from affecting queries against the database. +(i.e. to prevent an Encounter that hasn't yet had Conditions loaded in from looking like an +Encounter that doesn't _have_ any Conditions) + +Of course, even in the normal course of events, resources may show up weeks after an Encounter +(like lab results). +So an Encounter can never knowingly be truly _complete_, +but Cumulus ETL makes an effort to keep a consistent view of the world at least for a given +point in time. + +### Encounters First + +**Please export Encounters along with or before you export other Encounter-linked resources.** +(Patients can be exported beforehand, since they don't depend on Encounters.) + +To prevent incomplete Encounters, Cumulus only looks at Encounters that have an export +timestamp at the same time or before linked resources like Condition. +(As a result, there may be extra Conditions that point to not-yet-loaded Encounters. +But that's fine, they will also be ignored until their Encounters do get loaded.) + +If you do export Encounters last, you may not see any of those Encounters in the `core` study +tables once you run Cumulus Library on the data. +(Your Encounter data is safe and sound, +just temporarily ignored by the Library until later exports come through.) + +### No Partial Group Exports + +**Please don't slice and dice your Group resources when exporting.** +Cumulus ETL assumes that when you feed it an input folder of export files, +that everything in the Group is available (at least, for the exported resources). +You can export one resource from the Group at a time, just don't slice that resource further. + +This is because when you run ETL on say, Conditions exported from Group `Group1234`, +it will mark Conditions in `Group1234` as completely loaded (up to the export timestamp). + +Using `_since` or a date-oriented `_typeFilter` is still fine, to grab new data for an export. +The concern is more about an incomplete view of the data at a given point in time. + +For example, if you sliced Conditions according to category when exporting +(e.g. `_typeFilter=Condition?category=problem-list-item`), +Cumulus will have an incorrect view of the world +(thinking it got all Conditions when it only got problem list items). + +You can still do this if you are careful! +For example, maybe exporting Observations is too slow unless you slice by category. +Just make sure that after you export all the Observations separately, +you then combine them again into one big Observation folder before running Cumulus ETL. + ## Archiving Exports Exports can take a long time, and it's often convenient to archive the results. diff --git a/tests/data/simple/input/log.ndjson b/tests/data/simple/input/log.ndjson new file mode 100644 index 00000000..b8d0e45f --- /dev/null +++ b/tests/data/simple/input/log.ndjson @@ -0,0 +1,2 @@ +{"exportId": "testId", "timestamp": "2024-08-06T14:11:57-04:00", "eventId": "kickoff", "eventDetail": {"exportUrl": "https://example.org/fhir/$export"}} +{"exportId": "testId", "timestamp": "2024-08-06T14:12:57-04:00", "eventId": "status_complete", "eventDetail": {"transactionTime": "2024-08-06T14:00:00-04:00", "outputFileCount": 0, "deletedFileCount": 0, "errorFileCount": 0}} diff --git a/tests/etl/base.py b/tests/etl/base.py index 60e6464d..f629ac92 100644 --- a/tests/etl/base.py +++ b/tests/etl/base.py @@ -58,7 +58,6 @@ async def run_etl( input_format: str = "ndjson", export_group: str = "test-group", export_timestamp: str = "2020-10-13T12:00:20-05:00", - write_completion: bool = True, skip_init_checks: bool = True, ) -> None: args = [ @@ -75,8 +74,6 @@ async def run_etl( args.append(f"--export-group={export_group}") if export_timestamp: args.append(f"--export-timestamp={export_timestamp}") - if write_completion: - args.append("--write-completion") if output_format: args.append(f"--output-format={output_format}") if comment: diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index 2dd1f9dc..2c7e5448 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -238,27 +238,27 @@ async def test_task_init_checks(self, mock_check): # Second line is what loader will represent as the group/time # Third line is what we expect to use as the group/time ( - {"export_group": "CLI", "export_timestamp": "2020-01-02", "write_completion": True}, + {"export_group": "CLI", "export_timestamp": "2020-01-02"}, ("Loader", datetime.datetime(2010, 12, 12)), ("CLI", datetime.datetime(2020, 1, 2)), ), ( - {"export_group": "CLI", "export_timestamp": "2020-01-02", "write_completion": False}, + {"export_group": None, "export_timestamp": None}, + ("Loader", datetime.datetime(2010, 12, 12)), ("Loader", datetime.datetime(2010, 12, 12)), - (None, None), ), ( - {"export_group": None, "export_timestamp": None, "write_completion": True}, - ("Loader", datetime.datetime(2010, 12, 12)), - ("Loader", datetime.datetime(2010, 12, 12)), + {"export_group": "CLI", "export_timestamp": None}, + (None, None), + None, # errors out ), ( - {"export_group": "CLI", "export_timestamp": None, "write_completion": True}, + {"export_group": None, "export_timestamp": "2020-01-02"}, (None, None), None, # errors out ), ( - {"export_group": None, "export_timestamp": "2020-01-02", "write_completion": True}, + {"export_group": None, "export_timestamp": None}, (None, None), None, # errors out ), diff --git a/tests/etl/test_tasks.py b/tests/etl/test_tasks.py index e9be1724..a5d07ac0 100644 --- a/tests/etl/test_tasks.py +++ b/tests/etl/test_tasks.py @@ -293,30 +293,6 @@ async def test_medication_completion(self): comp_batch.rows, ) - @ddt.data("export_datetime", "export_group_name") - async def test_completion_disabled(self, null_field): - """Verify that we don't write completion data if we don't have args for it""" - self.make_json("Encounter", "A") - setattr(self.job_config, null_field, None) - - await basic_tasks.EncounterTask(self.job_config, self.scrubber).run() - - # This order is unusual - normally `encounter` is second, - # but because there is no content for etl__completion_encounters, - # it is only touched during task finalization, so it goes second instead. - enc_format = self.format # encounter - comp_enc_format = self.format2 # etl__completion_encounters - comp_format = self.format3 # etl__completion - - self.assertEqual("encounter", enc_format.dbname) - self.assertEqual("etl__completion_encounters", comp_enc_format.dbname) - self.assertIsNone(comp_format) # tasks don't create this when completion is disabled - - self.assertEqual(1, comp_enc_format.write_records.call_count) - self.assertEqual(1, enc_format.write_records.call_count) - - self.assertEqual([], comp_enc_format.write_records.call_args[0][0].rows) - async def test_allow_empty_group(self): """Empty groups are (rarely) used to mark a server-wide global export""" self.make_json("Device", "A") diff --git a/tests/loaders/ndjson/test_bulk_export.py b/tests/loaders/ndjson/test_bulk_export.py index ecaa841c..ab3fd8dc 100644 --- a/tests/loaders/ndjson/test_bulk_export.py +++ b/tests/loaders/ndjson/test_bulk_export.py @@ -903,7 +903,6 @@ async def test_successful_etl_bulk_export(self): "--task=patient", f"--smart-client-id={self.fhir_client_id}", f"--smart-jwks={self.fhir_jwks_path}", - "--write-completion", ] ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 15798c8b..b5135622 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -19,7 +19,6 @@ class TestCumulusCLI(AsyncTestCase): @ddt.data( ([], "usage: cumulus-etl [OPTION]..."), - (["chart-review"], "usage: cumulus-etl upload-notes [OPTION]..."), (["convert"], "usage: cumulus-etl convert [OPTION]..."), (["etl"], "usage: cumulus-etl etl [OPTION]..."), (["upload-notes"], "usage: cumulus-etl upload-notes [OPTION]..."), @@ -36,7 +35,6 @@ async def test_usage(self, argv, expected_usage): @ddt.data( ([], "cumulus_etl.etl.run_etl"), - (["chart-review"], "cumulus_etl.upload_notes.run_upload_notes"), (["convert"], "cumulus_etl.etl.convert.run_convert"), (["etl"], "cumulus_etl.etl.run_etl"), (["upload-notes"], "cumulus_etl.upload_notes.run_upload_notes"),