diff --git a/pkg/appmgmt/general/general_test.go b/pkg/appmgmt/general/general_test.go index 504c2a6f0..4cdfad97f 100644 --- a/pkg/appmgmt/general/general_test.go +++ b/pkg/appmgmt/general/general_test.go @@ -591,9 +591,9 @@ func TestListApplication(t *testing.T) { // Application 4 { description: "running in default queue and namespace01, without label and annotation", - applicationID: appName[3], + applicationID: "yunikorn-namespace01-autogen", input: podCase[4].InjectPod(), - expectedOutput: false, + expectedOutput: true, }, // Application 5 { @@ -615,7 +615,7 @@ func TestListApplication(t *testing.T) { am := NewManager(mockedAPIProvider, NewPodEventHandler(amProtocol, true)) pods, err := am.ListPods() assert.NilError(t, err) - assert.Equal(t, len(pods), 3) + assert.Equal(t, len(pods), 4) for _, pod := range pods { name := utils.GetApplicationIDFromPod(pod) expected := expectOutput[name] diff --git a/pkg/appmgmt/general/metadata_test.go b/pkg/appmgmt/general/metadata_test.go index 39ea41a6f..70c35a708 100644 --- a/pkg/appmgmt/general/metadata_test.go +++ b/pkg/appmgmt/general/metadata_test.go @@ -27,6 +27,8 @@ import ( apis "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/yunikorn-k8shim/pkg/common/constants" + "github.com/apache/yunikorn-k8shim/pkg/common/utils" + "github.com/apache/yunikorn-k8shim/pkg/conf" ) func TestGetTaskMetadata(t *testing.T) { @@ -80,10 +82,18 @@ func TestGetTaskMetadata(t *testing.T) { } task, ok = getTaskMetadata(&pod) - assert.Equal(t, ok, false) + assert.Equal(t, ok, true) + assert.Equal(t, task.ApplicationID, "yunikorn-default-autogen") + assert.Equal(t, task.TaskID, "UID-POD-00001") + assert.Equal(t, task.TaskGroupName, "") } func TestGetAppMetadata(t *testing.T) { //nolint:funlen + defer utils.SetPluginMode(false) + defer func() { conf.GetSchedulerConf().GenerateUniqueAppIds = false }() + utils.SetPluginMode(false) + conf.GetSchedulerConf().GenerateUniqueAppIds = false + pod := v1.Pod{ TypeMeta: apis.TypeMeta{ Kind: "Pod", @@ -136,10 +146,10 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen Namespace: "app-namespace-01", UID: "UID-POD-00001", Labels: map[string]string{ - "applicationId": "app00002", - "queue": "root.b", + "applicationId": "app00002", + "queue": "root.b", constants.DomainYuniKorn + "user": "testuser", - "disableStateAware": "true", + "disableStateAware": "true", }, Annotations: map[string]string{ constants.AnnotationSchedulingPolicyParam: "gangSchedulingStyle=Hard", @@ -200,8 +210,8 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen Namespace: "app-namespace-01", UID: "UID-POD-00001", Labels: map[string]string{ - "applicationId": "app00002", - "queue": "root.b", + "applicationId": "app00002", + "queue": "root.b", constants.DomainYuniKorn + "user": "testuser", }, Annotations: map[string]string{ @@ -238,26 +248,19 @@ func TestGetAppMetadata(t *testing.T) { //nolint:funlen }, } + utils.SetPluginMode(false) app, ok = getAppMetadata(&pod, false) - assert.Equal(t, ok, false) - pod = v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "pod00002", - Namespace: "app-namespace-01", - UID: "UID-POD-00001", - }, - Spec: v1.PodSpec{ - SchedulerName: constants.SchedulerName, - }, - Status: v1.PodStatus{ - Phase: v1.PodPending, - }, - } + conf.GetSchedulerConf().GenerateUniqueAppIds = true + assert.Equal(t, ok, true) + assert.Equal(t, app.ApplicationID, "yunikorn-app-namespace-01-autogen") + + utils.SetPluginMode(false) + conf.GetSchedulerConf().GenerateUniqueAppIds = true + app, ok = getAppMetadata(&pod, false) + assert.Equal(t, ok, true) + assert.Equal(t, app.ApplicationID, "app-namespace-01-UID-POD-00001") + utils.SetPluginMode(true) app, ok = getAppMetadata(&pod, false) assert.Equal(t, ok, false) } diff --git a/pkg/cache/context.go b/pkg/cache/context.go index f452c6abc..cfb2596df 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -133,10 +133,6 @@ func (ctx *Context) IsPluginMode() bool { return ctx.pluginMode } -func (ctx *Context) SetPluginMode(pluginMode bool) { - ctx.pluginMode = pluginMode -} - func (ctx *Context) addNode(obj interface{}) { ctx.lock.Lock() defer ctx.lock.Unlock() diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index a97aced19..866da69d7 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -1473,8 +1473,10 @@ func TestAddApplicationsWithTags(t *testing.T) { } func TestPendingPodAllocations(t *testing.T) { + utils.SetPluginMode(true) + defer utils.SetPluginMode(false) + context := initContextForTest() - context.SetPluginMode(true) node1 := v1.Node{ ObjectMeta: apis.ObjectMeta{ diff --git a/pkg/cache/task.go b/pkg/cache/task.go index 90bc493b4..322861257 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -55,7 +55,6 @@ type Task struct { taskGroupName string placeholder bool terminationType string - pluginMode bool originator bool schedulingState interfaces.TaskSchedulingState sm *fsm.FSM @@ -88,10 +87,6 @@ func NewFromTaskMeta(tid string, app *Application, ctx *Context, metadata interf func createTaskInternal(tid string, app *Application, resource *si.Resource, pod *v1.Pod, placeholder bool, taskGroupName string, ctx *Context, originator bool) *Task { - var pluginMode bool - if ctx != nil { - pluginMode = ctx.IsPluginMode() - } task := &Task{ taskID: tid, alias: fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), @@ -103,7 +98,6 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource, createTime: pod.GetCreationTimestamp().Time, placeholder: placeholder, taskGroupName: taskGroupName, - pluginMode: pluginMode, originator: originator, context: ctx, sm: newTaskState(), @@ -344,7 +338,7 @@ func (task *Task) postTaskAllocated() { defer task.lock.Unlock() // plugin mode means we delegate this work to the default scheduler - if task.pluginMode { + if utils.IsPluginMode() { log.Log(log.ShimCacheTask).Debug("allocating pod", zap.String("podName", task.pod.Name), zap.String("podUID", string(task.pod.UID))) @@ -423,7 +417,7 @@ func (task *Task) beforeTaskAllocated(eventSrc string, allocUUID string, nodeID } func (task *Task) postTaskBound() { - if task.pluginMode { + if utils.IsPluginMode() { // When the pod is actively scheduled by YuniKorn, it can be moved to the default-scheduler's // UnschedulablePods structure. If the pod does not change, the pod will stay in the UnschedulablePods // structure for podMaxInUnschedulablePodsDuration (default 5 minutes). Here we update the pod diff --git a/pkg/common/utils/utils.go b/pkg/common/utils/utils.go index dc627ab85..962c03d1c 100644 --- a/pkg/common/utils/utils.go +++ b/pkg/common/utils/utils.go @@ -43,6 +43,16 @@ import ( const userInfoKey = siCommon.DomainYuniKorn + "user.info" +var pluginMode bool + +func SetPluginMode(value bool) { + pluginMode = value +} + +func IsPluginMode() bool { + return pluginMode +} + func Convert2Pod(obj interface{}) (*v1.Pod, error) { pod, ok := obj.(*v1.Pod) if !ok { @@ -116,35 +126,48 @@ func GenerateApplicationID(namespace string, generateUniqueAppIds bool, podUID s return fmt.Sprintf("%.63s", generatedID) } -// GetApplicationIDFromPod returns the applicationID (if present) from a Pod or an empty string if not present. -// If an applicationID is present, the Pod is managed by YuniKorn. Otherwise, it is managed by an external scheduler. +// GetApplicationIDFromPod returns the Application for a Pod. If a Pod is marked as schedulable by YuniKorn but is +// missing an ApplicationID, one will be generated here (if YuniKorn is running in standard mode) or an empty string +// will be returned (if YuniKorn is running in plugin mode). +// If an Application ID is returned, the Pod is managed by YuniKorn. Otherwise, it is managed by an external scheduler. func GetApplicationIDFromPod(pod *v1.Pod) string { // SchedulerName needs to match if strings.Compare(pod.Spec.SchedulerName, constants.SchedulerName) != 0 { return "" } - // if pod was tagged with ignore-application, return - if value := GetPodAnnotationValue(pod, constants.AnnotationIgnoreApplication); value != "" { - ignore, err := strconv.ParseBool(value) - if err != nil { - log.Log(log.ShimUtils).Warn("Failed to parse annotation "+constants.AnnotationIgnoreApplication, zap.Error(err)) - } else if ignore { - return "" + + // If pod was tagged with ignore-application and plugin mode is active, return + if pluginMode { + if value := GetPodAnnotationValue(pod, constants.AnnotationIgnoreApplication); value != "" { + ignore, err := strconv.ParseBool(value) + if err != nil { + log.Log(log.ShimUtils).Warn("Failed to parse annotation "+constants.AnnotationIgnoreApplication, zap.Error(err)) + } else if ignore { + return "" + } } } - // application ID can be defined in annotations + + // Application ID can be defined in annotation if value := GetPodAnnotationValue(pod, constants.AnnotationApplicationID); value != "" { return value } + // Application ID can be defined in label if value := GetPodLabelValue(pod, constants.LabelApplicationID); value != "" { return value } - // application ID can be defined in labels + // Spark can also define application ID if value := GetPodLabelValue(pod, constants.SparkLabelAppID); value != "" { return value } - // no application ID found, this is not a YuniKorn-managed Pod - return "" + + // If plugin mode, interpret missing Application ID as a non-YuniKorn pod + if pluginMode { + return "" + } + + // Standard deployment mode, so we need a valid Application ID to proceed. Generate one now. + return GenerateApplicationID(pod.Namespace, conf.GetSchedulerConf().GenerateUniqueAppIds, string(pod.UID)) } // compare the existing pod condition with the given one, return true if the pod condition remains not changed. diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go index 712d36b15..7f1332ef8 100644 --- a/pkg/common/utils/utils_test.go +++ b/pkg/common/utils/utils_test.go @@ -498,33 +498,52 @@ func TestPodUnderCondition(t *testing.T) { } func TestGetApplicationIDFromPod(t *testing.T) { + defer SetPluginMode(false) + defer func() { conf.GetSchedulerConf().GenerateUniqueAppIds = false }() + appIDInLabel := "labelAppID" appIDInAnnotation := "annotationAppID" appIDInSelector := "selectorAppID" sparkIDInAnnotation := "sparkAnnotationAppID" testCases := []struct { - name string - pod *v1.Pod - expectedAppID string + name string + pod *v1.Pod + expectedAppID string + expectedAppIDPluginMode string + generateUniqueAppIds bool }{ {"AppID defined in label", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.LabelApplicationID: appIDInLabel}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInLabel}, + }, appIDInLabel, appIDInLabel, false}, + {"No AppID defined", &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testns", + UID: "podUid", + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + }, "yunikorn-testns-autogen", "", false}, + {"No AppID defined but generateUnique", &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "testns", + UID: "podUid", + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + }, "testns-podUid", "", true}, {"Non-yunikorn schedulerName", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.LabelApplicationID: appIDInLabel}, }, Spec: v1.PodSpec{SchedulerName: "default"}, - }, ""}, + }, "", "", false}, {"AppID defined in annotation", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{constants.AnnotationApplicationID: appIDInAnnotation}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInAnnotation}, + }, appIDInAnnotation, appIDInAnnotation, false}, {"AppID defined but ignore-application set", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ @@ -533,7 +552,7 @@ func TestGetApplicationIDFromPod(t *testing.T) { }, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, ""}, + }, appIDInAnnotation, "", false}, {"AppID defined and ignore-application invalid", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ @@ -542,48 +561,53 @@ func TestGetApplicationIDFromPod(t *testing.T) { }, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInAnnotation}, + }, appIDInAnnotation, appIDInAnnotation, false}, {"AppID defined in label and annotation", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{constants.AnnotationApplicationID: appIDInAnnotation}, Labels: map[string]string{constants.LabelApplicationID: appIDInLabel}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInAnnotation}, + }, appIDInAnnotation, appIDInAnnotation, false}, {"Spark AppID defined in spark app selector", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInSelector}, + }, appIDInSelector, appIDInSelector, false}, {"Spark AppID defined in spark app selector and annotation", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector}, Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, sparkIDInAnnotation}, + }, sparkIDInAnnotation, sparkIDInAnnotation, false}, {"Spark AppID defined in spark app selector and annotation", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector, constants.LabelApplicationID: appIDInLabel}, Annotations: map[string]string{constants.AnnotationApplicationID: sparkIDInAnnotation}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, sparkIDInAnnotation}, - {"No AppID defined", &v1.Pod{}, ""}, + }, sparkIDInAnnotation, sparkIDInAnnotation, false}, + {"No AppID defined", &v1.Pod{}, "", "", false}, {"Spark AppID defined in spark app selector and label", &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{constants.SparkLabelAppID: appIDInSelector, constants.LabelApplicationID: appIDInLabel}, }, Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - }, appIDInLabel}, + }, appIDInLabel, appIDInLabel, false}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + conf.GetSchedulerConf().GenerateUniqueAppIds = tc.generateUniqueAppIds + SetPluginMode(false) appID := GetApplicationIDFromPod(tc.pod) - assert.Equal(t, appID, tc.expectedAppID) + assert.Equal(t, appID, tc.expectedAppID, "Wrong appID (standard mode)") + SetPluginMode(true) + appID2 := GetApplicationIDFromPod(tc.pod) + assert.Equal(t, appID2, tc.expectedAppIDPluginMode, "Wrong appID (plugin mode)") }) } } diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index f156ed165..21fc3b8d9 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -52,9 +52,10 @@ const ( EnvNamespace = "NAMESPACE" // prefixes - PrefixService = "service." - PrefixLog = "log." - PrefixKubernetes = "kubernetes." + PrefixService = "service." + PrefixLog = "log." + PrefixKubernetes = "kubernetes." + PrefixAdmissionController = "admissionController." // service CMSvcClusterID = PrefixService + "clusterId" @@ -73,19 +74,24 @@ const ( CMKubeQPS = PrefixKubernetes + "qps" CMKubeBurst = PrefixKubernetes + "burst" + // admissioncontroller + PrefixAMFiltering = PrefixAdmissionController + "filtering." + AMFilteringGenerateUniqueAppIds = PrefixAMFiltering + "generateUniqueAppId" + // defaults - DefaultNamespace = "default" - DefaultClusterID = "mycluster" - DefaultPolicyGroup = "queues" - DefaultSchedulingInterval = time.Second - DefaultVolumeBindTimeout = 10 * time.Second - DefaultEventChannelCapacity = 1024 * 1024 - DefaultDispatchTimeout = 300 * time.Second - DefaultOperatorPlugins = "general" - DefaultDisableGangScheduling = false - DefaultEnableConfigHotRefresh = true - DefaultKubeQPS = 1000 - DefaultKubeBurst = 1000 + DefaultNamespace = "default" + DefaultClusterID = "mycluster" + DefaultPolicyGroup = "queues" + DefaultSchedulingInterval = time.Second + DefaultVolumeBindTimeout = 10 * time.Second + DefaultEventChannelCapacity = 1024 * 1024 + DefaultDispatchTimeout = 300 * time.Second + DefaultOperatorPlugins = "general" + DefaultDisableGangScheduling = false + DefaultEnableConfigHotRefresh = true + DefaultKubeQPS = 1000 + DefaultKubeBurst = 1000 + DefaultAMFilteringGenerateUniqueAppIds = false ) var ( @@ -124,6 +130,7 @@ type SchedulerConf struct { PlaceHolderImage string `json:"placeHolderImage"` InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"` Namespace string `json:"namespace"` + GenerateUniqueAppIds bool `json:"generateUniqueAppIds"` sync.RWMutex } @@ -151,6 +158,7 @@ func (conf *SchedulerConf) Clone() *SchedulerConf { PlaceHolderImage: conf.PlaceHolderImage, InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey, Namespace: conf.Namespace, + GenerateUniqueAppIds: conf.GenerateUniqueAppIds, } } @@ -208,6 +216,7 @@ func handleNonReloadableConfig(old *SchedulerConf, new *SchedulerConf) { checkNonReloadableBool(CMSvcDisableGangScheduling, &old.DisableGangScheduling, &new.DisableGangScheduling) checkNonReloadableString(CMSvcPlaceholderImage, &old.PlaceHolderImage, &new.PlaceHolderImage) checkNonReloadableString(CMSvcNodeInstanceTypeNodeLabelKey, &old.InstanceTypeNodeLabelKey, &new.InstanceTypeNodeLabelKey) + checkNonReloadableBool(AMFilteringGenerateUniqueAppIds, &old.GenerateUniqueAppIds, &new.GenerateUniqueAppIds) } const warningNonReloadable = "ignoring non-reloadable configuration change (restart required to update)" @@ -337,6 +346,7 @@ func CreateDefaultConfig() *SchedulerConf { UserLabelKey: constants.DefaultUserLabel, PlaceHolderImage: constants.PlaceholderContainerImage, InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey, + GenerateUniqueAppIds: DefaultAMFilteringGenerateUniqueAppIds, } } @@ -367,6 +377,9 @@ func parseConfig(config map[string]string, prev *SchedulerConf) (*SchedulerConf, parser.intVar(&conf.KubeQPS, CMKubeQPS) parser.intVar(&conf.KubeBurst, CMKubeBurst) + // admission controller + parser.boolVar(&conf.GenerateUniqueAppIds, AMFilteringGenerateUniqueAppIds) + if len(parser.errors) > 0 { return nil, parser.errors } diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index 9916eaaeb..c7c418709 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -73,7 +73,7 @@ func NewShimScheduler(scheduler api.SchedulerAPI, configs *conf.SchedulerConf, b func NewShimSchedulerForPlugin(scheduler api.SchedulerAPI, informerFactory informers.SharedInformerFactory, configs *conf.SchedulerConf, bootstrapConfigMaps []*v1.ConfigMap) *KubernetesShim { apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, false) context := cache.NewContextWithBootstrapConfigMaps(apiFactory, bootstrapConfigMaps) - context.SetPluginMode(true) + utils.SetPluginMode(true) rmCallback := callback.NewAsyncRMCallback(context) appManager := appmgmt.NewAMService(context, apiFactory) return newShimSchedulerInternal(context, apiFactory, appManager, rmCallback)