Skip to content

Commit

Permalink
[flink] Adopt getTaskInfo() when acquiring parallelism info
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 25, 2024
1 parent be24886 commit cf33fe5
Show file tree
Hide file tree
Showing 17 changed files with 201 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;

/** Utility methods about Flink runtime context to resolve compatibility issues. */
public class RuntimeContextUtils {
public static int getNumberOfParallelSubtasks(RuntimeContext context) {
return context.getNumberOfParallelSubtasks();
}

public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getIndexOfThisSubtask();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;

/** Utility methods about Flink runtime context to resolve compatibility issues. */
public class RuntimeContextUtils {
public static int getNumberOfParallelSubtasks(RuntimeContext context) {
return context.getNumberOfParallelSubtasks();
}

public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getIndexOfThisSubtask();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;

/** Utility methods about Flink runtime context to resolve compatibility issues. */
public class RuntimeContextUtils {
public static int getNumberOfParallelSubtasks(RuntimeContext context) {
return context.getNumberOfParallelSubtasks();
}

public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getIndexOfThisSubtask();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;

/** Utility methods about Flink runtime context to resolve compatibility issues. */
public class RuntimeContextUtils {
public static int getNumberOfParallelSubtasks(RuntimeContext context) {
return context.getNumberOfParallelSubtasks();
}

public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getIndexOfThisSubtask();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.service.network.NetworkUtils;
Expand Down Expand Up @@ -77,8 +78,8 @@ public void initializeState(StateInitializationContext context) throws Exception
this.query = ((FileStoreTable) table).newLocalTableQuery().withIOManager(ioManager);
KvQueryServer server =
new KvQueryServer(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
NetworkUtils.findHostAddress(),
Collections.singletonList(0).iterator(),
1,
Expand All @@ -96,8 +97,9 @@ public void initializeState(StateInitializationContext context) throws Exception
this.output.collect(
new StreamRecord<>(
GenericRow.of(
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
RuntimeContextUtils.getNumberOfParallelSubtasks(
getRuntimeContext()),
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
BinaryString.fromString(address.getHostName()),
address.getPort())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.runtime.state.StateInitializationContext;
Expand Down Expand Up @@ -129,7 +130,9 @@ public void initializeState(StateInitializationContext context) throws Exception
super.initializeState(context);

Preconditions.checkArgument(
!forceSingleParallelism || getRuntimeContext().getNumberOfParallelSubtasks() == 1,
!forceSingleParallelism
|| RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())
== 1,
"Committer Operator parallelism in paimon MUST be one.");

this.currentWatermark = Long.MIN_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.index.SimpleHashBucketAssigner;
Expand Down Expand Up @@ -76,8 +77,8 @@ public void initializeState(StateInitializationContext context) throws Exception
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class, initialCommitUser);

int numberTasks = getRuntimeContext().getNumberOfParallelSubtasks();
int taskId = getRuntimeContext().getIndexOfThisSubtask();
int numberTasks = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
this.assigner =
overwrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -109,8 +110,10 @@ public void initializeState(StateInitializationContext context) throws Exception
ChannelComputer.select(
partition,
bucket,
getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask());
RuntimeContextUtils.getNumberOfParallelSubtasks(
getRuntimeContext()))
== RuntimeContextUtils.getIndexOfThisSubtask(
getRuntimeContext()));

tables = new HashMap<>();
writes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -92,8 +93,10 @@ public void initializeState(StateInitializationContext context) throws Exception
ChannelComputer.select(
partition,
bucket,
getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask());
RuntimeContextUtils.getNumberOfParallelSubtasks(
getRuntimeContext()))
== RuntimeContextUtils.getIndexOfThisSubtask(
getRuntimeContext()));
write =
storeSinkWriteProvider.provide(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
Expand Down Expand Up @@ -58,14 +59,14 @@ public void initializeState(StateInitializationContext context) throws Exception
super.initializeState(context);

boolean containLogSystem = containLogSystem();
int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
int numTasks = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
StateValueFilter stateFilter =
(tableName, partition, bucket) -> {
int task =
containLogSystem
? ChannelComputer.select(bucket, numTasks)
: ChannelComputer.select(partition, bucket, numTasks);
return task == getRuntimeContext().getIndexOfThisSubtask();
return task == RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
};

state = createState(context, stateFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.crosspartition.KeyPartOrRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.table.Table;

import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -59,8 +60,8 @@ public void initializeState(StateInitializationContext context) throws Exception
assigner.open(
computeManagedMemory(this),
ioManager,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
this::collect);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.crosspartition.KeyPartOrRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.utils.SerializableFunction;

import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -50,8 +51,8 @@ public IndexBootstrapOperator(
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
bootstrap.bootstrap(
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
this::collect);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -79,7 +80,8 @@ public SortOperator(
public void open() throws Exception {
super.open();
initBuffer();
if (sinkParallelism != getRuntimeContext().getNumberOfParallelSubtasks()) {
if (sinkParallelism
!= RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext())) {
throw new IllegalArgumentException(
"Please ensure that the runtime parallelism of the sink matches the initial configuration "
+ "to avoid potential issues with skewed range partitioning.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;

Expand Down Expand Up @@ -67,7 +68,7 @@ public AppendBypassCoordinateOperator(
public void open() throws Exception {
super.open();
checkArgument(
getRuntimeContext().getNumberOfParallelSubtasks() == 1,
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
"Compaction Coordinator parallelism in paimon MUST be one.");
long intervalMs = table.coreOptions().continuousDiscoveryInterval().toMillis();
this.compactTasks = new LinkedBlockingQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void open(Configuration parameters) throws Exception {
compactionCoordinator =
new UnawareAppendTableCompactionCoordinator(table, streaming, filter);
Preconditions.checkArgument(
this.getRuntimeContext().getNumberOfParallelSubtasks() == 1,
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
"Compaction Operator parallelism in paimon MUST be one.");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;

/** Utility methods about Flink runtime context to resolve compatibility issues. */
public class RuntimeContextUtils {
public static int getNumberOfParallelSubtasks(RuntimeContext context) {
return context.getTaskInfo().getNumberOfParallelSubtasks();
}

public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getTaskInfo().getIndexOfThisSubtask();
}
}
Loading

0 comments on commit cf33fe5

Please sign in to comment.