Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2189] Implement ContainerCompletion callback in DynamicScalingYarnService #4092

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Blazer-007
Copy link
Member

@Blazer-007 Blazer-007 commented Jan 14, 2025

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Implemented handleContainerCompletion(...) which launches replacement container based on exit status
    Handled Scaling down of containers based on negative delta
    Refactored YarnService to remove use of helix related tags and names

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Updated one existing test
    Manually triggered DummyDynamicScalingYarnServiceManager to check if scale down is happening or not, the stripped logs from that are
2025-01-21 09:09:59 PST [,,] INFO  [DynamicScalingYarnService STARTING] org.apache.gobblin.temporal.yarn.YarnService  - Requesting initial containers
2025-01-21 09:09:59 PST [,,] INFO  [DynamicScalingYarnService STARTING] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 2 new containers for profile <<BASELINE>> having currently 0 containers
2025-01-21 09:09:59 PST [,,] INFO  [DynamicScalingYarnService STARTING] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 2 containers with resource = <memory:8192, vCores:2> and allocation request id = Optional.of(0)
...
2025-01-21 09:10:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 2 new containers for profile secondProfile having currently 0 containers
2025-01-21 09:10:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 2 containers with resource = <memory:2048, vCores:2> and allocation request id = Optional.of(1)
2025-01-21 09:10:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 3 new containers for profile firstProfile having currently 0 containers
2025-01-21 09:10:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 3 containers with resource = <memory:2048, vCores:2> and allocation request id = Optional.of(2)
...
2025-01-21 09:11:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 1 new containers for profile secondProfile having currently 2 containers
2025-01-21 09:11:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 1 containers with resource = <memory:2048, vCores:2> and allocation request id = Optional.of(3)
2025-01-21 09:11:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 2 new containers for profile firstProfile having currently 3 containers
2025-01-21 09:11:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 2 containers with resource = <memory:2048, vCores:2> and allocation request id = Optional.of(4)
...
2025-01-21 09:13:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Releasing 3 containers for profile secondProfile having currently 3 containers
2025-01-21 09:13:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Releasing 5 containers for profile firstProfile having currently 5 containers
...
2025-01-21 09:14:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 5 new containers for profile secondProfile having currently 0 containers
2025-01-21 09:14:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 5 containers with resource = <memory:2048, vCores:2> and allocation request id = Optional.of(5)
2025-01-21 09:14:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.DynamicScalingYarnService  - Requesting 5 new containers for profile firstProfile having currently 0 containers
2025-01-21 09:14:17 PST [,,] INFO  [DynamicScalingExecutor] org.apache.gobblin.temporal.yarn.YarnService  - Requesting 5 containers with resource = <memory:2048, vCores:2> and allocation request id = Optional.of(6)

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@Blazer-007 Blazer-007 changed the title [GOBBLIN-XXX] Implement ContainerCompletion callback in DynamicScalingYarnService [GOBBLIN-2189] Implement ContainerCompletion callback in DynamicScalingYarnService Jan 15, 2025
@Blazer-007 Blazer-007 marked this pull request as ready for review January 15, 2025 03:55
@codecov-commenter
Copy link

codecov-commenter commented Jan 19, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 38.75%. Comparing base (a0cef28) to head (7a80eb7).
Report is 2 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (a0cef28) and HEAD (7a80eb7). Click for more details.

HEAD has 2 uploads less than BASE
Flag BASE (a0cef28) HEAD (7a80eb7)
3 1
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #4092       +/-   ##
=============================================
- Coverage     49.03%   38.75%   -10.29%     
+ Complexity    10008     1599     -8409     
=============================================
  Files          1895      388     -1507     
  Lines         73612    16016    -57596     
  Branches       8188     1588     -6600     
=============================================
- Hits          36097     6207    -29890     
+ Misses        34280     9311    -24969     
+ Partials       3235      498     -2737     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

* </p>
*/
@Override
protected void handleContainerCompletion(ContainerStatus containerStatus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is not synchronized, but it removes entries from containerMap and modifies removedContainerIds, whereas, reviseWorkforcePlanAndRequestNewContainers is synchronized and also modifies both containerMap and removedContainerIds. If request for handleContainerCompletion interleaves with a call to reviseWorkforcePlanAndRequestNewContainers, race conditions/inconsistent state can happen

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in handleContainerCompletion the containerId which is removed will not be the one when removing the same inside reviseWorkforcePlanAndRequestNewContainers and for removedContainerIds at one place we are adding and at other removing, even if interleaving call happens then both are working with different containerIds not the same so chance of inconsistent state are too low given that both are thread-safe data structures as well

* </p>
*/
@Override
protected void handleContainerCompletion(ContainerStatus containerStatus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests for these methods

TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager(
mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource());
testDynamicScalingYarnServiceManager.startUp();
Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times
Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to update this to 7 seconds?

typo in comments 5 seconds sleep

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetScalingDirectivesRunnable.run() currently runs every second so for each second run it gets scalingDirectives if returned from ScalingDirectiveSource Impl which in our case DummyScalingDirectiveSource returns scaling directives for 5 times so 7 second to check if it is not returning more than 5 times even if runnable keeps running

testDynamicScalingYarnServiceManager.shutDown();
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly are we testing here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of scaling directives returned by ScalingDirectiveSource impl reviseWorkforcePlanAndRequestNewContainers is called that many times
here DummyScalingDirectiveSource returns scaling directives for 5 times so 5 invocations must happens to reviseWorkforcePlanAndRequestNewContainers

Comment on lines +247 to +252
if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS;
} else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.",
MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be simplified to:

int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 2, MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
    log.warn("Replacement container memory exceeds max limit {}, not requesting a replacement container.", MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
    return;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work if currContainerMemoryMbs is already at MAX_REPLACEMENT_CONTAINER_MEMORY_MBS it will keep requesting containers of MAX_REPLACEMENT_CONTAINER_MEMORY_MBS which will not help in anyway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, a typo - instead of new container, the check should be on current container. Can be updated to:

    if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
        log.warn("Container {} already had max allowed memory {} MBs. Not requesting a replacement container.", completedContainerId, currContainerMemoryMbs);
        return;
    }

    int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 2, MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);


public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);

this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
this.removedContainerIds = ConcurrentHashMap.newKeySet();
Copy link
Contributor

@khandelwal-prateek khandelwal-prateek Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to use ConcurrentLinkedQueue, as ConcurrentHashMap.newKeySet() works better for membership checks but not for ordered or frequent removals.

private final Queue<ContainerId> removedContainerIds;
removedContainerIds = new ConcurrentLinkedQueue<>();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants