From 119419636006cbae00b781f7c6d9e340e3040b81 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Tue, 21 May 2024 14:27:39 -0400 Subject: [PATCH] deltalake: enable liquid clustering for new tables --- cumulus_etl/__init__.py | 2 +- cumulus_etl/formats/deltalake.py | 1 + pyproject.toml | 2 +- tests/etl/test_etl_cli.py | 12 ++++++++---- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cumulus_etl/__init__.py b/cumulus_etl/__init__.py index 05fb822..4845145 100644 --- a/cumulus_etl/__init__.py +++ b/cumulus_etl/__init__.py @@ -1,3 +1,3 @@ """Turns FHIR data into de-identified & aggregated records""" -__version__ = "1.3.0" +__version__ = "1.4.0" diff --git a/cumulus_etl/formats/deltalake.py b/cumulus_etl/formats/deltalake.py index 06aa286..6125cec 100644 --- a/cumulus_etl/formats/deltalake.py +++ b/cumulus_etl/formats/deltalake.py @@ -101,6 +101,7 @@ def update_delta_table( table = ( delta.DeltaTable.createIfNotExists(self.spark) .addColumns(updates.schema) + .clusterBy(*self.uniqueness_fields) .location(self._table_path(self.dbname)) .execute() ) diff --git a/pyproject.toml b/pyproject.toml index 4a303aa..9ddbb25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ requires-python = ">= 3.10" dependencies = [ "ctakesclient >= 5.1, < 6", "cumulus-fhir-support >= 1.2, < 2", - "delta-spark >= 3, < 4", + "delta-spark >= 3.2.1, < 4", "httpx < 1", "inscriptis < 3", "jwcrypto < 2", diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index c057927..9bd5bcd 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -410,20 +410,24 @@ async def test_etl_job_deltalake(self): "_delta_log/.00000000000000000000.json.crc", "_delta_log/00000000000000000001.json", # merge "_delta_log/.00000000000000000001.json.crc", - "_delta_log/00000000000000000002.json", # vacuum start + "_delta_log/00000000000000000002.json", # optimize "_delta_log/.00000000000000000002.json.crc", - "_delta_log/00000000000000000003.json", # vacuum end + "_delta_log/00000000000000000003.json", # vacuum start "_delta_log/.00000000000000000003.json.crc", + "_delta_log/00000000000000000004.json", # vacuum end + "_delta_log/.00000000000000000004.json.crc", "_symlink_format_manifest/manifest", "_symlink_format_manifest/.manifest.crc", }, metadata_files, ) - self.assertEqual(1, len(data_files)) + # Expect two data files - one will be original (now marked as deleted from optimize call) + # and the other will be the new optimized file. + self.assertEqual(2, len(data_files)) self.assertRegex(data_files.pop(), r"part-00000-.*-c000.snappy.parquet") - self.assertEqual(1, len(data_crc_files)) + self.assertEqual(2, len(data_crc_files)) self.assertRegex(data_crc_files.pop(), r".part-00000-.*-c000.snappy.parquet.crc")