Skip to content

Commit

Permalink
[flink] Avoid deprecated SetupableStreamOperator (#4591)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Dec 1, 2024
1 parent 475e487 commit 4c6c557
Show file tree
Hide file tree
Showing 55 changed files with 1,397 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.paimon.table.sink.KeyAndBucketExtractor;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */
public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase<CdcRecord> {
Expand All @@ -42,8 +42,8 @@ protected KeyAndBucketExtractor<CdcRecord> createExtractor(TableSchema schema) {
}

@Override
protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator(table, writeProvider, commitUser);
protected OneInputStreamOperatorFactory<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -43,11 +47,12 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc

private final long retrySleepMillis;

public CdcDynamicBucketWriteOperator(
private CdcDynamicBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -85,4 +90,30 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcDynamicBucketWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<Tuple2<CdcRecord, Integer>> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcDynamicBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcDynamicBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

/**
* A {@link FlinkSink} for fixed-bucket table which accepts {@link CdcRecord} and waits for a schema
Expand All @@ -39,8 +39,8 @@ public CdcFixedBucketSink(FileStoreTable table) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcRecordStoreWriteOperator(table, writeProvider, commitUser);
return new CdcRecordStoreWriteOperator.Factory(table, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand Down Expand Up @@ -74,12 +77,13 @@ public class CdcRecordStoreMultiWriteOperator
private String commitUser;
private ExecutorService compactExecutor;

public CdcRecordStoreMultiWriteOperator(
private CdcRecordStoreMultiWriteOperator(
StreamOperatorParameters<MultiTableCommittable> parameters,
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
super(parameters, options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
Expand Down Expand Up @@ -254,4 +258,42 @@ public Map<Identifier, StoreSinkWrite> writes() {
public String commitUser() {
return commitUser;
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreMultiWriteOperator}. */
public static class Factory
extends PrepareCommitOperator.Factory<CdcMultiplexRecord, MultiTableCommittable> {
private final StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider;
private final String initialCommitUser;
private final Catalog.Loader catalogLoader;

public Factory(
Catalog.Loader catalogLoader,
StoreSinkWrite.WithWriteBufferProvider storeSinkWriteProvider,
String initialCommitUser,
Options options) {
super(options);
this.catalogLoader = catalogLoader;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<MultiTableCommittable>> T createStreamOperator(
StreamOperatorParameters<MultiTableCommittable> parameters) {
return (T)
new CdcRecordStoreMultiWriteOperator(
parameters,
catalogLoader,
storeSinkWriteProvider,
initialCommitUser,
options);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreMultiWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
Expand All @@ -27,6 +28,9 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -50,11 +54,12 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {

private final long retrySleepMillis;

public CdcRecordStoreWriteOperator(
protected CdcRecordStoreWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}
Expand Down Expand Up @@ -92,4 +97,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
throw new IOException(e);
}
}

/** {@link StreamOperatorFactory} of {@link CdcRecordStoreWriteOperator}. */
public static class Factory extends TableWriteOperator.Factory<CdcRecord> {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcRecordStoreWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcRecordStoreWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand All @@ -42,9 +42,9 @@ public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser);
return new CdcUnawareBucketWriteOperator.Factory(table, writeProvider, commitUser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;

import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */
public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator {

public CdcUnawareBucketWriteOperator(
private CdcUnawareBucketWriteOperator(
StreamOperatorParameters<Committable> parameters,
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
Expand All @@ -42,4 +47,30 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
super.processElement(element);
}
}

/** {@link StreamOperatorFactory} of {@link CdcUnawareBucketWriteOperator}. */
public static class Factory extends CdcRecordStoreWriteOperator.Factory {

public Factory(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Committable>> T createStreamOperator(
StreamOperatorParameters<Committable> parameters) {
return (T)
new CdcUnawareBucketWriteOperator(
parameters, table, storeSinkWriteProvider, initialCommitUser);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return CdcUnawareBucketWriteOperator.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
Expand All @@ -41,7 +41,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;

Expand All @@ -63,19 +63,16 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final Catalog.Loader catalogLoader;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final boolean commitChaining;
private final String commitUser;

public FlinkCdcMultiTableSink(
Catalog.Loader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
boolean commitChaining,
String commitUser) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitChaining = commitChaining;
this.commitUser = commitUser;
}

Expand Down Expand Up @@ -129,10 +126,9 @@ public DataStreamSink<?> sinkFrom(
.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
new CommitterOperatorFactory<>(
true,
false,
commitChaining,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
Expand All @@ -141,9 +137,10 @@ public DataStreamSink<?> sinkFrom(
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator(
protected OneInputStreamOperatorFactory<CdcMultiplexRecord, MultiTableCommittable>
createWriteOperator(
StoreSinkWrite.WithWriteBufferProvider writeProvider, String commitUser) {
return new CdcRecordStoreMultiWriteOperator.Factory(
catalogLoader, writeProvider, commitUser, new Options());
}

Expand Down
Loading

0 comments on commit 4c6c557

Please sign in to comment.