Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4580: Slow preemption of new containers when re-use is enabled #374

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ boolean preemptIfNeeded() {
+ numHighestPriRequests + " pending requests at pri: "
+ highestPriRequest.getPriority());
}

int newContainersReleased = 0;
for (int i=0; i<numPendingRequestsToService; ++i) {
// This request must have been considered for matching with all existing
// containers when request was made.
Expand Down Expand Up @@ -1311,7 +1311,7 @@ boolean preemptIfNeeded() {
" with priority: " + lowestPriNewContainer.getPriority() +
" to free resource for request: " + highestPriRequest +
" . Current free resources: " + freeResources);
numPendingRequestsToService--;
newContainersReleased++;
releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
// We are returning an unused resource back the RM. The RM thinks it
// has serviced our initial request and will not re-allocate this back
Expand All @@ -1324,7 +1324,7 @@ boolean preemptIfNeeded() {
continue;
}
}

numPendingRequestsToService -= newContainersReleased;
if (numPendingRequestsToService < 1) {
return true;
}
Expand Down Expand Up @@ -1573,6 +1573,9 @@ private void releaseContainer(ContainerId containerId) {
if (delayedContainer != null) {
Resources.subtractFrom(allocatedResources,
delayedContainer.getContainer().getResource());
if (shouldReuseContainers) {
delayedContainerManager.removeDelayedContainer(delayedContainer);
}
}
if (delayedContainer != null || !shouldReuseContainers) {
amRmClient.releaseAssignedContainer(containerId);
Expand Down Expand Up @@ -2163,6 +2166,17 @@ void addDelayedContainer(Container container,
}
}

void removeDelayedContainer(HeldContainer container) {
if (container != null) {
synchronized(this) {
if (delayedContainers.remove(container)) {
LOG.info("Removed {} from delayed containers", container.getContainer().getId());
} else {
LOG.warn("Unknown container {} sent for removal. Ignoring.", container.getContainer().getId());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A valid case here is when a new container is allocated - added to delayedContainers, it is polled (removed) from queue but if there is no pending task request releaseContainer() is invoked. Please suggest if the log level should be changed in info or for both the newly added logs should be at debug.

}
}
}
}
}

synchronized void determineMinHeldContainers() {
Expand Down
101 changes: 101 additions & 0 deletions tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,107 @@ public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
scheduler2.shutdown();
}

@Test(timeout = 5000)
public void testTaskSchedulerPreemptionWithLowAndHighPriorityRequests() throws Exception {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));

Configuration conf = new Configuration();
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE, 50);

TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
false, null, null, new PreemptionMatcher(), conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);

final TaskSchedulerWithDrainableContext scheduler =
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
scheduler.initialize();
scheduler.start();
int initialRmCapacity = 4;
int lowPriorityTasks = 5;
int highPriorityTasks = 6;
Resource taskAsk = Resource.newInstance(1000, 1);

Resource totalResource = Resource.newInstance(4000, 4);
when(mockRMClient.getAvailableResources()).thenReturn(totalResource);

// Add lower priority tasks
Priority lowPriority = Priority.newInstance(74);
for (int i = 0; i < lowPriorityTasks; i++) {
Object low = new Object();
TaskAttempt ta = mock(TaskAttempt.class);
scheduler.allocateTask(ta, taskAsk, null, null, lowPriority, low, null);
}

scheduler.getProgress(); // Will update the highest priority
drainableAppCallback.drain();
// 5 containers requested for lower priority tasks
verify(mockRMClient, times(5)).addContainerRequest(any(CookieContainerRequest.class));

// Allocate requested containers
List<Container> lowPriorityContainers = new ArrayList<>();
for (int i = 0; i < initialRmCapacity; i++) {
ContainerId containerId = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 1), 1), i);
NodeId nodeId = NodeId.newInstance("host-" + i, 8041);
Container container = Container.newInstance(containerId, nodeId, "host-" + i, taskAsk, lowPriority, null);
lowPriorityContainers.add(container);
}

totalResource = Resource.newInstance(0, 0);
when(mockRMClient.getAvailableResources()).thenReturn(totalResource);

// We don't want containers to be assigned to a task by delayedContainerManager as it invokes another preemption flow
// Delayed thread first takes lock on delayedContainerManager instance to check if there are any containers
// We block the thread, ensure all delayed containers have schedule time beyond test's runtime to avoid assignment.
synchronized (scheduler.delayedContainerManager) {
scheduler.onContainersAllocated(lowPriorityContainers);
drainableAppCallback.drain();
for (HeldContainer container : scheduler.delayedContainerManager.delayedContainers) {
// Set next schedule beyond this test's time to avoid any assignment
container.setNextScheduleTime(System.currentTimeMillis() + 10000);
// No preemption if assignment attempt of new container < 3
container.incrementAssignmentAttempts();
container.incrementAssignmentAttempts();
container.incrementAssignmentAttempts();
}
}

// No releases so far
verify(mockRMClient, times(0)).releaseAssignedContainer(any());

// Add higher priority task
Priority highPriority = Priority.newInstance(71);
for (int i = 0; i < highPriorityTasks; i++) {
Object high = new Object();
TaskAttempt ta = mock(TaskAttempt.class);
scheduler.allocateTask(ta, taskAsk, null, null, highPriority, high, null);
}

drainableAppCallback.drain();
// low priority tasks + high priority tasks
verify(mockRMClient, times(11)).addContainerRequest(any(CookieContainerRequest.class));

// Trigger preemption to release containers as 50% of pending high priority requests
scheduler.getProgress();
drainableAppCallback.drain();

// 50% of 6 high priority requests = 3, 4 containers were held - hence 3 will be released
verify(mockRMClient, times(3)).releaseAssignedContainer(any());

// Trigger another preemption cycle
scheduler.getProgress();
drainableAppCallback.drain();
// 50% of 6 high priority requests = 3, but only 1 container is held - which will be released,
// incrementing total to 4
verify(mockRMClient, times(4)).releaseAssignedContainer(any());
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
drainableAppCallback.drain();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test (timeout=5000)
public void testTaskSchedulerPreemption() throws Exception {
Expand Down
Loading