Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4521: Partition stats should be always uncompressed size #317

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ static class SourceVertexInfo {
final BitSet finishedTaskSet;
int numTasks;
int numVMEventsReceived;
// The total uncompressed size
long outputSize;
// The uncompressed size of each partition. The size might not be precise
int[] statsInMB;
EdgeManagerPluginDescriptor newDescriptor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void progress() {
protected final boolean cleanup;

protected OutputStatisticsReporter statsReporter;
// uncompressed size for each partition
protected final long[] partitionStats;
protected final boolean finalMergeEnabled;
protected final boolean sendEmptyPartitionDetails;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats()) {
partitionStats[i] += partLength;
partitionStats[i] += rawLength;
}
}

Expand Down Expand Up @@ -747,7 +747,7 @@ public void flush() throws IOException {
TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs);
if (reportPartitionStats()) {
for (int i = 0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
Expand Down Expand Up @@ -832,7 +832,7 @@ public void flush() throws IOException {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += partLength;
partitionStats[parts] += rawLength;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) {
partitionStats[i] += partLength;
partitionStats[i] += rawLength;
}
writer = null;
} finally {
Expand Down Expand Up @@ -1244,7 +1244,7 @@ private void mergeParts() throws IOException, InterruptedException {
}
if (spillRecord != null && reportPartitionStats()) {
for(int i=0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
Expand Down Expand Up @@ -1388,7 +1388,7 @@ private void mergeParts() throws IOException, InterruptedException {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += partLength;
partitionStats[parts] += rawLength;
}
}
numShuffleChunks.setValue(1); //final merge has happened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.ReflectionUtils;
Expand Down Expand Up @@ -125,10 +126,10 @@ VertexManagerEvent getVertexManagerEvent(long[] partitionSizes,
long uncompressedTotalSize, String vertexName, boolean reportDetailedStats)
throws IOException {
ByteBuffer payload;
long totalSize = 0;
final long totalSize;
// Use partition sizes to compute the total size.
if (partitionSizes != null) {
totalSize = estimatedUncompressedSum(partitionSizes);
totalSize = Arrays.stream(partitionSizes).sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does totalSize change with this patch? if it doesn't, why? if it does, can we validate it with this unit test or in anyway that makes sense to you @okumin ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patch doesn't change the total size. That's because the total size is stored in a different field of Protbuf from partitions stats. The design is valid since users have an option not to take partition stats at all(tez.runtime.report.partition.stats=none).
TEZ-4521 would remove the possibility where partition stats contain the compressed size. That's why I revised this file to prevent future users from being confused.

} else {
totalSize = uncompressedTotalSize;
}
Expand Down Expand Up @@ -169,16 +170,6 @@ VertexManagerEvent getVertexManagerEvent(long[] partitionSizes,
return vmEvent;
}

// Assume 3 : 1 compression ratio to estimate the total size
// of all partitions.
long estimatedUncompressedSum(long[] partitionStats) {
long sum = 0;
for (long partition : partitionStats) {
sum += partition;
}
return sum * 3;
}

public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);
when(mockVertex.getName()).thenReturn(vName);
Expand Down
Loading