Skip to content

Commit

Permalink
Merge pull request #351 from smart-on-fhir/mikix/skip-missing-resources
Browse files Browse the repository at this point in the history
Skip tasks or error out when resources are missing
  • Loading branch information
mikix authored Oct 23, 2024
2 parents ceb9ce7 + 96442bc commit 98e2b8c
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 48 deletions.
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SERVICE_MISSING = 33 # generic init-check service is missing
COMPLETION_ARG_MISSING = 34
TASK_HELP = 35
MISSING_REQUESTED_RESOURCES = 36


class FatalError(Exception):
Expand Down
71 changes: 66 additions & 5 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import argparse
import datetime
import logging
import os
import shutil
import sys
from collections.abc import Iterable

import rich
import rich.table
Expand All @@ -16,6 +16,9 @@
from cumulus_etl.etl.config import JobConfig, JobSummary
from cumulus_etl.etl.tasks import task_factory

TaskList = list[type[tasks.EtlTask]]


###############################################################################
#
# Main Pipeline (run all tasks)
Expand All @@ -24,7 +27,7 @@


async def etl_job(
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
) -> list[JobSummary]:
"""
:param config: job config
Expand Down Expand Up @@ -68,7 +71,7 @@ def check_mstool() -> None:
raise SystemExit(errors.MSTOOL_MISSING)


async def check_requirements(selected_tasks: Iterable[type[tasks.EtlTask]]) -> None:
async def check_requirements(selected_tasks: TaskList) -> None:
"""
Verifies that all external services and programs are ready
Expand Down Expand Up @@ -118,6 +121,11 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--errors-to", metavar="DIR", help="where to put resources that could not be processed"
)
parser.add_argument(
"--allow-missing-resources",
action="store_true",
help="run tasks even if their resources are not present",
)

cli_utils.add_aws(parser)
cli_utils.add_auth(parser)
Expand All @@ -143,7 +151,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:


def print_config(
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: Iterable[tasks.EtlTask]
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: TaskList
) -> None:
"""
Prints the ETL configuration to the console.
Expand Down Expand Up @@ -214,6 +222,49 @@ def handle_completion_args(
return export_group_name, export_datetime


async def check_available_resources(
loader: loaders.Loader,
*,
requested_resources: set[str],
args: argparse.Namespace,
is_default_tasks: bool,
) -> set[str]:
# Here we try to reconcile which resources the user requested and which resources are actually
# available in the input root.
# - If the user didn't specify a specific task, we'll scope down the requested resources to
# what is actually present in the input.
# - If they did, we'll complain if their required resources are not available.
#
# Reconciling is helpful for performance reasons (don't need to finalize untouched tables),
# UX reasons (can tell user if they made a CLI mistake), and completion tracking (don't
# mark a resource as complete if we didn't even export it)
if args.allow_missing_resources:
return requested_resources

detected = await loader.detect_resources()
if detected is None:
return requested_resources # likely we haven't run bulk export yet

if missing_resources := requested_resources - detected:
for resource in sorted(missing_resources):
# Log the same message we would print if in common.py if we ran tasks anyway
logging.warning("No %s files found in %s", resource, loader.root.path)

if is_default_tasks:
requested_resources -= missing_resources # scope down to detected resources
if not requested_resources:
errors.fatal(
"No supported resources found.",
errors.MISSING_REQUESTED_RESOURCES,
)
else:
msg = "Required resources not found.\n"
msg += "Add --allow-missing-resources to run related tasks anyway with no input."
errors.fatal(msg, errors.MISSING_REQUESTED_RESOURCES)

return requested_resources


async def etl_main(args: argparse.Namespace) -> None:
# Set up some common variables

Expand All @@ -227,6 +278,7 @@ async def etl_main(args: argparse.Namespace) -> None:
job_datetime = common.datetime_now() # grab timestamp before we do anything

selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)
is_default_tasks = not args.task and not args.task_filter

# Print configuration
print_config(args, job_datetime, selected_tasks)
Expand Down Expand Up @@ -261,8 +313,17 @@ async def etl_main(args: argparse.Namespace) -> None:
resume=args.resume,
)

required_resources = await check_available_resources(
config_loader,
args=args,
is_default_tasks=is_default_tasks,
requested_resources=required_resources,
)
# Drop any tasks that we didn't find resources for
selected_tasks = [t for t in selected_tasks if t.resource in required_resources]

# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loader_results = await config_loader.load_all(list(required_resources))
loader_results = await config_loader.load_resources(required_resources)

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, loader_results)
Expand Down
3 changes: 0 additions & 3 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,6 @@ def _delete_requested_ids(self):
self.formatters[index].delete_records(deleted_ids)

def _update_completion_table(self) -> None:
# TODO: what about empty sets - do we assume the export gave 0 results or skip it?
# Is there a difference we could notice? (like empty input file vs no file at all)

if not self.completion_tracking_enabled:
return

Expand Down
10 changes: 9 additions & 1 deletion cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ def __init__(self, root: store.Root):
self.root = root

@abc.abstractmethod
async def load_all(self, resources: list[str]) -> LoaderResults:
async def detect_resources(self) -> set[str] | None:
"""
Inspect which resources are available for use.
:returns: the types of resources detected (or None if that can't be determined yet)
"""

@abc.abstractmethod
async def load_resources(self, resources: set[str]) -> LoaderResults:
"""
Loads the listed remote resources and places them into a local folder as FHIR ndjson
Expand Down
4 changes: 2 additions & 2 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BulkExporter:
def __init__(
self,
client: fhir.FhirClient,
resources: list[str],
resources: set[str],
url: str,
destination: str,
*,
Expand Down Expand Up @@ -81,7 +81,7 @@ def format_kickoff_url(
self,
url: str,
*,
resources: list[str],
resources: set[str],
since: str | None,
until: str | None,
prefer_url_resources: bool,
Expand Down
19 changes: 16 additions & 3 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import tempfile

import cumulus_fhir_support

from cumulus_etl import cli_utils, common, errors, fhir, store
from cumulus_etl.loaders import base
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
Expand Down Expand Up @@ -37,7 +39,18 @@ def __init__(
self.until = until
self.resume = resume

async def load_all(self, resources: list[str]) -> base.LoaderResults:
async def detect_resources(self) -> set[str] | None:
if self.root.protocol in {"http", "https"}:
# We haven't done the export yet, so there are no files to inspect yet.
# Returning None means "dunno" (i.e. "just accept whatever you eventually get").
return None

found_files = cumulus_fhir_support.list_multiline_json_in_dir(
self.root.path, fsspec_fs=self.root.fs
)
return {resource for resource in found_files.values() if resource}

async def load_resources(self, resources: set[str]) -> base.LoaderResults:
# Are we doing a bulk FHIR export from a server?
if self.root.protocol in ["http", "https"]:
bulk_dir = await self.load_from_bulk_export(resources)
Expand All @@ -61,14 +74,14 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
# TemporaryDirectory gets discarded), but that seems reasonable.
print("Copying ndjson input files…")
tmpdir = tempfile.TemporaryDirectory()
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
filenames = common.ls_resources(input_root, resources, warn_if_empty=True)
for filename in filenames:
input_root.get(filename, f"{tmpdir.name}/")

return self.read_loader_results(input_root, tmpdir)

async def load_from_bulk_export(
self, resources: list[str], prefer_url_resources: bool = False
self, resources: set[str], prefer_url_resources: bool = False
) -> common.Directory:
"""
Performs a bulk export and drops the results in an export dir.
Expand Down
30 changes: 26 additions & 4 deletions cumulus_etl/loaders/i2b2/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,29 @@ def __init__(self, root: store.Root, export_to: str | None = None):
super().__init__(root)
self.export_to = export_to

async def load_all(self, resources: list[str]) -> base.LoaderResults:
async def detect_resources(self) -> set[str] | None:
if self.root.protocol in {"tcp"}:
# We haven't done the export yet, so there are no files to inspect yet.
# Returning None means "dunno" (i.e. "just accept whatever you eventually get").
return None

filenames = {
"observation_fact_diagnosis.csv": "Condition",
"observation_fact_lab_views.csv": "Observation",
"observation_fact_medications.csv": "MedicationRequest",
"observation_fact_notes.csv": "DocumentReference",
"observation_fact_vitals.csv": "Observation",
"patient_dimension.csv": "Patient",
"visit_dimension.csv": "Encounter",
}

return {
resource
for path, resource in filenames.items()
if self.root.exists(self.root.joinpath(path))
}

async def load_resources(self, resources: set[str]) -> base.LoaderResults:
if self.root.protocol in ["tcp"]:
directory = self._load_all_from_oracle(resources)
else:
Expand All @@ -43,7 +65,7 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:

def _load_all_with_extractors(
self,
resources: list[str],
resources: set[str],
conditions: I2b2ExtractorCallable,
lab_views: I2b2ExtractorCallable,
medicationrequests: I2b2ExtractorCallable,
Expand Down Expand Up @@ -139,7 +161,7 @@ def _loop(
#
###################################################################################################################

def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
def _load_all_from_csv(self, resources: set[str]) -> common.Directory:
path = self.root.path
return self._load_all_with_extractors(
resources,
Expand Down Expand Up @@ -177,7 +199,7 @@ def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
#
###################################################################################################################

def _load_all_from_oracle(self, resources: list[str]) -> common.Directory:
def _load_all_from_oracle(self, resources: set[str]) -> common.Directory:
path = self.root.path
return self._load_all_with_extractors(
resources,
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/upload_notes/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def download_docrefs_from_fhir_server(
else:
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
return await ndjson_loader.load_all(["DocumentReference"])
return await ndjson_loader.load_resources({"DocumentReference"})


async def _download_docrefs_from_fake_ids(
Expand Down
Loading

0 comments on commit 98e2b8c

Please sign in to comment.