Skip to content

Commit

Permalink
Address comments and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Feb 6, 2025
1 parent d0a8f35 commit 98cc216
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 82 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- Delta -->
<dependency>
Expand All @@ -293,6 +298,11 @@
<artifactId>delta-standalone_${scala.binary.version}</artifactId>
<version>${delta.standalone.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-hive_${scala.binary.version}</artifactId>
<version>${delta.hive.version}</version>
</dependency>

<!-- Spark -->
<dependency>
Expand Down
1 change: 1 addition & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!-- Hadoop dependencies -->
Expand Down
2 changes: 0 additions & 2 deletions xtable-hive-metastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- Delta dependencies -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-hive_${scala.binary.version}</artifactId>
<version>${delta.hive.version}</version>
</dependency>

<!-- HMS dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ public DeltaHMSCatalogTableBuilder() {
this.schemaExtractor = HMSSchemaExtractor.getInstance();
}

@VisibleForTesting
DeltaHMSCatalogTableBuilder(HMSSchemaExtractor schemaExtractor) {
this.schemaExtractor = schemaExtractor;
}

@Override
public Table getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier) {
return newHmsTable(tableIdentifier, getStorageDescriptor(table), getTableParameters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,28 @@

package org.apache.xtable.hms;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.mockito.Mock;

import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.model.storage.TableFormat;

Expand All @@ -57,23 +64,80 @@ public class HMSCatalogSyncClientTestBase {

protected static final String ICEBERG_METADATA_FILE_LOCATION = "base-path/metadata";
protected static final String ICEBERG_METADATA_FILE_LOCATION_V2 = "base-path/v2-metadata";
protected static final InternalPartitionField PARTITION_FIELD =
InternalPartitionField.builder()
.sourceField(
InternalField.builder()
.name("partitionField")
.schema(
InternalSchema.builder().name("string").dataType(InternalType.STRING).build())
.build())
.transformType(PartitionTransformType.VALUE)
.build();
protected static final InternalSchema INTERNAL_SCHEMA =
InternalSchema.builder()
.dataType(InternalType.RECORD)
.fields(
Arrays.asList(
getInternalField("intField", "int", InternalType.INT),
getInternalField("stringField", "string", InternalType.STRING),
getInternalField("partitionField", "string", InternalType.STRING)))
.build();
protected static final InternalSchema UPDATED_INTERNAL_SCHEMA =
InternalSchema.builder()
.dataType(InternalType.RECORD)
.fields(
Arrays.asList(
getInternalField("intField", "int", InternalType.INT),
getInternalField("stringField", "string", InternalType.STRING),
getInternalField("partitionField", "string", InternalType.STRING),
getInternalField("booleanField", "boolean", InternalType.BOOLEAN)))
.build();
protected static final List<FieldSchema> FIELD_SCHEMA =
Arrays.asList(
getFieldSchema("intField", "int"),
getFieldSchema("stringField", "string"),
getFieldSchema("partitionField", "string"));
protected static final List<FieldSchema> UPDATED_FIELD_SCHEMA =
Arrays.asList(
getFieldSchema("intField", "int"),
getFieldSchema("stringField", "string"),
getFieldSchema("partitionField", "string"),
getFieldSchema("booleanField", "boolean"));
protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.ICEBERG)
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
.readSchema(INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final InternalTable TEST_UPDATED_ICEBERG_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.ICEBERG)
.readSchema(UPDATED_INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final InternalTable TEST_DELTA_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.DELTA)
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
.readSchema(INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final InternalTable TEST_UPDATED_DELTA_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.DELTA)
.readSchema(UPDATED_INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final InternalTable TEST_HUDI_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.HUDI)
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
.readSchema(INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER =
new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE, TEST_HMS_TABLE);
Expand Down Expand Up @@ -101,4 +165,16 @@ protected Database newDatabase(String dbName) {
return new Database(
dbName, "Created by " + HMSCatalogSyncClient.class.getName(), null, Collections.emptyMap());
}

protected static FieldSchema getFieldSchema(String name, String type) {
return new FieldSchema(name, type, null);
}

protected static InternalField getInternalField(
String fieldName, String schemaName, InternalType dataType) {
return InternalField.builder()
.name(fieldName)
.schema(InternalSchema.builder().name(schemaName).dataType(dataType).build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.xtable.hms;

import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.FIELD_SCHEMA;
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE;
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE;
Expand Down Expand Up @@ -48,15 +49,15 @@ void testNewHmsTable() {
expected.setTableName(TEST_HMS_TABLE);
expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
expected.setCreateTime((int) createdTime.getEpochSecond());
expected.setSd(getTestHmsTableStorageDescriptor());
expected.setSd(getTestHmsTableStorageDescriptor(FIELD_SCHEMA));
expected.setTableType("EXTERNAL_TABLE");
expected.setParameters(getTestHmsTableParameters());

assertEquals(
expected,
HMSCatalogTableBuilderFactory.newHmsTable(
TEST_CATALOG_TABLE_IDENTIFIER,
getTestHmsTableStorageDescriptor(),
getTestHmsTableStorageDescriptor(FIELD_SCHEMA),
getTestHmsTableParameters()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.SneakyThrows;
Expand All @@ -50,20 +47,13 @@ public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBas
private DeltaHMSCatalogTableBuilder mockDeltaHmsCatalogSyncRequestProvider;

private DeltaHMSCatalogTableBuilder createDeltaHMSCatalogTableBuilder() {
return new DeltaHMSCatalogTableBuilder(mockHmsSchemaExtractor);
}

void setupCommonMocks() {
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
return new DeltaHMSCatalogTableBuilder();
}

@SneakyThrows
@Test
void testGetCreateTableRequest() {
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
when(mockHmsSchemaExtractor.toColumns(
TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema()))
.thenReturn(Collections.emptyList());

Instant createdTime = Instant.now();
try (MockedStatic<Instant> mockZonedDateTime = mockStatic(Instant.class)) {
Expand All @@ -73,55 +63,44 @@ void testGetCreateTableRequest() {
expected.setTableName(TEST_HMS_TABLE);
expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
expected.setCreateTime((int) createdTime.getEpochSecond());
expected.setSd(getTestStorageDescriptor());
expected.setSd(getTestStorageDescriptor(FIELD_SCHEMA));
expected.setTableType("EXTERNAL_TABLE");
expected.setParameters(getTestParameters());

assertEquals(
expected,
mockDeltaHmsCatalogSyncRequestProvider.getCreateTableRequest(
TEST_DELTA_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER));
verify(mockHmsSchemaExtractor, times(1))
.toColumns(TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema());
}
}

@SneakyThrows
@Test
void testGetUpdateTableRequest() {
setupCommonMocks();
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();

Table hmsTable =
newTable(
TEST_HMS_DATABASE, TEST_HMS_TABLE, getTestParameters(), getTestStorageDescriptor());
FieldSchema newColumn = new FieldSchema("new_column", "test", null);
when(mockHmsSchemaExtractor.toColumns(
TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema()))
.thenReturn(Collections.singletonList(newColumn));
TEST_HMS_DATABASE,
TEST_HMS_TABLE,
getTestParameters(),
getTestStorageDescriptor(FIELD_SCHEMA));
Table expected = new Table(hmsTable);
expected.getSd().setCols(UPDATED_FIELD_SCHEMA);

Table output =
mockDeltaHmsCatalogSyncRequestProvider.getUpdateTableRequest(
TEST_DELTA_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER);
Table expected = new Table(hmsTable);
expected.getSd().setCols(Collections.singletonList(newColumn));

TEST_UPDATED_DELTA_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER);
assertEquals(expected, output);
verify(mockHmsSchemaExtractor, times(1))
.toColumns(TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema());
}

@Test
void testGetStorageDescriptor() {
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
when(mockHmsSchemaExtractor.toColumns(
TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema()))
.thenReturn(Collections.emptyList());
StorageDescriptor expected = getTestStorageDescriptor();
StorageDescriptor expected = getTestStorageDescriptor(FIELD_SCHEMA);
assertEquals(
expected,
mockDeltaHmsCatalogSyncRequestProvider.getStorageDescriptor(TEST_DELTA_INTERNAL_TABLE));
verify(mockHmsSchemaExtractor, times(1))
.toColumns(TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema());
}

@Test
Expand All @@ -131,13 +110,13 @@ void testGetTableParameters() {
assertEquals(expected, mockDeltaHmsCatalogSyncRequestProvider.getTableParameters());
}

private StorageDescriptor getTestStorageDescriptor() {
private StorageDescriptor getTestStorageDescriptor(List<FieldSchema> columns) {
Map<String, String> serDeParams = new HashMap<>();
serDeParams.put("serialization.format", "1");
serDeParams.put("path", TEST_BASE_PATH);

StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(Collections.emptyList());
storageDescriptor.setCols(columns);
storageDescriptor.setLocation(TEST_BASE_PATH);
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setParameters(serDeParams);
Expand Down
Loading

0 comments on commit 98cc216

Please sign in to comment.