Skip to content

Commit

Permalink
convert: preserve any deleted IDs from ndjson through to delta lake
Browse files Browse the repository at this point in the history
  • Loading branch information
mikix committed Sep 5, 2024
1 parent c0e08dd commit 9a6ab95
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 25 deletions.
24 changes: 20 additions & 4 deletions cumulus_etl/etl/convert/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ def make_batch(
return formats.Batch(rows, groups=groups, schema=schema)


def convert_table_metadata(
meta_path: str,
formatter: formats.Format,
) -> None:
try:
meta = common.read_json(meta_path)
except (FileNotFoundError, PermissionError):
return

# Only one metadata field currently: deleted IDs
deleted = meta.get("deleted", [])
formatter.delete_records(set(deleted))


def convert_folder(
input_root: store.Root,
*,
Expand Down Expand Up @@ -66,6 +80,7 @@ def convert_folder(
formatter.write_records(batch)
progress.update(progress_task, advance=1)

convert_table_metadata(f"{table_input_dir}/{table_name}.meta", formatter)
formatter.finalize()
progress.update(progress_task, advance=1)

Expand Down Expand Up @@ -117,14 +132,15 @@ def convert_completion(

def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
job_config_path = input_root.joinpath("JobConfig")
job_config_path = input_root.joinpath("JobConfig/")

# Download input dir if it's not local
if input_root.protocol != "file":
input_root.get(job_config_path, tmpdir, recursive=True)
job_config_path = os.path.join(tmpdir, "JobConfig")
new_location = os.path.join(tmpdir, "JobConfig/")
input_root.get(job_config_path, new_location, recursive=True)
job_config_path = new_location

output_root.put(job_config_path, output_root.path, recursive=True)
output_root.put(job_config_path, output_root.joinpath("JobConfig/"), recursive=True)


def walk_tree(
Expand Down
8 changes: 3 additions & 5 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,9 @@ async def run(self) -> list[config.JobSummary]:
with self._indeterminate_progress(progress, "Finalizing"):
# Ensure that we touch every output table (to create them and/or to confirm schema).
# Consider case of Medication for an EHR that only has inline Medications inside
# MedicationRequest.
# The Medication table wouldn't get created otherwise. Plus this is a good place to
# push any schema changes.
# (The reason it's nice if the table & schema exist is so that downstream SQL can
# be dumber.)
# MedicationRequest. The Medication table wouldn't get created otherwise.
# Plus this is a good place to push any schema changes. (The reason it's nice if
# the table & schema exist is so that downstream SQL can be dumber.)
self._touch_remaining_tables()

# If the input data indicates we should delete some IDs, do that here.
Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/formats/batched_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ def delete_records(self, ids: set[str]) -> None:
Though this is a no-op for batched file outputs, since:
- we guarantee the output folder is empty at the start
- the spec says deleted IDs won't overlap with output IDs
But subclasses may still want to write these to disk to preserve the metadata.
"""
19 changes: 19 additions & 0 deletions cumulus_etl/formats/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ def write_format(self, batch: Batch, path: str) -> None:

# This is mostly used in tests and debugging, so we'll write out sparse files (no null columns)
common.write_rows_to_ndjson(path, batch.rows, sparse=True)

def table_metadata_path(self) -> str:
return self.dbroot.joinpath(f"{self.dbname}.meta") # no batch number

def read_table_metadata(self) -> dict:
try:
return common.read_json(self.table_metadata_path())
except (FileNotFoundError, PermissionError):
return {}

def write_table_metadata(self, metadata: dict) -> None:
self.root.makedirs(self.dbroot.path)
common.write_json(self.table_metadata_path(), metadata, indent=2)

def delete_records(self, ids: set[str]) -> None:
# Read and write back table metadata, with the addition of these new deleted IDs
meta = self.read_table_metadata()
meta.setdefault("deleted", []).extend(sorted(ids))
self.write_table_metadata(meta)
3 changes: 2 additions & 1 deletion cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ def read_deleted_ids(self, root: store.Root) -> dict[str, set[str]]:
request = entry.get("request", {})
if request.get("method") != "DELETE":
continue
url = request.get("url") # should be relative URL like "Patient/123"
url = request.get("url")
# Sanity check that we have a relative URL like "Patient/123"
if not url or url.count("/") != 1:
continue
resource, res_id = url.split("/")
Expand Down
88 changes: 73 additions & 15 deletions tests/convert/test_convert_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
import ddt

from cumulus_etl import cli, common, errors
from tests import utils
from tests import s3mock, utils


@ddt.ddt
class TestConvert(utils.AsyncTestCase):
"""Tests for high-level convert support."""
class ConvertTestsBase(utils.AsyncTestCase):
"""Base class for convert tests"""

def setUp(self):
super().setUp()
Expand All @@ -25,6 +24,21 @@ def setUp(self):
self.original_path = os.path.join(self.tmpdir, "original")
self.target_path = os.path.join(self.tmpdir, "target")

async def run_convert(
self, input_path: str | None = None, output_path: str | None = None
) -> None:
args = [
"convert",
input_path or self.original_path,
output_path or self.target_path,
]
await cli.main(args)


@ddt.ddt
class TestConvert(ConvertTestsBase):
"""Tests for high-level convert support."""

def prepare_original_dir(self) -> str:
"""Returns the job timestamp used, for easier inspection"""
# Fill in original dir, including a non-default output folder
Expand All @@ -47,16 +61,6 @@ def prepare_original_dir(self) -> str:

return job_timestamp

async def run_convert(
self, input_path: str | None = None, output_path: str | None = None
) -> None:
args = [
"convert",
input_path or self.original_path,
output_path or self.target_path,
]
await cli.main(args)

async def test_input_dir_must_exist(self):
"""Verify that the input dir must already exist"""
with self.assertRaises(SystemExit) as cm:
Expand Down Expand Up @@ -150,7 +154,10 @@ async def test_happy_path(self):
{"test": True},
common.read_json(f"{self.target_path}/JobConfig/{job_timestamp}/job_config.json"),
)
self.assertEqual({"delta": "yup"}, common.read_json(f"{delta_config_dir}/job_config.json"))
self.assertEqual(
{"delta": "yup"},
common.read_json(f"{self.target_path}/JobConfig/{delta_timestamp}/job_config.json"),
)
patients = utils.read_delta_lake(f"{self.target_path}/patient") # re-check the patients
self.assertEqual(3, len(patients))
# these rows are sorted by id, so these are reliable indexes
Expand Down Expand Up @@ -211,3 +218,54 @@ async def test_batch_metadata(self, mock_write):
)
# second (faked) covid batch
self.assertEqual({"nonexistent"}, mock_write.call_args_list[2][0][0].groups)

@mock.patch("cumulus_etl.formats.Format.write_records")
@mock.patch("cumulus_etl.formats.deltalake.DeltaLakeFormat.delete_records")
async def test_deleted_ids(self, mock_delete, mock_write):
"""Verify that we pass along deleted IDs in the table metadata"""
# Set up input path
shutil.copytree( # First, one table that has no metadata
f"{self.datadir}/simple/output/patient",
f"{self.original_path}/patient",
)
shutil.copytree( # Then, one that will
f"{self.datadir}/simple/output/condition",
f"{self.original_path}/condition",
)
common.write_json(f"{self.original_path}/condition/condition.meta", {"deleted": ["a", "b"]})
os.makedirs(f"{self.original_path}/JobConfig")

# Run conversion
await self.run_convert()

# Verify results
self.assertEqual(mock_write.call_count, 2)
self.assertEqual(mock_delete.call_count, 1)
self.assertEqual(mock_delete.call_args, mock.call({"a", "b"}))


class TestConvertOnS3(s3mock.S3Mixin, ConvertTestsBase):
@mock.patch("cumulus_etl.formats.Format.write_records")
async def test_convert_from_s3(self, mock_write):
"""Quick test that we can read from an arbitrary input dir using fsspec"""
# Set up input
common.write_json(
f"{self.bucket_url}/JobConfig/2024-08-09__16.32.51/job_config.json",
{"comment": "unittest"},
)
common.write_json(
f"{self.bucket_url}/condition/condition.000.ndjson",
{"id": "con1"},
)

# Run conversion
await self.run_convert(input_path=self.bucket_url)

# Verify results
self.assertEqual(mock_write.call_count, 1)
self.assertEqual([{"id": "con1"}], mock_write.call_args[0][0].rows)
print(os.listdir(f"{self.target_path}"))
self.assertEqual(
common.read_json(f"{self.target_path}/JobConfig/2024-08-09__16.32.51/job_config.json"),
{"comment": "unittest"},
)
17 changes: 17 additions & 0 deletions tests/formats/test_ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,20 @@ def test_disallows_existing_files(self, files: None | list[str], is_ok: bool):
else:
with self.assertRaises(SystemExit):
NdjsonFormat.initialize_class(self.root)

def test_writes_deleted_ids(self):
"""Verify that we write a table metadata file with deleted IDs"""
meta_path = f"{self.root.joinpath('condition')}/condition.meta"

# Test with a fresh directory
formatter = NdjsonFormat(self.root, "condition")
formatter.delete_records({"b", "a"})
metadata = common.read_json(meta_path)
self.assertEqual(metadata, {"deleted": ["a", "b"]})

# Confirm we append to existing metadata, should we ever need to
metadata["extra"] = "bonus metadata!"
common.write_json(meta_path, metadata)
formatter.delete_records({"c"})
metadata = common.read_json(meta_path)
self.assertEqual(metadata, {"deleted": ["a", "b", "c"], "extra": "bonus metadata!"})

0 comments on commit 9a6ab95

Please sign in to comment.