Skip to content

Commit

Permalink
TEZ-3821: Ability to fail fast tasks that write too much to local dis…
Browse files Browse the repository at this point in the history
…k. (#314) (Ayush Saxena reviewed by Rajesh Balamohan)
  • Loading branch information
ayushtkn authored Oct 27, 2023
1 parent 4bc87e2 commit 51d6f53
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2304,4 +2304,12 @@ static Set<String> getPropertySet() {
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";

/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty(type = "long")
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime;

import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
Expand All @@ -26,6 +27,8 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
Expand All @@ -35,6 +38,11 @@
import org.apache.tez.runtime.metrics.TaskCounterUpdater;

import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT;

public abstract class RuntimeTask {

Expand All @@ -54,6 +62,9 @@ public abstract class RuntimeTask {
private final TaskStatistics statistics;
private final AtomicBoolean progressNotified = new AtomicBoolean(false);

private final long lfsBytesWriteLimit;
private static final Logger LOG = LoggerFactory.getLogger(RuntimeTask.class);

protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
this.taskSpec = taskSpec;
Expand All @@ -71,6 +82,8 @@ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
} else {
this.counterUpdater = null;
}
this.lfsBytesWriteLimit =
tezConf.getLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT);
}

protected enum State {
Expand Down Expand Up @@ -182,4 +195,36 @@ protected void setTaskDone() {
protected final boolean isUpdatingSystemCounters() {
return counterUpdater != null;
}

/**
* Check whether the task has exceeded any configured limits.
*
* @throws LocalWriteLimitException in case the limit is exceeded.
*/
public void checkTaskLimits() throws LocalWriteLimitException {
// check the limit for writing to local file system
if (lfsBytesWriteLimit >= 0) {
Long lfsBytesWritten = null;
try {
LocalFileSystem localFS = FileSystem.getLocal(tezConf);
lfsBytesWritten = FileSystem.getGlobalStorageStatistics().get(localFS.getScheme()).getLong("bytesWritten");
} catch (IOException e) {
LOG.warn("Could not get LocalFileSystem bytesWritten counter");
}
if (lfsBytesWritten != null && lfsBytesWritten > lfsBytesWriteLimit) {
throw new LocalWriteLimitException(
"Too much write to local file system." + " current value is " + lfsBytesWritten + " the limit is "
+ lfsBytesWriteLimit);
}
}
}

/**
* Exception thrown when the task exceeds some configured limits.
*/
public static class LocalWriteLimitException extends IOException {
public LocalWriteLimitException(String str) {
super(str);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException;
import org.apache.tez.runtime.api.*;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
Expand Down Expand Up @@ -141,6 +142,8 @@ static class HeartbeatCallable implements Callable<Boolean> {
private static final float LOG_COUNTER_BACKOFF = 1.3f;
private static final int HEAP_MEMORY_USAGE_UPDATE_INTERVAL = 5000; // 5 seconds

private static final int LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL = 10000; // 10 seconds

private final RuntimeTask task;
private final EventMetaData updateEventMetadata;

Expand All @@ -165,6 +168,9 @@ static class HeartbeatCallable implements Callable<Boolean> {
private long usedMemory = 0;
private long heapMemoryUsageUpdatedTime = System.currentTimeMillis() - HEAP_MEMORY_USAGE_UPDATE_INTERVAL;

private long localFileSystemBytesWrittenCheckInterval =
System.currentTimeMillis() - LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL;

/*
* Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
* log counters.
Expand Down Expand Up @@ -262,6 +268,17 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
sendCounters = true;
prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
}
try {
long now = System.currentTimeMillis();
if (now - localFileSystemBytesWrittenCheckInterval > LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL) {
task.checkTaskLimits();
localFileSystemBytesWrittenCheckInterval = now;
}
} catch (LocalWriteLimitException lwle) {
LOG.error("Local FileSystem write limit exceeded", lwle);
askedToDie.set(true);
return new ResponseWrapper(true, 1);
}
updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata);
events.add(updateEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime.task;

import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand All @@ -27,7 +28,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -36,12 +41,21 @@

import com.google.common.collect.Lists;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
Expand All @@ -55,6 +69,9 @@
@SuppressWarnings("rawtypes")
public class TestTaskReporter {

private static final File TEST_DIR =
new File(System.getProperty("test.build.data"), TestTaskReporter.class.getName()).getAbsoluteFile();

@Test(timeout = 10000)
public void testContinuousHeartbeatsOnMaxEvents() throws Exception {

Expand Down Expand Up @@ -218,6 +235,38 @@ public void testStatusUpdateAfterInitializationAndCounterFlag() {

}

@Test
public void testLocalFileSystemBytesWrittenLimit() throws IOException {
TaskSpec mockSpec = mock(TaskSpec.class);
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
TezConfiguration tezConf = new TezConfiguration();
LogicalIOProcessorRuntimeTask lio1 =
new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null,
Runtime.getRuntime().maxMemory(), true, null, null);

LocalFileSystem localFS = FileSystem.getLocal(tezConf);
FileSystem.clearStatistics();
Path tmpPath =
new Path(TEST_DIR + "/testLocalFileSystemBytesWrittenLimit" + new Random(System.currentTimeMillis()).nextInt());
try (FSDataOutputStream out = localFS.create(tmpPath, true)) {
out.write(new byte[1024]);
}
// Check limits with default shouldn't throw exception.
lio1.checkTaskLimits();

tezConf.setLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, 10);
lio1 = new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null,
Runtime.getRuntime().maxMemory(), true, null, null);

try {
lio1.checkTaskLimits();
Assert.fail("Expected to throw LocalWriteLimitException");
} catch (LocalWriteLimitException localWriteLimitException) {
Assert.assertTrue(localWriteLimitException.getMessage().contains("Too much write to local file system"));
}
}

private List<TezEvent> createEvents(int numEvents) {
List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
for (int i = 0; i < numEvents; i++) {
Expand Down

0 comments on commit 51d6f53

Please sign in to comment.