Skip to content

Commit

Permalink
Queue/ApplicationId annotations prior to Labels(except for canonical …
Browse files Browse the repository at this point in the history
…label). Change to not to reject task before 1.7.0
  • Loading branch information
chenyulin0719 committed Jul 5, 2024
1 parent 5713cf2 commit ead6a62
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 174 deletions.
25 changes: 15 additions & 10 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,10 @@ func (app *Application) Schedule() bool {
func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
for _, task := range app.GetNewTasks() {
if taskScheduleCondition(task) {
// for each new task, we do a sanity check before moving the state to Pending_Schedule
// if the task is not ready for scheduling, we keep it in New state
// if the task pod is bounded and have conflicting metadata, we move the task to Rejected state
// for each new task, we do a sanity check before moving the state to "Pending"
// if sanity check fails due to PVC check, it means the task is not ready for scheduling, we keep it in "New" state
// if sanity check fails due to an unbounded pod having conflicting metadata, we move the task to "Rejected" state
// Before version 1.7.0, the sanity check would pass even if the unbounded pod having conflicting metadata.
err, rejectTask := task.sanityCheckBeforeScheduling()

if err == nil {
Expand All @@ -410,19 +411,23 @@ func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool)
log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err))
}
} else {
if !rejectTask {
if rejectTask {

Check warning on line 414 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L414

Added line #L414 was not covered by tests
// Before version 1.7.0, task.sanityCheckBeforeScheduling() would never return an err with true reject flag when pod has conflicting metadata.
// After version 1.7.0, tasks.sanityCheckBeforeScheduling() would fail and return true reject flag if pod has conflicting metadata.
log.Log(log.ShimCacheApplication).Error("Bug: sanity check shouldn't fail before version 1.7.0.")

Check warning on line 417 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L417

Added line #L417 was not covered by tests

// task transits to Rejected state
if handleErr := task.handle(
NewRejectTaskEvent(task.applicationID, task.taskID, err.Error())); handleErr != nil {
log.Log(log.ShimCacheApplication).Warn("reject task failed", zap.Error(err))

Check warning on line 422 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L420-L422

Added lines #L420 - L422 were not covered by tests
}
} else {

Check warning on line 424 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L424

Added line #L424 was not covered by tests
// no state transition
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))

Check warning on line 430 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L426-L430

Added lines #L426 - L430 were not covered by tests
} else {
// task transits to Rejected state
if handleErr := task.handle(
NewRejectTaskEvent(task.applicationID, task.taskID, err.Error())); handleErr != nil {
log.Log(log.ShimCacheApplication).Warn("reject task failed", zap.Error(err))
}
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ func (task *Task) postTaskBound() {
func (task *Task) postTaskRejected(reason string) {
// if task is rejected because of conflicting metadata, we should fail the pod with reason
if strings.Contains(reason, constants.TaskPodInconsistMetadataFailure) {
// Before version 1.7.0, this path would never be reached.
// After version 1.7.0, task pod should fail if pod has conflicting metadata.
task.failTaskPodWithReasonAndMsg(constants.TaskRejectedFailure, reason)

Check warning on line 482 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L482

Added line #L482 was not covered by tests
}

Expand Down Expand Up @@ -573,16 +575,22 @@ func (task *Task) sanityCheckBeforeScheduling() (error, bool) {
// only check pod labels and annotations consistency if pod is not already bound
// reject the task if pod metadata is conflicting
if !utils.PodAlreadyBound(task.pod) {
if err := task.checkPodMetadata(); err != nil {
rejectTask = true
return err, rejectTask
if err := task.checkTaskPodWithoutConflictMetadata(); err != nil {

Check warning on line 578 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L578

Added line #L578 was not covered by tests
// Before version 1.7.0, return nil error and log a warning if pod metadata is conflicting
// After version 1.7.0, err should be returned if pod metadata is conflicting
// After version 1.7.0, rejectTask should be true if pod metadata is conflicting
log.Log(log.ShimCacheTask).Warn("Task pod has conflicting metadata, the unbound task pod will be rejected after version 1.7.0",
zap.String("appID", task.applicationID),
zap.String("podName", task.pod.Name),
zap.String("error", err.Error()))
return nil, rejectTask

Check warning on line 586 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L582-L586

Added lines #L582 - L586 were not covered by tests
}
}

return nil, rejectTask
}

func (task *Task) checkPodMetadata() error {
func (task *Task) checkTaskPodWithoutConflictMetadata() error {
// check application ID
appIdLabelKeys := []string{
constants.CanonicalLabelApplicationID,
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
}

// nolint: funlen
func TestCheckPodMetadata(t *testing.T) {
func TestCheckTaskPodWithoutConflictMetadata(t *testing.T) {
const (
appID = "app01"
app2ID = "app02"
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestCheckPodMetadata(t *testing.T) {
},
}
task := NewTask("task01", app, nil, pod)
err := task.checkPodMetadata()
err := task.checkTaskPodWithoutConflictMetadata()
if err != nil {
assert.Equal(t, tc.expected.Error(), err.Error())
} else {
Expand Down
42 changes: 23 additions & 19 deletions pkg/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ func GetQueueNameFromPod(pod *v1.Pod) string {
// Queue name can be defined in multiple places
// The queue name is determined by the following order
// 1. Label: constants.CanonicalLabelQueueName
// 2. Label: constants.LabelQueueName
// 3. Annotation: constants.AnnotationQueueName
// 2. Annotation: constants.AnnotationQueueName
// 3. Label: constants.LabelQueueName
// 4. Default: constants.ApplicationDefaultQueue
queueName := constants.ApplicationDefaultQueue
if canonicalLabelQueueName := GetPodLabelValue(pod, constants.CanonicalLabelQueueName); canonicalLabelQueueName != "" {
queueName = canonicalLabelQueueName
} else if labelQueueName := GetPodLabelValue(pod, constants.LabelQueueName); labelQueueName != "" {
queueName = labelQueueName
} else if annotationQueueName := GetPodAnnotationValue(pod, constants.AnnotationQueueName); annotationQueueName != "" {
queueName = annotationQueueName
} else if labelQueueName := GetPodLabelValue(pod, constants.LabelQueueName); labelQueueName != "" {
queueName = labelQueueName
}

return queueName
Expand Down Expand Up @@ -166,25 +166,29 @@ func GetApplicationIDFromPod(pod *v1.Pod) string {
// Application ID can be defined in multiple places
// The application ID is determined by the following order.
// 1. Label: constants.CanonicalLabelApplicationID
// 2. Label: constants.LabelApplicationID
// 3. Label: constants.SparkLabelAppID
// 4. Annotation: constants.AnnotationApplicationID
labelKeys := []string{
constants.CanonicalLabelApplicationID,
constants.LabelApplicationID,
constants.SparkLabelAppID,
}
appID := ""
for _, label := range labelKeys {
appID = GetPodLabelValue(pod, label)
if appID != "" {
break
}
}
// 2. Annotation: constants.AnnotationApplicationID
// 3. Label: constants.LabelApplicationID
// 4. Label: constants.SparkLabelAppID

appID := GetPodLabelValue(pod, constants.CanonicalLabelApplicationID)

if appID == "" {
appID = GetPodAnnotationValue(pod, constants.AnnotationApplicationID)
}

if appID == "" {
labelKeys := []string{
constants.LabelApplicationID,
constants.SparkLabelAppID,
}
for _, label := range labelKeys {
appID = GetPodLabelValue(pod, label)
if appID != "" {
break
}
}
}

// If plugin mode, interpret missing Application ID as a non-YuniKorn pod
if pluginMode && appID == "" {
return ""
Expand Down
8 changes: 4 additions & 4 deletions pkg/common/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func TestGetApplicationIDFromPod(t *testing.T) {
Labels: map[string]string{constants.LabelApplicationID: appIDInLabel},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInLabel, appIDInLabel, false},
}, appIDInAnnotation, appIDInAnnotation, false},

{"Spark AppID defined in spark app selector", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -631,14 +631,14 @@ func TestGetApplicationIDFromPod(t *testing.T) {
Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInSelector, appIDInSelector, false},
}, sparkIDInAnnotation, sparkIDInAnnotation, false},
{"Spark AppID defined in spark app selector and annotation", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector, constants.LabelApplicationID: appIDInLabel},
Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation},
},
Spec: v1.PodSpec{SchedulerName: constants.SchedulerName},
}, appIDInLabel, appIDInLabel, false},
}, sparkIDInAnnotation, sparkIDInAnnotation, false},
{"No AppID defined", &v1.Pod{}, "", "", false},
{"Spark AppID defined in spark app selector and label", &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -988,7 +988,7 @@ func TestGetQueueNameFromPod(t *testing.T) {
Annotations: map[string]string{constants.AnnotationQueueName: queueInAnnotation},
},
},
expectedQueue: queueInLabel,
expectedQueue: queueInAnnotation,
},
{
name: "Without queue label and annotation",
Expand Down
68 changes: 0 additions & 68 deletions test/e2e/basic_scheduling/basic_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
Expand Down Expand Up @@ -120,73 +119,6 @@ var _ = ginkgo.Describe("", func() {
Ω(resMap["vcore"]).To(gomega.Equal(core))
})

ginkgo.It("Verify_Pod_With_Conflicting_AppId", func() {
ginkgo.By("Validate task pod with conflicting appId will be rejected.")
PodName := "pod-with-conflicting-app-id"
AppIdA := "appId-A-" + common.RandSeq(10)
AppIdB := "appId-B-" + common.RandSeq(10)

var testPodConfigs = k8s.TestPodConfig{
Name: PodName,
Labels: map[string]string{
constants.CanonicalLabelApplicationID: AppIdA,
constants.LabelApplicationID: AppIdA,
},
Annotations: &k8s.PodAnnotation{
Other: map[string]string{
constants.AnnotationApplicationID: AppIdB,
},
},
Namespace: dev,
}
pod, err := k8s.InitTestPod(testPodConfigs)
Ω(err).NotTo(HaveOccurred())
_, err = kClient.CreatePod(pod, dev)
Ω(err).NotTo(HaveOccurred())
err = kClient.WaitForPodFailed(dev, PodName, 30*time.Second)
Ω(err).NotTo(HaveOccurred())
reason, message, getReasonErr := kClient.GetPodFailureReasonAndMessage(PodName, dev)
Ω(getReasonErr).NotTo(HaveOccurred())

Ω(reason).To(gomega.Equal("TaskRejected"))
Ω(message).To(gomega.ContainSubstring("PodInconsistentMetadata"))
})

ginkgo.It("Verify_Pod_With_Conflicting_QueueName", func() {
ginkgo.By("Validate task pod with conflicting queue name will be rejected.")
PodName := "pod-with-conflicting-queue"
AppId := "appId-" + common.RandSeq(10)
queueNameA := "root.aaa"
queueNameB := "root.bbb"

var testPodConfigs = k8s.TestPodConfig{
Name: PodName,
Labels: map[string]string{
constants.CanonicalLabelApplicationID: AppId,
constants.LabelApplicationID: AppId,
constants.CanonicalLabelQueueName: queueNameA,
constants.LabelQueueName: queueNameA,
},
Annotations: &k8s.PodAnnotation{
Other: map[string]string{
constants.AnnotationQueueName: queueNameB,
},
},
Namespace: dev,
}
pod, err := k8s.InitTestPod(testPodConfigs)
Ω(err).NotTo(HaveOccurred())
_, err = kClient.CreatePod(pod, dev)
Ω(err).NotTo(HaveOccurred())
err = kClient.WaitForPodFailed(dev, PodName, 30*time.Second)
Ω(err).NotTo(HaveOccurred())
reason, message, getReasonErr := kClient.GetPodFailureReasonAndMessage(PodName, dev)
Ω(getReasonErr).NotTo(HaveOccurred())

Ω(reason).To(gomega.Equal("TaskRejected"))
Ω(message).To(gomega.ContainSubstring("PodInconsistentMetadata"))
})

ginkgo.AfterEach(func() {
tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev})
// call the healthCheck api to check scheduler health
Expand Down
12 changes: 0 additions & 12 deletions test/e2e/framework/helpers/k8s/k8s_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,6 @@ func (k *KubeCtl) GetPod(name, namespace string) (*v1.Pod, error) {
return k.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

func (k *KubeCtl) GetPodFailureReasonAndMessage(name, namespace string) (string, string, error) {
pod, err := k.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return "", "", err
}
if pod.Status.Phase == v1.PodFailed {
return pod.Status.Reason, pod.Status.Message, nil
} else {
return "", "", fmt.Errorf("pod %s is not in failed state", name)
}
}

func (k *KubeCtl) GetSchedulerPod() (string, error) {
podNameList, err := k.GetPodNamesFromNS(configmanager.YuniKornTestConfig.YkNamespace)
if err != nil {
Expand Down
55 changes: 0 additions & 55 deletions test/e2e/recovery_and_restart/recovery_and_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
Expand Down Expand Up @@ -363,60 +362,6 @@ var _ = ginkgo.Describe("", func() {
Ω(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("Verify_Pod_Restart_After_Add_Conflict_Metadata", func() {
// A tast with conflicting metadata in pod will be rejected.
// However, if the pod is already bounded, the task can still be registered to YK.
kClient = k8s.KubeCtl{}
Ω(kClient.SetClient()).To(gomega.BeNil())
defer yunikorn.RestorePortForwarding(&kClient)

ginkgo.By("Submitting a normal sleep pod with consistent metadata")
appId := "appId-" + common.RandSeq(10)
PodName := "normal-sleep-pod"
queueName := "root.abc"
var testPodConfigs = k8s.TestPodConfig{
Name: PodName,
Labels: map[string]string{
constants.CanonicalLabelApplicationID: appId,
constants.LabelApplicationID: appId,
constants.CanonicalLabelQueueName: queueName,
constants.LabelQueueName: queueName,
},
Namespace: dev,
}
pod, err := k8s.InitTestPod(testPodConfigs)
Ω(err).NotTo(gomega.HaveOccurred())
_, err = kClient.CreatePod(pod, dev)
Ω(err).NotTo(gomega.HaveOccurred())
err = kClient.WaitForPodRunning(dev, PodName, 30*time.Second)
Ω(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Add conflict queue name to the pod annotation")
pod, err = kClient.GetPod(PodName, dev)
Ω(err).NotTo(gomega.HaveOccurred())

_, err = kClient.UpdatePodWithAnnotation(pod, dev, constants.AnnotationQueueName, "other-queue")
Ω(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Restart the scheduler pod")
yunikorn.RestartYunikorn(&kClient)

ginkgo.By("Port-forward scheduler pod after restart")
yunikorn.RestorePortForwarding(&kClient)

ginkgo.By("Check the bounded pod is still in running state")
err = kClient.WaitForPodRunning(dev, PodName, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Check the task pod is still registered to YK")
restClient = yunikorn.RClient{}
err = restClient.WaitForAppStateTransition("default", "root."+dev, appId, "Running", 30)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
appsInfo, err := restClient.GetAppInfo("default", "root."+dev, appId)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
gomega.Ω(len(appsInfo.Allocations)).To(gomega.Equal(1))
})

ginkgo.AfterEach(func() {
tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev})
})
Expand Down

0 comments on commit ead6a62

Please sign in to comment.