Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2857] Fix flaky gang scheduling e2e test #911

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/e2e/framework/helpers/yunikorn/rest_api_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func compareQueueTS(queuePathStr string, ts string) wait.ConditionFunc {
restClient := RClient{}
qInfo, err := restClient.GetQueue(DefaultPartition, queuePathStr, false)
if err != nil {
return false, err
return false, nil
}

return qInfo.Properties["timestamp"] == ts, nil
Expand Down
69 changes: 58 additions & 11 deletions test/e2e/gang_scheduling/gang_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package gangscheduling_test

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -29,6 +31,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-k8shim/pkg/cache"
Expand Down Expand Up @@ -96,9 +99,8 @@ var _ = Describe("", func() {
checkAppStatus(appID, yunikorn.States().Application.Running)

// Ensure placeholders are created
appDaoInfo, appDaoInfoErr := restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
checkPlaceholderData(appDaoInfo, groupA, 5, 0, 0)
phErr := waitForPlaceholderData(nsQueue, appID, groupA, 5, 0, 0, 30)
Ω(phErr).NotTo(HaveOccurred())

// Deploy job, now with 5 pods part of taskGroup
By("Deploy second job with 5 real taskGroup pods")
Expand All @@ -118,9 +120,8 @@ var _ = Describe("", func() {
checkAppStatus(appID, yunikorn.States().Application.Running)

// Ensure placeholders are replaced
appDaoInfo, appDaoInfoErr = restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
checkPlaceholderData(appDaoInfo, groupA, 5, 5, 0)
phErr = waitForPlaceholderData(nsQueue, appID, groupA, 5, 5, 0, 30)
Ω(phErr).NotTo(HaveOccurred())
})

// Test to verify multiple task group nodes
Expand Down Expand Up @@ -218,9 +219,10 @@ var _ = Describe("", func() {
Ω(phTermErr).NotTo(HaveOccurred())

// Ensure placeholders are replaced and allocations count is correct
phErr := waitForPlaceholderData(nsQueue, appID, groupA, 3, 3, 0, 30)
Ω(phErr).NotTo(HaveOccurred())
appDaoInfo, appDaoInfoErr := restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
checkPlaceholderData(appDaoInfo, groupA, 3, 3, 0)
Ω(len(appDaoInfo.Allocations)).To(Equal(int(6)), "Allocations count is not correct")
})

Expand Down Expand Up @@ -250,11 +252,13 @@ var _ = Describe("", func() {
checkAppStatus(appID, yunikorn.States().Application.Running)

// Ensure placeholders are timed out and allocations count is correct as app started running normal because of 'soft' gang style
phErr := waitForPlaceholderData(nsQueue, appID, groupA, 3, 0, 3, 30)
Ω(phErr).NotTo(HaveOccurred())
phErr = waitForPlaceholderData(nsQueue, appID, groupB, 1, 0, 1, 30)
Ω(phErr).NotTo(HaveOccurred())
appDaoInfo, appDaoInfoErr := restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(2), "Placeholder count is not correct")
checkPlaceholderData(appDaoInfo, groupA, 3, 0, 3)
checkPlaceholderData(appDaoInfo, groupB, 1, 0, 1)
Ω(len(appDaoInfo.Allocations)).To(Equal(int(3)), "Allocations count is not correct")
for _, alloc := range appDaoInfo.Allocations {
Ω(alloc.Placeholder).To(Equal(false), "Allocation should be non placeholder")
Expand Down Expand Up @@ -563,10 +567,11 @@ var _ = Describe("", func() {
checkAppStatus(appID, yunikorn.States().Application.Running)

// Ensure placeholders are replaced and allocations count is correct
phErr := waitForPlaceholderData(nsQueue, appID, groupA, 3, 3, 0, 30)
Ω(phErr).NotTo(HaveOccurred())
appDaoInfo, appDaoInfoErr := restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(1), "Placeholder count is not correct")
checkPlaceholderData(appDaoInfo, groupA, 3, 3, 0)
Ω(len(appDaoInfo.Allocations)).To(Equal(int(3)), "Allocations count is not correct")
Ω(appDaoInfo.UsedResource[hugepageKey]).To(Equal(int64(314572800)), "Used huge page resource is not correct")
})
Expand Down Expand Up @@ -725,7 +730,40 @@ func checkCompletedAppStatus(applicationID, state string) {
Ω(timeoutErr).NotTo(HaveOccurred())
}

func checkPlaceholderData(appDaoInfo *dao.ApplicationDAOInfo, tgName string, count, replaced, timeout int) {
func waitForPlaceholderData(nsQueue string, appID string, tgName string, count, replaced, timedOut, timeout int) error {
lastOk := false
var lastCount, lastReplaced, lastTimedOut int

err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, time.Duration(timeout)*time.Second, false, func(c context.Context) (bool, error) {
appDaoInfo, err := restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
if err != nil {
return false, err
}
lastOk, lastCount, lastReplaced, lastTimedOut = getPlaceholderData(appDaoInfo, tgName)
return lastOk && lastCount == count && lastReplaced == replaced && lastTimedOut == timedOut, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the API call again to assert allocationsCount at caller side, Can we either assert or return allocationsCount?

})
if err != nil {
errs := make([]error, 0)
errs = append(errs, err)
if !lastOk {
errs = append(errs, fmt.Errorf("can't find task group %s in app info", tgName))
}
if lastCount != count {
errs = append(errs, fmt.Errorf("placeholder count is incorrect (expected %d, got %d)", count, lastCount))
}
if lastReplaced != replaced {
errs = append(errs, fmt.Errorf("placeholder replaced is incorrect (expected %d, got %d)", replaced, lastReplaced))
}
if lastTimedOut != timedOut {
errs = append(errs, fmt.Errorf("placeholder timedout is incorrect (expected %d, got %d)", timedOut, lastTimedOut))
}
return errors.Join(errs...)
}
return nil
}

func checkPlaceholderData(appDaoInfo *dao.ApplicationDAOInfo, tgName string, count int, replaced int, timeout int) {
By(fmt.Sprintf("Verify application %s placeholder data for group %s", appDaoInfo.ApplicationID, tgName))
verified := false
for _, placeholderData := range appDaoInfo.PlaceholderData {
if tgName == placeholderData.TaskGroupName {
Expand All @@ -739,6 +777,15 @@ func checkPlaceholderData(appDaoInfo *dao.ApplicationDAOInfo, tgName string, cou
Ω(verified).To(Equal(true), fmt.Sprintf("Can't find task group %s in app info", tgName))
}

func getPlaceholderData(appDaoInfo *dao.ApplicationDAOInfo, tgName string) (bool, int, int, int) {
for _, placeholderData := range appDaoInfo.PlaceholderData {
if tgName == placeholderData.TaskGroupName {
return true, int(placeholderData.Count), int(placeholderData.Replaced), int(placeholderData.TimedOut)
}
}
return false, 0, 0, 0
}

func verifyOriginatorDeletionCase(withOwnerRef bool) {
podConf := k8s.TestPodConfig{
Name: "gang-driver-pod" + common.RandSeq(5),
Expand Down