-
Notifications
You must be signed in to change notification settings - Fork 392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#6361] feat(paimon):Support specifying primary keys during create paimon table by flink #6362
base: main
Are you sure you want to change the base?
Conversation
…ate paimon table by flink
…ate paimon table by flink
…ate paimon table by flink
...flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
Show resolved
Hide resolved
if (primaryColumns == null) { | ||
return new Index[0]; | ||
} | ||
String[][] primaryFiled = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String[][] primaryFiled = | |
String[][] primaryField = |
@@ -521,6 +552,11 @@ protected CatalogBaseTable toFlinkTable(Table table) { | |||
.column(column.name(), column.nullable() ? flinkType.nullable() : flinkType.notNull()) | |||
.withComment(column.comment()); | |||
} | |||
Index[] indices = table.index(); | |||
if (indices != null && indices.length == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So .... this means the primary key always has a SINGLE column, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
Outdated
Show resolved
Hide resolved
...-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
Outdated
Show resolved
Hide resolved
...-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
Outdated
Show resolved
Hide resolved
...-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
Outdated
Show resolved
Hide resolved
...-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
Outdated
Show resolved
Hide resolved
...paimon/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/GravitinoPaimonTable.java
Outdated
Show resolved
Hide resolved
try { | ||
catalog().asTableCatalog().createTable(identifier, columns, comment, properties, partitions); | ||
|
||
Index[] indices = getGrivatinoIndeics(resolvedTable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use getGrivatinoIndices
?
if (primaryKeyList.isEmpty()) { | ||
return; | ||
} | ||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add the check for the size of filednames too?
Map<String, String> flinkTableProperties = | ||
propertiesConverter.toFlinkTableProperties(table.properties()); | ||
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning()); | ||
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties); | ||
} | ||
|
||
private static void handleFlinkPrimaryKey( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you use String getFlinkPrimaryKey(Table table)
to return the primary field name, builder could use field name to build primary key.
LGTM except minior comments |
What changes were proposed in this pull request?
Support specifying primary keys during create paimon table by flink
Why are the changes needed?
Fix: #6361
Does this PR introduce any user-facing change?
None
How was this patch tested?
Add testCreateTableWithPrimaryKey case in org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT