Skip to content

Commit

Permalink
feat!: require completion tracking info, write completion by default
Browse files Browse the repository at this point in the history
- Removes the --write-completion opt-in flag, it is now always enabled.
- Requires export group name and timestamp information to be available,
  either from export log or from CLI.
- Updates some user docs, explaining how completion tracking expects to
  be fed data.
  • Loading branch information
mikix committed Oct 29, 2024
1 parent 3c97d2f commit ef7e6f4
Show file tree
Hide file tree
Showing 33 changed files with 145 additions and 97 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ jobs:
/in/input \
/in/run-output \
/in/phi \
--export-group nlp-test \
--export-timestamp 2024-08-29 \
--output-format=ndjson \
--task covid_symptom__nlp_results
Expand Down
7 changes: 5 additions & 2 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def __init__(self):
)


def fatal(message: str, status: int) -> NoReturn:
def fatal(message: str, status: int, extra: str = "") -> NoReturn:
"""Convenience method to exit the program with a user-friendly error message a test-friendly status code"""
rich.console.Console(stderr=True).print(message, style="bold red", highlight=False)
stderr = rich.console.Console(stderr=True)
stderr.print(message, style="bold red", highlight=False)
if extra:
stderr.print(rich.padding.Padding.indent(extra, 2), highlight=False)
sys.exit(status) # raises a SystemExit exception
44 changes: 30 additions & 14 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,18 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
)

group = parser.add_argument_group("external export identification")
group.add_argument("--export-group", help=argparse.SUPPRESS)
group.add_argument("--export-timestamp", help=argparse.SUPPRESS)
# Temporary explicit opt-in flag during the development of the completion-tracking feature
group.add_argument(
"--write-completion", action="store_true", default=False, help=argparse.SUPPRESS
"--export-group",
metavar="NAME",
help="name of the FHIR Group that was exported (default is to grab this from an "
"export log file in the input folder, but you can also use this to assign a "
"nickname as long as you consistently set the same nickname)",
)
group.add_argument(
"--export-timestamp",
metavar="TIMESTAMP",
help="when the data was exported from the FHIR Group (default is to grab this from an "
"export log file in the input folder)",
)

cli_utils.add_nlp(parser)
Expand Down Expand Up @@ -206,17 +213,26 @@ def handle_completion_args(
else loader_results.export_datetime
)

# Disable entirely if asked to
if not args.write_completion:
export_group_name = None
export_datetime = None

# Error out if we have mismatched args
has_group_name = export_group_name is not None
has_datetime = bool(export_datetime)
if has_group_name and not has_datetime:
# Error out if we have missing args
missing_group_name = export_group_name is None
missing_datetime = not export_datetime
if missing_group_name and missing_datetime:
errors.fatal(
"Missing Group name and timestamp export information for the input data.",
errors.COMPLETION_ARG_MISSING,
extra="This is likely because you don’t have an export log in your input folder.\n"
"This log file (log.ndjson) is generated by some bulk export tools.\n"
"Instead, please manually specify the Group name and timestamp of the export "
"with the --export-group and --export-timestamp options.\n"
"These options are necessary to track whether all the required data from "
"a Group has been imported and is ready to be used.\n"
"See https://docs.smarthealthit.org/cumulus/etl/bulk-exports.html for more "
"information.\n",
)
# These next two errors can be briefer because the user clearly knows about the args.
elif missing_datetime:
errors.fatal("Missing --export-datetime argument.", errors.COMPLETION_ARG_MISSING)
elif not has_group_name and has_datetime:
elif missing_group_name:
errors.fatal("Missing --export-group argument.", errors.COMPLETION_ARG_MISSING)

return export_group_name, export_datetime
Expand Down
6 changes: 0 additions & 6 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ def __init__(self, task_config: config.JobConfig, scrubber: deid.Scrubber):
self.summaries: list[config.JobSummary] = [
config.JobSummary(output.get_name(self)) for output in self.outputs
]
self.completion_tracking_enabled = (
self.task_config.export_group_name is not None and self.task_config.export_datetime
)

async def run(self) -> list[config.JobSummary]:
"""
Expand Down Expand Up @@ -260,9 +257,6 @@ def _delete_requested_ids(self):
self.formatters[index].delete_records(deleted_ids)

def _update_completion_table(self) -> None:
if not self.completion_tracking_enabled:
return

# Create completion rows
batch = formats.Batch(
rows=[
Expand Down
14 changes: 5 additions & 9 deletions cumulus_etl/etl/tasks/basic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,11 @@ class EncounterTask(tasks.EtlTask):

async def read_entries(self, *, progress: rich.progress.Progress = None) -> tasks.EntryIterator:
async for encounter in super().read_entries(progress=progress):
if self.completion_tracking_enabled:
completion_info = {
"encounter_id": encounter["id"],
"group_name": self.task_config.export_group_name,
"export_time": self.task_config.export_datetime.isoformat(),
}
else:
completion_info = None

completion_info = {
"encounter_id": encounter["id"],
"group_name": self.task_config.export_group_name,
"export_time": self.task_config.export_datetime.isoformat(),
}
yield completion_info, encounter

@classmethod
Expand Down
68 changes: 64 additions & 4 deletions docs/bulk-exports.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,33 @@ Cumulus ETL wants data, and lots of it.
It's happy to ingest data that you've gathered elsewhere (as a separate export),
but it's also happy to download the data itself as needed during the ETL (as an on-the-fly export).

## Separate Exports
## Export Options

### External Exports

1. If you have an existing process to export health data, you can do that bulk export externally,
and then just feed the resulting files to Cumulus ETL.
(Though note that you will need to provide some export information manually,
with the `--export-group` and `--export-timestamp` options. See `--help` for more info.)

2. Cumulus ETL has an `export` command to perform just a bulk export without an ETL step.
Run it like so: `cumulus-etl export FHIR_URL ./output` (see `--help` for more options).
You can use all sorts of
- You can use all sorts of
[interesting FHIR options](https://hl7.org/fhir/uv/bulkdata/export.html#query-parameters)
like `_typeFilter` or `_since` in the URL.
- This workflow will generate an export log file, from which Cumulus ETL can pull
some export metadata like the Group name and export timestamp.

3. Or you may need more advanced options than our internal exporter supports.
The [SMART Bulk Data Client](https://github.com/smart-on-fhir/bulk-data-client)
is a great tool with lots of features.
is a great tool with lots of features (and also generates an export log file).

In any case, it's simple to feed that data to the ETL:
1. Pass Cumulus ETL the folder that holds the downloaded data as the input path.
1. Pass `--fhir-url=` pointing at your FHIR server so that externally referenced document notes
and medications can still be downloaded as needed.

## On-The-Fly Exports
### On-The-Fly Exports

If it's easier to just do it all in one step,
you can also start an ETL run with your FHIR URL as the input path.
Expand All @@ -44,6 +50,60 @@ You can save the exported files for archiving after the fact with `--export-to=P
However, bulk exports tend to be brittle and slow for many EHRs at the time of this writing.
It might be wiser to separately export, make sure the data is all there and good, and then ETL it.

## Cumulus Assumptions

Cumulus ETL makes some specific assumptions about the data you feed it and the order you feed it in.

This is because Cumulus tracks which resources were exported from which FHIR Groups and when.
It only allows Encounters that have had all their data fully imported to be queried by SQL,
to prevent an in-progress ETL workflow from affecting queries against the database.
(i.e. to prevent an Encounter that hasn't yet had Conditions loaded in from looking like an
Encounter that doesn't _have_ any Conditions)

Of course, even in the normal course of events, resources may show up weeks after an Encounter
(like lab results).
So an Encounter can never knowingly be truly _complete_,
but Cumulus ETL makes an effort to keep a consistent view of the world at least for a given
point in time.

### Encounters First

**Please export Encounters along with or before you export other Encounter-linked resources.**
(Patients can be exported beforehand, since they don't depend on Encounters.)

To prevent incomplete Encounters, Cumulus only looks at Encounters that have an export
timestamp at the same time or before linked resources like Condition.
(As a result, there may be extra Conditions that point to not-yet-loaded Encounters.
But that's fine, they will also be ignored until their Encounters do get loaded.)

If you do export Encounters last, you may not see any of those Encounters in the `core` study
tables once you run Cumulus Library on the data.
(Your Encounter data is safe and sound,
just temporarily ignored by the Library until later exports come through.)

### No Partial Group Exports

**Please don't slice and dice your Group resources when exporting.**
Cumulus ETL assumes that when you feed it an input folder of export files,
that everything in the Group is available (at least, for the exported resources).
You can export one resource from the Group at a time, just don't slice that resource further.

This is because when you run ETL on say, Conditions exported from Group `Group1234`,
it will mark Conditions in `Group1234` as completely loaded (up to the export timestamp).

Using `_since` or a date-oriented `_typeFilter` is still fine, to grab new data for an export.
The concern is more about an incomplete view of the data at a given point in time.

For example, if you sliced Conditions according to category when exporting
(e.g. `_typeFilter=Condition?category=problem-list-item`),
Cumulus will have an incorrect view of the world
(thinking it got all Conditions when it only got problem list items).

You can still do this if you are careful!
For example, maybe exporting Observations is too slow unless you slice by category.
Just make sure that after you export all the Observations separately,
you then combine them again into one big Observation folder before running Cumulus ETL.

## Archiving Exports

Exports can take a long time, and it's often convenient to archive the results.
Expand Down
7 changes: 6 additions & 1 deletion docs/setup/sample-runs.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ docker compose -f $CUMULUS_REPO_PATH/compose.yaml \
s3://my-cumulus-prefix-phi-99999999999-us-east-2/subdir1/
```

(Read [more about bulk exporting](../bulk-exports.md)
to learn how to get some real data from your EHR,
and how to properly feed it into the ETL.)

Now let's talk about customizing this command for your own environment.
(And remember that you can always run `docker compose run cumulus-etl --help` for more guidance.)

Expand All @@ -225,7 +229,8 @@ defaults are subject to change or might not match your situation.
* `--output-format`: There are two reasonable values (`ndjson` and `deltalake`).
For production use, you can use the default value of `deltalake` as it supports incremental,
batched updates.
But `ndjson` is useful when debugging as it is human-readable.
But since the `ndjson` output is human-readable, it's useful for debugging
or reviewing the output before pushing to the cloud.

* `--batch-size`: How many resources to save in a single output file. If there are more resources
(e.g. more patients) than this limit, multiple output files will be created for that resource
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
2 changes: 2 additions & 0 deletions tests/data/simple/input/log.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"exportId": "testId", "timestamp": "2024-08-06T14:11:57-04:00", "eventId": "kickoff", "eventDetail": {"exportUrl": "https://example.org/fhir/$export"}}
{"exportId": "testId", "timestamp": "2024-08-06T14:12:57-04:00", "eventId": "status_complete", "eventDetail": {"transactionTime": "2024-08-06T14:00:00-04:00", "outputFileCount": 0, "deletedFileCount": 0, "errorFileCount": 0}}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "allergyintolerance", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "allergyintolerance", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "device", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "device", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "diagnosticreport", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "diagnosticreport", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "immunization", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "immunization", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "export_url": "https://example.org/fhir/$export", "etl_version": "1.0.0+test", "etl_time": "2021-09-14T21:23:45+00:00"}
Loading

0 comments on commit ef7e6f4

Please sign in to comment.