Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6361] feat(paimon):Support specifying primary keys during create paimon table by flink #6362

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,10 @@ private List<String> getPrimaryKeysFromIndexes(Index[] indexes) {
if (indexes == null || indexes.length == 0) {
return Collections.emptyList();
}

Preconditions.checkArgument(
indexes.length == 1, "Paimon only supports no more than one Index.");

Index primaryKeyIndex = indexes[0];
Arrays.stream(primaryKeyIndex.fieldNames())
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
.forEach(
filedName ->
Preconditions.checkArgument(
filedName != null && filedName.length == 1,
"The primary key columns should not be nested."));

return Arrays.stream(primaryKeyIndex.fieldNames())
.map(fieldName -> fieldName[0])
.collect(Collectors.toList());
return Arrays.stream(primaryKeyIndex.fieldNames()).map(e -> e[0]).collect(Collectors.toList());
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
}

private static Index[] constructIndexesFromPrimaryKeys(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand Down Expand Up @@ -75,7 +77,11 @@
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;

/**
* The BaseCatalog that provides a default implementation for all methods in the {@link
Expand Down Expand Up @@ -276,8 +282,21 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
propertiesConverter.toGravitinoTableProperties(table.getOptions());
Transform[] partitions =
partitionConverter.toGravitinoPartitions(((CatalogTable) table).getPartitionKeys());

try {
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions);

Index[] indices = getIndices(resolvedTable);
catalog()
.asTableCatalog()
.createTable(
identifier,
columns,
comment,
properties,
partitions,
Distributions.NONE,
new SortOrder[0],
indices);
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(catalogName(), tablePath.getDatabaseName(), e);
} catch (TableAlreadyExistsException e) {
Expand All @@ -289,6 +308,18 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static Index[] getIndices(ResolvedCatalogBaseTable<?> resolvedTable) {
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
Optional<UniqueConstraint> primaryKey = resolvedTable.getResolvedSchema().getPrimaryKey();
List<String> primaryColumns = primaryKey.map(UniqueConstraint::getColumns).orElse(null);
if (primaryColumns == null) {
return new Index[0];
}
String[][] primaryFiled =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String[][] primaryFiled =
String[][] primaryField =

primaryColumns.stream().map(e -> new String[] {e}).toArray(String[][]::new);
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
Index primary = Indexes.primary("primary", primaryFiled);
return new Index[] {primary};
}

/**
* The method only is used to change the comments. To alter columns, use the other alterTable API
* and provide a list of TableChanges.
Expand Down Expand Up @@ -521,6 +552,11 @@ protected CatalogBaseTable toFlinkTable(Table table) {
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull())
.withComment(column.comment());
}
Index[] indices = table.index();
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
if (indices != null && indices.length == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So .... this means the primary key always has a SINGLE column, right?

Copy link
Contributor Author

@hdygxsj hdygxsj Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. this means that only one primary key is supported. The Index object also stores field information and there can be multiple fields.
image

builder.primaryKey(
Arrays.stream(indices[0].fieldNames()).map(arr -> arr[0]).collect(Collectors.toList()));
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
}
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -280,6 +281,69 @@ public void testCreateSimpleTable() {
supportDropCascade());
}

@Test
@EnabledIf("supportTableOperation")
public void testCreateTableWithPrimaryKey() {
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
String databaseName = "test_create_no_partition_table_db";
String tableName = "test_create_no_partition_table";
String comment = "test comment";
String key = "test key";
String value = "test value";

doWithSchema(
currentCatalog(),
databaseName,
catalog -> {
sql(
"CREATE TABLE %s "
+ "(aa int, "
+ " bb int,"
+ " cc int,"
+ " PRIMARY KEY (aa,bb) NOT ENFORCED"
+ ")"
+ " COMMENT '%s' WITH ("
+ "'%s' = '%s')",
tableName, comment, key, value);
Table table =
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
Assertions.assertEquals(1, table.index().length);
Index index = table.index()[0];
Assertions.assertEquals("aa", index.fieldNames()[0][0]);
Assertions.assertEquals("bb", index.fieldNames()[1][0]);
sql("INSERT INTO %s VALUES(1,2,3)", tableName);
sql("INSERT INTO %s VALUES(1,2,4)", tableName);
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName), ResultKind.SUCCESS_WITH_CONTENT, Row.of(1, 2, 4));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1));
sql("INSERT INTO %s VALUES(1,3,4)", tableName);
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(2));
sql("INSERT INTO %s VALUES(2,2,4)", tableName);
TestUtils.assertTableResult(
sql("SELECT * FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(1, 2, 4),
Row.of(1, 3, 4),
Row.of(2, 2, 4));
TestUtils.assertTableResult(
sql("SELECT count(*) num FROM %s", tableName),
ResultKind.SUCCESS_WITH_CONTENT,
Row.of(3));
},
true,
supportDropCascade());
}

@Test
@EnabledIf("supportTableOperation")
public void testListTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

@Tag("gravitino-docker-test")
public class FlinkPaimonCatalogIT extends FlinkCommonIT {

@TempDir private static Path warehouseDir;
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation(project(":common"))
implementation(project(":core"))
implementation(project(":server-common"))
implementation(libs.mysql.driver)
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)
implementation(libs.bundles.log4j)
Expand Down
Loading