From df27ed91f07d8d7772ac053fe7192d86062e63a5 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 24 Jan 2025 00:44:28 +0800 Subject: [PATCH 01/10] [#6361] feat(paimon):Support specifying primary keys during create paimon table by flink --- .../paimon/GravitinoPaimonTable.java | 12 +----- .../flink/connector/catalog/BaseCatalog.java | 39 ++++++++++++++++++- .../integration/test/FlinkCommonIT.java | 39 +++++++++++++++++++ server/build.gradle.kts | 1 + 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java index 2853abbbe30..4b5be312b8e 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java @@ -140,18 +140,8 @@ private List getPrimaryKeysFromIndexes(Index[] indexes) { Preconditions.checkArgument( indexes.length == 1, "Paimon only supports no more than one Index."); - Index primaryKeyIndex = indexes[0]; - Arrays.stream(primaryKeyIndex.fieldNames()) - .forEach( - filedName -> - Preconditions.checkArgument( - filedName != null && filedName.length == 1, - "The primary key columns should not be nested.")); - - return Arrays.stream(primaryKeyIndex.fieldNames()) - .map(fieldName -> fieldName[0]) - .collect(Collectors.toList()); + return Arrays.stream(primaryKeyIndex.fieldNames()).map(e -> e[0]).collect(Collectors.toList()); } private static Index[] constructIndexesFromPrimaryKeys(Table table) { diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index fd8e118ee49..2b4ec0732e6 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.compress.utils.Lists; @@ -40,6 +41,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -75,7 +77,12 @@ import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; +import org.jetbrains.annotations.NotNull; /** * The BaseCatalog that provides a default implementation for all methods in the {@link @@ -276,8 +283,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig propertiesConverter.toGravitinoTableProperties(table.getOptions()); Transform[] partitions = partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys()); + try { - catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions); + + Index[] indices = getIndices(resolvedTable); + catalog() + .asTableCatalog() + .createTable( + identifier, + columns, + comment, + properties, + partitions, + Distributions.NONE, + new SortOrder[0], + indices); } catch (NoSuchSchemaException e) { throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e); } catch (TableAlreadyExistsException e) { @@ -289,6 +309,18 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private static Index @NotNull [] getIndices(ResolvedCatalogBaseTable resolvedTable) { + Optional primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey(); + List primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null); + if (primaryColumns == null) { + return new Index[0]; + } + String[][] primaryFiled = + primaryColumns.stream().map(e -> new String[] {e}).toArray(String[][]::new); + Index primary = Indexes.primary("primary", primaryFiled); + return new Index[] {primary}; + } + /** * The method only is used to change the comments. To alter columns, use the other alterTable API * and provide a list of TableChanges. @@ -521,6 +553,11 @@ protected CatalogBaseTable toFlinkTable(Table table) { .column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull()) .withComment(column.comment()); } + Index[] indices = table.index(); + if (indices != null && indices.length == 1) { + builder.primaryKey( + Arrays.stream(indices[0].fieldNames()).map(arr -> arr[0]).collect(Collectors.toList())); + } Map flinkTableProperties = propertiesConverter.toFlinkTableProperties(table.properties()); List partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning()); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index b45e5f46ec2..75d6188db74 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -280,6 +280,45 @@ public void testCreateSimpleTable() { supportDropCascade()); } + @Test + @EnabledIf("supportTableOperation") + public void testCreateTableWithPrimaryKey() { + String databaseName = "test_create_no_partition_table_db"; + String tableName = "test_create_no_partition_table"; + String comment = "test comment"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + sql( + "CREATE TABLE %s " + + "(aa int, " + + " bb int," + + " cc int," + + " PRIMARY KEY (aa,bb) NOT ENFORCED" + + ")" + + " COMMENT '%s' WITH (" + + "'%s' = '%s')", + tableName, comment, key, value); + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertEquals(1, table.index().length); + sql("INSERT INTO %s VALUES(1,2,3)", tableName); + sql("INSERT INTO %s VALUES(1,2,4)", tableName); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1)); + }, + true, + supportDropCascade()); + } + @Test @EnabledIf("supportTableOperation") public void testListTables() { diff --git a/server/build.gradle.kts b/server/build.gradle.kts index 4fe6ae27073..ada87841b6e 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -28,6 +28,7 @@ dependencies { implementation(project(":common")) implementation(project(":core")) implementation(project(":server-common")) + implementation(libs.mysql.driver) implementation(libs.bundles.jetty) implementation(libs.bundles.jersey) implementation(libs.bundles.log4j) From b560a85b54d7b86933a648e822fea4fc581590bf Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 24 Jan 2025 10:44:33 +0800 Subject: [PATCH 02/10] [#6361] feat(paimon):Support specifying primary keys during create paimon table by flink --- .../paimon/GravitinoPaimonTable.java | 1 - .../integration/test/FlinkCommonIT.java | 25 +++++++++++++++++++ .../test/paimon/FlinkPaimonCatalogIT.java | 2 -- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java index 4b5be312b8e..f12d9b44b20 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java @@ -137,7 +137,6 @@ private List getPrimaryKeysFromIndexes(Index[] indexes) { if (indexes == null || indexes.length == 0) { return Collections.emptyList(); } - Preconditions.checkArgument( indexes.length == 1, "Paimon only supports no more than one Index."); Index primaryKeyIndex = indexes[0]; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 75d6188db74..2b9468879e6 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -51,6 +51,7 @@ import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.indexes.Index; import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -306,6 +307,9 @@ public void testCreateTableWithPrimaryKey() { Table table = catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); Assertions.assertEquals(1, table.index().length); + Index index = table.index()[0]; + Assertions.assertEquals("aa", index.fieldNames()[0][0]); + Assertions.assertEquals("bb", index.fieldNames()[1][0]); sql("INSERT INTO %s VALUES(1,2,3)", tableName); sql("INSERT INTO %s VALUES(1,2,4)", tableName); TestUtils.assertTableResult( @@ -314,6 +318,27 @@ public void testCreateTableWithPrimaryKey() { sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1)); + sql("INSERT INTO %s VALUES(1,3,4)", tableName); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(2)); + sql("INSERT INTO %s VALUES(2,2,4)", tableName); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4), + Row.of(2, 2, 4)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(3)); }, true, supportDropCascade()); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index a03b4a198e1..50478f1e711 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -28,11 +28,9 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -@Tag("gravitino-docker-test") public class FlinkPaimonCatalogIT extends FlinkCommonIT { @TempDir private static Path warehouseDir; From 18afe702d0cc164d6469b0d2b63dfd5b7df25389 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 24 Jan 2025 10:47:15 +0800 Subject: [PATCH 03/10] [#6361] feat(paimon):Support specifying primary keys during create paimon table by flink --- .../apache/gravitino/flink/connector/catalog/BaseCatalog.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 2b4ec0732e6..b95bc0c4a54 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -82,7 +82,6 @@ import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.indexes.Index; import org.apache.gravitino.rel.indexes.Indexes; -import org.jetbrains.annotations.NotNull; /** * The BaseCatalog that provides a default implementation for all methods in the {@link @@ -309,7 +308,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } - private static Index @NotNull [] getIndices(ResolvedCatalogBaseTable resolvedTable) { + private static Index[] getIndices(ResolvedCatalogBaseTable resolvedTable) { Optional primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey(); List primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null); if (primaryColumns == null) { From d0ea80f959c5f0b6b5cf4992a25b467610c11b21 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 24 Jan 2025 16:17:16 +0800 Subject: [PATCH 04/10] add supportPrimaryKey and remove mysql lib --- .../flink/connector/integration/test/FlinkCommonIT.java | 6 +++++- .../connector/integration/test/hive/FlinkHiveCatalogIT.java | 5 +++++ .../integration/test/iceberg/FlinkIcebergCatalogIT.java | 5 +++++ server/build.gradle.kts | 1 - 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 2b9468879e6..6018c57547d 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -81,6 +81,10 @@ protected boolean supportGetSchemaWithoutCommentAndOption() { protected abstract boolean supportDropCascade(); + protected boolean supportCreateTableWithPrimaryKey() { + return true; + } + @Test public void testCreateSchema() { doWithCatalog( @@ -282,7 +286,7 @@ public void testCreateSimpleTable() { } @Test - @EnabledIf("supportTableOperation") + @EnabledIf("supportCreateTableWithPrimaryKey") public void testCreateTableWithPrimaryKey() { String databaseName = "test_create_no_partition_table_db"; String tableName = "test_create_no_partition_table"; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 7792068e249..203accf54b0 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -71,6 +71,11 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; + @Override + protected boolean supportCreateTableWithPrimaryKey() { + return false; + } + @BeforeAll void hiveStartUp() { initDefaultHiveCatalog(); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index 0834def90b7..4779ee331e4 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -63,6 +63,11 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog icebergCatalog; + @Override + protected boolean supportCreateTableWithPrimaryKey() { + return false; + } + @BeforeAll public void before() { Preconditions.checkNotNull(metalake); diff --git a/server/build.gradle.kts b/server/build.gradle.kts index ada87841b6e..4fe6ae27073 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -28,7 +28,6 @@ dependencies { implementation(project(":common")) implementation(project(":core")) implementation(project(":server-common")) - implementation(libs.mysql.driver) implementation(libs.bundles.jetty) implementation(libs.bundles.jersey) implementation(libs.bundles.log4j) From 14d1299a6b9b50e78249b1ccb02f49a71c991bf9 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 24 Jan 2025 16:20:32 +0800 Subject: [PATCH 05/10] add supportPrimaryKey and remove mysql lib --- .../connector/integration/test/paimon/FlinkPaimonCatalogIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 50478f1e711..a03b4a198e1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -28,9 +28,11 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +@Tag("gravitino-docker-test") public class FlinkPaimonCatalogIT extends FlinkCommonIT { @TempDir private static Path warehouseDir; From 7c94309e254ac697f9461cc616e51ffc0ed26c51 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 24 Jan 2025 16:23:21 +0800 Subject: [PATCH 06/10] add supportPrimaryKey and remove mysql lib --- .../flink/connector/integration/test/FlinkCommonIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 6018c57547d..ba34b9af393 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -288,8 +288,8 @@ public void testCreateSimpleTable() { @Test @EnabledIf("supportCreateTableWithPrimaryKey") public void testCreateTableWithPrimaryKey() { - String databaseName = "test_create_no_partition_table_db"; - String tableName = "test_create_no_partition_table"; + String databaseName = "test_create_table_with_primary_key_db"; + String tableName = "test_create_primary_key_table"; String comment = "test comment"; String key = "test key"; String value = "test value"; From bfe153214d964b01fb7ec16784bfa5bade4dd2e0 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Sun, 26 Jan 2025 13:46:56 +0800 Subject: [PATCH 07/10] fix ci --- .../integration/test/FlinkCommonIT.java | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index ba34b9af393..e5317a739bd 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -314,35 +314,57 @@ public void testCreateTableWithPrimaryKey() { Index index = table.index()[0]; Assertions.assertEquals("aa", index.fieldNames()[0][0]); Assertions.assertEquals("bb", index.fieldNames()[1][0]); - sql("INSERT INTO %s VALUES(1,2,3)", tableName); - sql("INSERT INTO %s VALUES(1,2,4)", tableName); TestUtils.assertTableResult( - sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4)); + sql("INSERT INTO %s VALUES(1,2,3)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1)); - sql("INSERT INTO %s VALUES(1,3,4)", tableName); TestUtils.assertTableResult( - sql("SELECT * FROM %s", tableName), + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3)); + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,2,4)", tableName), ResultKind.SUCCESS_WITH_CONTENT, - Row.of(1, 2, 4), - Row.of(1, 3, 4)); + Row.of(-1)); + + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4)); + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,3,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(2)); - sql("INSERT INTO %s VALUES(2,2,4)", tableName); TestUtils.assertTableResult( sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4), - Row.of(1, 3, 4), - Row.of(2, 2, 4)); + Row.of(1, 3, 4)); + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(2,2,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(3)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4), + Row.of(2, 2, 4)); }, true, supportDropCascade()); From c86e7e72b50032ef148de96e79f2b1a302d301ab Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Sun, 26 Jan 2025 13:49:26 +0800 Subject: [PATCH 08/10] spotless --- .../flink/connector/integration/test/FlinkCommonIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index e5317a739bd..2d0bb545178 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -314,33 +314,33 @@ public void testCreateTableWithPrimaryKey() { Index index = table.index()[0]; Assertions.assertEquals("aa", index.fieldNames()[0][0]); Assertions.assertEquals("bb", index.fieldNames()[1][0]); + TestUtils.assertTableResult( sql("INSERT INTO %s VALUES(1,2,3)", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(-1)); - TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1)); TestUtils.assertTableResult( sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3)); + TestUtils.assertTableResult( sql("INSERT INTO %s VALUES(1,2,4)", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(-1)); - TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1)); TestUtils.assertTableResult( sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4)); + TestUtils.assertTableResult( sql("INSERT INTO %s VALUES(1,3,4)", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(-1)); - TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, @@ -350,11 +350,11 @@ public void testCreateTableWithPrimaryKey() { ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4), Row.of(1, 3, 4)); + TestUtils.assertTableResult( sql("INSERT INTO %s VALUES(2,2,4)", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(-1)); - TestUtils.assertTableResult( sql("SELECT count(*) num FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, From e1ab8dc1d7f2bdaffe85cad9923076984afa2e45 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Sun, 26 Jan 2025 15:07:50 +0800 Subject: [PATCH 09/10] spotless --- .../apache/gravitino/flink/connector/catalog/BaseCatalog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index b95bc0c4a54..911d355526c 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -314,9 +314,9 @@ private static Index[] getIndices(ResolvedCatalogBaseTable resolvedTable) { if (primaryColumns == null) { return new Index[0]; } - String[][] primaryFiled = + String[][] primaryField = primaryColumns.stream().map(e -> new String[] {e}).toArray(String[][]::new); - Index primary = Indexes.primary("primary", primaryFiled); + Index primary = Indexes.primary("primary", primaryField); return new Index[] {primary}; } From ef3a9068228802a6d7c86a5660ab7a7ffd8f9ac9 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Sun, 26 Jan 2025 23:16:02 +0800 Subject: [PATCH 10/10] support primary key table --- .../paimon/GravitinoPaimonTable.java | 13 +++++++- .../flink/connector/catalog/BaseCatalog.java | 31 ++++++++++++++----- .../integration/test/FlinkCommonIT.java | 4 +-- .../test/hive/FlinkHiveCatalogIT.java | 2 +- .../test/iceberg/FlinkIcebergCatalogIT.java | 2 +- 5 files changed, 39 insertions(+), 13 deletions(-) diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java index f12d9b44b20..2853abbbe30 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java @@ -137,10 +137,21 @@ private List getPrimaryKeysFromIndexes(Index[] indexes) { if (indexes == null || indexes.length == 0) { return Collections.emptyList(); } + Preconditions.checkArgument( indexes.length == 1, "Paimon only supports no more than one Index."); + Index primaryKeyIndex = indexes[0]; - return Arrays.stream(primaryKeyIndex.fieldNames()).map(e -> e[0]).collect(Collectors.toList()); + Arrays.stream(primaryKeyIndex.fieldNames()) + .forEach( + filedName -> + Preconditions.checkArgument( + filedName != null && filedName.length == 1, + "The primary key columns should not be nested.")); + + return Arrays.stream(primaryKeyIndex.fieldNames()) + .map(fieldName -> fieldName[0]) + .collect(Collectors.toList()); } private static Index[] constructIndexesFromPrimaryKeys(Table table) { diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 911d355526c..578c136e3a9 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -285,7 +285,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig try { - Index[] indices = getIndices(resolvedTable); + Index[] indices = getGrivatinoIndeics(resolvedTable); catalog() .asTableCatalog() .createTable( @@ -308,14 +308,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } - private static Index[] getIndices(ResolvedCatalogBaseTable resolvedTable) { + private static Index[] getGrivatinoIndeics(ResolvedCatalogBaseTable resolvedTable) { Optional primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey(); List primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null); if (primaryColumns == null) { return new Index[0]; } String[][] primaryField = - primaryColumns.stream().map(e -> new String[] {e}).toArray(String[][]::new); + primaryColumns.stream() + .map(primaryColumn -> new String[] {primaryColumn}) + .toArray(String[][]::new); Index primary = Indexes.primary("primary", primaryField); return new Index[] {primary}; } @@ -552,17 +554,30 @@ protected CatalogBaseTable toFlinkTable(Table table) { .column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull()) .withComment(column.comment()); } - Index[] indices = table.index(); - if (indices != null && indices.length == 1) { - builder.primaryKey( - Arrays.stream(indices[0].fieldNames()).map(arr -> arr[0]).collect(Collectors.toList())); - } + handleFlinkPrimaryKey(table, builder); Map flinkTableProperties = propertiesConverter.toFlinkTableProperties(table.properties()); List partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning()); return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties); } + private static void handleFlinkPrimaryKey( + Table table, org.apache.flink.table.api.Schema.Builder builder) { + List primaryKeyList = + Arrays.stream(table.index()) + .filter(index -> index.type() == Index.IndexType.PRIMARY_KEY) + .collect(Collectors.toList()); + if (primaryKeyList.isEmpty()) { + return; + } + Preconditions.checkArgument( + primaryKeyList.size() == 1, "More than one primary key is not supported."); + builder.primaryKey( + Arrays.stream(primaryKeyList.get(0).fieldNames()) + .map(fieldNames -> fieldNames[0]) + .collect(Collectors.toList())); + } + private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) { return Column.of( column.getName(), diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 2d0bb545178..8ff6f8db7a2 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -81,7 +81,7 @@ protected boolean supportGetSchemaWithoutCommentAndOption() { protected abstract boolean supportDropCascade(); - protected boolean supportCreateTableWithPrimaryKey() { + protected boolean supportsPrimaryKey() { return true; } @@ -286,7 +286,7 @@ public void testCreateSimpleTable() { } @Test - @EnabledIf("supportCreateTableWithPrimaryKey") + @EnabledIf("supportsPrimaryKey") public void testCreateTableWithPrimaryKey() { String databaseName = "test_create_table_with_primary_key_db"; String tableName = "test_create_primary_key_table"; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 203accf54b0..3add18211f1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -72,7 +72,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; @Override - protected boolean supportCreateTableWithPrimaryKey() { + protected boolean supportsPrimaryKey() { return false; } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index 4779ee331e4..3e6e69ec0ef 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -64,7 +64,7 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog icebergCatalog; @Override - protected boolean supportCreateTableWithPrimaryKey() { + protected boolean supportsPrimaryKey() { return false; }