Skip to content

Commit

Permalink
TEZ-4518: Added capability to limit number of spill files being gener…
Browse files Browse the repository at this point in the history
…ated
  • Loading branch information
mudit1289 committed Oct 14, 2023
1 parent 5bba1ff commit 418ba98
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ private TezRuntimeConfiguration() {}
public static final int TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT =
1024 * 1024;

@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT = TEZ_RUNTIME_PREFIX +
"sort.spill.files.count.limit";
public static final int TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;


// TODO Use the default value
@ConfigurationProperty(type = "integer")
Expand Down Expand Up @@ -616,6 +621,7 @@ private TezRuntimeConfiguration() {}
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_PERCENT);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IO_SORT_MB);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
TEZ_RUNTIME_KEYS.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.Lists;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,6 +68,7 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT;
import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -110,6 +110,8 @@ public class PipelinedSorter extends ExternalSorter {
private final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();

private final int spillFilesCountLimit;

private final boolean pipelinedShuffle;

private long currentAllocatableMemory;
Expand All @@ -130,6 +132,8 @@ public class PipelinedSorter extends ExternalSorter {
*/
private final List<Event> finalEvents;

private static final int SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;

// TODO Set additional countesr - total bytes written, spills etc.

public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
Expand Down Expand Up @@ -171,6 +175,14 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO
pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);

spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT,
TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT);
Preconditions.checkArgument(spillFilesCountLimit == SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
|| spillFilesCountLimit > 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT
+ " should be greater than 0 or unbounded");

//sanity checks
final long sortmb = this.availableMemoryMb;

Expand Down Expand Up @@ -542,7 +554,7 @@ private void spillSingleRecord(final Object key, final Object value,
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
Expand Down Expand Up @@ -633,7 +645,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
Expand Down Expand Up @@ -1503,4 +1515,18 @@ public TezRawKeyValueIterator filter(int partition) {
}

}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import org.apache.tez.common.Preconditions;

import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT;
import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -123,12 +124,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private final int indexCacheMemoryLimit;
private final int spillFilesCountLimit;
private int totalIndexCacheMemory;

private long totalKeys = 0;
private long sameKey = 0;

public static final int MAX_IO_SORT_MB = 1800;
private static final int SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;


public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
Expand All @@ -148,6 +151,13 @@ public DefaultSorter(OutputContext outputContext, Configuration conf, int numOut
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);

spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT,
TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT);
Preconditions.checkArgument(spillFilesCountLimit == SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
|| spillFilesCountLimit > 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT
+ " should be greater than 0 or unbounded");

boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
Expand Down Expand Up @@ -978,7 +988,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun
}
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
+ " at " + filename.toString());
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
Expand Down Expand Up @@ -1057,7 +1067,7 @@ private void spillSingleRecord(final Object key, final Object value,
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
Expand Down Expand Up @@ -1312,7 +1322,7 @@ private void mergeParts() throws IOException, InterruptedException {
} finally {
finalOut.close();
}
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
List<Event> events = Lists.newLinkedList();
maybeSendEventForSpill(events, true, sr, 0, true);
Expand Down Expand Up @@ -1399,4 +1409,18 @@ private void mergeParts() throws IOException, InterruptedException {
}
}
}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -116,6 +121,9 @@ public void setup() throws IOException {
this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
}

@Rule
public ExpectedException exception = ExpectedException.none();

public static Configuration getConf() {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
Expand Down Expand Up @@ -858,6 +866,75 @@ public void testWithLargeKeyValueWithMinBlockSize() throws IOException {
basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
this.numOutputs = 10;

//128 MB. Do not pre-allocate.
// Get 32 MB buffer first and the another buffer with 96 on filling up
// the 32 MB buffer.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
conf.setInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, -2);

exception.expect(IllegalArgumentException.class);
exception.expectMessage("tez.runtime.sort.spill.files.count.limit should be greater than 0 or unbounded");

PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
numOutputs, (128L << 20));

closeSorter(sorter);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
this.numOutputs = 10;

//128 MB. Do not pre-allocate.
// Get 32 MB buffer first and the another buffer with 96 on filling up
// the 32 MB buffer.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
conf.setInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, 2);

PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
numOutputs, (128L << 20));

Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills");
numSpillsField.setAccessible(true);
numSpillsField.set(sorter, 2);

Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(sorter);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

Assert.assertTrue(gotExceptionWithMessage);
}

private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import static org.mockito.internal.verification.VerificationModeFactory.times;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -81,7 +84,9 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -129,6 +134,9 @@ public void reset() throws IOException {
localFs.mkdirs(workingDir);
}

@Rule
public ExpectedException exception = ExpectedException.none();

@Test(timeout = 5000)
public void testSortSpillPercent() throws Exception {
OutputContext context = createTezOutputContext();
Expand Down Expand Up @@ -577,6 +585,71 @@ public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
verifyCounters(sorter, context);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
OutputContext context = createTezOutputContext();

conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, -2);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);

exception.expect(IllegalArgumentException.class);
exception.expectMessage("tez.runtime.sort.spill.files.count.limit should be greater than 0 or unbounded");

SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
sorterWrapper.getSorter();
sorterWrapper.close();
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
OutputContext context = createTezOutputContext();

conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, 2);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();

Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills");
numSpillsField.setAccessible(true);
numSpillsField.set(sorter, 2);

Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(sorter);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

Assert.assertTrue(gotExceptionWithMessage);
}

private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Expand Down

0 comments on commit 418ba98

Please sign in to comment.