From 1a3a1138717fb416d230d99d4e7b51971fb886f4 Mon Sep 17 00:00:00 2001 From: "zhongqiang.czq" Date: Mon, 28 Aug 2023 21:02:34 +0800 Subject: [PATCH] fix code sytle --- .../plugin/flink/readclient/CelebornBufferStream.java | 7 ++++++- .../flink/readclient/FlinkShuffleClientImpl.java | 6 +++++- .../apache/celeborn/tests/flink/WordCountHelper.java | 10 +++++----- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java index ded2bd84f61..bc72af8a634 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java @@ -147,7 +147,12 @@ public void close() { } public void moveToNextPartitionIfPossible(long endedStreamId) { - logger.debug("this:{}, moveToNextPartitionIfPossible endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}", this, endedStreamId, currentLocationIndex, streamId); + logger.debug( + "this:{}, moveToNextPartitionIfPossible endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}", + this, + endedStreamId, + currentLocationIndex, + streamId); if (currentLocationIndex > 0) { if (endedStreamId == streamId) { logger.debug("Get end streamId {}", endedStreamId); diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index 3962e9cd52d..99271689422 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -151,7 +151,11 @@ public CelebornBufferStream readBufferedPartition( PartitionLocation[] partitionLocations = fileGroups.partitionGroups.get(partitionId).toArray(new PartitionLocation[0]); Arrays.sort(partitionLocations, Comparator.comparingInt(PartitionLocation::getEpoch)); - logger.debug("readBufferedPartition shuffleKey:{} partitionid:{} partitionLocation:{}", shuffleKey, partitionId, partitionLocations); + logger.debug( + "readBufferedPartition shuffleKey:{} partitionid:{} partitionLocation:{}", + shuffleKey, + partitionId, + partitionLocations); return CelebornBufferStream.create( this, flinkTransportClientFactory, diff --git a/tests/flink-it/src/test/java/org/apache/celeborn/tests/flink/WordCountHelper.java b/tests/flink-it/src/test/java/org/apache/celeborn/tests/flink/WordCountHelper.java index aa1d72c25f8..1018f452ad1 100644 --- a/tests/flink-it/src/test/java/org/apache/celeborn/tests/flink/WordCountHelper.java +++ b/tests/flink-it/src/test/java/org/apache/celeborn/tests/flink/WordCountHelper.java @@ -97,11 +97,11 @@ public VerifySink(long wordCount) { @Override public void invoke(Long value, Context context) { -// try { -// Thread.sleep(100 *1000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } + // try { + // Thread.sleep(100 *1000); + // } catch (InterruptedException e) { + // e.printStackTrace(); + // } assertEquals(wordCount, value); } }