Skip to content

Commit

Permalink
[flink] Avoid deprecated SetupableStreamOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 27, 2024
1 parent ee466bc commit 0333afe
Show file tree
Hide file tree
Showing 22 changed files with 428 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorFactory;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
Expand Down Expand Up @@ -63,19 +63,16 @@ public class FlinkCdcMultiTableSink implements Serializable {
private final Catalog.Loader catalogLoader;
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final boolean commitChaining;
private final String commitUser;

public FlinkCdcMultiTableSink(
Catalog.Loader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
boolean commitChaining,
String commitUser) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitChaining = commitChaining;
this.commitUser = commitUser;
}

Expand Down Expand Up @@ -129,10 +126,9 @@ public DataStreamSink<?> sinkFrom(
.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
new CommitterOperatorFactory<>(
true,
false,
commitChaining,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
@Nullable private Integer parallelism;
private double committerCpu;
@Nullable private MemorySize committerMemory;
private boolean commitChaining;

// Paimon catalog used to check and create tables. There will be two
// places where this catalog is used. 1) in processing function,
Expand Down Expand Up @@ -103,7 +102,6 @@ public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
this.parallelism = options.get(FlinkConnectorOptions.SINK_PARALLELISM);
this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
this.commitChaining = options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING);
this.commitUser = createCommitUser(options);
return this;
}
Expand Down Expand Up @@ -169,7 +167,7 @@ private void buildCombinedCdcSink() {

FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
catalogLoader, committerCpu, committerMemory, commitChaining, commitUser);
catalogLoader, committerCpu, committerMemory, commitUser);
sink.sinkFrom(partitioned);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void cancel() {}
() -> FlinkCatalogFactory.createPaimonCatalog(new Options()),
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null,
true,
UUID.randomUUID().toString());
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;

Expand All @@ -31,7 +30,6 @@ public class AppendBypassCompactWorkerOperator

public AppendBypassCompactWorkerOperator(FileStoreTable table, String commitUser) {
super(table, commitUser);
this.chainingStrategy = ChainingStrategy.HEAD;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,13 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.time.Duration;
Expand All @@ -58,9 +53,7 @@
* time, tags are automatically created for each flink savepoint.
*/
public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
implements OneInputStreamOperator<CommitT, CommitT>, BoundedOneInput {
public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -256,19 +249,4 @@ public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception
public void endInput() throws Exception {
commitOperator.endInput();
}

@Override
public void setup(StreamTask containingTask, StreamConfig config, Output output) {
commitOperator.setup(containingTask, config, output);
}

@Override
public ChainingStrategy getChainingStrategy() {
return commitOperator.getChainingStrategy();
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
commitOperator.setChainingStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

import java.time.Duration;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;

/**
* {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link
* AutoTagForSavepointCommitterOperator}.
*/
public class AutoTagForSavepointCommitterOperatorFactory<CommitT, GlobalCommitT>
extends AbstractStreamOperatorFactory<CommitT>
implements OneInputStreamOperatorFactory<CommitT, CommitT> {

private final CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory;

private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;

private final SerializableSupplier<TagManager> tagManagerFactory;

private final SerializableSupplier<TagDeletion> tagDeletionFactory;

private final SerializableSupplier<List<TagCallback>> callbacksSupplier;

private final NavigableSet<Long> identifiersForTags;

private final Duration tagTimeRetained;

public AutoTagForSavepointCommitterOperatorFactory(
CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagDeletion> tagDeletionFactory,
SerializableSupplier<List<TagCallback>> callbacksSupplier,
Duration tagTimeRetained) {
this.commitOperatorFactory = commitOperatorFactory;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagDeletionFactory = tagDeletionFactory;
this.callbacksSupplier = callbacksSupplier;
this.identifiersForTags = new TreeSet<>();
this.tagTimeRetained = tagTimeRetained;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommitT>> T createStreamOperator(
StreamOperatorParameters<CommitT> parameters) {
return (T)
new AutoTagForSavepointCommitterOperator<>(
commitOperatorFactory.createStreamOperator(parameters),
snapshotManagerFactory,
tagManagerFactory,
tagDeletionFactory,
callbacksSupplier,
tagTimeRetained);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return AutoTagForSavepointCommitterOperator.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,13 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.time.Instant;
Expand All @@ -53,9 +48,7 @@
* completed, the corresponding tag is generated.
*/
public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
implements OneInputStreamOperator<CommitT, CommitT>, BoundedOneInput {

private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";

Expand Down Expand Up @@ -250,19 +243,4 @@ public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception
public void endInput() throws Exception {
commitOperator.endInput();
}

@Override
public void setup(StreamTask containingTask, StreamConfig config, Output output) {
commitOperator.setup(containingTask, config, output);
}

@Override
public ChainingStrategy getChainingStrategy() {
return commitOperator.getChainingStrategy();
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
commitOperator.setChainingStrategy(strategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

/**
* {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link
* BatchWriteGeneratorTagOperator}.
*/
public class BatchWriteGeneratorTagOperatorFactory<CommitT, GlobalCommitT>
extends AbstractStreamOperatorFactory<CommitT>
implements OneInputStreamOperatorFactory<CommitT, CommitT> {
private final CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory;

protected final FileStoreTable table;

public BatchWriteGeneratorTagOperatorFactory(
CommitterOperatorFactory<CommitT, GlobalCommitT> commitOperatorFactory,
FileStoreTable table) {
this.table = table;
this.commitOperatorFactory = commitOperatorFactory;
}

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommitT>> T createStreamOperator(
StreamOperatorParameters<CommitT> parameters) {
return (T)
new BatchWriteGeneratorTagOperator<>(
commitOperatorFactory.createStreamOperator(parameters), table);
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return BatchWriteGeneratorTagOperator.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,17 @@ protected DataStreamSink<?> doCommit(
.transform(
GLOBAL_COMMITTER_NAME,
new MultiTableCommittableTypeInfo(),
new CommitterOperator<>(
new CommitterOperatorFactory<>(
streamingCheckpointEnabled,
false,
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
commitUser,
createCommitterFactory(isStreaming),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)))
.setParallelism(written.getParallelism());
if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) {
committed = committed.startNewChain();
}
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Expand Down
Loading

0 comments on commit 0333afe

Please sign in to comment.