From 17bf715246a961163a1f94fad9ed39a98fa759f4 Mon Sep 17 00:00:00 2001 From: okumin Date: Tue, 28 Nov 2023 00:26:52 +0900 Subject: [PATCH] TEZ-4521: Partition stats should be always uncompressed size --- .../vertexmanager/ShuffleVertexManagerBase.java | 2 ++ .../library/common/sort/impl/ExternalSorter.java | 1 + .../library/common/sort/impl/PipelinedSorter.java | 6 +++--- .../common/sort/impl/dflt/DefaultSorter.java | 6 +++--- .../TestShuffleVertexManagerUtils.java | 15 +++------------ 5 files changed, 12 insertions(+), 18 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java index c6264fb2f6..1d55c71944 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java @@ -130,7 +130,9 @@ static class SourceVertexInfo { final BitSet finishedTaskSet; int numTasks; int numVMEventsReceived; + // The total uncompressed size long outputSize; + // The uncompressed size of each partition. The size might not be precise int[] statsInMB; EdgeManagerPluginDescriptor newDescriptor; diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index 758c069799..232d964307 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -131,6 +131,7 @@ public void progress() { protected final boolean cleanup; protected OutputStatisticsReporter statsReporter; + // uncompressed size for each partition protected final long[] partitionStats; protected final boolean finalMergeEnabled; protected final boolean sendEmptyPartitionDetails; diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 08786c9b2c..067dcca0c6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -622,7 +622,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException { new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, i); if (!isFinalMergeEnabled() && reportPartitionStats()) { - partitionStats[i] += partLength; + partitionStats[i] += rawLength; } } @@ -747,7 +747,7 @@ public void flush() throws IOException { TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs); if (reportPartitionStats()) { for (int i = 0; i < spillRecord.size(); i++) { - partitionStats[i] += spillRecord.getIndex(i).getPartLength(); + partitionStats[i] += spillRecord.getIndex(i).getRawLength(); } } numShuffleChunks.setValue(numSpills); @@ -832,7 +832,7 @@ public void flush() throws IOException { new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, parts); if (reportPartitionStats()) { - partitionStats[parts] += partLength; + partitionStats[parts] += rawLength; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 7c678749b2..6354c7cc41 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -956,7 +956,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, i); if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) { - partitionStats[i] += partLength; + partitionStats[i] += rawLength; } writer = null; } finally { @@ -1244,7 +1244,7 @@ private void mergeParts() throws IOException, InterruptedException { } if (spillRecord != null && reportPartitionStats()) { for(int i=0; i < spillRecord.size(); i++) { - partitionStats[i] += spillRecord.getIndex(i).getPartLength(); + partitionStats[i] += spillRecord.getIndex(i).getRawLength(); } } numShuffleChunks.setValue(numSpills); @@ -1388,7 +1388,7 @@ private void mergeParts() throws IOException, InterruptedException { new TezIndexRecord(segmentStart, rawLength, partLength); spillRec.putIndex(rec, parts); if (reportPartitionStats()) { - partitionStats[parts] += partLength; + partitionStats[parts] += rawLength; } } numShuffleChunks.setValue(1); //final merge has happened diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java index 44adc462bc..5d1509754b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java @@ -20,6 +20,7 @@ import com.google.protobuf.ByteString; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.ReflectionUtils; @@ -125,10 +126,10 @@ VertexManagerEvent getVertexManagerEvent(long[] partitionSizes, long uncompressedTotalSize, String vertexName, boolean reportDetailedStats) throws IOException { ByteBuffer payload; - long totalSize = 0; + final long totalSize; // Use partition sizes to compute the total size. if (partitionSizes != null) { - totalSize = estimatedUncompressedSum(partitionSizes); + totalSize = Arrays.stream(partitionSizes).sum(); } else { totalSize = uncompressedTotalSize; } @@ -169,16 +170,6 @@ VertexManagerEvent getVertexManagerEvent(long[] partitionSizes, return vmEvent; } - // Assume 3 : 1 compression ratio to estimate the total size - // of all partitions. - long estimatedUncompressedSum(long[] partitionStats) { - long sum = 0; - for (long partition : partitionStats) { - sum += partition; - } - return sum * 3; - } - public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) { VertexIdentifier mockVertex = mock(VertexIdentifier.class); when(mockVertex.getName()).thenReturn(vName);