Skip to content

Commit

Permalink
[flink][procedure] Support named argument in procedures with compatib…
Browse files Browse the repository at this point in the history
…ility issue
  • Loading branch information
yunfengzhou-hub committed Aug 28, 2024
1 parent ad70f0a commit 5eeb2cd
Show file tree
Hide file tree
Showing 9 changed files with 453 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import static org.apache.paimon.CoreOptions.createCommitUser;
Expand All @@ -44,9 +47,14 @@ public class DropPartitionProcedure extends ProcedureBase {

public static final String IDENTIFIER = "drop_partition";

public String[] call(
ProcedureContext procedureContext, String tableId, String... partitionStrings)
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "partitions", type = @DataTypeHint("STRING"))
})
public String[] call(ProcedureContext procedureContext, String tableId, String partitions)
throws Catalog.TableNotExistException {
String[] partitionStrings = partitions.split(";");
checkArgument(
partitionStrings.length > 0, "drop-partition procedure must specify partitions.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.io.IOException;
Expand All @@ -47,9 +50,14 @@ public class MarkPartitionDoneProcedure extends ProcedureBase {

public static final String IDENTIFIER = "mark_partition_done";

public String[] call(
ProcedureContext procedureContext, String tableId, String... partitionStrings)
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "partitions", type = @DataTypeHint("STRING"))
})
public String[] call(ProcedureContext procedureContext, String tableId, String partitions)
throws Catalog.TableNotExistException, IOException {
String[] partitionStrings = partitions.split(";");
checkArgument(
partitionStrings.length > 0,
"mark_partition_done procedure must specify partitions.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.procedure.ProcedureContext;
Expand Down Expand Up @@ -97,76 +100,46 @@ public class MergeIntoProcedure extends ProcedureBase {

public static final String IDENTIFIER = "merge_into";

public String[] call(
ProcedureContext procedureContext,
String targetTableId,
String targetAlias,
String sourceSqls,
String sourceTable,
String mergeCondition,
String matchedUpsertCondition,
String matchedUpsertSetting) {
return call(
procedureContext,
targetTableId,
targetAlias,
sourceSqls,
sourceTable,
mergeCondition,
matchedUpsertCondition,
matchedUpsertSetting,
"",
"",
"");
}

public String[] call(
ProcedureContext procedureContext,
String targetTableId,
String targetAlias,
String sourceSqls,
String sourceTable,
String mergeCondition,
String matchedUpsertCondition,
String matchedUpsertSetting,
String notMatchedInsertCondition,
String notMatchedInsertValues) {
return call(
procedureContext,
targetTableId,
targetAlias,
sourceSqls,
sourceTable,
mergeCondition,
matchedUpsertCondition,
matchedUpsertSetting,
notMatchedInsertCondition,
notMatchedInsertValues,
"");
}

public String[] call(
ProcedureContext procedureContext,
String targetTableId,
String targetAlias,
String sourceSqls,
String sourceTable,
String mergeCondition,
String matchedDeleteCondition) {
return call(
procedureContext,
targetTableId,
targetAlias,
sourceSqls,
sourceTable,
mergeCondition,
"",
"",
"",
"",
matchedDeleteCondition);
}

@ProcedureHint(
argument = {
@ArgumentHint(name = "target_table", type = @DataTypeHint("STRING")),
@ArgumentHint(
name = "target_alias",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "source_sqls",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "source_table",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "merge_condition",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "matched_upsert_condition",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "matched_upsert_setting",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "not_matched_insert_condition",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "not_matched_insert_values",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
name = "matched_delete_condition",
type = @DataTypeHint("STRING"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String targetTableId,
Expand All @@ -179,6 +152,16 @@ public String[] call(
String notMatchedInsertCondition,
String notMatchedInsertValues,
String matchedDeleteCondition) {
targetAlias = notnull(targetAlias);
sourceSqls = notnull(sourceSqls);
sourceTable = notnull(sourceTable);
mergeCondition = notnull(mergeCondition);
matchedUpsertCondition = notnull(matchedUpsertCondition);
matchedUpsertSetting = notnull(matchedUpsertSetting);
notMatchedInsertCondition = notnull(notMatchedInsertCondition);
notMatchedInsertValues = notnull(notMatchedInsertValues);
matchedDeleteCondition = notnull(matchedDeleteCondition);

String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
Identifier identifier = Identifier.fromString(targetTableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

/**
Expand All @@ -39,19 +43,24 @@ public class RollbackToProcedure extends ProcedureBase {

public static final String IDENTIFIER = "rollback_to";

public String[] call(ProcedureContext procedureContext, String tableId, long snapshotId)
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "snapshot_id",
type = @DataTypeHint("BIGINT"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.rollbackTo(snapshotId);

return new String[] {"Success"};
}

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.rollbackTo(tagName);

if (!StringUtils.isBlank(tagName)) {
table.rollbackTo(tagName);
} else {
table.rollbackTo(snapshotId);
}
return new String[] {"Success"};
}

Expand Down
Loading

0 comments on commit 5eeb2cd

Please sign in to comment.