From f7bd95dcdcd80c636b22bdc78b92f8ac43e7340a Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 23 Jul 2024 17:45:21 +0800 Subject: [PATCH] [flink][procedure] Support named argument in procedures with compatibility issue --- .../procedure/DropPartitionProcedure.java | 12 +- .../procedure/MarkPartitionDoneProcedure.java | 12 +- .../flink/procedure/MergeIntoProcedure.java | 123 +++++------- .../flink/procedure/RollbackToProcedure.java | 31 +-- .../action/DropPartitionActionITCase.java | 124 +++++++----- .../action/MarkPartitionDoneActionITCase.java | 124 +++++++----- .../flink/action/MergeIntoActionITCase.java | 183 +++++++++++++----- .../flink/action/RollbackToActionITCase.java | 102 ++++++---- .../paimon/flink/procedure/ProcedureTest.java | 34 +--- 9 files changed, 453 insertions(+), 292 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java index 3a231758f4765..6716a6ed8e3a1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java @@ -25,6 +25,9 @@ import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.utils.ParameterUtils; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; import static org.apache.paimon.CoreOptions.createCommitUser; @@ -44,9 +47,14 @@ public class DropPartitionProcedure extends ProcedureBase { public static final String IDENTIFIER = "drop_partition"; - public String[] call( - ProcedureContext procedureContext, String tableId, String... partitionStrings) + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "partitions", type = @DataTypeHint("STRING")) + }) + public String[] call(ProcedureContext procedureContext, String tableId, String partitions) throws Catalog.TableNotExistException { + String[] partitionStrings = partitions.split(";"); checkArgument( partitionStrings.length > 0, "drop-partition procedure must specify partitions."); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index d70cccf6ba25c..4792706346871 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -27,6 +27,9 @@ import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.PartitionPathUtils; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; import java.io.IOException; @@ -47,9 +50,14 @@ public class MarkPartitionDoneProcedure extends ProcedureBase { public static final String IDENTIFIER = "mark_partition_done"; - public String[] call( - ProcedureContext procedureContext, String tableId, String... partitionStrings) + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "partitions", type = @DataTypeHint("STRING")) + }) + public String[] call(ProcedureContext procedureContext, String tableId, String partitions) throws Catalog.TableNotExistException, IOException { + String[] partitionStrings = partitions.split(";"); checkArgument( partitionStrings.length > 0, "mark_partition_done procedure must specify partitions."); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java index acda2afd2e697..b486b0fd327b6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java @@ -23,6 +23,9 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.data.RowData; import org.apache.flink.table.procedure.ProcedureContext; @@ -97,76 +100,46 @@ public class MergeIntoProcedure extends ProcedureBase { public static final String IDENTIFIER = "merge_into"; - public String[] call( - ProcedureContext procedureContext, - String targetTableId, - String targetAlias, - String sourceSqls, - String sourceTable, - String mergeCondition, - String matchedUpsertCondition, - String matchedUpsertSetting) { - return call( - procedureContext, - targetTableId, - targetAlias, - sourceSqls, - sourceTable, - mergeCondition, - matchedUpsertCondition, - matchedUpsertSetting, - "", - "", - ""); - } - - public String[] call( - ProcedureContext procedureContext, - String targetTableId, - String targetAlias, - String sourceSqls, - String sourceTable, - String mergeCondition, - String matchedUpsertCondition, - String matchedUpsertSetting, - String notMatchedInsertCondition, - String notMatchedInsertValues) { - return call( - procedureContext, - targetTableId, - targetAlias, - sourceSqls, - sourceTable, - mergeCondition, - matchedUpsertCondition, - matchedUpsertSetting, - notMatchedInsertCondition, - notMatchedInsertValues, - ""); - } - - public String[] call( - ProcedureContext procedureContext, - String targetTableId, - String targetAlias, - String sourceSqls, - String sourceTable, - String mergeCondition, - String matchedDeleteCondition) { - return call( - procedureContext, - targetTableId, - targetAlias, - sourceSqls, - sourceTable, - mergeCondition, - "", - "", - "", - "", - matchedDeleteCondition); - } - + @ProcedureHint( + argument = { + @ArgumentHint(name = "target_table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "target_alias", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "source_sqls", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "source_table", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "merge_condition", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "matched_upsert_condition", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "matched_upsert_setting", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "not_matched_insert_condition", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "not_matched_insert_values", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "matched_delete_condition", + type = @DataTypeHint("STRING"), + isOptional = true) + }) public String[] call( ProcedureContext procedureContext, String targetTableId, @@ -179,6 +152,16 @@ public String[] call( String notMatchedInsertCondition, String notMatchedInsertValues, String matchedDeleteCondition) { + targetAlias = notnull(targetAlias); + sourceSqls = notnull(sourceSqls); + sourceTable = notnull(sourceTable); + mergeCondition = notnull(mergeCondition); + matchedUpsertCondition = notnull(matchedUpsertCondition); + matchedUpsertSetting = notnull(matchedUpsertSetting); + notMatchedInsertCondition = notnull(notMatchedInsertCondition); + notMatchedInsertValues = notnull(notMatchedInsertValues); + matchedDeleteCondition = notnull(matchedDeleteCondition); + String warehouse = catalog.warehouse(); Map catalogOptions = catalog.options(); Identifier identifier = Identifier.fromString(targetTableId); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java index 1bf545004d93d..8086bb84341b4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java @@ -21,7 +21,11 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.StringUtils; +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; /** @@ -39,19 +43,24 @@ public class RollbackToProcedure extends ProcedureBase { public static final String IDENTIFIER = "rollback_to"; - public String[] call(ProcedureContext procedureContext, String tableId, long snapshotId) + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "snapshot_id", + type = @DataTypeHint("BIGINT"), + isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); - table.rollbackTo(snapshotId); - - return new String[] {"Success"}; - } - - public String[] call(ProcedureContext procedureContext, String tableId, String tagName) - throws Catalog.TableNotExistException { - Table table = catalog.getTable(Identifier.fromString(tableId)); - table.rollbackTo(tagName); - + if (!StringUtils.isBlank(tagName)) { + table.rollbackTo(tagName); + } else { + table.rollbackTo(snapshotId); + } return new String[] {"Success"}; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java index 9e919c1573046..7fa6d921b7a1c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java @@ -30,14 +30,15 @@ import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -50,30 +51,51 @@ public class DropPartitionActionITCase extends ActionITCaseBase { private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"partKey0", "partKey1", "dt", "value"}); + private static Stream testArguments() { + return Stream.of( + Arguments.of(true, "action"), + Arguments.of(false, "action"), + Arguments.of(true, "procedure_indexed"), + Arguments.of(false, "procedure_indexed"), + Arguments.of(true, "procedure_named"), + Arguments.of(false, "procedure_named")); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Exception { + @MethodSource("testArguments") + public void testDropPartitionWithSinglePartitionKey(boolean hasPk, String invoker) + throws Exception { FileStoreTable table = prepareTable(hasPk); - if (ThreadLocalRandom.current().nextBoolean()) { - - createAction( - DropPartitionAction.class, - "drop_partition", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--partition", - "partKey0=0") - .run(); - } else { - callProcedure( - String.format( - "CALL sys.drop_partition('%s.%s', 'partKey0 = 0')", - database, tableName)); + switch (invoker) { + case "action": + createAction( + DropPartitionAction.class, + "drop_partition", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--partition", + "partKey0=0") + .run(); + break; + case "procedure_indexed": + callProcedure( + String.format( + "CALL sys.drop_partition('%s.%s', 'partKey0 = 0')", + database, tableName)); + break; + case "procedure_named": + callProcedure( + String.format( + "CALL sys.drop_partition(`table` => '%s.%s', partitions => 'partKey0 = 0')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); } SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); @@ -108,8 +130,9 @@ public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Except } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exception { + @MethodSource("testArguments") + public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invoker) + throws Exception { FileStoreTable table = prepareTable(hasPk); Map partitions0 = new HashMap<>(); @@ -120,26 +143,37 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exce partitions1.put("partKey0", "1"); partitions1.put("partKey1", "0"); - if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - DropPartitionAction.class, - "drop_partition", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--partition", - "partKey0=0,partKey1=1", - "--partition", - "partKey0=1,partKey1=0") - .run(); - } else { - callProcedure( - String.format( - "CALL sys.drop_partition('%s.%s', 'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')", - database, tableName)); + switch (invoker) { + case "action": + createAction( + DropPartitionAction.class, + "drop_partition", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--partition", + "partKey0=0,partKey1=1", + "--partition", + "partKey0=1,partKey1=0") + .run(); + break; + case "procedure_indexed": + callProcedure( + String.format( + "CALL sys.drop_partition('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + case "procedure_named": + callProcedure( + String.format( + "CALL sys.drop_partition(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); } SnapshotManager snapshotManager = getFileStoreTable(tableName).snapshotManager(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java index e6c2c8678ea8b..1c0ec85878160 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java @@ -30,12 +30,13 @@ import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -48,28 +49,51 @@ public class MarkPartitionDoneActionITCase extends ActionITCaseBase { private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new String[] {"partKey0", "partKey1", "dt", "value"}); + private static Stream testArguments() { + return Stream.of( + Arguments.of(true, "action"), + Arguments.of(false, "action"), + Arguments.of(true, "procedure_indexed"), + Arguments.of(false, "procedure_indexed"), + Arguments.of(true, "procedure_named"), + Arguments.of(false, "procedure_named")); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk) throws Exception { + @MethodSource("testArguments") + public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk, String invoker) + throws Exception { FileStoreTable table = prepareTable(hasPk); - if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - MarkPartitionDoneAction.class, - "mark_partition_done", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--partition", - "partKey0=0") - .run(); - } else { - callProcedure( - String.format( - "CALL sys.mark_partition_done('%s.%s', 'partKey0 = 0')", - database, tableName)); + + switch (invoker) { + case "action": + createAction( + MarkPartitionDoneAction.class, + "mark_partition_done", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--partition", + "partKey0=0") + .run(); + break; + case "procedure_indexed": + callProcedure( + String.format( + "CALL sys.mark_partition_done('%s.%s', 'partKey0 = 0')", + database, tableName)); + break; + case "procedure_named": + callProcedure( + String.format( + "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0 = 0')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); } Path successPath = new Path(table.location(), "partKey0=0/_SUCCESS"); @@ -78,30 +102,42 @@ public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk) throws Ex } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exception { + @MethodSource("testArguments") + public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invoker) + throws Exception { FileStoreTable table = prepareTable(hasPk); - if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - MarkPartitionDoneAction.class, - "mark_partition_done", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--partition", - "partKey0=0,partKey1=1", - "--partition", - "partKey0=1,partKey1=0") - .run(); - } else { - callProcedure( - String.format( - "CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')", - database, tableName)); + switch (invoker) { + case "action": + createAction( + MarkPartitionDoneAction.class, + "mark_partition_done", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--partition", + "partKey0=0,partKey1=1", + "--partition", + "partKey0=1,partKey1=0") + .run(); + break; + case "procedure_indexed": + callProcedure( + String.format( + "CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + case "procedure_named": + callProcedure( + String.format( + "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); } Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 1da8030e1308a..78f83fcfa3e07 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -29,7 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -38,7 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; @@ -132,9 +131,19 @@ public void testVariousChangelogProducer( changelogRow("+I", 12, "v_12", "insert", "02-29"))); } - @ParameterizedTest(name = "in-default = {0}") - @ValueSource(booleans = {true, false}) - public void testTargetAlias(boolean inDefault) throws Exception { + private static Stream testArguments() { + return Stream.of( + Arguments.of(true, "action"), + Arguments.of(false, "action"), + Arguments.of(true, "procedure_indexed"), + Arguments.of(false, "procedure_indexed"), + Arguments.of(true, "procedure_named"), + Arguments.of(false, "procedure_named")); + } + + @ParameterizedTest + @MethodSource("testArguments") + public void testTargetAlias(boolean inDefault, String invoker) throws Exception { MergeIntoActionBuilder action; if (!inDefault) { @@ -155,10 +164,23 @@ public void testTargetAlias(boolean inDefault) throws Exception { .withMergeCondition("TT.k = S.k AND TT.dt = S.dt") .withMatchedDelete("S.v IS NULL"); - String procedureStatement = - String.format( - "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k AND TT.dt = S.dt', 'S.v IS NULL')", - inDefault ? database : "test_db"); + String procedureStatement = ""; + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k AND TT.dt = S.dt', '', '', '', '', 'S.v IS NULL')", + inDefault ? database : "test_db"); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => '%s.T', " + + "target_alias => 'TT', " + + "source_table => 'S', " + + "merge_condition => 'TT.k = S.k AND TT.dt = S.dt', " + + "matched_delete_condition => 'S.v IS NULL')", + inDefault ? database : "test_db"); + } List streamingExpected = Arrays.asList( @@ -176,16 +198,16 @@ public void testTargetAlias(boolean inDefault) throws Exception { changelogRow("+I", 9, "v_9", "creation", "02-28"), changelogRow("+I", 10, "v_10", "creation", "02-28")); - if (ThreadLocalRandom.current().nextBoolean()) { + if ("action".equals(invoker)) { validateActionRunResult(action.build(), streamingExpected, batchExpected); } else { validateProcedureResult(procedureStatement, streamingExpected, batchExpected); } } - @ParameterizedTest(name = "in-default = {0}") - @ValueSource(booleans = {true, false}) - public void testSourceName(boolean inDefault) throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testSourceName(boolean inDefault, String invoker) throws Exception { MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T"); String sourceTableName = "S"; @@ -203,10 +225,22 @@ public void testSourceName(boolean inDefault) throws Exception { .withMergeCondition("T.k = S.k AND T.dt = S.dt") .withMatchedDelete("S.v IS NULL"); - String procedureStatement = - String.format( - "CALL sys.merge_into('default.T', '', '', '%s', 'T.k = S.k AND T.dt = S.dt', 'S.v IS NULL')", - sourceTableName); + String procedureStatement = ""; + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('default.T', '', '', '%s', 'T.k = S.k AND T.dt = S.dt', '', '', '', '', 'S.v IS NULL')", + sourceTableName); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => 'default.T', " + + "source_table => '%s', " + + "merge_condition => 'T.k = S.k AND T.dt = S.dt', " + + "matched_delete_condition => 'S.v IS NULL')", + sourceTableName); + } if (!inDefault) { sEnv.executeSql("USE `default`"); @@ -229,16 +263,16 @@ public void testSourceName(boolean inDefault) throws Exception { changelogRow("+I", 9, "v_9", "creation", "02-28"), changelogRow("+I", 10, "v_10", "creation", "02-28")); - if (ThreadLocalRandom.current().nextBoolean()) { + if ("action".equals(invoker)) { validateActionRunResult(action.build(), streamingExpected, batchExpected); } else { validateProcedureResult(procedureStatement, streamingExpected, batchExpected); } } - @ParameterizedTest(name = "useCatalog = {0}") - @ValueSource(booleans = {true, false}) - public void testSqls(boolean useCatalog) throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testSqls(boolean useCatalog, String invoker) throws Exception { // drop table S sEnv.executeSql("DROP TABLE S"); @@ -273,16 +307,28 @@ public void testSqls(boolean useCatalog) throws Exception { action.withMergeCondition("T.k = S.k AND T.dt = S.dt").withMatchedDelete("S.v IS NULL"); - String procedureStatement = - String.format( - "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = S.k AND T.dt = S.dt', 'S.v IS NULL')", - database, - useCatalog - ? String.format( - "%s;%s;%s", - escapeCatalog, "USE CATALOG test_cat", escapeDdl) - : String.format("%s;%s", escapeCatalog, escapeDdl), - useCatalog ? "S" : "test_cat.default.S"); + String procedureStatement = ""; + String sourceSqls = + useCatalog + ? String.format( + "%s;%s;%s", escapeCatalog, "USE CATALOG test_cat", escapeDdl) + : String.format("%s;%s", escapeCatalog, escapeDdl); + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = S.k AND T.dt = S.dt', '', '', '', '', 'S.v IS NULL')", + database, sourceSqls, useCatalog ? "S" : "test_cat.default.S"); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => '%s.T', " + + "source_sqls => '%s', " + + "source_table => '%s', " + + "merge_condition => 'T.k = S.k AND T.dt = S.dt', " + + "matched_delete_condition => 'S.v IS NULL')", + database, sourceSqls, useCatalog ? "S" : "test_cat.default.S"); + } List streamingExpected = Arrays.asList( @@ -300,16 +346,16 @@ public void testSqls(boolean useCatalog) throws Exception { changelogRow("+I", 9, "v_9", "creation", "02-28"), changelogRow("+I", 10, "v_10", "creation", "02-28")); - if (ThreadLocalRandom.current().nextBoolean()) { + if ("action".equals(invoker)) { validateActionRunResult(action.build(), streamingExpected, batchExpected); } else { validateProcedureResult(procedureStatement, streamingExpected, batchExpected); } } - @ParameterizedTest(name = "source-qualified = {0}") - @ValueSource(booleans = {true, false}) - public void testMatchedUpsertSetAll(boolean qualified) throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testMatchedUpsertSetAll(boolean qualified, String invoker) throws Exception { // build MergeIntoAction MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S") @@ -317,12 +363,27 @@ public void testMatchedUpsertSetAll(boolean qualified) throws Exception { .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") .withMatchedUpsert(null, "*"); - String procedureStatement = - String.format( - "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '*')", - database, - "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", - qualified ? "default.SS" : "SS"); + String procedureStatement = ""; + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '*')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => '%s.T', " + + "source_sqls => '%s', " + + "source_table => '%s', " + + "merge_condition => 'T.k = SS.k AND T.dt = SS.dt', " + + "matched_upsert_setting => '*')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } List streamingExpected = Arrays.asList( @@ -348,16 +409,16 @@ public void testMatchedUpsertSetAll(boolean qualified) throws Exception { changelogRow("+I", 9, "v_9", "creation", "02-28"), changelogRow("+I", 10, "v_10", "creation", "02-28")); - if (ThreadLocalRandom.current().nextBoolean()) { + if ("action".equals(invoker)) { validateActionRunResult(action.build(), streamingExpected, batchExpected); } else { validateProcedureResult(procedureStatement, streamingExpected, batchExpected); } } - @ParameterizedTest(name = "source-qualified = {0}") - @ValueSource(booleans = {true, false}) - public void testNotMatchedInsertAll(boolean qualified) throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testNotMatchedInsertAll(boolean qualified, String invoker) throws Exception { // build MergeIntoAction MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S") @@ -365,12 +426,28 @@ public void testNotMatchedInsertAll(boolean qualified) throws Exception { .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") .withNotMatchedInsert("SS.k < 12", "*"); - String procedureStatement = - String.format( - "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', 'SS.k < 12', '*', '')", - database, - "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", - qualified ? "default.SS" : "SS"); + String procedureStatement = ""; + if ("procedure_indexed".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', 'SS.k < 12', '*', '')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } else if ("procedure_named".equals(invoker)) { + procedureStatement = + String.format( + "CALL sys.merge_into(" + + "target_table => '%s.T', " + + "source_sqls => '%s', " + + "source_table => '%s', " + + "merge_condition => 'T.k = SS.k AND T.dt = SS.dt', " + + "not_matched_insert_condition => 'SS.k < 12'," + + "not_matched_insert_values => '*')", + database, + "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", + qualified ? "default.SS" : "SS"); + } List streamingExpected = Arrays.asList( @@ -392,7 +469,7 @@ public void testNotMatchedInsertAll(boolean qualified) throws Exception { changelogRow("+I", 8, "v_8", "unknown", "02-29"), changelogRow("+I", 11, "v_11", "unknown", "02-29")); - if (ThreadLocalRandom.current().nextBoolean()) { + if ("action".equals(invoker)) { validateActionRunResult(action.build(), streamingExpected, batchExpected); } else { validateProcedureResult(procedureStatement, streamingExpected, batchExpected); @@ -403,7 +480,7 @@ public void testNotMatchedInsertAll(boolean qualified) throws Exception { public void testProcedureWithDeleteConditionTrue() throws Exception { String procedureStatement = String.format( - "CALL sys.merge_into('%s.T', '', '', 'S', 'T.k = S.k AND T.dt = S.dt', 'TRUE')", + "CALL sys.merge_into('%s.T', '', '', 'S', 'T.k = S.k AND T.dt = S.dt', '', '', '', '', 'TRUE')", database); validateProcedureResult( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index ceab42480ed64..a9e0726e7d157 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -27,11 +27,11 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.ThreadLocalRandom; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead; @@ -49,8 +49,9 @@ public void setUp() { init(warehouse); } - @Test - public void rollbackToSnapshotTest() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"action", "procedure_named", "procedure_indexed"}) + public void rollbackToSnapshotTest(String invoker) throws Exception { FileStoreTable table = createFileStoreTable( ROW_TYPE, @@ -67,21 +68,35 @@ public void rollbackToSnapshotTest() throws Exception { writeData(rowData(2L, BinaryString.fromString("World"))); writeData(rowData(2L, BinaryString.fromString("Flink"))); - if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - RollbackToAction.class, - "rollback_to", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--version", - "2") - .run(); - } else { - callProcedure(String.format("CALL sys.rollback_to('%s.%s', 2)", database, tableName)); + switch (invoker) { + case "action": + createAction( + RollbackToAction.class, + "rollback_to", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--version", + "2") + .run(); + break; + case "procedure_indexed": + callProcedure( + String.format( + "CALL sys.rollback_to('%s.%s', '', cast(2 as bigint))", + database, tableName)); + break; + case "procedure_named": + callProcedure( + String.format( + "CALL sys.rollback_to(`table` => '%s.%s', snapshot_id => cast(2 as bigint))", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); } testBatchRead( @@ -89,8 +104,9 @@ public void rollbackToSnapshotTest() throws Exception { Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello"))); } - @Test - public void rollbackToTagTest() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"action", "procedure_named", "procedure_indexed"}) + public void rollbackToTagTest(String invoker) throws Exception { FileStoreTable table = createFileStoreTable( ROW_TYPE, @@ -110,22 +126,34 @@ public void rollbackToTagTest() throws Exception { table.createTag("tag2", 2); table.createTag("tag3", 3); - if (ThreadLocalRandom.current().nextBoolean()) { - createAction( - RollbackToAction.class, - "rollback_to", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName, - "--version", - "tag2") - .run(); - } else { - callProcedure( - String.format("CALL sys.rollback_to('%s.%s', 'tag2')", database, tableName)); + switch (invoker) { + case "action": + createAction( + RollbackToAction.class, + "rollback_to", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--version", + "tag2") + .run(); + break; + case "procedure_indexed": + callProcedure( + String.format( + "CALL sys.rollback_to('%s.%s', 'tag2')", database, tableName)); + break; + case "procedure_named": + callProcedure( + String.format( + "CALL sys.rollback_to(`table` => '%s.%s', tag => 'tag2')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); } testBatchRead( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java index 8e6c3340e72fd..0c30afc833f28 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java @@ -27,7 +27,6 @@ import java.lang.reflect.Method; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -69,42 +68,21 @@ public void testProcedureCoverAllActions() { @Test public void testProcedureHasNamedArgument() { - // The following excluded procedures involve compatibility issue to be resolved in followup - // PR - Set expectedExclusions = new HashSet<>(); - expectedExclusions.add("merge_into"); - expectedExclusions.add("rollback_to"); - expectedExclusions.add("drop_partition"); - expectedExclusions.add("mark_partition_done"); - List identifiers = FactoryUtil.discoverIdentifiers( ProcedureBase.class.getClassLoader(), ProcedureBase.class); assertThat(identifiers.size()).isNotZero(); for (String identifier : identifiers) { - if (expectedExclusions.contains(identifier)) { - continue; - } ProcedureBase procedure = FactoryUtil.discoverFactory( ProcedureBase.class.getClassLoader(), ProcedureBase.class, identifier); Method method = getMethodFromName(procedure.getClass(), "call"); - if (expectedExclusions.contains(identifier)) { - assertThat(method) - .matches( - x -> x.isAnnotationPresent(ProcedureHint.class), - String.format( - "Procedure %s has supported named argument. Should be removed from exclusion list.", - procedure.identifier())); - } else { - assertThat(method) - .matches( - x -> x.isAnnotationPresent(ProcedureHint.class), - String.format( - "Procedure %s should have its call method decorated by %s.", - procedure.identifier(), - ProcedureHint.class.getSimpleName())); - } + assertThat(method) + .matches( + x -> x.isAnnotationPresent(ProcedureHint.class), + String.format( + "Procedure %s should have its call method decorated by %s.", + procedure.identifier(), ProcedureHint.class.getSimpleName())); } }