diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index fd8e118ee49..578c136e3a9 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.compress.utils.Lists; @@ -40,6 +41,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -75,7 +77,11 @@ import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; /** * The BaseCatalog that provides a default implementation for all methods in the {@link @@ -276,8 +282,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig propertiesConverter.toGravitinoTableProperties(table.getOptions()); Transform[] partitions = partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys()); + try { - catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions); + + Index[] indices = getGrivatinoIndeics(resolvedTable); + catalog() + .asTableCatalog() + .createTable( + identifier, + columns, + comment, + properties, + partitions, + Distributions.NONE, + new SortOrder[0], + indices); } catch (NoSuchSchemaException e) { throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e); } catch (TableAlreadyExistsException e) { @@ -289,6 +308,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private static Index[] getGrivatinoIndeics(ResolvedCatalogBaseTable resolvedTable) { + Optional primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey(); + List primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null); + if (primaryColumns == null) { + return new Index[0]; + } + String[][] primaryField = + primaryColumns.stream() + .map(primaryColumn -> new String[] {primaryColumn}) + .toArray(String[][]::new); + Index primary = Indexes.primary("primary", primaryField); + return new Index[] {primary}; + } + /** * The method only is used to change the comments. To alter columns, use the other alterTable API * and provide a list of TableChanges. @@ -521,12 +554,30 @@ protected CatalogBaseTable toFlinkTable(Table table) { .column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull()) .withComment(column.comment()); } + handleFlinkPrimaryKey(table, builder); Map flinkTableProperties = propertiesConverter.toFlinkTableProperties(table.properties()); List partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning()); return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties); } + private static void handleFlinkPrimaryKey( + Table table, org.apache.flink.table.api.Schema.Builder builder) { + List primaryKeyList = + Arrays.stream(table.index()) + .filter(index -> index.type() == Index.IndexType.PRIMARY_KEY) + .collect(Collectors.toList()); + if (primaryKeyList.isEmpty()) { + return; + } + Preconditions.checkArgument( + primaryKeyList.size() == 1, "More than one primary key is not supported."); + builder.primaryKey( + Arrays.stream(primaryKeyList.get(0).fieldNames()) + .map(fieldNames -> fieldNames[0]) + .collect(Collectors.toList())); + } + private Column toGravitinoColumn(org.apache.flink.table.catalog.Column column) { return Column.of( column.getName(), diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index b45e5f46ec2..8ff6f8db7a2 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -51,6 +51,7 @@ import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.indexes.Index; import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -80,6 +81,10 @@ protected boolean supportGetSchemaWithoutCommentAndOption() { protected abstract boolean supportDropCascade(); + protected boolean supportsPrimaryKey() { + return true; + } + @Test public void testCreateSchema() { doWithCatalog( @@ -280,6 +285,91 @@ public void testCreateSimpleTable() { supportDropCascade()); } + @Test + @EnabledIf("supportsPrimaryKey") + public void testCreateTableWithPrimaryKey() { + String databaseName = "test_create_table_with_primary_key_db"; + String tableName = "test_create_primary_key_table"; + String comment = "test comment"; + String key = "test key"; + String value = "test value"; + + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + sql( + "CREATE TABLE %s " + + "(aa int, " + + " bb int," + + " cc int," + + " PRIMARY KEY (aa,bb) NOT ENFORCED" + + ")" + + " COMMENT '%s' WITH (" + + "'%s' = '%s')", + tableName, comment, key, value); + Table table = + catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); + Assertions.assertEquals(1, table.index().length); + Index index = table.index()[0]; + Assertions.assertEquals("aa", index.fieldNames()[0][0]); + Assertions.assertEquals("bb", index.fieldNames()[1][0]); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,2,3)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 3)); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,2,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4)); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(1,3,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(2)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4)); + + TestUtils.assertTableResult( + sql("INSERT INTO %s VALUES(2,2,4)", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(-1)); + TestUtils.assertTableResult( + sql("SELECT count(*) num FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(3)); + TestUtils.assertTableResult( + sql("SELECT * FROM %s", tableName), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of(1, 2, 4), + Row.of(1, 3, 4), + Row.of(2, 2, 4)); + }, + true, + supportDropCascade()); + } + @Test @EnabledIf("supportTableOperation") public void testListTables() { diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 7792068e249..3add18211f1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -71,6 +71,11 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog hiveCatalog; + @Override + protected boolean supportsPrimaryKey() { + return false; + } + @BeforeAll void hiveStartUp() { initDefaultHiveCatalog(); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index 0834def90b7..3e6e69ec0ef 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -63,6 +63,11 @@ public abstract class FlinkIcebergCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog icebergCatalog; + @Override + protected boolean supportsPrimaryKey() { + return false; + } + @BeforeAll public void before() { Preconditions.checkNotNull(metalake);