diff --git a/cumulus_etl/etl/convert/cli.py b/cumulus_etl/etl/convert/cli.py index ffc2957..6ee7dd5 100644 --- a/cumulus_etl/etl/convert/cli.py +++ b/cumulus_etl/etl/convert/cli.py @@ -36,6 +36,20 @@ def make_batch( return formats.Batch(rows, groups=groups, schema=schema) +def convert_table_metadata( + meta_path: str, + formatter: formats.Format, +) -> None: + try: + meta = common.read_json(meta_path) + except (FileNotFoundError, PermissionError): + return + + # Only one metadata field currently: deleted IDs + deleted = meta.get("deleted", []) + formatter.delete_records(set(deleted)) + + def convert_folder( input_root: store.Root, *, @@ -66,6 +80,7 @@ def convert_folder( formatter.write_records(batch) progress.update(progress_task, advance=1) + convert_table_metadata(f"{table_input_dir}/{table_name}.meta", formatter) formatter.finalize() progress.update(progress_task, advance=1) @@ -117,14 +132,15 @@ def convert_completion( def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None: with tempfile.TemporaryDirectory() as tmpdir: - job_config_path = input_root.joinpath("JobConfig") + job_config_path = input_root.joinpath("JobConfig/") # Download input dir if it's not local if input_root.protocol != "file": - input_root.get(job_config_path, tmpdir, recursive=True) - job_config_path = os.path.join(tmpdir, "JobConfig") + new_location = os.path.join(tmpdir, "JobConfig/") + input_root.get(job_config_path, new_location, recursive=True) + job_config_path = new_location - output_root.put(job_config_path, output_root.path, recursive=True) + output_root.put(job_config_path, output_root.joinpath("JobConfig/"), recursive=True) def walk_tree( diff --git a/cumulus_etl/etl/tasks/base.py b/cumulus_etl/etl/tasks/base.py index 00b2d40..4aa9e99 100644 --- a/cumulus_etl/etl/tasks/base.py +++ b/cumulus_etl/etl/tasks/base.py @@ -144,11 +144,9 @@ async def run(self) -> list[config.JobSummary]: with self._indeterminate_progress(progress, "Finalizing"): # Ensure that we touch every output table (to create them and/or to confirm schema). # Consider case of Medication for an EHR that only has inline Medications inside - # MedicationRequest. - # The Medication table wouldn't get created otherwise. Plus this is a good place to - # push any schema changes. - # (The reason it's nice if the table & schema exist is so that downstream SQL can - # be dumber.) + # MedicationRequest. The Medication table wouldn't get created otherwise. + # Plus this is a good place to push any schema changes. (The reason it's nice if + # the table & schema exist is so that downstream SQL can be dumber.) self._touch_remaining_tables() # If the input data indicates we should delete some IDs, do that here. diff --git a/cumulus_etl/formats/batched_files.py b/cumulus_etl/formats/batched_files.py index 25e4588..86e9f0c 100644 --- a/cumulus_etl/formats/batched_files.py +++ b/cumulus_etl/formats/batched_files.py @@ -91,4 +91,6 @@ def delete_records(self, ids: set[str]) -> None: Though this is a no-op for batched file outputs, since: - we guarantee the output folder is empty at the start - the spec says deleted IDs won't overlap with output IDs + + But subclasses may still want to write these to disk to preserve the metadata. """ diff --git a/cumulus_etl/formats/ndjson.py b/cumulus_etl/formats/ndjson.py index 22358eb..a7ea5fc 100644 --- a/cumulus_etl/formats/ndjson.py +++ b/cumulus_etl/formats/ndjson.py @@ -35,3 +35,22 @@ def write_format(self, batch: Batch, path: str) -> None: # This is mostly used in tests and debugging, so we'll write out sparse files (no null columns) common.write_rows_to_ndjson(path, batch.rows, sparse=True) + + def table_metadata_path(self) -> str: + return self.dbroot.joinpath(f"{self.dbname}.meta") # no batch number + + def read_table_metadata(self) -> dict: + try: + return common.read_json(self.table_metadata_path()) + except (FileNotFoundError, PermissionError): + return {} + + def write_table_metadata(self, metadata: dict) -> None: + self.root.makedirs(self.dbroot.path) + common.write_json(self.table_metadata_path(), metadata, indent=2) + + def delete_records(self, ids: set[str]) -> None: + # Read and write back table metadata, with the addition of these new deleted IDs + meta = self.read_table_metadata() + meta.setdefault("deleted", []).extend(sorted(ids)) + self.write_table_metadata(meta) diff --git a/cumulus_etl/loaders/fhir/ndjson_loader.py b/cumulus_etl/loaders/fhir/ndjson_loader.py index b4ef2bb..d304b44 100644 --- a/cumulus_etl/loaders/fhir/ndjson_loader.py +++ b/cumulus_etl/loaders/fhir/ndjson_loader.py @@ -132,7 +132,8 @@ def read_deleted_ids(self, root: store.Root) -> dict[str, set[str]]: request = entry.get("request", {}) if request.get("method") != "DELETE": continue - url = request.get("url") # should be relative URL like "Patient/123" + url = request.get("url") + # Sanity check that we have a relative URL like "Patient/123" if not url or url.count("/") != 1: continue resource, res_id = url.split("/") diff --git a/tests/convert/test_convert_cli.py b/tests/convert/test_convert_cli.py index c366267..e52b181 100644 --- a/tests/convert/test_convert_cli.py +++ b/tests/convert/test_convert_cli.py @@ -8,12 +8,11 @@ import ddt from cumulus_etl import cli, common, errors -from tests import utils +from tests import s3mock, utils -@ddt.ddt -class TestConvert(utils.AsyncTestCase): - """Tests for high-level convert support.""" +class ConvertTestsBase(utils.AsyncTestCase): + """Base class for convert tests""" def setUp(self): super().setUp() @@ -25,6 +24,21 @@ def setUp(self): self.original_path = os.path.join(self.tmpdir, "original") self.target_path = os.path.join(self.tmpdir, "target") + async def run_convert( + self, input_path: str | None = None, output_path: str | None = None + ) -> None: + args = [ + "convert", + input_path or self.original_path, + output_path or self.target_path, + ] + await cli.main(args) + + +@ddt.ddt +class TestConvert(ConvertTestsBase): + """Tests for high-level convert support.""" + def prepare_original_dir(self) -> str: """Returns the job timestamp used, for easier inspection""" # Fill in original dir, including a non-default output folder @@ -47,16 +61,6 @@ def prepare_original_dir(self) -> str: return job_timestamp - async def run_convert( - self, input_path: str | None = None, output_path: str | None = None - ) -> None: - args = [ - "convert", - input_path or self.original_path, - output_path or self.target_path, - ] - await cli.main(args) - async def test_input_dir_must_exist(self): """Verify that the input dir must already exist""" with self.assertRaises(SystemExit) as cm: @@ -150,7 +154,10 @@ async def test_happy_path(self): {"test": True}, common.read_json(f"{self.target_path}/JobConfig/{job_timestamp}/job_config.json"), ) - self.assertEqual({"delta": "yup"}, common.read_json(f"{delta_config_dir}/job_config.json")) + self.assertEqual( + {"delta": "yup"}, + common.read_json(f"{self.target_path}/JobConfig/{delta_timestamp}/job_config.json"), + ) patients = utils.read_delta_lake(f"{self.target_path}/patient") # re-check the patients self.assertEqual(3, len(patients)) # these rows are sorted by id, so these are reliable indexes @@ -211,3 +218,54 @@ async def test_batch_metadata(self, mock_write): ) # second (faked) covid batch self.assertEqual({"nonexistent"}, mock_write.call_args_list[2][0][0].groups) + + @mock.patch("cumulus_etl.formats.Format.write_records") + @mock.patch("cumulus_etl.formats.deltalake.DeltaLakeFormat.delete_records") + async def test_deleted_ids(self, mock_delete, mock_write): + """Verify that we pass along deleted IDs in the table metadata""" + # Set up input path + shutil.copytree( # First, one table that has no metadata + f"{self.datadir}/simple/output/patient", + f"{self.original_path}/patient", + ) + shutil.copytree( # Then, one that will + f"{self.datadir}/simple/output/condition", + f"{self.original_path}/condition", + ) + common.write_json(f"{self.original_path}/condition/condition.meta", {"deleted": ["a", "b"]}) + os.makedirs(f"{self.original_path}/JobConfig") + + # Run conversion + await self.run_convert() + + # Verify results + self.assertEqual(mock_write.call_count, 2) + self.assertEqual(mock_delete.call_count, 1) + self.assertEqual(mock_delete.call_args, mock.call({"a", "b"})) + + +class TestConvertOnS3(s3mock.S3Mixin, ConvertTestsBase): + @mock.patch("cumulus_etl.formats.Format.write_records") + async def test_convert_from_s3(self, mock_write): + """Quick test that we can read from an arbitrary input dir using fsspec""" + # Set up input + common.write_json( + f"{self.bucket_url}/JobConfig/2024-08-09__16.32.51/job_config.json", + {"comment": "unittest"}, + ) + common.write_json( + f"{self.bucket_url}/condition/condition.000.ndjson", + {"id": "con1"}, + ) + + # Run conversion + await self.run_convert(input_path=self.bucket_url) + + # Verify results + self.assertEqual(mock_write.call_count, 1) + self.assertEqual([{"id": "con1"}], mock_write.call_args[0][0].rows) + print(os.listdir(f"{self.target_path}")) + self.assertEqual( + common.read_json(f"{self.target_path}/JobConfig/2024-08-09__16.32.51/job_config.json"), + {"comment": "unittest"}, + ) diff --git a/tests/formats/test_ndjson.py b/tests/formats/test_ndjson.py index 2c5e412..ebd65f1 100644 --- a/tests/formats/test_ndjson.py +++ b/tests/formats/test_ndjson.py @@ -56,3 +56,20 @@ def test_disallows_existing_files(self, files: None | list[str], is_ok: bool): else: with self.assertRaises(SystemExit): NdjsonFormat.initialize_class(self.root) + + def test_writes_deleted_ids(self): + """Verify that we write a table metadata file with deleted IDs""" + meta_path = f"{self.root.joinpath('condition')}/condition.meta" + + # Test with a fresh directory + formatter = NdjsonFormat(self.root, "condition") + formatter.delete_records({"b", "a"}) + metadata = common.read_json(meta_path) + self.assertEqual(metadata, {"deleted": ["a", "b"]}) + + # Confirm we append to existing metadata, should we ever need to + metadata["extra"] = "bonus metadata!" + common.write_json(meta_path, metadata) + formatter.delete_records({"c"}) + metadata = common.read_json(meta_path) + self.assertEqual(metadata, {"deleted": ["a", "b", "c"], "extra": "bonus metadata!"})