Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2189] Implement ContainerCompletion callback in DynamicScalingYarnService #4092

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@
*/
@Slf4j
public class DynamicScalingYarnService extends YarnService {
public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile";
public static final int GENERAL_OOM_EXIT_STATUS_CODE = 137;
public static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2;
public static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB

/** this holds the current count of containers already requested for each worker profile */
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;
private final Set<ContainerId> removedContainerIds;
public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile";
private final AtomicLong profileNameSuffixGenerator;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
Expand Down Expand Up @@ -98,6 +101,10 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
ContainerId completedContainerId = containerStatus.getContainerId();
ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId);

// Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion()
// is called before onContainersAllocated(), resulting in the containerId missing from the containersMap.
// We use removedContainerIds to remember these containers and remove them from containerMap later
// when we call reviseWorkforcePlanAndRequestNewContainers method
if (completedContainerInfo == null) {
log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()",
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
completedContainerId);
Expand All @@ -106,13 +113,11 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
}

log.info("Container {} running profile {} completed with exit status {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()
);
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus());

if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
log.info("Container {} running profile {} completed with diagnostics: {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()
);
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics());
}

if (this.shutdownInProgress) {
Expand All @@ -131,7 +136,7 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
completedContainerId, completedContainerInfo.getWorkerProfileName());
requestContainersForWorkerProfile(workerProfile, 1);
break;
case(137): // General OOM exit status
case(GENERAL_OOM_EXIT_STATUS_CODE):
case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
handleContainerExitedWithOOM(completedContainerId, completedContainerInfo);
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -232,7 +237,12 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont

// Request a replacement container
int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable
int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.",
MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
return;
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
}
Optional<ProfileDerivation> optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(),
new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + ""))
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class YarnService extends AbstractIdleService {
private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
private final NMClientAsync nmClientAsync;
private final ExecutorService containerLaunchExecutor;
private final boolean containerHostAffinityEnabled;
private final String containerTimezone;
private final String proxyJvmArgs;

Expand Down Expand Up @@ -197,8 +196,6 @@ public YarnService(Config config, String applicationName, String applicationId,
this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
this.nmClientAsync.init(this.yarnConfiguration);

this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);

this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY;

Expand Down Expand Up @@ -536,26 +533,6 @@ private String buildContainerCommand(Container container, String workerProfileNa
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
}

/**
* Check the exit status of a completed container and see if the replacement container
* should try to be started on the same node. Some exit status indicates a disk or
* node failure and in such cases the replacement container should try to be started on
* a different node.
*/
private boolean shouldStickToTheSameNode(int containerExitStatus) {
switch (containerExitStatus) {
case ContainerExitStatus.DISKS_FAILED:
return false;
case ContainerExitStatus.ABORTED:
// Mostly likely this exit status is due to node failures because the
// application itself will not release containers.
return false;
default:
// Stick to the same node for other cases if host affinity is enabled.
return this.containerHostAffinityEnabled;
}
}

/**
* Handle the completion of a container.
* Just removes the containerId from {@link #containerMap}
Expand Down
Loading