Skip to content

Commit

Permalink
fix concurrent partitionindex increment
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqiangczq committed Aug 28, 2023
1 parent 1a3a113 commit 27b1689
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -44,7 +45,7 @@ public class CelebornBufferStream {
private int subIndexStart;
private int subIndexEnd;
private TransportClient client;
private int currentLocationIndex = 0;
private AtomicInteger currentLocationIndex = new AtomicInteger(0);
private long streamId = 0;
private FlinkShuffleClientImpl mapShuffleClient;
private boolean isClosed;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void moveToNextPartitionIfPossible(long endedStreamId) {
endedStreamId,
currentLocationIndex,
streamId);
if (currentLocationIndex > 0) {
if (currentLocationIndex.get() > 0) {
if (endedStreamId == streamId) {
logger.debug("Get end streamId {}", endedStreamId);
cleanStream(endedStreamId);
Expand All @@ -165,10 +166,10 @@ public void moveToNextPartitionIfPossible(long endedStreamId) {
return;
}
}
if (currentLocationIndex < locations.length) {
if (currentLocationIndex.get() < locations.length) {
try {
openStreamInternal();
currentLocationIndex++;
currentLocationIndex.incrementAndGet();
} catch (Exception e) {
logger.warn("Failed to open stream and report to flink framework. ", e);
messageConsumer.accept(new TransportableError(0L, e));
Expand All @@ -179,9 +180,9 @@ public void moveToNextPartitionIfPossible(long endedStreamId) {
private void openStreamInternal() throws IOException, InterruptedException {
this.client =
clientFactory.createClientWithRetry(
locations[currentLocationIndex].getHost(),
locations[currentLocationIndex].getFetchPort());
String fileName = locations[currentLocationIndex].getFileName();
locations[currentLocationIndex.get()].getHost(),
locations[currentLocationIndex.get()].getFetchPort());
String fileName = locations[currentLocationIndex.get()].getFileName();
TransportMessage openStream =
new TransportMessage(
MessageType.OPEN_STREAM,
Expand Down

0 comments on commit 27b1689

Please sign in to comment.