Skip to content

Commit

Permalink
feat: initial completeness-tracking work
Browse files Browse the repository at this point in the history
This is early support for completeness-tracking (the ability to mark
which groups & resources have been loaded by the ETL and are thus ready
for studies to use).

- Adds some new (secret for now) CLI arguments:
  --export-group
  --export-timestamp
  --write-completion
- Adds a new `etl__completion` table which holds:
  - table
  - group
  - export_time
- Adds a new `etl__completion_encounters` table which holds:
  - group
  - encounter_id
  - export_time
- This table is automatically written to, using the CLI values
- Currently, those arguments are optional. A future change will make
  them required. (though hopefully usually automatically inferred from
  export logs)
- The export args will be automatically provided internally, if we are
  handling the bulk export ourselves (i.e. Loaders can provide group
  name and export timestamp).
- When using the ndjson output format, you can no longer have any files
  in the output folder. This is to safeguard against accidents (and to
  make some code paths simpler)
  • Loading branch information
mikix committed Apr 9, 2024
1 parent 93c33ad commit b464da3
Show file tree
Hide file tree
Showing 61 changed files with 909 additions and 200 deletions.
20 changes: 20 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ pre-commit install

This will install the pre-commit hooks for the repo (which automatically enforce styling for you).

### Running unit tests

1. First, you'll want to install the
[Microsoft Anonymizer tool](https://github.com/microsoft/Tools-for-Health-Data-Anonymization/).
Here's an example install command for a Debian-based Linux system,
once you've checked out the repo:

```shell
sudo apt-get install dotnet6
dotnet publish \
--runtime=linux-x64 \
--configuration=Release \
-p:PublishSingleFile=true \
--output=$HOME/.local/bin \
mstool/FHIR/src/Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool
```

2. Then just run `pytest`.
All dependencies should have been installed by the `pip install .[dev]` above.

### How to show us the patch

Open a new GitHub PR and one of the maintainers will notice it and comment.
Expand Down
16 changes: 7 additions & 9 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""Helper methods for CLI parsing."""

import argparse
import os
import socket
import tempfile
import time
import urllib.parse

import rich.progress

from cumulus_etl import common, errors
from cumulus_etl import common, errors, store


def add_auth(parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -60,22 +59,21 @@ def make_export_dir(export_to: str = None) -> common.Directory:
# If we were to relax this requirement, we'd want to copy the exported files over to a local dir.
errors.fatal(f"The target export folder '{export_to}' must be local. ", errors.BULK_EXPORT_FOLDER_NOT_LOCAL)

confirm_dir_is_empty(export_to)
confirm_dir_is_empty(store.Root(export_to, create=True))

return common.RealDirectory(export_to)


def confirm_dir_is_empty(path: str) -> None:
"""Errors out if the dir exists with contents, but creates empty dir if not present yet"""
def confirm_dir_is_empty(root: store.Root) -> None:
"""Errors out if the dir exists with contents"""
try:
if os.listdir(path):
if root.ls():
errors.fatal(
f"The target folder '{path}' already has contents. Please provide an empty folder.",
f"The target folder '{root.path}' already has contents. Please provide an empty folder.",
errors.FOLDER_NOT_EMPTY,
)
except FileNotFoundError:
# Target folder doesn't exist, so let's make it
os.makedirs(path, mode=0o700)
pass


def is_url_available(url: str, retry: bool = True) -> bool:
Expand Down
21 changes: 21 additions & 0 deletions cumulus_etl/completion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Helpers for implementing completion-tracking.
Completion tracking allows downstream consumers to know when ETL runs are
"complete enough" for their purposes.
For example, the `core` study may want to not expose Encounters whose
Conditions have not yet been loaded. These metadata tables allow that.
Although these metadata tables aren't themselves tasks, they need a
lot of the same information that tasks need. This module provides that.
"""

from .schema import (
COMPLETION_TABLE,
COMPLETION_ENCOUNTERS_TABLE,
completion_encounters_output_args,
completion_encounters_schema,
completion_format_args,
completion_schema,
)
62 changes: 62 additions & 0 deletions cumulus_etl/completion/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Schemas and Format helpers for writing completion tables."""

import pyarrow


COMPLETION_TABLE = "etl__completion"
COMPLETION_ENCOUNTERS_TABLE = "etl__completion_encounters"


# FORMATTERS


def completion_format_args() -> dict:
"""Returns kwargs to pass to the Format class initializer of your choice"""
return {
"dbname": COMPLETION_TABLE,
"uniqueness_fields": {"table_name", "group_name"},
}


# OUTPUT TABLES


def completion_encounters_output_args() -> dict:
"""Returns output table kwargs for the etl__completion_encounters table"""
return {
"name": COMPLETION_ENCOUNTERS_TABLE,
"uniqueness_fields": {"encounter_id", "group_name"},
"update_existing": False, # we want to keep the first export time we make for a group
"resource_type": None,
"visible": False,
}


# SCHEMAS


def completion_schema() -> pyarrow.Schema:
"""Returns a schema for the etl__completion table"""
return pyarrow.schema(
[
pyarrow.field("table_name", pyarrow.string()),
pyarrow.field("group_name", pyarrow.string()),
# You might think this is an opportunity to use pyarrow.timestamp(),
# but because ndjson output formats (which can't natively represent a
# datetime) would then require conversion to and fro, it's easier to
# just mirror our FHIR tables and use strings for timestamps.
pyarrow.field("export_time", pyarrow.string()),
]
)


def completion_encounters_schema() -> pyarrow.Schema:
"""Returns a schema for the etl__completion_encounters table"""
return pyarrow.schema(
[
pyarrow.field("encounter_id", pyarrow.string()),
pyarrow.field("group_name", pyarrow.string()),
# See note above for why this isn't a pyarrow.timestamp() field.
pyarrow.field("export_time", pyarrow.string()),
]
)
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
LABEL_STUDIO_MISSING = 31
FHIR_AUTH_FAILED = 32
SERVICE_MISSING = 33 # generic init-check service is missing
COMPLETION_ARG_MISSING = 34


class FhirConnectionError(Exception):
Expand Down
37 changes: 36 additions & 1 deletion cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
export.add_argument("--since", help="Start date for export from the FHIR server")
export.add_argument("--until", help="End date for export from the FHIR server")

group = parser.add_argument_group("external export identification")
group.add_argument("--export-group", help=argparse.SUPPRESS)
group.add_argument("--export-timestamp", help=argparse.SUPPRESS)
# Temporary explicit opt-in flag during the development of the completion-tracking feature
group.add_argument("--write-completion", action="store_true", default=False, help=argparse.SUPPRESS)

cli_utils.add_nlp(parser)

task = parser.add_argument_group("task selection")
Expand Down Expand Up @@ -180,6 +186,30 @@ def print_config(args: argparse.Namespace, job_datetime: datetime.datetime, all_
rich.get_console().print(table)


def handle_completion_args(args: argparse.Namespace, loader: loaders.Loader) -> (str, datetime.datetime):
"""Returns (group_name, datetime)"""
# Grab completion options from CLI or loader
export_group_name = args.export_group or loader.group_name
export_datetime = (
datetime.datetime.fromisoformat(args.export_timestamp) if args.export_timestamp else loader.export_datetime
)

# Disable entirely if asked to
if not args.write_completion:
export_group_name = None
export_datetime = None

# Error out if we have mismatched args
has_group_name = export_group_name is not None
has_datetime = bool(export_datetime)
if has_group_name and not has_datetime:
errors.fatal("Missing --export-datetime argument.", errors.COMPLETION_ARG_MISSING)
elif not has_group_name and has_datetime:
errors.fatal("Missing --export-group argument.", errors.COMPLETION_ARG_MISSING)

return export_group_name, export_datetime


async def etl_main(args: argparse.Namespace) -> None:
# Set up some common variables
store.set_user_fs_options(vars(args)) # record filesystem options like --s3-region before creating Roots
Expand All @@ -200,7 +230,7 @@ async def etl_main(args: argparse.Namespace) -> None:
common.print_header() # all "prep" comes in this next section, like connecting to server, bulk export, and de-id

if args.errors_to:
cli_utils.confirm_dir_is_empty(args.errors_to)
cli_utils.confirm_dir_is_empty(store.Root(args.errors_to, create=True))

# Check that cTAKES is running and any other services or binaries we require
if not args.skip_init_checks:
Expand All @@ -226,6 +256,9 @@ async def etl_main(args: argparse.Namespace) -> None:
# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, config_loader)

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
Expand All @@ -248,6 +281,8 @@ async def etl_main(args: argparse.Namespace) -> None:
ctakes_overrides=args.ctakes_overrides,
dir_errors=args.errors_to,
tasks=[t.name for t in selected_tasks],
export_group_name=export_group_name,
export_datetime=export_datetime,
)
common.write_json(config.path_config(), config.as_json(), indent=4)

Expand Down
10 changes: 8 additions & 2 deletions cumulus_etl/etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(
ctakes_overrides: str = None,
dir_errors: str = None,
tasks: list[str] = None,
export_group_name: str = None,
export_datetime: datetime.datetime = None,
):
self._dir_input_orig = dir_input_orig
self.dir_input = dir_input_deid
Expand All @@ -46,14 +48,16 @@ def __init__(
self.batch_size = batch_size
self.ctakes_overrides = ctakes_overrides
self.tasks = tasks or []
self.export_group_name = export_group_name
self.export_datetime = export_datetime

# initialize format class
self._output_root = store.Root(self._dir_output, create=True)
self._format_class = formats.get_format_class(self._output_format)
self._format_class.initialize_class(self._output_root)

def create_formatter(self, dbname: str, group_field: str = None, resource_type: str = None) -> formats.Format:
return self._format_class(self._output_root, dbname, group_field=group_field, resource_type=resource_type)
def create_formatter(self, dbname: str, **kwargs) -> formats.Format:
return self._format_class(self._output_root, dbname, **kwargs)

def path_config(self) -> str:
return os.path.join(self.dir_job_config(), "job_config.json")
Expand All @@ -74,6 +78,8 @@ def as_json(self):
"comment": self.comment,
"batch_size": self.batch_size,
"tasks": ",".join(self.tasks),
"export_group_name": self.export_group_name,
"export_timestamp": self.export_datetime and self.export_datetime.isoformat(),
}


Expand Down
Loading

0 comments on commit b464da3

Please sign in to comment.