Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 15, 2024
1 parent e93b01a commit 8e012fe
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 9 deletions.
13 changes: 5 additions & 8 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,8 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

public static final ConfigOption<String> BRANCH_NAME =
key("branch-name")
.stringType()
.defaultValue("main")
.withDescription("Specify branch name.");
public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

public static final ConfigOption<FileFormatType> FILE_FORMAT =
key("file.format")
Expand Down Expand Up @@ -1116,10 +1113,10 @@ public String branch() {
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH_NAME.key())) {
return options.get(BRANCH_NAME.key());
if (options.containsKey(BRANCH.key())) {
return options.get(BRANCH.key());
}
return BRANCH_NAME.defaultValue();
return BRANCH.defaultValue();
}

public static Path path(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ public TableSchema commitChanges(String branch, SchemaChange... changes) throws
return commitChanges(branch, Arrays.asList(changes));
}

public TableSchema commitChanges(List<SchemaChange> changes) throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, Catalog.ColumnNotExistException {
public TableSchema commitChanges(List<SchemaChange> changes)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
return commitChanges(branch, changes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
Expand All @@ -29,6 +30,7 @@
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BranchManager;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -181,6 +183,12 @@ protected void buildSink(
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
String branch = tableConfig.get(CoreOptions.BRANCH.key());
if (branch != null) {
sinkBuilder.toBranch(branch);
} else {
sinkBuilder.toBranch(BranchManager.DEFAULT_MAIN_BRANCH);
}
sinkBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -65,6 +66,7 @@ public CdcSinkBuilder<T> withParserFactory(EventParser.Factory<T> parserFactory)

public CdcSinkBuilder<T> withTable(Table table) {
this.table = table;
this.table.options().put(CoreOptions.BRANCH.key(), BranchManager.DEFAULT_MAIN_BRANCH);
return this;
}

Expand All @@ -83,6 +85,11 @@ public CdcSinkBuilder<T> withCatalogLoader(Catalog.Loader catalogLoader) {
return this;
}

public CdcSinkBuilder<T> toBranch(String branch) {
this.table.options().put(CoreOptions.BRANCH.key(), branch);
return this;
}

public DataStreamSink<?> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -101,6 +102,12 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
if (!tables.isEmpty()) {
String branch = options.get(CoreOptions.BRANCH);
for (Table table : tables) {
table.options().put(CoreOptions.BRANCH.key(), branch);
}
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -89,6 +90,11 @@ public FlinkSinkBuilder forCompact(boolean compactSink) {
return this;
}

public FlinkSinkBuilder toBranch(String branch) {
table.options().put(CoreOptions.BRANCH.key(), branch);
return this;
}

public DataStreamSink<?> build() {
DataStream<InternalRow> input = MapToInternalRow.map(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.withBoundedInputStream(context.isBounded())
.toBranch(conf.getString(CoreOptions.BRANCH))
.build());
}

Expand Down

0 comments on commit 8e012fe

Please sign in to comment.