From 8e012fe221290a793018b457c89f24625a50e615 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 16 Mar 2024 02:37:56 +0800 Subject: [PATCH] fixed --- .../main/java/org/apache/paimon/CoreOptions.java | 13 +++++-------- .../org/apache/paimon/schema/SchemaManager.java | 4 +++- .../flink/action/cdc/SyncTableActionBase.java | 8 ++++++++ .../paimon/flink/sink/cdc/CdcSinkBuilder.java | 7 +++++++ .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 7 +++++++ .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 6 ++++++ .../paimon/flink/sink/FlinkTableSinkBase.java | 1 + 7 files changed, 37 insertions(+), 9 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index dc4a3538e9d3..ce1651a2af25 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -101,11 +101,8 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The file path of this table in the filesystem."); - public static final ConfigOption BRANCH_NAME = - key("branch-name") - .stringType() - .defaultValue("main") - .withDescription("Specify branch name."); + public static final ConfigOption BRANCH = + key("branch").stringType().defaultValue("main").withDescription("Specify branch name."); public static final ConfigOption FILE_FORMAT = key("file.format") @@ -1116,10 +1113,10 @@ public String branch() { } public static String branch(Map options) { - if (options.containsKey(BRANCH_NAME.key())) { - return options.get(BRANCH_NAME.key()); + if (options.containsKey(BRANCH.key())) { + return options.get(BRANCH.key()); } - return BRANCH_NAME.defaultValue(); + return BRANCH.defaultValue(); } public static Path path(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 76b7bf964b12..a04a6599fed6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -182,7 +182,9 @@ public TableSchema commitChanges(String branch, SchemaChange... changes) throws return commitChanges(branch, Arrays.asList(changes)); } - public TableSchema commitChanges(List changes) throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, Catalog.ColumnNotExistException { + public TableSchema commitChanges(List changes) + throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, + Catalog.ColumnNotExistException { return commitChanges(branch, changes); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index ae9301362de9..378a5db9cca3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Identifier; @@ -29,6 +30,7 @@ import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.BranchManager; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; @@ -181,6 +183,12 @@ protected void buildSink( if (sinkParallelism != null) { sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); } + String branch = tableConfig.get(CoreOptions.BRANCH.key()); + if (branch != null) { + sinkBuilder.toBranch(branch); + } else { + sinkBuilder.toBranch(BranchManager.DEFAULT_MAIN_BRANCH); + } sinkBuilder.build(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index 0fe9b8cef7f9..8ced723077e3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; import org.apache.flink.streaming.api.datastream.DataStream; @@ -65,6 +66,7 @@ public CdcSinkBuilder withParserFactory(EventParser.Factory parserFactory) public CdcSinkBuilder withTable(Table table) { this.table = table; + this.table.options().put(CoreOptions.BRANCH.key(), BranchManager.DEFAULT_MAIN_BRANCH); return this; } @@ -83,6 +85,11 @@ public CdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) { return this; } + public CdcSinkBuilder toBranch(String branch) { + this.table.options().put(CoreOptions.BRANCH.key(), branch); + return this; + } + public DataStreamSink build() { Preconditions.checkNotNull(input, "Input DataStream can not be null."); Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null."); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 4285ebd36241..214b565a8319 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -30,6 +30,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.Preconditions; import org.apache.flink.streaming.api.datastream.DataStream; @@ -101,6 +102,12 @@ public FlinkCdcSyncDatabaseSinkBuilder withTableOptions(Options options) { this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM); this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU); this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY); + if (!tables.isEmpty()) { + String branch = options.get(CoreOptions.BRANCH); + for (Table table : tables) { + table.options().put(CoreOptions.BRANCH.key(), branch); + } + } return this; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index b5de897b92f8..ba4b760092f0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink; import org.apache.paimon.table.BucketMode; @@ -89,6 +90,11 @@ public FlinkSinkBuilder forCompact(boolean compactSink) { return this; } + public FlinkSinkBuilder toBranch(String branch) { + table.options().put(CoreOptions.BRANCH.key(), branch); + return this; + } + public DataStreamSink build() { DataStream input = MapToInternalRow.map(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index 30039188d148..74d19e0dc61a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -135,6 +135,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .withOverwritePartition(overwrite ? staticPartitions : null) .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM)) .withBoundedInputStream(context.isBounded()) + .toBranch(conf.getString(CoreOptions.BRANCH)) .build()); }