Skip to content

Commit

Permalink
feat: initial completeness-tracking work
Browse files Browse the repository at this point in the history
This is early support for completeness-tracking (the ability to mark
which groups & resources have been loaded by the ETL and are thus ready
for studies to use).

- Adds some new (secret for now) CLI arguments:
  --export-group
  --export-timestamp
  --write-completion
- Adds a new `etl__completion` table which holds:
  - table
  - group
  - export_time
- Adds a new `etl__completion_encounters` table which holds:
  - group
  - encounter_id
  - export_time
- This table is automatically written to, using the CLI values
- Currently, those arguments are optional. A future change will make
  them required. (though hopefully usually automatically inferred from
  export logs)
- The export args will be automatically provided internally, if we are
  handling the bulk export ourselves (i.e. Loaders can provide group
  name and export timestamp).
- When using the ndjson output format, you can no longer have any files
  in the output folder. This is to safeguard against accidents (and to
  make some code paths simpler)
  • Loading branch information
mikix committed Apr 5, 2024
1 parent 93c33ad commit 6aead50
Show file tree
Hide file tree
Showing 60 changed files with 883 additions and 200 deletions.
16 changes: 7 additions & 9 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""Helper methods for CLI parsing."""

import argparse
import os
import socket
import tempfile
import time
import urllib.parse

import rich.progress

from cumulus_etl import common, errors
from cumulus_etl import common, errors, store


def add_auth(parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -60,22 +59,21 @@ def make_export_dir(export_to: str = None) -> common.Directory:
# If we were to relax this requirement, we'd want to copy the exported files over to a local dir.
errors.fatal(f"The target export folder '{export_to}' must be local. ", errors.BULK_EXPORT_FOLDER_NOT_LOCAL)

confirm_dir_is_empty(export_to)
confirm_dir_is_empty(store.Root(export_to, create=True))

return common.RealDirectory(export_to)


def confirm_dir_is_empty(path: str) -> None:
"""Errors out if the dir exists with contents, but creates empty dir if not present yet"""
def confirm_dir_is_empty(root: store.Root) -> None:
"""Errors out if the dir exists with contents"""
try:
if os.listdir(path):
if root.ls():
errors.fatal(
f"The target folder '{path}' already has contents. Please provide an empty folder.",
f"The target folder '{root.path}' already has contents. Please provide an empty folder.",
errors.FOLDER_NOT_EMPTY,
)
except FileNotFoundError:
# Target folder doesn't exist, so let's make it
os.makedirs(path, mode=0o700)
pass


def is_url_available(url: str, retry: bool = True) -> bool:
Expand Down
21 changes: 21 additions & 0 deletions cumulus_etl/completion/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Helpers for implementing completion-tracking.
Completion tracking allows downstream consumers to know when ETL runs are
"complete enough" for their purposes.
For example, the `core` study may want to not expose Encounters whose
Conditions have not yet been loaded. These metadata tables allow that.
Although these metadata tables aren't themselves tasks, they need a
lot of the same information that tasks need. This module provides that.
"""

from .output import (
COMPLETION_TABLE,
COMPLETION_ENCOUNTERS_TABLE,
completion_encounters_output_args,
completion_encounters_schema,
completion_format_args,
completion_schema,
)
62 changes: 62 additions & 0 deletions cumulus_etl/completion/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Schemas and Format helpers for writing completion tables."""

import pyarrow


COMPLETION_TABLE = "etl__completion"
COMPLETION_ENCOUNTERS_TABLE = "etl__completion_encounters"


# FORMATTERS


def completion_format_args() -> dict:
"""Returns kwargs to pass to the Format class initializer of your choice"""
return {
"dbname": COMPLETION_TABLE,
"uniqueness_fields": {"table_name", "group_name"},
}


# OUTPUT TABLES


def completion_encounters_output_args() -> dict:
"""Returns output table kwargs for the etl__completion_encounters table"""
return {
"name": COMPLETION_ENCOUNTERS_TABLE,
"uniqueness_fields": {"encounter_id", "group_name"},
"update_existing": False, # we want to keep the first export time we make for a group
"resource_type": None,
"visible": False,
}


# SCHEMAS


def completion_schema() -> pyarrow.Schema:
"""Returns a schema for the etl__completion table"""
return pyarrow.schema(
[
pyarrow.field("table_name", pyarrow.string()),
pyarrow.field("group_name", pyarrow.string()),
# You might think this is an opportunity to use pyarrow.timestamp(),
# but because ndjson output formats (which can't natively represent a
# datetime) would then require conversion to and fro, it's easier to
# just mirror our FHIR tables and use strings for timestamps.
pyarrow.field("export_time", pyarrow.string()),
]
)


def completion_encounters_schema() -> pyarrow.Schema:
"""Returns a schema for the etl__completion_encounters table"""
return pyarrow.schema(
[
pyarrow.field("encounter_id", pyarrow.string()),
pyarrow.field("group_name", pyarrow.string()),
# See note above for why this isn't a pyarrow.timestamp() field.
pyarrow.field("export_time", pyarrow.string()),
]
)
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
LABEL_STUDIO_MISSING = 31
FHIR_AUTH_FAILED = 32
SERVICE_MISSING = 33 # generic init-check service is missing
COMPLETION_ARG_MISSING = 34


class FhirConnectionError(Exception):
Expand Down
37 changes: 36 additions & 1 deletion cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
export.add_argument("--since", help="Start date for export from the FHIR server")
export.add_argument("--until", help="End date for export from the FHIR server")

group = parser.add_argument_group("external export identification")
group.add_argument("--export-group", help=argparse.SUPPRESS)
group.add_argument("--export-timestamp", help=argparse.SUPPRESS)
# Temporary explicit opt-in flag during the development of the completion-tracking feature
group.add_argument("--write-completion", action="store_true", default=False, help=argparse.SUPPRESS)

cli_utils.add_nlp(parser)

task = parser.add_argument_group("task selection")
Expand Down Expand Up @@ -180,6 +186,30 @@ def print_config(args: argparse.Namespace, job_datetime: datetime.datetime, all_
rich.get_console().print(table)


def handle_completion_args(args: argparse.Namespace, loader: loaders.Loader) -> (str, datetime.datetime):
"""Returns (group_name, datetime)"""
# Grab completion options from CLI or loader
export_group_name = args.export_group or loader.group_name
export_datetime = (
datetime.datetime.fromisoformat(args.export_timestamp) if args.export_timestamp else loader.export_datetime
)

# Disable entirely if asked to
if not args.write_completion:
export_group_name = None
export_datetime = None

# Error out if we have mismatched args
has_group_name = export_group_name is not None
has_datetime = bool(export_datetime)
if has_group_name and not has_datetime:
errors.fatal("Missing --export-datetime argument.", errors.COMPLETION_ARG_MISSING)
elif not has_group_name and has_datetime:
errors.fatal("Missing --export-group argument.", errors.COMPLETION_ARG_MISSING)

return export_group_name, export_datetime


async def etl_main(args: argparse.Namespace) -> None:
# Set up some common variables
store.set_user_fs_options(vars(args)) # record filesystem options like --s3-region before creating Roots
Expand All @@ -200,7 +230,7 @@ async def etl_main(args: argparse.Namespace) -> None:
common.print_header() # all "prep" comes in this next section, like connecting to server, bulk export, and de-id

if args.errors_to:
cli_utils.confirm_dir_is_empty(args.errors_to)
cli_utils.confirm_dir_is_empty(store.Root(args.errors_to, create=True))

# Check that cTAKES is running and any other services or binaries we require
if not args.skip_init_checks:
Expand All @@ -226,6 +256,9 @@ async def etl_main(args: argparse.Namespace) -> None:
# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, config_loader)

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
Expand All @@ -248,6 +281,8 @@ async def etl_main(args: argparse.Namespace) -> None:
ctakes_overrides=args.ctakes_overrides,
dir_errors=args.errors_to,
tasks=[t.name for t in selected_tasks],
export_group_name=export_group_name,
export_datetime=export_datetime,
)
common.write_json(config.path_config(), config.as_json(), indent=4)

Expand Down
10 changes: 8 additions & 2 deletions cumulus_etl/etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(
ctakes_overrides: str = None,
dir_errors: str = None,
tasks: list[str] = None,
export_group_name: str = None,
export_datetime: datetime.datetime = None,
):
self._dir_input_orig = dir_input_orig
self.dir_input = dir_input_deid
Expand All @@ -46,14 +48,16 @@ def __init__(
self.batch_size = batch_size
self.ctakes_overrides = ctakes_overrides
self.tasks = tasks or []
self.export_group_name = export_group_name
self.export_datetime = export_datetime

# initialize format class
self._output_root = store.Root(self._dir_output, create=True)
self._format_class = formats.get_format_class(self._output_format)
self._format_class.initialize_class(self._output_root)

def create_formatter(self, dbname: str, group_field: str = None, resource_type: str = None) -> formats.Format:
return self._format_class(self._output_root, dbname, group_field=group_field, resource_type=resource_type)
def create_formatter(self, dbname: str, **kwargs) -> formats.Format:
return self._format_class(self._output_root, dbname, **kwargs)

def path_config(self) -> str:
return os.path.join(self.dir_job_config(), "job_config.json")
Expand All @@ -74,6 +78,8 @@ def as_json(self):
"comment": self.comment,
"batch_size": self.batch_size,
"tasks": ",".join(self.tasks),
"export_group_name": self.export_group_name,
"export_timestamp": self.export_datetime and self.export_datetime.isoformat(),
}


Expand Down
93 changes: 70 additions & 23 deletions cumulus_etl/etl/convert/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@
import argparse
import os
import tempfile
from functools import partial
from typing import Callable

import pyarrow
import rich.progress

from cumulus_etl import cli_utils, common, errors, formats, store
from cumulus_etl import cli_utils, common, completion, errors, formats, store
from cumulus_etl.etl import tasks
from cumulus_etl.etl.tasks import task_factory


def make_batch(task: type[tasks.EtlTask], formatter: formats.Format, index: int, path: str) -> formats.Batch:
def make_batch(
path: str,
schema_func: Callable[[list[dict]], pyarrow.Schema],
) -> formats.Batch:
metadata_path = path.removesuffix(".ndjson") + ".meta"
try:
metadata = common.read_json(metadata_path)
Expand All @@ -24,50 +30,88 @@ def make_batch(task: type[tasks.EtlTask], formatter: formats.Format, index: int,

rows = list(common.read_ndjson(path))
groups = set(metadata.get("groups", []))
return task.make_batch_from_rows(formatter, rows, groups=groups, index=index)
schema = schema_func(rows)

return formats.Batch(rows, groups=groups, schema=schema)

def convert_task_table(
task: type[tasks.EtlTask],
table: tasks.OutputTable,

def convert_folder(
input_root: store.Root,
output_root: store.Root,
formatter_class: type[formats.Format],
*,
table_name: str,
schema_func: Callable[[list[dict]], pyarrow.Schema],
formatter: formats.Format,
progress: rich.progress.Progress,
) -> None:
"""Converts a task's output folder (like output/observation/ or output/covid_symptom__nlp_results/)"""
# Does the task dir even exist?
task_input_dir = input_root.joinpath(table.get_name(task))
if not input_root.exists(task_input_dir):
table_input_dir = input_root.joinpath(table_name)
if not input_root.exists(table_input_dir):
# Don't error out in this case -- it's not the user's fault if the folder doesn't exist.
# We're just checking all task folders.
return

# Grab all the files in the task dir
all_paths = store.Root(task_input_dir).ls()
all_paths = store.Root(table_input_dir).ls()
ndjson_paths = sorted(filter(lambda x: x.endswith(".ndjson"), all_paths))
if not ndjson_paths:
# Again, don't error out in this case -- if the ETL made an empty dir, it's not a user-visible error
return

# Let's convert! Make the formatter and chew through the files
# Let's convert! Start chewing through the files
count = len(ndjson_paths) + 1 # add one for finalize step
progress_task = progress.add_task(table_name, total=count)

for ndjson_path in ndjson_paths:
batch = make_batch(ndjson_path, schema_func)
formatter.write_records(batch)
progress.update(progress_task, advance=1)

formatter.finalize()
progress.update(progress_task, advance=1)


def convert_task_table(
task: type[tasks.EtlTask],
table: tasks.OutputTable,
input_root: store.Root,
output_root: store.Root,
formatter_class: type[formats.Format],
progress: rich.progress.Progress,
) -> None:
"""Converts a task's output folder (like output/observation/ or output/covid_symptom__nlp_results/)"""

# Start with a formatter
formatter = formatter_class(
output_root,
table.get_name(task),
group_field=table.group_field,
resource_type=table.get_schema(task),
uniqueness_fields=table.uniqueness_fields,
update_existing=table.update_existing,
)

count = len(ndjson_paths) + 1 # add one for finalize step
progress_task = progress.add_task(table.get_name(task), total=count)
# And then run the conversion
convert_folder(
input_root,
table_name=table.get_name(task),
schema_func=partial(task.get_schema, table.get_resource_type(task)),
formatter=formatter,
progress=progress,
)

for index, ndjson_path in enumerate(ndjson_paths):
batch = make_batch(task, formatter, index, ndjson_path)
formatter.write_records(batch)
progress.update(progress_task, advance=1)

formatter.finalize()
progress.update(progress_task, advance=1)
def convert_completion(
input_root: store.Root,
output_root: store.Root,
formatter_class: type[formats.Format],
progress: rich.progress.Progress,
) -> None:
"""Converts the etl__completion metadata table"""
convert_folder(
input_root,
table_name=completion.COMPLETION_TABLE,
schema_func=lambda rows: completion.completion_schema(),
formatter=formatter_class(output_root, **completion.completion_format_args()),
progress=progress,
)


def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None:
Expand All @@ -90,6 +134,9 @@ def walk_tree(input_root: store.Root, output_root: store.Root, formatter_class:
for table in task.outputs:
convert_task_table(task, table, input_root, output_root, formatter_class, progress)

# And aftward, copy over the completion metadata tables
convert_completion(input_root, output_root, formatter_class, progress)

# Copy JobConfig files over too.
# To consider: Marking the job_config.json file in these JobConfig directories as "converted" in some way.
# They already will be detectable by having "output_format: ndjson", but maybe we could do more.
Expand Down
Loading

0 comments on commit 6aead50

Please sign in to comment.