Skip to content

Commit

Permalink
Fix local merge test and overridden setup
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 1, 2024
1 parent e47862c commit 833100e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;

import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -113,26 +110,14 @@ private static class FileIndexModificationOperator

private static final long serialVersionUID = 1L;

private final FileStoreTable table;

private transient FileIndexProcessor fileIndexProcessor;
private transient List<CommitMessage> messages;
private final transient FileIndexProcessor fileIndexProcessor;
private final transient List<CommitMessage> messages;

private FileIndexModificationOperator(
StreamOperatorParameters<Committable> parameters,
Options options,
FileStoreTable table) {
super(parameters, options);
this.table = table;
}

@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Committable>> output) {
super.setup(containingTask, config, output);

this.fileIndexProcessor = new FileIndexProcessor(table);
this.messages = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -72,14 +69,6 @@ protected RowDataStoreWriteOperator(
String initialCommitUser) {
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
this.logSinkFunction = logSinkFunction;
}

@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Committable>> output) {
super.setup(containingTask, config, output);
if (logSinkFunction != null) {
FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.OutputTag;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -151,7 +157,17 @@ private void prepareHashOperator(Map<String, String> options) throws Exception {
Collections.singletonList("f0"),
options,
null);
operator = new LocalMergeOperator.Factory(schema).createStreamOperator(null);
operator =
new LocalMergeOperator.Factory(schema)
.createStreamOperator(
new StreamOperatorParameters<>(
new SourceOperatorStreamTask<Integer>(
new DummyEnvironment()),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()),
null,
null,
null));
operator.open();
assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class);
}
Expand Down

0 comments on commit 833100e

Please sign in to comment.