Skip to content

Commit

Permalink
[YUNIKORN-2083] Shim: Handle missing applicationID cleanly in standar…
Browse files Browse the repository at this point in the history
…d mode

If a Pod with schedulerName: yunikorn is encountered with missing applicationID
metadata, generate an applicationID using the same algorithm as the admission
controller would have. This allows these Pods to be scheduled successfully.
  • Loading branch information
craigcondit committed Oct 27, 2023
1 parent fc54037 commit 75c5578
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 94 deletions.
6 changes: 3 additions & 3 deletions pkg/appmgmt/general/general_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,9 @@ func TestListApplication(t *testing.T) {
// Application 4
{
description: "running in default queue and namespace01, without label and annotation",
applicationID: appName[3],
applicationID: "yunikorn-namespace01-autogen",
input: podCase[4].InjectPod(),
expectedOutput: false,
expectedOutput: true,
},
// Application 5
{
Expand All @@ -615,7 +615,7 @@ func TestListApplication(t *testing.T) {
am := NewManager(mockedAPIProvider, NewPodEventHandler(amProtocol, true))
pods, err := am.ListPods()
assert.NilError(t, err)
assert.Equal(t, len(pods), 3)
assert.Equal(t, len(pods), 4)
for _, pod := range pods {
name := utils.GetApplicationIDFromPod(pod)
expected := expectOutput[name]
Expand Down
51 changes: 27 additions & 24 deletions pkg/appmgmt/general/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
apis "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
)

func TestGetTaskMetadata(t *testing.T) {
Expand Down Expand Up @@ -80,10 +82,18 @@ func TestGetTaskMetadata(t *testing.T) {
}

task, ok = getTaskMetadata(&pod)
assert.Equal(t, ok, false)
assert.Equal(t, ok, true)
assert.Equal(t, task.ApplicationID, "yunikorn-default-autogen")
assert.Equal(t, task.TaskID, "UID-POD-00001")
assert.Equal(t, task.TaskGroupName, "")
}

func TestGetAppMetadata(t *testing.T) { //nolint:funlen
defer utils.SetPluginMode(false)
defer func() { conf.GetSchedulerConf().GenerateUniqueAppIds = false }()
utils.SetPluginMode(false)
conf.GetSchedulerConf().GenerateUniqueAppIds = false

pod := v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
Expand Down Expand Up @@ -136,10 +146,10 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
Namespace: "app-namespace-01",
UID: "UID-POD-00001",
Labels: map[string]string{
"applicationId": "app00002",
"queue": "root.b",
"applicationId": "app00002",
"queue": "root.b",
constants.DomainYuniKorn + "user": "testuser",
"disableStateAware": "true",
"disableStateAware": "true",
},
Annotations: map[string]string{
constants.AnnotationSchedulingPolicyParam: "gangSchedulingStyle=Hard",
Expand Down Expand Up @@ -200,8 +210,8 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
Namespace: "app-namespace-01",
UID: "UID-POD-00001",
Labels: map[string]string{
"applicationId": "app00002",
"queue": "root.b",
"applicationId": "app00002",
"queue": "root.b",
constants.DomainYuniKorn + "user": "testuser",
},
Annotations: map[string]string{
Expand Down Expand Up @@ -238,26 +248,19 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen
},
}

utils.SetPluginMode(false)
app, ok = getAppMetadata(&pod, false)
assert.Equal(t, ok, false)
pod = v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod00002",
Namespace: "app-namespace-01",
UID: "UID-POD-00001",
},
Spec: v1.PodSpec{
SchedulerName: constants.SchedulerName,
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}
conf.GetSchedulerConf().GenerateUniqueAppIds = true
assert.Equal(t, ok, true)
assert.Equal(t, app.ApplicationID, "yunikorn-app-namespace-01-autogen")

utils.SetPluginMode(false)
conf.GetSchedulerConf().GenerateUniqueAppIds = true
app, ok = getAppMetadata(&pod, false)
assert.Equal(t, ok, true)
assert.Equal(t, app.ApplicationID, "app-namespace-01-UID-POD-00001")

utils.SetPluginMode(true)
app, ok = getAppMetadata(&pod, false)
assert.Equal(t, ok, false)
}
4 changes: 0 additions & 4 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ func (ctx *Context) IsPluginMode() bool {
return ctx.pluginMode
}

func (ctx *Context) SetPluginMode(pluginMode bool) {
ctx.pluginMode = pluginMode
}

func (ctx *Context) addNode(obj interface{}) {
node, err := convertToNode(obj)
if err != nil {
Expand Down
26 changes: 19 additions & 7 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ func TestRemoveApplicationInternal(t *testing.T) {
}

func TestFilterPods(t *testing.T) {
defer utils.SetPluginMode(false)

context := initContextForTest()
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Expand Down Expand Up @@ -370,10 +372,18 @@ func TestFilterPods(t *testing.T) {
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
assert.Check(t, !context.filterPods(nil), "nil object was allowed")
assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with no app id was allowed")
assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod was allowed")
assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was filtered")

utils.SetPluginMode(false)
assert.Check(t, !context.filterPods(nil), "standard: nil object was allowed")
assert.Check(t, context.filterPods(pod1), "standard: yunikorn-managed pod with no app id was not allowed")
assert.Check(t, !context.filterPods(pod2), "standard: non-yunikorn-managed pod was allowed")
assert.Check(t, context.filterPods(pod3), "standard: yunikorn-managed pod was filtered")

utils.SetPluginMode(true)
assert.Check(t, !context.filterPods(nil), "plugin: nil object was allowed")
assert.Check(t, !context.filterPods(pod1), "plugin: yunikorn-managed pod with no app id was allowed")
assert.Check(t, !context.filterPods(pod2), "plugin: non-yunikorn-managed pod was allowed")
assert.Check(t, context.filterPods(pod3), "plugin: yunikorn-managed pod was filtered")
}

func TestAddPodToCache(t *testing.T) {
Expand Down Expand Up @@ -1174,9 +1184,9 @@ func TestAddApplicationsWithTags(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "test2",
Annotations: map[string]string{
constants.NamespaceQuota: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
constants.NamespaceQuota: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
constants.DomainYuniKorn + "parentqueue": "root.test",
constants.NamespaceGuaranteed: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
constants.NamespaceGuaranteed: "{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
},
},
}
Expand Down Expand Up @@ -1301,8 +1311,10 @@ func TestAddApplicationsWithTags(t *testing.T) {
}

func TestPendingPodAllocations(t *testing.T) {
utils.SetPluginMode(true)
defer utils.SetPluginMode(false)

context := initContextForTest()
context.SetPluginMode(true)

node1 := v1.Node{
ObjectMeta: apis.ObjectMeta{
Expand Down
20 changes: 16 additions & 4 deletions pkg/cache/node_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/yunikorn-k8shim/pkg/common/utils"

siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -345,6 +347,8 @@ func TestDeleteTerminatedPod(t *testing.T) {
}

func TestNodeCoordinatorFilterPods(t *testing.T) {
defer utils.SetPluginMode(false)

mockedSchedulerAPI := newMockSchedulerAPI()
nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache())
host1 := NodeForTest(Host1, "10G", "10")
Expand Down Expand Up @@ -385,8 +389,16 @@ func TestNodeCoordinatorFilterPods(t *testing.T) {
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed")
assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod with no app id was filtered")
assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod was filtered")
assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod was allowed")

utils.SetPluginMode(false)
assert.Check(t, !coordinator.filterPods(nil), "standard: nil object was allowed")
assert.Check(t, !coordinator.filterPods(pod1), "standard: yunikorn-managed pod with no app id was allowed")
assert.Check(t, coordinator.filterPods(pod2), "standard: non-yunikorn-managed pod was filtered")
assert.Check(t, !coordinator.filterPods(pod3), "standard: yunikorn-managed pod was allowed")

utils.SetPluginMode(true)
assert.Check(t, !coordinator.filterPods(nil), "plugin: nil object was allowed")
assert.Check(t, coordinator.filterPods(pod1), "plugin: yunikorn-managed pod with no app id was filtered")
assert.Check(t, coordinator.filterPods(pod2), "plugin: non-yunikorn-managed pod was filtered")
assert.Check(t, !coordinator.filterPods(pod3), "plugin: yunikorn-managed pod was allowed")
}
10 changes: 2 additions & 8 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type Task struct {
taskGroupName string
placeholder bool
terminationType string
pluginMode bool
originator bool
schedulingState interfaces.TaskSchedulingState
sm *fsm.FSM
Expand Down Expand Up @@ -88,10 +87,6 @@ func NewFromTaskMeta(tid string, app *Application, ctx *Context, metadata interf

func createTaskInternal(tid string, app *Application, resource *si.Resource,
pod *v1.Pod, placeholder bool, taskGroupName string, ctx *Context, originator bool) *Task {
var pluginMode bool
if ctx != nil {
pluginMode = ctx.IsPluginMode()
}
task := &Task{
taskID: tid,
alias: fmt.Sprintf("%s/%s", pod.Namespace, pod.Name),
Expand All @@ -103,7 +98,6 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource,
createTime: pod.GetCreationTimestamp().Time,
placeholder: placeholder,
taskGroupName: taskGroupName,
pluginMode: pluginMode,
originator: originator,
context: ctx,
sm: newTaskState(),
Expand Down Expand Up @@ -344,7 +338,7 @@ func (task *Task) postTaskAllocated() {
defer task.lock.Unlock()

// plugin mode means we delegate this work to the default scheduler
if task.pluginMode {
if utils.IsPluginMode() {
log.Log(log.ShimCacheTask).Debug("allocating pod",
zap.String("podName", task.pod.Name),
zap.String("podUID", string(task.pod.UID)))
Expand Down Expand Up @@ -423,7 +417,7 @@ func (task *Task) beforeTaskAllocated(eventSrc string, allocUUID string, nodeID
}

func (task *Task) postTaskBound() {
if task.pluginMode {
if utils.IsPluginMode() {
// When the pod is actively scheduled by YuniKorn, it can be moved to the default-scheduler's
// UnschedulablePods structure. If the pod does not change, the pod will stay in the UnschedulablePods
// structure for podMaxInUnschedulablePodsDuration (default 5 minutes). Here we update the pod
Expand Down
49 changes: 36 additions & 13 deletions pkg/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ import (

const userInfoKey = siCommon.DomainYuniKorn + "user.info"

var pluginMode bool

func SetPluginMode(value bool) {
pluginMode = value
}

func IsPluginMode() bool {
return pluginMode

Check warning on line 53 in pkg/common/utils/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/utils/utils.go#L52-L53

Added lines #L52 - L53 were not covered by tests
}

func Convert2Pod(obj interface{}) (*v1.Pod, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand Down Expand Up @@ -116,35 +126,48 @@ func GenerateApplicationID(namespace string, generateUniqueAppIds bool, podUID s
return fmt.Sprintf("%.63s", generatedID)
}

// GetApplicationIDFromPod returns the applicationID (if present) from a Pod or an empty string if not present.
// If an applicationID is present, the Pod is managed by YuniKorn. Otherwise, it is managed by an external scheduler.
// GetApplicationIDFromPod returns the Application for a Pod. If a Pod is marked as schedulable by YuniKorn but is
// missing an ApplicationID, one will be generated here (if YuniKorn is running in standard mode) or an empty string
// will be returned (if YuniKorn is running in plugin mode).
// If an Application ID is returned, the Pod is managed by YuniKorn. Otherwise, it is managed by an external scheduler.
func GetApplicationIDFromPod(pod *v1.Pod) string {
// SchedulerName needs to match
if strings.Compare(pod.Spec.SchedulerName, constants.SchedulerName) != 0 {
return ""
}
// if pod was tagged with ignore-application, return
if value := GetPodAnnotationValue(pod, constants.AnnotationIgnoreApplication); value != "" {
ignore, err := strconv.ParseBool(value)
if err != nil {
log.Log(log.ShimUtils).Warn("Failed to parse annotation "+constants.AnnotationIgnoreApplication, zap.Error(err))
} else if ignore {
return ""

// If pod was tagged with ignore-application and plugin mode is active, return
if pluginMode {
if value := GetPodAnnotationValue(pod, constants.AnnotationIgnoreApplication); value != "" {
ignore, err := strconv.ParseBool(value)
if err != nil {
log.Log(log.ShimUtils).Warn("Failed to parse annotation "+constants.AnnotationIgnoreApplication, zap.Error(err))
} else if ignore {
return ""
}
}
}
// application ID can be defined in annotations

// Application ID can be defined in annotation
if value := GetPodAnnotationValue(pod, constants.AnnotationApplicationID); value != "" {
return value
}
// Application ID can be defined in label
if value := GetPodLabelValue(pod, constants.LabelApplicationID); value != "" {
return value
}
// application ID can be defined in labels
// Spark can also define application ID
if value := GetPodLabelValue(pod, constants.SparkLabelAppID); value != "" {
return value
}
// no application ID found, this is not a YuniKorn-managed Pod
return ""

// If plugin mode, interpret missing Application ID as a non-YuniKorn pod
if pluginMode {
return ""
}

// Standard deployment mode, so we need a valid Application ID to proceed. Generate one now.
return GenerateApplicationID(pod.Namespace, conf.GetSchedulerConf().GenerateUniqueAppIds, string(pod.UID))
}

// compare the existing pod condition with the given one, return true if the pod condition remains not changed.
Expand Down
Loading

0 comments on commit 75c5578

Please sign in to comment.