Skip to content

Commit

Permalink
[flink] Avoid deprecated SetupableStreamOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 26, 2024
1 parent ee466bc commit 2f0b1b5
Show file tree
Hide file tree
Showing 20 changed files with 427 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
Expand Down Expand Up @@ -129,10 +129,9 @@ public DataStreamSink<?> sinkFrom(
.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
new CommitterOperatorFactory<>(
true,
false,
commitChaining,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;

Expand All @@ -31,7 +30,6 @@ public class AppendBypassCompactWorkerOperator

public AppendBypassCompactWorkerOperator(FileStoreTable table, String commitUser) {
super(table, commitUser);
this.chainingStrategy = ChainingStrategy.HEAD;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,13 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.time.Duration;
Expand All @@ -58,9 +53,7 @@
* time, tags are automatically created for each flink savepoint.
*/
public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
implements OneInputStreamOperator<CommitT, CommitT>, BoundedOneInput {
public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -256,19 +249,4 @@ public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception
public void endInput() throws Exception {
commitOperator.endInput();
}

@Override
public void setup(StreamTask containingTask, StreamConfig config, Output output) {
commitOperator.setup(containingTask, config, output);
}

@Override
public ChainingStrategy getChainingStrategy() {
return commitOperator.getChainingStrategy();
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
commitOperator.setChainingStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

import java.time.Duration;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;

/**
* {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link
* AutoTagForSavepointCommitterOperator}.
*/
public class AutoTagForSavepointCommitterOperatorFactory<CommitT, GlobalCommitT>
extends AbstractStreamOperatorFactory<CommitT>
implements OneInputStreamOperatorFactory<CommitT, CommitT> {

private final CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory;

private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;

private final SerializableSupplier<TagManager> tagManagerFactory;

private final SerializableSupplier<TagDeletion> tagDeletionFactory;

private final SerializableSupplier<List<TagCallback>> callbacksSupplier;

private final NavigableSet<Long> identifiersForTags;

private final Duration tagTimeRetained;

public AutoTagForSavepointCommitterOperatorFactory(
CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagDeletion> tagDeletionFactory,
SerializableSupplier<List<TagCallback>> callbacksSupplier,
Duration tagTimeRetained) {
this.commitOperatorFactory = commitOperatorFactory;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagDeletionFactory = tagDeletionFactory;
this.callbacksSupplier = callbacksSupplier;
this.identifiersForTags = new TreeSet<>();
this.tagTimeRetained = tagTimeRetained;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommitT>> T createStreamOperator(
StreamOperatorParameters<CommitT> parameters) {
return (T)
new AutoTagForSavepointCommitterOperator<>(
commitOperatorFactory.createStreamOperator(parameters),
snapshotManagerFactory,
tagManagerFactory,
tagDeletionFactory,
callbacksSupplier,
tagTimeRetained);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return AutoTagForSavepointCommitterOperator.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,13 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.time.Instant;
Expand All @@ -53,9 +48,7 @@
* completed, the corresponding tag is generated.
*/
public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
implements OneInputStreamOperator<CommitT, CommitT>, BoundedOneInput {

private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";

Expand Down Expand Up @@ -250,19 +243,4 @@ public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception
public void endInput() throws Exception {
commitOperator.endInput();
}

@Override
public void setup(StreamTask containingTask, StreamConfig config, Output output) {
commitOperator.setup(containingTask, config, output);
}

@Override
public ChainingStrategy getChainingStrategy() {
return commitOperator.getChainingStrategy();
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
commitOperator.setChainingStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

/**
* {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link
* BatchWriteGeneratorTagOperator}.
*/
public class BatchWriteGeneratorTagOperatorFactory<CommitT, GlobalCommitT>
extends AbstractStreamOperatorFactory<CommitT>
implements OneInputStreamOperatorFactory<CommitT, CommitT> {
private final CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory;

protected final FileStoreTable table;

public BatchWriteGeneratorTagOperatorFactory(
CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory,
FileStoreTable table) {
this.table = table;
this.commitOperatorFactory = commitOperatorFactory;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommitT>> T createStreamOperator(
StreamOperatorParameters<CommitT> parameters) {
return (T)
new BatchWriteGeneratorTagOperator<>(
commitOperatorFactory.createStreamOperator(parameters), table);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return BatchWriteGeneratorTagOperator.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ protected DataStreamSink<?> doCommit(
.transform(
GLOBAL_COMMITTER_NAME,
new MultiTableCommittableTypeInfo(),
new CommitterOperator<>(
new CommitterOperatorFactory<>(
streamingCheckpointEnabled,
false,
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
commitUser,
createCommitterFactory(isStreaming),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)))
.setParallelism(written.getParallelism());
if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
committed = committed.startNewChain();
}
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

Expand Down Expand Up @@ -91,26 +91,9 @@ public class CommitterOperator<CommitT, GlobalCommitT> extends AbstractStreamOpe
private final Long endInputWatermark;

public CommitterOperator(
StreamOperatorParameters<CommitT> parameters,
boolean streamingCheckpointEnabled,
boolean forceSingleParallelism,
boolean chaining,
String initialCommitUser,
Committer.Factory<CommitT, GlobalCommitT> committerFactory,
CommittableStateManager<GlobalCommitT> committableStateManager) {
this(
streamingCheckpointEnabled,
forceSingleParallelism,
chaining,
initialCommitUser,
committerFactory,
committableStateManager,
null);
}

public CommitterOperator(
boolean streamingCheckpointEnabled,
boolean forceSingleParallelism,
boolean chaining,
String initialCommitUser,
Committer.Factory<CommitT, GlobalCommitT> committerFactory,
CommittableStateManager<GlobalCommitT> committableStateManager,
Expand All @@ -122,7 +105,10 @@ public CommitterOperator(
this.committerFactory = checkNotNull(committerFactory);
this.committableStateManager = committableStateManager;
this.endInputWatermark = endInputWatermark;
setChainingStrategy(chaining ? ChainingStrategy.ALWAYS : ChainingStrategy.HEAD);
this.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
}

@Override
Expand Down
Loading

0 comments on commit 2f0b1b5

Please sign in to comment.