diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index 8cd1d238b23f..d9f863c6b919 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -45,13 +45,10 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import javax.annotation.Nullable; @@ -113,26 +110,14 @@ private static class FileIndexModificationOperator private static final long serialVersionUID = 1L; - private final FileStoreTable table; - - private transient FileIndexProcessor fileIndexProcessor; - private transient List messages; + private final transient FileIndexProcessor fileIndexProcessor; + private final transient List messages; private FileIndexModificationOperator( StreamOperatorParameters parameters, Options options, FileStoreTable table) { super(parameters, options); - this.table = table; - } - - @Override - public void setup( - StreamTask containingTask, - StreamConfig config, - Output> output) { - super.setup(containingTask, config, output); - this.fileIndexProcessor = new FileIndexProcessor(table); this.messages = new ArrayList<>(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java index 3c93f5d409ce..8009bec9677f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java @@ -32,16 +32,13 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import javax.annotation.Nullable; @@ -72,14 +69,6 @@ protected RowDataStoreWriteOperator( String initialCommitUser) { super(parameters, table, storeSinkWriteProvider, initialCommitUser); this.logSinkFunction = logSinkFunction; - } - - @Override - public void setup( - StreamTask containingTask, - StreamConfig config, - Output> output) { - super.setup(containingTask, config, output); if (logSinkFunction != null) { FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java index e7d98d3a8c65..fc45eceb3fd5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java @@ -26,12 +26,18 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.util.OutputTag; import org.junit.jupiter.api.Test; @@ -151,7 +157,17 @@ private void prepareHashOperator(Map options) throws Exception { Collections.singletonList("f0"), options, null); - operator = new LocalMergeOperator.Factory(schema).createStreamOperator(null); + operator = + new LocalMergeOperator.Factory(schema) + .createStreamOperator( + new StreamOperatorParameters<>( + new SourceOperatorStreamTask( + new DummyEnvironment()), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + null, + null, + null)); operator.open(); assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class); }