Skip to content

Commit

Permalink
Skip tasks or error out when resources are missing
Browse files Browse the repository at this point in the history
- If tasks are selected with --task or --task-filter and a needed
  resource is missing, we now error out.
- If tasks are not specifically selected, we now only run tasks for
  which resources are available (and error out if no resources exist
  at all)
- If --allow-missing-resources is passed, we skip both above checks
  and do historical behavior of running any given task with zero input
  rows (which pushes up a schema, cleans up the delta lake, and stamps
  the resource as complete in the completion tracking tables).
  • Loading branch information
mikix committed Oct 21, 2024
1 parent 3256616 commit d6d8f64
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 45 deletions.
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SERVICE_MISSING = 33 # generic init-check service is missing
COMPLETION_ARG_MISSING = 34
TASK_HELP = 35
MISSING_REQUESTED_RESOURCES = 36


class FatalError(Exception):
Expand Down
72 changes: 67 additions & 5 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import argparse
import datetime
import logging
import os
import shutil
import sys
from collections.abc import Iterable

import cumulus_fhir_support

Check failure on line 10 in cumulus_etl/etl/cli.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F401)

cumulus_etl/etl/cli.py:10:8: F401 `cumulus_fhir_support` imported but unused
import rich
import rich.table

Expand All @@ -16,6 +17,9 @@
from cumulus_etl.etl.config import JobConfig, JobSummary
from cumulus_etl.etl.tasks import task_factory

TaskList = list[type[tasks.EtlTask]]


###############################################################################
#
# Main Pipeline (run all tasks)
Expand All @@ -24,7 +28,7 @@


async def etl_job(
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
) -> list[JobSummary]:
"""
:param config: job config
Expand Down Expand Up @@ -68,7 +72,7 @@ def check_mstool() -> None:
raise SystemExit(errors.MSTOOL_MISSING)


async def check_requirements(selected_tasks: Iterable[type[tasks.EtlTask]]) -> None:
async def check_requirements(selected_tasks: TaskList) -> None:
"""
Verifies that all external services and programs are ready
Expand Down Expand Up @@ -123,6 +127,11 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--errors-to", metavar="DIR", help="where to put resources that could not be processed"
)
parser.add_argument(
"--allow-missing-resources",
action="store_true",
help="run tasks even if their resources are not present",
)

cli_utils.add_aws(parser)
cli_utils.add_auth(parser)
Expand All @@ -148,7 +157,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:


def print_config(
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: Iterable[tasks.EtlTask]
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: TaskList
) -> None:
"""
Prints the ETL configuration to the console.
Expand Down Expand Up @@ -219,6 +228,49 @@ def handle_completion_args(
return export_group_name, export_datetime


async def check_available_resources(
loader: loaders.Loader,
*,
requested_resources: set[str],
args: argparse.Namespace,
is_default_tasks: bool,
) -> set[str]:
# Here we try to reconcile which resources the user requested and which resources are actually
# available in the input root.
# - If the user didn't specify a specific task, we'll scope down the requested resources to
# what is actually present in the input.
# - If they did, we'll complain if their required resources are not available.
#
# Reconciling is helpful for performance reasons (don't need to finalize untouched tables),
# UX reasons (can tell user if they made a CLI mistake), and completion tracking (don't
# mark a resource as complete if we didn't even export it)
if args.allow_missing_resources:
return requested_resources

detected = await loader.detect_resources()
if detected is None:
return requested_resources # likely we haven't run bulk export yet

if missing_resources := requested_resources - detected:
for resource in sorted(missing_resources):
# Log the same message we would print if in common.py if we ran tasks anyway
logging.warning("No %s files found in %s", resource, loader.root.path)

if is_default_tasks:
requested_resources -= missing_resources # scope down to detected resources
if not requested_resources:
errors.fatal(
f"No supported resources found.",

Check failure on line 263 in cumulus_etl/etl/cli.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F541)

cumulus_etl/etl/cli.py:263:21: F541 f-string without any placeholders
errors.MISSING_REQUESTED_RESOURCES,
)
else:
msg = f"Required resources not found.\n"

Check failure on line 267 in cumulus_etl/etl/cli.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F541)

cumulus_etl/etl/cli.py:267:19: F541 f-string without any placeholders
msg += "Add --allow-missing-resources to run related tasks anyway with no input."
errors.fatal(msg, errors.MISSING_REQUESTED_RESOURCES)

return requested_resources


async def etl_main(args: argparse.Namespace) -> None:
# Set up some common variables

Expand All @@ -232,6 +284,7 @@ async def etl_main(args: argparse.Namespace) -> None:
job_datetime = common.datetime_now() # grab timestamp before we do anything

selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)
is_default_tasks = not args.task and not args.task_filter

# Print configuration
print_config(args, job_datetime, selected_tasks)
Expand Down Expand Up @@ -266,8 +319,17 @@ async def etl_main(args: argparse.Namespace) -> None:
resume=args.resume,
)

required_resources = await check_available_resources(
config_loader,
args=args,
is_default_tasks=is_default_tasks,
requested_resources=required_resources,
)
# Drop any tasks that we didn't find resources for
selected_tasks = [t for t in selected_tasks if t.resource in required_resources]

# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loader_results = await config_loader.load_all(list(required_resources))
loader_results = await config_loader.load_resources(required_resources)

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, loader_results)
Expand Down
10 changes: 9 additions & 1 deletion cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ def __init__(self, root: store.Root):
self.root = root

@abc.abstractmethod
async def load_all(self, resources: list[str]) -> LoaderResults:
async def detect_resources(self) -> set[str] | None:
"""
Inspect which resources are available for use.
:returns: the types of resources detected (or None if that can't be determined yet)
"""

@abc.abstractmethod
async def load_resources(self, resources: set[str]) -> LoaderResults:
"""
Loads the listed remote resources and places them into a local folder as FHIR ndjson
Expand Down
4 changes: 2 additions & 2 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BulkExporter:
def __init__(
self,
client: fhir.FhirClient,
resources: list[str],
resources: set[str],
url: str,
destination: str,
*,
Expand Down Expand Up @@ -81,7 +81,7 @@ def format_kickoff_url(
self,
url: str,
*,
resources: list[str],
resources: set[str],
since: str | None,
until: str | None,
prefer_url_resources: bool,
Expand Down
17 changes: 14 additions & 3 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import tempfile

import cumulus_fhir_support

from cumulus_etl import cli_utils, common, errors, fhir, store
from cumulus_etl.loaders import base
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
Expand Down Expand Up @@ -37,7 +39,16 @@ def __init__(
self.until = until
self.resume = resume

async def load_all(self, resources: list[str]) -> base.LoaderResults:
async def detect_resources(self) -> set[str] | None:
if self.root.protocol in {"http", "https"}:
return None # we haven't done the export yet!

found_files = cumulus_fhir_support.list_multiline_json_in_dir(
self.root.path, fsspec_fs=self.root.fs
)
return {resource for resource in found_files.values() if resource}

async def load_resources(self, resources: set[str]) -> base.LoaderResults:
# Are we doing a bulk FHIR export from a server?
if self.root.protocol in ["http", "https"]:
bulk_dir = await self.load_from_bulk_export(resources)
Expand All @@ -61,14 +72,14 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
# TemporaryDirectory gets discarded), but that seems reasonable.
print("Copying ndjson input files…")
tmpdir = tempfile.TemporaryDirectory()
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
filenames = common.ls_resources(input_root, resources, warn_if_empty=True)
for filename in filenames:
input_root.get(filename, f"{tmpdir.name}/")

return self.read_loader_results(input_root, tmpdir)

async def load_from_bulk_export(
self, resources: list[str], prefer_url_resources: bool = False
self, resources: set[str], prefer_url_resources: bool = False
) -> common.Directory:
"""
Performs a bulk export and drops the results in an export dir.
Expand Down
28 changes: 24 additions & 4 deletions cumulus_etl/loaders/i2b2/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,27 @@ def __init__(self, root: store.Root, export_to: str | None = None):
super().__init__(root)
self.export_to = export_to

async def load_all(self, resources: list[str]) -> base.LoaderResults:
async def detect_resources(self) -> set[str] | None:
if self.root.protocol in {"tcp"}:
return None # we haven't done the export yet!

filenames = {
"observation_fact_diagnosis.csv": "Condition",
"observation_fact_lab_views.csv": "Observation",
"observation_fact_medications.csv": "MedicationRequest",
"observation_fact_notes.csv": "DocumentReference",
"observation_fact_vitals.csv": "Observation",
"patient_dimension.csv": "Patient",
"visit_dimension.csv": "Encounter",
}

return {
resource
for path, resource in filenames.items()
if self.root.exists(self.root.joinpath(path))
}

async def load_resources(self, resources: set[str]) -> base.LoaderResults:
if self.root.protocol in ["tcp"]:
directory = self._load_all_from_oracle(resources)
else:
Expand All @@ -43,7 +63,7 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:

def _load_all_with_extractors(
self,
resources: list[str],
resources: set[str],
conditions: I2b2ExtractorCallable,
lab_views: I2b2ExtractorCallable,
medicationrequests: I2b2ExtractorCallable,
Expand Down Expand Up @@ -139,7 +159,7 @@ def _loop(
#
###################################################################################################################

def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
def _load_all_from_csv(self, resources: set[str]) -> common.Directory:
path = self.root.path
return self._load_all_with_extractors(
resources,
Expand Down Expand Up @@ -177,7 +197,7 @@ def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
#
###################################################################################################################

def _load_all_from_oracle(self, resources: list[str]) -> common.Directory:
def _load_all_from_oracle(self, resources: set[str]) -> common.Directory:
path = self.root.path
return self._load_all_with_extractors(
resources,
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/upload_notes/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def download_docrefs_from_fhir_server(
else:
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
return await ndjson_loader.load_all(["DocumentReference"])
return await ndjson_loader.load_resources({"DocumentReference"})


async def _download_docrefs_from_fake_ids(
Expand Down
52 changes: 38 additions & 14 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,17 @@ async def test_failed_task(self):

async def test_single_task(self):
# Grab all observations before we mock anything
observations = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
["Observation"]
observations = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_resources(
{"Observation"}
)

def fake_load_all(internal_self, resources):
def fake_load_resources(internal_self, resources):
del internal_self
# Confirm we only tried to load one resource
self.assertEqual(["Observation"], resources)
self.assertEqual({"Observation"}, resources)
return observations

with mock.patch.object(loaders.FhirNdjsonLoader, "load_all", new=fake_load_all):
with mock.patch.object(loaders.FhirNdjsonLoader, "load_resources", new=fake_load_resources):
await self.run_etl(tasks=["observation"])

# Confirm we only wrote the one resource
Expand All @@ -126,17 +126,17 @@ def fake_load_all(internal_self, resources):

async def test_multiple_tasks(self):
# Grab all observations before we mock anything
loaded = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
["Observation", "Patient"]
loaded = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_resources(
{"Observation", "Patient"}
)

def fake_load_all(internal_self, resources):
def fake_load_resources(internal_self, resources):
del internal_self
# Confirm we only tried to load two resources
self.assertEqual({"Observation", "Patient"}, set(resources))
self.assertEqual({"Observation", "Patient"}, resources)
return loaded

with mock.patch.object(loaders.FhirNdjsonLoader, "load_all", new=fake_load_all):
with mock.patch.object(loaders.FhirNdjsonLoader, "load_resources", new=fake_load_resources):
await self.run_etl(tasks=["observation", "patient"])

# Confirm we only wrote the two resources
Expand Down Expand Up @@ -267,16 +267,18 @@ async def test_task_init_checks(self, mock_check):
async def test_completion_args(self, etl_args, loader_vals, expected_vals):
"""Verify that we parse completion args with the correct fallbacks and checks."""
# Grab all observations before we mock anything
observations = await loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
["Observation"]
observations = await loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_resources(
{"Observation"}
)
observations.group_name = loader_vals[0]
observations.export_datetime = loader_vals[1]

with (
self.assertRaises(SystemExit) as cm,
mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit) as mock_etl_job,
mock.patch.object(loaders.FhirNdjsonLoader, "load_all", return_value=observations),
mock.patch.object(
loaders.FhirNdjsonLoader, "load_resources", return_value=observations
),
):
await self.run_etl(tasks=["observation"], **etl_args)

Expand All @@ -297,14 +299,36 @@ async def test_deleted_ids_passed_down(self):
with (
self.assertRaises(SystemExit),
mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit) as mock_etl_job,
mock.patch.object(loaders.FhirNdjsonLoader, "load_all", return_value=results),
mock.patch.object(loaders.FhirNdjsonLoader, "load_resources", return_value=results),
):
await self.run_etl(tasks=["observation"])

self.assertEqual(mock_etl_job.call_count, 1)
config = mock_etl_job.call_args[0][0]
self.assertEqual({"Observation": {"obs1"}}, config.deleted_ids)

@ddt.data(["patient"], None)
async def test_missing_resources(self, tasks):
with tempfile.TemporaryDirectory() as tmpdir:
with self.assertRaises(SystemExit) as cm:
await self.run_etl(tasks=tasks, input_path=tmpdir)
self.assertEqual(errors.MISSING_REQUESTED_RESOURCES, cm.exception.code)

async def test_allow_missing_resources(self):
with tempfile.TemporaryDirectory() as tmpdir:
await self.run_etl("--allow-missing-resources", tasks=["patient"], input_path=tmpdir)

self.assertEqual("", common.read_text(f"{self.output_path}/patient/patient.000.ndjson"))

async def test_missing_resources_skips_tasks(self):
with tempfile.TemporaryDirectory() as tmpdir:
common.write_json(f"{tmpdir}/p.ndjson", {"id": "A", "resourceType": "Patient"})
await self.run_etl(input_path=tmpdir)

self.assertEqual(
{"etl__completion", "patient", "JobConfig"}, set(os.listdir(self.output_path))
)


class TestEtlJobConfig(BaseEtlSimple):
"""Test case for the job config logging data"""
Expand Down
Loading

0 comments on commit d6d8f64

Please sign in to comment.