From cf33fe5fd9df0d38668654a82a83ef0fe0ac5cab Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 18 Nov 2024 11:33:08 +0800 Subject: [PATCH] [flink] Adopt getTaskInfo() when acquiring parallelism info --- .../flink/utils/RuntimeContextUtils.java | 32 +++++++++++++++++++ .../flink/utils/RuntimeContextUtils.java | 32 +++++++++++++++++++ .../flink/utils/RuntimeContextUtils.java | 32 +++++++++++++++++++ .../flink/utils/RuntimeContextUtils.java | 32 +++++++++++++++++++ .../flink/service/QueryExecutorOperator.java | 10 +++--- .../paimon/flink/sink/CommitterOperator.java | 5 ++- .../sink/HashBucketAssignerOperator.java | 5 +-- .../sink/MultiTablesStoreCompactOperator.java | 7 ++-- .../flink/sink/StoreCompactOperator.java | 7 ++-- .../paimon/flink/sink/TableWriteOperator.java | 5 +-- .../index/GlobalIndexAssignerOperator.java | 5 +-- .../sink/index/IndexBootstrapOperator.java | 5 +-- .../paimon/flink/sorter/SortOperator.java | 4 ++- .../AppendBypassCoordinateOperator.java | 3 +- .../source/BucketUnawareCompactSource.java | 3 +- .../flink/utils/RuntimeContextUtils.java | 32 +++++++++++++++++++ .../UnawareBucketAppendOnlyTableITCase.java | 3 +- 17 files changed, 201 insertions(+), 21 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java new file mode 100644 index 000000000000..460fea55ad7a --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** Utility methods about Flink runtime context to resolve compatibility issues. */ +public class RuntimeContextUtils { + public static int getNumberOfParallelSubtasks(RuntimeContext context) { + return context.getNumberOfParallelSubtasks(); + } + + public static int getIndexOfThisSubtask(RuntimeContext context) { + return context.getIndexOfThisSubtask(); + } +} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java new file mode 100644 index 000000000000..460fea55ad7a --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** Utility methods about Flink runtime context to resolve compatibility issues. */ +public class RuntimeContextUtils { + public static int getNumberOfParallelSubtasks(RuntimeContext context) { + return context.getNumberOfParallelSubtasks(); + } + + public static int getIndexOfThisSubtask(RuntimeContext context) { + return context.getIndexOfThisSubtask(); + } +} diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java new file mode 100644 index 000000000000..460fea55ad7a --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** Utility methods about Flink runtime context to resolve compatibility issues. */ +public class RuntimeContextUtils { + public static int getNumberOfParallelSubtasks(RuntimeContext context) { + return context.getNumberOfParallelSubtasks(); + } + + public static int getIndexOfThisSubtask(RuntimeContext context) { + return context.getIndexOfThisSubtask(); + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java new file mode 100644 index 000000000000..460fea55ad7a --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** Utility methods about Flink runtime context to resolve compatibility issues. */ +public class RuntimeContextUtils { + public static int getNumberOfParallelSubtasks(RuntimeContext context) { + return context.getNumberOfParallelSubtasks(); + } + + public static int getIndexOfThisSubtask(RuntimeContext context) { + return context.getIndexOfThisSubtask(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java index 556c30839688..bf0521d55049 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.service.network.NetworkUtils; @@ -77,8 +78,8 @@ public void initializeState(StateInitializationContext context) throws Exception this.query = ((FileStoreTable) table).newLocalTableQuery().withIOManager(ioManager); KvQueryServer server = new KvQueryServer( - getRuntimeContext().getIndexOfThisSubtask(), - getRuntimeContext().getNumberOfParallelSubtasks(), + RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()), + RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()), NetworkUtils.findHostAddress(), Collections.singletonList(0).iterator(), 1, @@ -96,8 +97,9 @@ public void initializeState(StateInitializationContext context) throws Exception this.output.collect( new StreamRecord<>( GenericRow.of( - getRuntimeContext().getNumberOfParallelSubtasks(), - getRuntimeContext().getIndexOfThisSubtask(), + RuntimeContextUtils.getNumberOfParallelSubtasks( + getRuntimeContext()), + RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()), BinaryString.fromString(address.getHostName()), address.getPort()))); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 2ec90b8c6c40..021a5db413d5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.utils.Preconditions; import org.apache.flink.runtime.state.StateInitializationContext; @@ -129,7 +130,9 @@ public void initializeState(StateInitializationContext context) throws Exception super.initializeState(context); Preconditions.checkArgument( - !forceSingleParallelism || getRuntimeContext().getNumberOfParallelSubtasks() == 1, + !forceSingleParallelism + || RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) + == 1, "Committer Operator parallelism in paimon MUST be one."); this.currentWatermark = Long.MIN_VALUE; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java index 70fac7a83e93..0c101c6d1e01 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.flink.ProcessRecordAttributesUtil; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.index.BucketAssigner; import org.apache.paimon.index.HashBucketAssigner; import org.apache.paimon.index.SimpleHashBucketAssigner; @@ -76,8 +77,8 @@ public void initializeState(StateInitializationContext context) throws Exception StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); - int numberTasks = getRuntimeContext().getNumberOfParallelSubtasks(); - int taskId = getRuntimeContext().getIndexOfThisSubtask(); + int numberTasks = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()); + int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); this.assigner = overwrite diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 7cb5d30c2f8e..8a1d3a02df81 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.options.Options; @@ -109,8 +110,10 @@ public void initializeState(StateInitializationContext context) throws Exception ChannelComputer.select( partition, bucket, - getRuntimeContext().getNumberOfParallelSubtasks()) - == getRuntimeContext().getIndexOfThisSubtask()); + RuntimeContextUtils.getNumberOfParallelSubtasks( + getRuntimeContext())) + == RuntimeContextUtils.getIndexOfThisSubtask( + getRuntimeContext())); tables = new HashMap<>(); writes = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 9b152a81ca22..ac10345bc425 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.options.Options; @@ -92,8 +93,10 @@ public void initializeState(StateInitializationContext context) throws Exception ChannelComputer.select( partition, bucket, - getRuntimeContext().getNumberOfParallelSubtasks()) - == getRuntimeContext().getIndexOfThisSubtask()); + RuntimeContextUtils.getNumberOfParallelSubtasks( + getRuntimeContext())) + == RuntimeContextUtils.getIndexOfThisSubtask( + getRuntimeContext())); write = storeSinkWriteProvider.provide( table, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 67b4720e2964..32fcdd03bdfd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.ProcessRecordAttributesUtil; import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; @@ -58,14 +59,14 @@ public void initializeState(StateInitializationContext context) throws Exception super.initializeState(context); boolean containLogSystem = containLogSystem(); - int numTasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int numTasks = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()); StateValueFilter stateFilter = (tableName, partition, bucket) -> { int task = containLogSystem ? ChannelComputer.select(bucket, numTasks) : ChannelComputer.select(partition, bucket, numTasks); - return task == getRuntimeContext().getIndexOfThisSubtask(); + return task == RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); }; state = createState(context, stateFilter); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java index 7fee3f45f3db..99cce07fdc57 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java @@ -22,6 +22,7 @@ import org.apache.paimon.crosspartition.KeyPartOrRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.table.Table; import org.apache.flink.api.java.tuple.Tuple2; @@ -59,8 +60,8 @@ public void initializeState(StateInitializationContext context) throws Exception assigner.open( computeManagedMemory(this), ioManager, - getRuntimeContext().getNumberOfParallelSubtasks(), - getRuntimeContext().getIndexOfThisSubtask(), + RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()), + RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()), this::collect); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java index 501e35dff46c..5c8ba8f9441f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.crosspartition.IndexBootstrap; import org.apache.paimon.crosspartition.KeyPartOrRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.utils.SerializableFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -50,8 +51,8 @@ public IndexBootstrapOperator( public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); bootstrap.bootstrap( - getRuntimeContext().getNumberOfParallelSubtasks(), - getRuntimeContext().getIndexOfThisSubtask(), + RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()), + RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()), this::collect); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java index d4d5dd741681..b6847125fbc6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.options.MemorySize; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.types.RowType; @@ -79,7 +80,8 @@ public SortOperator( public void open() throws Exception { super.open(); initBuffer(); - if (sinkParallelism != getRuntimeContext().getNumberOfParallelSubtasks()) { + if (sinkParallelism + != RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())) { throw new IllegalArgumentException( "Please ensure that the runtime parallelism of the sink matches the initial configuration " + "to avoid potential issues with skewed range partitioning."); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java index 668aa24c145d..45090f7b68b4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java @@ -20,6 +20,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask; import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.ExecutorUtils; @@ -67,7 +68,7 @@ public AppendBypassCoordinateOperator( public void open() throws Exception { super.open(); checkArgument( - getRuntimeContext().getNumberOfParallelSubtasks() == 1, + RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1, "Compaction Coordinator parallelism in paimon MUST be one."); long intervalMs = table.coreOptions().continuousDiscoveryInterval().toMillis(); this.compactTasks = new LinkedBlockingQueue<>(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java index d306c7d8e1e5..4ddb76e101aa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java @@ -21,6 +21,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask; import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator; import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.EndOfScanException; @@ -77,7 +78,7 @@ public void open(Configuration parameters) throws Exception { compactionCoordinator = new UnawareAppendTableCompactionCoordinator(table, streaming, filter); Preconditions.checkArgument( - this.getRuntimeContext().getNumberOfParallelSubtasks() == 1, + RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1, "Compaction Operator parallelism in paimon MUST be one."); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java new file mode 100644 index 000000000000..34e0d041b6a0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; + +/** Utility methods about Flink runtime context to resolve compatibility issues. */ +public class RuntimeContextUtils { + public static int getNumberOfParallelSubtasks(RuntimeContext context) { + return context.getTaskInfo().getNumberOfParallelSubtasks(); + } + + public static int getIndexOfThisSubtask(RuntimeContext context) { + return context.getTaskInfo().getIndexOfThisSubtask(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java index cb323542d4c1..f6dfb1b23046 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.reader.RecordReader; @@ -395,7 +396,7 @@ private TestStatelessWriterSource(FileStoreTable table) { @Override public void run(SourceContext sourceContext) throws Exception { - int taskId = getRuntimeContext().getIndexOfThisSubtask(); + int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); // wait some time in parallelism #2, // so that it does not commit in the same checkpoint with parallelism #1 int waitCount = (taskId == 0 ? 0 : 10);