Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse any deleted resource IDs from bulk exports and act on it #344

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
with:
coverageFile: coverage.xml
token: ${{ secrets.GITHUB_TOKEN }}
thresholdAll: .97
thresholdAll: .98
thresholdNew: 1
thresholdModified: 1

Expand Down
15 changes: 8 additions & 7 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ def print_config(


def handle_completion_args(
args: argparse.Namespace, loader: loaders.Loader
args: argparse.Namespace, loader_results: loaders.LoaderResults
) -> (str, datetime.datetime):
"""Returns (group_name, datetime)"""
# Grab completion options from CLI or loader
export_group_name = args.export_group or loader.group_name
export_group_name = args.export_group or loader_results.group_name
export_datetime = (
datetime.datetime.fromisoformat(args.export_timestamp)
if args.export_timestamp
else loader.export_datetime
else loader_results.export_datetime
)

# Disable entirely if asked to
Expand Down Expand Up @@ -267,22 +267,22 @@ async def etl_main(args: argparse.Namespace) -> None:
)

# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))
loader_results = await config_loader.load_all(list(required_resources))

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, config_loader)
export_group_name, export_datetime = handle_completion_args(args, loader_results)

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
loader_results.directory = await deid.Scrubber.scrub_bulk_data(loader_results.path)
else:
print("Skipping bulk de-identification.")
print("These selected tasks will de-identify resources as they are processed.")

# Prepare config for jobs
config = JobConfig(
args.dir_input,
loaded_dir.name,
loader_results.path,
args.dir_output,
args.dir_phi,
args.input_format,
Expand All @@ -296,6 +296,7 @@ async def etl_main(args: argparse.Namespace) -> None:
tasks=[t.name for t in selected_tasks],
export_group_name=export_group_name,
export_datetime=export_datetime,
deleted_ids=loader_results.deleted_ids,
)
common.write_json(config.path_config(), config.as_json(), indent=4)

Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
tasks: list[str] | None = None,
export_group_name: str | None = None,
export_datetime: datetime.datetime | None = None,
deleted_ids: dict[str, set[str]] | None = None,
):
self._dir_input_orig = dir_input_orig
self.dir_input = dir_input_deid
Expand All @@ -50,6 +51,7 @@ def __init__(
self.tasks = tasks or []
self.export_group_name = export_group_name
self.export_datetime = export_datetime
self.deleted_ids = deleted_ids or {}

# initialize format class
self._output_root = store.Root(self._dir_output, create=True)
Expand Down
24 changes: 20 additions & 4 deletions cumulus_etl/etl/convert/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ def make_batch(
return formats.Batch(rows, groups=groups, schema=schema)


def convert_table_metadata(
meta_path: str,
formatter: formats.Format,
) -> None:
try:
meta = common.read_json(meta_path)
except (FileNotFoundError, PermissionError):
return

# Only one metadata field currently: deleted IDs
deleted = meta.get("deleted", [])
formatter.delete_records(set(deleted))


def convert_folder(
input_root: store.Root,
*,
Expand Down Expand Up @@ -66,6 +80,7 @@ def convert_folder(
formatter.write_records(batch)
progress.update(progress_task, advance=1)

convert_table_metadata(f"{table_input_dir}/{table_name}.meta", formatter)
formatter.finalize()
progress.update(progress_task, advance=1)

Expand Down Expand Up @@ -117,14 +132,15 @@ def convert_completion(

def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
job_config_path = input_root.joinpath("JobConfig")
job_config_path = input_root.joinpath("JobConfig/")

# Download input dir if it's not local
if input_root.protocol != "file":
input_root.get(job_config_path, tmpdir, recursive=True)
job_config_path = os.path.join(tmpdir, "JobConfig")
new_location = os.path.join(tmpdir, "JobConfig/")
input_root.get(job_config_path, new_location, recursive=True)
job_config_path = new_location

output_root.put(job_config_path, output_root.path, recursive=True)
output_root.put(job_config_path, output_root.joinpath("JobConfig/"), recursive=True)


def walk_tree(
Expand Down
36 changes: 33 additions & 3 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,15 @@ async def run(self) -> list[config.JobSummary]:

with self._indeterminate_progress(progress, "Finalizing"):
# Ensure that we touch every output table (to create them and/or to confirm schema).
# Consider case of Medication for an EHR that only has inline Medications inside MedicationRequest.
# The Medication table wouldn't get created otherwise. Plus this is a good place to push any schema
# changes. (The reason it's nice if the table & schema exist is so that downstream SQL can be dumber.)
# Consider case of Medication for an EHR that only has inline Medications inside
# MedicationRequest. The Medication table wouldn't get created otherwise.
# Plus this is a good place to push any schema changes. (The reason it's nice if
# the table & schema exist is so that downstream SQL can be dumber.)
self._touch_remaining_tables()

# If the input data indicates we should delete some IDs, do that here.
self._delete_requested_ids()

# Mark this group & resource combo as complete
self._update_completion_table()

Expand Down Expand Up @@ -228,6 +232,32 @@ def _touch_remaining_tables(self):
# just write an empty dataframe (should be fast)
self._write_one_table_batch([], table_index, 0)

def _delete_requested_ids(self):
"""
Deletes IDs that have been marked for deletion.

Formatters are expected to already exist when this is called.

This usually happens via the `deleted` array from a bulk export.
Which clients usually drop in a deleted/ folder in the download directory.
But in our case, that's abstracted away into a JobConfig.deleted_ids dictionary.
"""
for index, output in enumerate(self.outputs):
resource = output.get_resource_type(self)
if not resource or resource.lower() != output.get_name(self):
# Only delete from the main table for the resource
continue

deleted_ids = self.task_config.deleted_ids.get(resource, set())
if not deleted_ids:
continue

deleted_ids = {
self.scrubber.codebook.fake_id(resource, x, caching_allowed=False)
for x in deleted_ids
}
self.formatters[index].delete_records(deleted_ids)

def _update_completion_table(self) -> None:
# TODO: what about empty sets - do we assume the export gave 0 results or skip it?
# Is there a difference we could notice? (like empty input file vs no file at all)
Expand Down
8 changes: 8 additions & 0 deletions cumulus_etl/formats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ def _write_one_batch(self, batch: Batch) -> None:
:param batch: the batch of data
"""

@abc.abstractmethod
def delete_records(self, ids: set[str]) -> None:
"""
Deletes all mentioned IDs from the table.
:param ids: all IDs to remove
"""

def finalize(self) -> None:
"""
Performs any necessary cleanup after all batches have been written.
Expand Down
11 changes: 11 additions & 0 deletions cumulus_etl/formats/batched_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,14 @@ def _write_one_batch(self, batch: Batch) -> None:
full_path = self.dbroot.joinpath(f"{self.dbname}.{self._index:03}.{self.suffix}")
self.write_format(batch, full_path)
self._index += 1

def delete_records(self, ids: set[str]) -> None:
"""
Deletes the given IDs.

Though this is a no-op for batched file outputs, since:
- we guarantee the output folder is empty at the start
- the spec says deleted IDs won't overlap with output IDs

But subclasses may still want to write these to disk to preserve the metadata.
"""
40 changes: 30 additions & 10 deletions cumulus_etl/formats/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ def initialize_class(cls, root: store.Root) -> None:
def _write_one_batch(self, batch: Batch) -> None:
"""Writes the whole dataframe to a delta lake"""
with self.batch_to_spark(batch) as updates:
if updates is None:
return
delta_table = self.update_delta_table(updates, groups=batch.groups)

delta_table.generate("symlink_format_manifest")
Expand Down Expand Up @@ -131,16 +129,25 @@ def update_delta_table(

return table

def finalize(self) -> None:
"""Performs any necessary cleanup after all batches have been written"""
full_path = self._table_path(self.dbname)
def delete_records(self, ids: set[str]) -> None:
"""Deletes the given IDs."""
if not ids:
return

table = self._load_table()
if not table:
return

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
except AnalysisException:
return # if the table doesn't exist because we didn't write anything, that's fine - just bail
id_list = "', '".join(ids)
table.delete(f"id in ('{id_list}')")
except Exception:
logging.exception("Could not finalize Delta Lake table %s", self.dbname)
logging.exception("Could not delete IDs from Delta Lake table %s", self.dbname)

def finalize(self) -> None:
"""Performs any necessary cleanup after all batches have been written"""
table = self._load_table()
if not table:
return

try:
Expand All @@ -154,6 +161,19 @@ def _table_path(self, dbname: str) -> str:
# hadoop uses the s3a: scheme instead of s3:
return self.root.joinpath(dbname).replace("s3://", "s3a://")

def _load_table(self) -> delta.DeltaTable | None:
full_path = self._table_path(self.dbname)

try:
return delta.DeltaTable.forPath(self.spark, full_path)
except AnalysisException:
# The table likely doesn't exist.
# Which can be normal if we didn't write anything yet, that's fine - just bail.
return None
except Exception:
logging.exception("Could not load Delta Lake table %s", self.dbname)
return None

@staticmethod
def _get_update_condition(schema: pyspark.sql.types.StructType) -> str | None:
"""
Expand Down Expand Up @@ -214,7 +234,7 @@ def _configure_fs(root: store.Root, spark: pyspark.sql.SparkSession):
spark.conf.set("fs.s3a.endpoint.region", region_name)

@contextlib.contextmanager
def batch_to_spark(self, batch: Batch) -> pyspark.sql.DataFrame | None:
def batch_to_spark(self, batch: Batch) -> pyspark.sql.DataFrame:
"""Transforms a batch to a spark DF"""
# This is the quick and dirty way - write batch to parquet with pyarrow and read it back.
# But a more direct way would be to convert the pyarrow schema to a pyspark schema and just
Expand Down
19 changes: 19 additions & 0 deletions cumulus_etl/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ def write_format(self, batch: Batch, path: str) -> None:

# This is mostly used in tests and debugging, so we'll write out sparse files (no null columns)
common.write_rows_to_ndjson(path, batch.rows, sparse=True)

def table_metadata_path(self) -> str:
return self.dbroot.joinpath(f"{self.dbname}.meta") # no batch number

def read_table_metadata(self) -> dict:
try:
return common.read_json(self.table_metadata_path())
except (FileNotFoundError, PermissionError):
return {}

def write_table_metadata(self, metadata: dict) -> None:
self.root.makedirs(self.dbroot.path)
common.write_json(self.table_metadata_path(), metadata, indent=2)

def delete_records(self, ids: set[str]) -> None:
# Read and write back table metadata, with the addition of these new deleted IDs
meta = self.read_table_metadata()
meta.setdefault("deleted", []).extend(sorted(ids))
self.write_table_metadata(meta)
2 changes: 1 addition & 1 deletion cumulus_etl/loaders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Public API for loaders"""

from .base import Loader
from .base import Loader, LoaderResults
from .fhir.ndjson_loader import FhirNdjsonLoader
from .i2b2.loader import I2b2Loader
29 changes: 24 additions & 5 deletions cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
"""Base abstract loader"""

import abc
import dataclasses
import datetime

from cumulus_etl import common, store


@dataclasses.dataclass(kw_only=True)
class LoaderResults:
"""Bundles results of a load request"""

# Where loaded files reside on disk (use .path for convenience)
directory: common.Directory

@property
def path(self) -> str:
return self.directory.name

# Completion tracking values - noting an export group name for this bundle of data
# and the time when it was exported ("transactionTime" in bulk-export terms).
group_name: str | None = None
export_datetime: datetime.datetime | None = None

# A list of resource IDs that should be deleted from the output tables.
# This is a map of resource -> set of IDs like {"Patient": {"A", "B"}}
deleted_ids: dict[str, set[str]] = dataclasses.field(default_factory=dict)


class Loader(abc.ABC):
"""
An abstraction for how to load FHIR input
Expand All @@ -21,12 +44,8 @@ def __init__(self, root: store.Root):
"""
self.root = root

# Public properties (potentially set when loading) for reporting back to caller
self.group_name = None
self.export_datetime = None

@abc.abstractmethod
async def load_all(self, resources: list[str]) -> common.Directory:
async def load_all(self, resources: list[str]) -> LoaderResults:
"""
Loads the listed remote resources and places them into a local folder as FHIR ndjson

Expand Down
Loading