From 6d1c1ceb880e52f54cf28d7b0b1d1a6f5cb2d55f Mon Sep 17 00:00:00 2001 From: Roushan Kumar Date: Fri, 7 Feb 2025 00:15:14 +0530 Subject: [PATCH] [590] Add Delta HMS Catalog Sync implementation --- pom.xml | 12 +- xtable-hive-metastore/pom.xml | 7 +- .../xtable/hms/HMSCatalogSyncClient.java | 3 +- .../hms/HMSCatalogTableBuilderFactory.java | 5 +- .../table/DeltaHMSCatalogTableBuilder.java | 95 +++++++++++++ .../hms/HMSCatalogSyncClientTestBase.java | 86 ++++++++++- .../TestHMSCatalogTableBuilderFactory.java | 5 +- .../TestDeltaHMSCatalogTableBuilder.java | 134 ++++++++++++++++++ .../TestIcebergHMSCatalogTableBuilder.java | 58 +++----- 9 files changed, 361 insertions(+), 44 deletions(-) create mode 100644 xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/DeltaHMSCatalogTableBuilder.java create mode 100644 xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java diff --git a/pom.xml b/pom.xml index 3d2b44d95..3184a4be2 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ 0.16.1 1.8 0.5.0 + 3.0.0 UTF-8 **/target/** ${project.build.directory}/delombok @@ -280,6 +281,11 @@ ${iceberg.version} test + + org.apache.iceberg + iceberg-hive-runtime + ${iceberg.version} + @@ -291,7 +297,11 @@ io.delta delta-standalone_${scala.binary.version} ${delta.standalone.version} - test + + + io.delta + delta-hive_${scala.binary.version} + ${delta.hive.version} diff --git a/xtable-hive-metastore/pom.xml b/xtable-hive-metastore/pom.xml index c292c1234..01037a1fc 100644 --- a/xtable-hive-metastore/pom.xml +++ b/xtable-hive-metastore/pom.xml @@ -46,7 +46,12 @@ org.apache.iceberg iceberg-hive-runtime - ${iceberg.version} + + + + + io.delta + delta-hive_${scala.binary.version} diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java index db86958d9..537de8341 100644 --- a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java @@ -194,8 +194,7 @@ private void _init( } catch (MetaException | HiveException e) { throw new CatalogSyncException("HiveMetastoreClient could not be created", e); } - this.tableBuilder = - HMSCatalogTableBuilderFactory.getTableBuilder(tableFormat, this.configuration); + this.tableBuilder = HMSCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration); } /** diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java index e6a4eb5ca..56c182798 100644 --- a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java @@ -32,6 +32,7 @@ import org.apache.xtable.catalog.CatalogTableBuilder; import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder; import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; @@ -39,11 +40,13 @@ public class HMSCatalogTableBuilderFactory { - public static CatalogTableBuilder getTableBuilder( + static CatalogTableBuilder getInstance( String tableFormat, Configuration configuration) { switch (tableFormat) { case TableFormat.ICEBERG: return new IcebergHMSCatalogTableBuilder(configuration); + case TableFormat.DELTA: + return new DeltaHMSCatalogTableBuilder(); default: throw new NotSupportedException("Unsupported table format: " + tableFormat); } diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/DeltaHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/DeltaHMSCatalogTableBuilder.java new file mode 100644 index 000000000..1f5ce4ccf --- /dev/null +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/DeltaHMSCatalogTableBuilder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hms.table; + +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.xtable.catalog.Constants.PROP_EXTERNAL; +import static org.apache.xtable.catalog.Constants.PROP_PATH; +import static org.apache.xtable.catalog.Constants.PROP_SERIALIZATION_FORMAT; +import static org.apache.xtable.hms.HMSCatalogTableBuilderFactory.newHmsTable; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; + +import com.google.common.annotations.VisibleForTesting; + +import io.delta.hive.DeltaStorageHandler; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.hms.HMSSchemaExtractor; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.storage.TableFormat; + +public class DeltaHMSCatalogTableBuilder implements CatalogTableBuilder { + + private final HMSSchemaExtractor schemaExtractor; + private static final String tableFormat = TableFormat.DELTA; + + public DeltaHMSCatalogTableBuilder() { + this.schemaExtractor = HMSSchemaExtractor.getInstance(); + } + + @Override + public Table getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier) { + return newHmsTable(tableIdentifier, getStorageDescriptor(table), getTableParameters()); + } + + @Override + public Table getUpdateTableRequest( + InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + Table copyTb = new Table(catalogTable); + copyTb.getSd().setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema())); + return copyTb; + } + + @VisibleForTesting + StorageDescriptor getStorageDescriptor(InternalTable table) { + final StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema())); + storageDescriptor.setLocation(table.getBasePath()); + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setParameters(getSerDeParameters(table)); + storageDescriptor.setSerdeInfo(serDeInfo); + return storageDescriptor; + } + + @VisibleForTesting + Map getTableParameters() { + Map parameters = new HashMap<>(); + parameters.put(PROP_EXTERNAL, "TRUE"); + parameters.put(TABLE_TYPE_PROP, tableFormat.toUpperCase(Locale.ENGLISH)); + parameters.put( + hive_metastoreConstants.META_TABLE_STORAGE, DeltaStorageHandler.class.getCanonicalName()); + return parameters; + } + + private Map getSerDeParameters(InternalTable table) { + Map parameters = new HashMap<>(); + parameters.put(PROP_SERIALIZATION_FORMAT, "1"); + parameters.put(PROP_PATH, table.getBasePath()); + return parameters; + } +} diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java index 052a5ded7..cd76c6be9 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java @@ -18,13 +18,16 @@ package org.apache.xtable.hms; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.mockito.Mock; @@ -32,7 +35,11 @@ import org.apache.xtable.conversion.ExternalCatalogConfig; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.model.storage.TableFormat; @@ -57,17 +64,80 @@ public class HMSCatalogSyncClientTestBase { protected static final String ICEBERG_METADATA_FILE_LOCATION = "base-path/metadata"; protected static final String ICEBERG_METADATA_FILE_LOCATION_V2 = "base-path/v2-metadata"; + protected static final InternalPartitionField PARTITION_FIELD = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("partitionField") + .schema( + InternalSchema.builder().name("string").dataType(InternalType.STRING).build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + protected static final InternalSchema INTERNAL_SCHEMA = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + getInternalField("intField", "int", InternalType.INT), + getInternalField("stringField", "string", InternalType.STRING), + getInternalField("partitionField", "string", InternalType.STRING))) + .build(); + protected static final InternalSchema UPDATED_INTERNAL_SCHEMA = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + getInternalField("intField", "int", InternalType.INT), + getInternalField("stringField", "string", InternalType.STRING), + getInternalField("partitionField", "string", InternalType.STRING), + getInternalField("booleanField", "boolean", InternalType.BOOLEAN))) + .build(); + protected static final List FIELD_SCHEMA = + Arrays.asList( + getFieldSchema("intField", "int"), + getFieldSchema("stringField", "string"), + getFieldSchema("partitionField", "string")); + protected static final List UPDATED_FIELD_SCHEMA = + Arrays.asList( + getFieldSchema("intField", "int"), + getFieldSchema("stringField", "string"), + getFieldSchema("partitionField", "string"), + getFieldSchema("booleanField", "boolean")); protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE = InternalTable.builder() .basePath(TEST_BASE_PATH) .tableFormat(TableFormat.ICEBERG) - .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) + .readSchema(INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) + .build(); + protected static final InternalTable TEST_UPDATED_ICEBERG_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.ICEBERG) + .readSchema(UPDATED_INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) + .build(); + protected static final InternalTable TEST_DELTA_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.DELTA) + .readSchema(INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) + .build(); + protected static final InternalTable TEST_UPDATED_DELTA_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.DELTA) + .readSchema(UPDATED_INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) .build(); protected static final InternalTable TEST_HUDI_INTERNAL_TABLE = InternalTable.builder() .basePath(TEST_BASE_PATH) .tableFormat(TableFormat.HUDI) - .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) + .readSchema(INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) .build(); protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER = new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE, TEST_HMS_TABLE); @@ -95,4 +165,16 @@ protected Database newDatabase(String dbName) { return new Database( dbName, "Created by " + HMSCatalogSyncClient.class.getName(), null, Collections.emptyMap()); } + + protected static FieldSchema getFieldSchema(String name, String type) { + return new FieldSchema(name, type, null); + } + + protected static InternalField getInternalField( + String fieldName, String schemaName, InternalType dataType) { + return InternalField.builder() + .name(fieldName) + .schema(InternalSchema.builder().name(schemaName).dataType(dataType).build()) + .build(); + } } diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java index 6748b6883..45d45a2da 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java @@ -18,6 +18,7 @@ package org.apache.xtable.hms; +import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.FIELD_SCHEMA; import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER; import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE; import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE; @@ -48,7 +49,7 @@ void testNewHmsTable() { expected.setTableName(TEST_HMS_TABLE); expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); expected.setCreateTime((int) createdTime.getEpochSecond()); - expected.setSd(getTestHmsTableStorageDescriptor()); + expected.setSd(getTestHmsTableStorageDescriptor(FIELD_SCHEMA)); expected.setTableType("EXTERNAL_TABLE"); expected.setParameters(getTestHmsTableParameters()); @@ -56,7 +57,7 @@ void testNewHmsTable() { expected, HMSCatalogTableBuilderFactory.newHmsTable( TEST_CATALOG_TABLE_IDENTIFIER, - getTestHmsTableStorageDescriptor(), + getTestHmsTableStorageDescriptor(FIELD_SCHEMA), getTestHmsTableParameters())); } } diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java new file mode 100644 index 000000000..1a7d0cb4f --- /dev/null +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hms.table; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mockStatic; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.SneakyThrows; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.hms.HMSCatalogSyncClientTestBase; +import org.apache.xtable.model.storage.TableFormat; + +@ExtendWith(MockitoExtension.class) +public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBase { + + private DeltaHMSCatalogTableBuilder mockDeltaHmsCatalogSyncRequestProvider; + + private DeltaHMSCatalogTableBuilder createDeltaHMSCatalogTableBuilder() { + return new DeltaHMSCatalogTableBuilder(); + } + + @SneakyThrows + @Test + void testGetCreateTableRequest() { + mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder(); + + Instant createdTime = Instant.now(); + try (MockedStatic mockZonedDateTime = mockStatic(Instant.class)) { + mockZonedDateTime.when(Instant::now).thenReturn(createdTime); + Table expected = new Table(); + expected.setDbName(TEST_HMS_DATABASE); + expected.setTableName(TEST_HMS_TABLE); + expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + expected.setCreateTime((int) createdTime.getEpochSecond()); + expected.setSd(getTestStorageDescriptor(FIELD_SCHEMA)); + expected.setTableType("EXTERNAL_TABLE"); + expected.setParameters(getTestParameters()); + + assertEquals( + expected, + mockDeltaHmsCatalogSyncRequestProvider.getCreateTableRequest( + TEST_DELTA_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + } + } + + @SneakyThrows + @Test + void testGetUpdateTableRequest() { + mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder(); + + Table hmsTable = + newTable( + TEST_HMS_DATABASE, + TEST_HMS_TABLE, + getTestParameters(), + getTestStorageDescriptor(FIELD_SCHEMA)); + Table expected = new Table(hmsTable); + expected.getSd().setCols(UPDATED_FIELD_SCHEMA); + + Table output = + mockDeltaHmsCatalogSyncRequestProvider.getUpdateTableRequest( + TEST_UPDATED_DELTA_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER); + assertEquals(expected, output); + } + + @Test + void testGetStorageDescriptor() { + mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder(); + StorageDescriptor expected = getTestStorageDescriptor(FIELD_SCHEMA); + assertEquals( + expected, + mockDeltaHmsCatalogSyncRequestProvider.getStorageDescriptor(TEST_DELTA_INTERNAL_TABLE)); + } + + @Test + void testGetTableParameters() { + mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder(); + Map expected = getTestParameters(); + assertEquals(expected, mockDeltaHmsCatalogSyncRequestProvider.getTableParameters()); + } + + private StorageDescriptor getTestStorageDescriptor(List columns) { + Map serDeParams = new HashMap<>(); + serDeParams.put("serialization.format", "1"); + serDeParams.put("path", TEST_BASE_PATH); + + StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(columns); + storageDescriptor.setLocation(TEST_BASE_PATH); + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setParameters(serDeParams); + storageDescriptor.setSerdeInfo(serDeInfo); + return storageDescriptor; + } + + private Map getTestParameters() { + Map parameters = new HashMap<>(); + parameters.put("EXTERNAL", "TRUE"); + parameters.put("table_type", TableFormat.DELTA); + parameters.put("storage_handler", "io.delta.hive.DeltaStorageHandler"); + return parameters; + } +} diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java index b0c09a947..14d39c449 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java @@ -31,10 +31,12 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.SneakyThrows; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -51,7 +53,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.xtable.hms.HMSCatalogSyncClientTestBase; -import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.hms.HMSSchemaExtractor; @ExtendWith(MockitoExtension.class) public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBase { @@ -63,12 +65,9 @@ public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncClientTestB private IcebergHMSCatalogTableBuilder mockIcebergHmsCatalogSyncRequestProvider; - private IcebergHMSCatalogTableBuilder createIcebergHMSHelper() { - return new IcebergHMSCatalogTableBuilder(mockHmsSchemaExtractor, mockIcebergHadoopTables); - } - - void setupCommonMocks() { - mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + private IcebergHMSCatalogTableBuilder createIcebergHMSCatalogTableBuilder() { + return new IcebergHMSCatalogTableBuilder( + HMSSchemaExtractor.getInstance(), mockIcebergHadoopTables); } void mockHadoopTables() { @@ -86,11 +85,9 @@ void mockMetadataFileLocation() { @SneakyThrows @Test void testGetCreateTableRequest() { - mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSCatalogTableBuilder(); mockHadoopTables(); - when(mockHmsSchemaExtractor.toColumns( - TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) - .thenReturn(Collections.emptyList()); + Instant createdTime = Instant.now(); try (MockedStatic mockZonedDateTime = mockStatic(Instant.class)) { mockZonedDateTime.when(Instant::now).thenReturn(createdTime); @@ -99,7 +96,7 @@ void testGetCreateTableRequest() { expected.setTableName(TEST_HMS_TABLE); expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); expected.setCreateTime((int) createdTime.getEpochSecond()); - expected.setSd(getTestHmsTableStorageDescriptor()); + expected.setSd(getTestHmsTableStorageDescriptor(FIELD_SCHEMA)); expected.setTableType("EXTERNAL_TABLE"); expected.setParameters(getTestHmsTableParameters()); @@ -107,8 +104,6 @@ void testGetCreateTableRequest() { expected, mockIcebergHmsCatalogSyncRequestProvider.getCreateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); - verify(mockHmsSchemaExtractor, times(1)) - .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); verify(mockIcebergBaseTable, times(1)).properties(); verify(mockIcebergHadoopTables, times(1)).load(TEST_BASE_PATH); } @@ -117,52 +112,45 @@ void testGetCreateTableRequest() { @SneakyThrows @Test void testGetUpdateTableRequest() { - setupCommonMocks(); + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSCatalogTableBuilder(); mockHadoopTables(); - when(mockHmsSchemaExtractor.toColumns( - TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) - .thenReturn(Collections.emptyList()); Map tableParams = new HashMap<>(); tableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION); Table hmsTable = newTable( - TEST_HMS_DATABASE, TEST_HMS_TABLE, tableParams, getTestHmsTableStorageDescriptor()); + TEST_HMS_DATABASE, + TEST_HMS_TABLE, + tableParams, + getTestHmsTableStorageDescriptor(FIELD_SCHEMA)); when(mockIcebergTableMetadata.metadataFileLocation()) .thenReturn(ICEBERG_METADATA_FILE_LOCATION_V2); when(mockIcebergBaseTable.properties()).thenReturn(Collections.emptyMap()); Table output = mockIcebergHmsCatalogSyncRequestProvider.getUpdateTableRequest( - TEST_ICEBERG_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER); + TEST_UPDATED_ICEBERG_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER); tableParams.put(PREVIOUS_METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION); tableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_V2); - Table expected = - newTable( - TEST_HMS_DATABASE, TEST_HMS_TABLE, tableParams, getTestHmsTableStorageDescriptor()); + Table expected = new Table(hmsTable); + expected.getSd().setCols(UPDATED_FIELD_SCHEMA); + assertEquals(expected, output); assertEquals(tableParams, hmsTable.getParameters()); - verify(mockHmsSchemaExtractor, times(1)) - .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); } @Test void testGetStorageDescriptor() { - mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); - when(mockHmsSchemaExtractor.toColumns( - TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) - .thenReturn(Collections.emptyList()); - StorageDescriptor expected = getTestHmsTableStorageDescriptor(); + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSCatalogTableBuilder(); + StorageDescriptor expected = getTestHmsTableStorageDescriptor(FIELD_SCHEMA); assertEquals( expected, mockIcebergHmsCatalogSyncRequestProvider.getStorageDescriptor(TEST_ICEBERG_INTERNAL_TABLE)); - verify(mockHmsSchemaExtractor, times(1)) - .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); } @Test void testGetTableParameters() { - mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSCatalogTableBuilder(); mockMetadataFileLocation(); when(mockIcebergBaseTable.properties()).thenReturn(Collections.emptyMap()); Map expected = getTestHmsTableParameters(); @@ -173,10 +161,10 @@ void testGetTableParameters() { verify(mockIcebergHadoopTables, never()).load(any()); } - public static StorageDescriptor getTestHmsTableStorageDescriptor() { + public static StorageDescriptor getTestHmsTableStorageDescriptor(List columns) { StorageDescriptor storageDescriptor = new StorageDescriptor(); SerDeInfo serDeInfo = new SerDeInfo(); - storageDescriptor.setCols(Collections.emptyList()); + storageDescriptor.setCols(columns); storageDescriptor.setLocation(TEST_BASE_PATH); storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat"); storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");