Skip to content

Commit

Permalink
Merge pull request #344 from smart-on-fhir/mikix/deleted-ids
Browse files Browse the repository at this point in the history
Parse any deleted resource IDs from bulk exports and act on it
  • Loading branch information
mikix authored Sep 5, 2024
2 parents 7396061 + 9a6ab95 commit 94bdf30
Show file tree
Hide file tree
Showing 21 changed files with 557 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
with:
coverageFile: coverage.xml
token: ${{ secrets.GITHUB_TOKEN }}
thresholdAll: .97
thresholdAll: .98
thresholdNew: 1
thresholdModified: 1

Expand Down
15 changes: 8 additions & 7 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ def print_config(


def handle_completion_args(
args: argparse.Namespace, loader: loaders.Loader
args: argparse.Namespace, loader_results: loaders.LoaderResults
) -> (str, datetime.datetime):
"""Returns (group_name, datetime)"""
# Grab completion options from CLI or loader
export_group_name = args.export_group or loader.group_name
export_group_name = args.export_group or loader_results.group_name
export_datetime = (
datetime.datetime.fromisoformat(args.export_timestamp)
if args.export_timestamp
else loader.export_datetime
else loader_results.export_datetime
)

# Disable entirely if asked to
Expand Down Expand Up @@ -267,22 +267,22 @@ async def etl_main(args: argparse.Namespace) -> None:
)

# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))
loader_results = await config_loader.load_all(list(required_resources))

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, config_loader)
export_group_name, export_datetime = handle_completion_args(args, loader_results)

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
loader_results.directory = await deid.Scrubber.scrub_bulk_data(loader_results.path)
else:
print("Skipping bulk de-identification.")
print("These selected tasks will de-identify resources as they are processed.")

# Prepare config for jobs
config = JobConfig(
args.dir_input,
loaded_dir.name,
loader_results.path,
args.dir_output,
args.dir_phi,
args.input_format,
Expand All @@ -296,6 +296,7 @@ async def etl_main(args: argparse.Namespace) -> None:
tasks=[t.name for t in selected_tasks],
export_group_name=export_group_name,
export_datetime=export_datetime,
deleted_ids=loader_results.deleted_ids,
)
common.write_json(config.path_config(), config.as_json(), indent=4)

Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
tasks: list[str] | None = None,
export_group_name: str | None = None,
export_datetime: datetime.datetime | None = None,
deleted_ids: dict[str, set[str]] | None = None,
):
self._dir_input_orig = dir_input_orig
self.dir_input = dir_input_deid
Expand All @@ -50,6 +51,7 @@ def __init__(
self.tasks = tasks or []
self.export_group_name = export_group_name
self.export_datetime = export_datetime
self.deleted_ids = deleted_ids or {}

# initialize format class
self._output_root = store.Root(self._dir_output, create=True)
Expand Down
24 changes: 20 additions & 4 deletions cumulus_etl/etl/convert/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ def make_batch(
return formats.Batch(rows, groups=groups, schema=schema)


def convert_table_metadata(
meta_path: str,
formatter: formats.Format,
) -> None:
try:
meta = common.read_json(meta_path)
except (FileNotFoundError, PermissionError):
return

# Only one metadata field currently: deleted IDs
deleted = meta.get("deleted", [])
formatter.delete_records(set(deleted))


def convert_folder(
input_root: store.Root,
*,
Expand Down Expand Up @@ -66,6 +80,7 @@ def convert_folder(
formatter.write_records(batch)
progress.update(progress_task, advance=1)

convert_table_metadata(f"{table_input_dir}/{table_name}.meta", formatter)
formatter.finalize()
progress.update(progress_task, advance=1)

Expand Down Expand Up @@ -117,14 +132,15 @@ def convert_completion(

def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
job_config_path = input_root.joinpath("JobConfig")
job_config_path = input_root.joinpath("JobConfig/")

# Download input dir if it's not local
if input_root.protocol != "file":
input_root.get(job_config_path, tmpdir, recursive=True)
job_config_path = os.path.join(tmpdir, "JobConfig")
new_location = os.path.join(tmpdir, "JobConfig/")
input_root.get(job_config_path, new_location, recursive=True)
job_config_path = new_location

output_root.put(job_config_path, output_root.path, recursive=True)
output_root.put(job_config_path, output_root.joinpath("JobConfig/"), recursive=True)


def walk_tree(
Expand Down
36 changes: 33 additions & 3 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,15 @@ async def run(self) -> list[config.JobSummary]:

with self._indeterminate_progress(progress, "Finalizing"):
# Ensure that we touch every output table (to create them and/or to confirm schema).
# Consider case of Medication for an EHR that only has inline Medications inside MedicationRequest.
# The Medication table wouldn't get created otherwise. Plus this is a good place to push any schema
# changes. (The reason it's nice if the table & schema exist is so that downstream SQL can be dumber.)
# Consider case of Medication for an EHR that only has inline Medications inside
# MedicationRequest. The Medication table wouldn't get created otherwise.
# Plus this is a good place to push any schema changes. (The reason it's nice if
# the table & schema exist is so that downstream SQL can be dumber.)
self._touch_remaining_tables()

# If the input data indicates we should delete some IDs, do that here.
self._delete_requested_ids()

# Mark this group & resource combo as complete
self._update_completion_table()

Expand Down Expand Up @@ -228,6 +232,32 @@ def _touch_remaining_tables(self):
# just write an empty dataframe (should be fast)
self._write_one_table_batch([], table_index, 0)

def _delete_requested_ids(self):
"""
Deletes IDs that have been marked for deletion.
Formatters are expected to already exist when this is called.
This usually happens via the `deleted` array from a bulk export.
Which clients usually drop in a deleted/ folder in the download directory.
But in our case, that's abstracted away into a JobConfig.deleted_ids dictionary.
"""
for index, output in enumerate(self.outputs):
resource = output.get_resource_type(self)
if not resource or resource.lower() != output.get_name(self):
# Only delete from the main table for the resource
continue

deleted_ids = self.task_config.deleted_ids.get(resource, set())
if not deleted_ids:
continue

deleted_ids = {
self.scrubber.codebook.fake_id(resource, x, caching_allowed=False)
for x in deleted_ids
}
self.formatters[index].delete_records(deleted_ids)

def _update_completion_table(self) -> None:
# TODO: what about empty sets - do we assume the export gave 0 results or skip it?
# Is there a difference we could notice? (like empty input file vs no file at all)
Expand Down
8 changes: 8 additions & 0 deletions cumulus_etl/formats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ def _write_one_batch(self, batch: Batch) -> None:
:param batch: the batch of data
"""

@abc.abstractmethod
def delete_records(self, ids: set[str]) -> None:
"""
Deletes all mentioned IDs from the table.
:param ids: all IDs to remove
"""

def finalize(self) -> None:
"""
Performs any necessary cleanup after all batches have been written.
Expand Down
11 changes: 11 additions & 0 deletions cumulus_etl/formats/batched_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,14 @@ def _write_one_batch(self, batch: Batch) -> None:
full_path = self.dbroot.joinpath(f"{self.dbname}.{self._index:03}.{self.suffix}")
self.write_format(batch, full_path)
self._index += 1

def delete_records(self, ids: set[str]) -> None:
"""
Deletes the given IDs.
Though this is a no-op for batched file outputs, since:
- we guarantee the output folder is empty at the start
- the spec says deleted IDs won't overlap with output IDs
But subclasses may still want to write these to disk to preserve the metadata.
"""
40 changes: 30 additions & 10 deletions cumulus_etl/formats/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ def initialize_class(cls, root: store.Root) -> None:
def _write_one_batch(self, batch: Batch) -> None:
"""Writes the whole dataframe to a delta lake"""
with self.batch_to_spark(batch) as updates:
if updates is None:
return
delta_table = self.update_delta_table(updates, groups=batch.groups)

delta_table.generate("symlink_format_manifest")
Expand Down Expand Up @@ -131,16 +129,25 @@ def update_delta_table(

return table

def finalize(self) -> None:
"""Performs any necessary cleanup after all batches have been written"""
full_path = self._table_path(self.dbname)
def delete_records(self, ids: set[str]) -> None:
"""Deletes the given IDs."""
if not ids:
return

table = self._load_table()
if not table:
return

try:
table = delta.DeltaTable.forPath(self.spark, full_path)
except AnalysisException:
return # if the table doesn't exist because we didn't write anything, that's fine - just bail
id_list = "', '".join(ids)
table.delete(f"id in ('{id_list}')")
except Exception:
logging.exception("Could not finalize Delta Lake table %s", self.dbname)
logging.exception("Could not delete IDs from Delta Lake table %s", self.dbname)

def finalize(self) -> None:
"""Performs any necessary cleanup after all batches have been written"""
table = self._load_table()
if not table:
return

try:
Expand All @@ -154,6 +161,19 @@ def _table_path(self, dbname: str) -> str:
# hadoop uses the s3a: scheme instead of s3:
return self.root.joinpath(dbname).replace("s3://", "s3a://")

def _load_table(self) -> delta.DeltaTable | None:
full_path = self._table_path(self.dbname)

try:
return delta.DeltaTable.forPath(self.spark, full_path)
except AnalysisException:
# The table likely doesn't exist.
# Which can be normal if we didn't write anything yet, that's fine - just bail.
return None
except Exception:
logging.exception("Could not load Delta Lake table %s", self.dbname)
return None

@staticmethod
def _get_update_condition(schema: pyspark.sql.types.StructType) -> str | None:
"""
Expand Down Expand Up @@ -214,7 +234,7 @@ def _configure_fs(root: store.Root, spark: pyspark.sql.SparkSession):
spark.conf.set("fs.s3a.endpoint.region", region_name)

@contextlib.contextmanager
def batch_to_spark(self, batch: Batch) -> pyspark.sql.DataFrame | None:
def batch_to_spark(self, batch: Batch) -> pyspark.sql.DataFrame:
"""Transforms a batch to a spark DF"""
# This is the quick and dirty way - write batch to parquet with pyarrow and read it back.
# But a more direct way would be to convert the pyarrow schema to a pyspark schema and just
Expand Down
19 changes: 19 additions & 0 deletions cumulus_etl/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ def write_format(self, batch: Batch, path: str) -> None:

# This is mostly used in tests and debugging, so we'll write out sparse files (no null columns)
common.write_rows_to_ndjson(path, batch.rows, sparse=True)

def table_metadata_path(self) -> str:
return self.dbroot.joinpath(f"{self.dbname}.meta") # no batch number

def read_table_metadata(self) -> dict:
try:
return common.read_json(self.table_metadata_path())
except (FileNotFoundError, PermissionError):
return {}

def write_table_metadata(self, metadata: dict) -> None:
self.root.makedirs(self.dbroot.path)
common.write_json(self.table_metadata_path(), metadata, indent=2)

def delete_records(self, ids: set[str]) -> None:
# Read and write back table metadata, with the addition of these new deleted IDs
meta = self.read_table_metadata()
meta.setdefault("deleted", []).extend(sorted(ids))
self.write_table_metadata(meta)
2 changes: 1 addition & 1 deletion cumulus_etl/loaders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Public API for loaders"""

from .base import Loader
from .base import Loader, LoaderResults
from .fhir.ndjson_loader import FhirNdjsonLoader
from .i2b2.loader import I2b2Loader
29 changes: 24 additions & 5 deletions cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
"""Base abstract loader"""

import abc
import dataclasses
import datetime

from cumulus_etl import common, store


@dataclasses.dataclass(kw_only=True)
class LoaderResults:
"""Bundles results of a load request"""

# Where loaded files reside on disk (use .path for convenience)
directory: common.Directory

@property
def path(self) -> str:
return self.directory.name

# Completion tracking values - noting an export group name for this bundle of data
# and the time when it was exported ("transactionTime" in bulk-export terms).
group_name: str | None = None
export_datetime: datetime.datetime | None = None

# A list of resource IDs that should be deleted from the output tables.
# This is a map of resource -> set of IDs like {"Patient": {"A", "B"}}
deleted_ids: dict[str, set[str]] = dataclasses.field(default_factory=dict)


class Loader(abc.ABC):
"""
An abstraction for how to load FHIR input
Expand All @@ -21,12 +44,8 @@ def __init__(self, root: store.Root):
"""
self.root = root

# Public properties (potentially set when loading) for reporting back to caller
self.group_name = None
self.export_datetime = None

@abc.abstractmethod
async def load_all(self, resources: list[str]) -> common.Directory:
async def load_all(self, resources: list[str]) -> LoaderResults:
"""
Loads the listed remote resources and places them into a local folder as FHIR ndjson
Expand Down
Loading

0 comments on commit 94bdf30

Please sign in to comment.