Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip tasks or error out when resources are missing #351

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SERVICE_MISSING = 33 # generic init-check service is missing
COMPLETION_ARG_MISSING = 34
TASK_HELP = 35
MISSING_REQUESTED_RESOURCES = 36


class FatalError(Exception):
Expand Down
71 changes: 66 additions & 5 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import argparse
import datetime
import logging
import os
import shutil
import sys
from collections.abc import Iterable

import rich
import rich.table
Expand All @@ -16,6 +16,9 @@
from cumulus_etl.etl.config import JobConfig, JobSummary
from cumulus_etl.etl.tasks import task_factory

TaskList = list[type[tasks.EtlTask]]


###############################################################################
#
# Main Pipeline (run all tasks)
Expand All @@ -24,7 +27,7 @@


async def etl_job(
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
config: JobConfig, selected_tasks: TaskList, use_philter: bool = False
) -> list[JobSummary]:
"""
:param config: job config
Expand Down Expand Up @@ -68,7 +71,7 @@ def check_mstool() -> None:
raise SystemExit(errors.MSTOOL_MISSING)


async def check_requirements(selected_tasks: Iterable[type[tasks.EtlTask]]) -> None:
async def check_requirements(selected_tasks: TaskList) -> None:
"""
Verifies that all external services and programs are ready

Expand Down Expand Up @@ -118,6 +121,11 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--errors-to", metavar="DIR", help="where to put resources that could not be processed"
)
parser.add_argument(
"--allow-missing-resources",
action="store_true",
help="run tasks even if their resources are not present",
)

cli_utils.add_aws(parser)
cli_utils.add_auth(parser)
Expand All @@ -143,7 +151,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:


def print_config(
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: Iterable[tasks.EtlTask]
args: argparse.Namespace, job_datetime: datetime.datetime, all_tasks: TaskList
) -> None:
"""
Prints the ETL configuration to the console.
Expand Down Expand Up @@ -214,6 +222,49 @@ def handle_completion_args(
return export_group_name, export_datetime


async def check_available_resources(
loader: loaders.Loader,
*,
requested_resources: set[str],
args: argparse.Namespace,
is_default_tasks: bool,
) -> set[str]:
# Here we try to reconcile which resources the user requested and which resources are actually
# available in the input root.
# - If the user didn't specify a specific task, we'll scope down the requested resources to
# what is actually present in the input.
# - If they did, we'll complain if their required resources are not available.
#
# Reconciling is helpful for performance reasons (don't need to finalize untouched tables),
# UX reasons (can tell user if they made a CLI mistake), and completion tracking (don't
# mark a resource as complete if we didn't even export it)
if args.allow_missing_resources:
return requested_resources

detected = await loader.detect_resources()
if detected is None:
return requested_resources # likely we haven't run bulk export yet

if missing_resources := requested_resources - detected:
for resource in sorted(missing_resources):
# Log the same message we would print if in common.py if we ran tasks anyway
logging.warning("No %s files found in %s", resource, loader.root.path)

if is_default_tasks:
requested_resources -= missing_resources # scope down to detected resources
if not requested_resources:
errors.fatal(
"No supported resources found.",
errors.MISSING_REQUESTED_RESOURCES,
)
else:
msg = "Required resources not found.\n"
msg += "Add --allow-missing-resources to run related tasks anyway with no input."
errors.fatal(msg, errors.MISSING_REQUESTED_RESOURCES)

return requested_resources


async def etl_main(args: argparse.Namespace) -> None:
# Set up some common variables

Expand All @@ -227,6 +278,7 @@ async def etl_main(args: argparse.Namespace) -> None:
job_datetime = common.datetime_now() # grab timestamp before we do anything

selected_tasks = task_factory.get_selected_tasks(args.task, args.task_filter)
is_default_tasks = not args.task and not args.task_filter

# Print configuration
print_config(args, job_datetime, selected_tasks)
Expand Down Expand Up @@ -261,8 +313,17 @@ async def etl_main(args: argparse.Namespace) -> None:
resume=args.resume,
)

required_resources = await check_available_resources(
config_loader,
args=args,
is_default_tasks=is_default_tasks,
requested_resources=required_resources,
)
# Drop any tasks that we didn't find resources for
selected_tasks = [t for t in selected_tasks if t.resource in required_resources]

# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loader_results = await config_loader.load_all(list(required_resources))
loader_results = await config_loader.load_resources(required_resources)

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, loader_results)
Expand Down
3 changes: 0 additions & 3 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,6 @@ def _delete_requested_ids(self):
self.formatters[index].delete_records(deleted_ids)

def _update_completion_table(self) -> None:
# TODO: what about empty sets - do we assume the export gave 0 results or skip it?
# Is there a difference we could notice? (like empty input file vs no file at all)

if not self.completion_tracking_enabled:
return

Expand Down
10 changes: 9 additions & 1 deletion cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ def __init__(self, root: store.Root):
self.root = root

@abc.abstractmethod
async def load_all(self, resources: list[str]) -> LoaderResults:
async def detect_resources(self) -> set[str] | None:
"""
Inspect which resources are available for use.

:returns: the types of resources detected (or None if that can't be determined yet)
"""

@abc.abstractmethod
async def load_resources(self, resources: set[str]) -> LoaderResults:
"""
Loads the listed remote resources and places them into a local folder as FHIR ndjson

Expand Down
4 changes: 2 additions & 2 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BulkExporter:
def __init__(
self,
client: fhir.FhirClient,
resources: list[str],
resources: set[str],
url: str,
destination: str,
*,
Expand Down Expand Up @@ -81,7 +81,7 @@ def format_kickoff_url(
self,
url: str,
*,
resources: list[str],
resources: set[str],
since: str | None,
until: str | None,
prefer_url_resources: bool,
Expand Down
19 changes: 16 additions & 3 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import tempfile

import cumulus_fhir_support

from cumulus_etl import cli_utils, common, errors, fhir, store
from cumulus_etl.loaders import base
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
Expand Down Expand Up @@ -37,7 +39,18 @@ def __init__(
self.until = until
self.resume = resume

async def load_all(self, resources: list[str]) -> base.LoaderResults:
async def detect_resources(self) -> set[str] | None:
if self.root.protocol in {"http", "https"}:
# We haven't done the export yet, so there are no files to inspect yet.
# Returning None means "dunno" (i.e. "just accept whatever you eventually get").
return None

found_files = cumulus_fhir_support.list_multiline_json_in_dir(
self.root.path, fsspec_fs=self.root.fs
)
return {resource for resource in found_files.values() if resource}

async def load_resources(self, resources: set[str]) -> base.LoaderResults:
# Are we doing a bulk FHIR export from a server?
if self.root.protocol in ["http", "https"]:
bulk_dir = await self.load_from_bulk_export(resources)
Expand All @@ -61,14 +74,14 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
# TemporaryDirectory gets discarded), but that seems reasonable.
print("Copying ndjson input files…")
tmpdir = tempfile.TemporaryDirectory()
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
filenames = common.ls_resources(input_root, resources, warn_if_empty=True)
for filename in filenames:
input_root.get(filename, f"{tmpdir.name}/")

return self.read_loader_results(input_root, tmpdir)

async def load_from_bulk_export(
self, resources: list[str], prefer_url_resources: bool = False
self, resources: set[str], prefer_url_resources: bool = False
) -> common.Directory:
"""
Performs a bulk export and drops the results in an export dir.
Expand Down
30 changes: 26 additions & 4 deletions cumulus_etl/loaders/i2b2/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,29 @@ def __init__(self, root: store.Root, export_to: str | None = None):
super().__init__(root)
self.export_to = export_to

async def load_all(self, resources: list[str]) -> base.LoaderResults:
async def detect_resources(self) -> set[str] | None:
if self.root.protocol in {"tcp"}:
# We haven't done the export yet, so there are no files to inspect yet.
# Returning None means "dunno" (i.e. "just accept whatever you eventually get").
return None

filenames = {
"observation_fact_diagnosis.csv": "Condition",
"observation_fact_lab_views.csv": "Observation",
"observation_fact_medications.csv": "MedicationRequest",
"observation_fact_notes.csv": "DocumentReference",
"observation_fact_vitals.csv": "Observation",
"patient_dimension.csv": "Patient",
"visit_dimension.csv": "Encounter",
}

return {
resource
for path, resource in filenames.items()
if self.root.exists(self.root.joinpath(path))
}

async def load_resources(self, resources: set[str]) -> base.LoaderResults:
if self.root.protocol in ["tcp"]:
directory = self._load_all_from_oracle(resources)
else:
Expand All @@ -43,7 +65,7 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:

def _load_all_with_extractors(
self,
resources: list[str],
resources: set[str],
conditions: I2b2ExtractorCallable,
lab_views: I2b2ExtractorCallable,
medicationrequests: I2b2ExtractorCallable,
Expand Down Expand Up @@ -139,7 +161,7 @@ def _loop(
#
###################################################################################################################

def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
def _load_all_from_csv(self, resources: set[str]) -> common.Directory:
path = self.root.path
return self._load_all_with_extractors(
resources,
Expand Down Expand Up @@ -177,7 +199,7 @@ def _load_all_from_csv(self, resources: list[str]) -> common.Directory:
#
###################################################################################################################

def _load_all_from_oracle(self, resources: list[str]) -> common.Directory:
def _load_all_from_oracle(self, resources: set[str]) -> common.Directory:
path = self.root.path
return self._load_all_with_extractors(
resources,
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/upload_notes/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def download_docrefs_from_fhir_server(
else:
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
return await ndjson_loader.load_all(["DocumentReference"])
return await ndjson_loader.load_resources({"DocumentReference"})


async def _download_docrefs_from_fake_ids(
Expand Down
Loading