diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 14aba6796f..3b0a4a7216 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -12,8 +12,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/coocood/freecache v1.1.1 github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26 +<<<<<<< HEAD github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000 github.com/flyteorg/flyteidl v0.0.0-00010101000000-000000000000 +======= + github.com/flyteorg/flyteidl v1.5.20-0.20231002193413-9bb0dd7669d3 + github.com/flyteorg/flytestdlib v1.0.15 +>>>>>>> flyteplugins/jeev/gpu-type github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.5.3 github.com/hashicorp/golang-lru v0.5.4 diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index b47b530091..64e5f448fc 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -213,8 +213,21 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +<<<<<<< HEAD github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864= github.com/flyteorg/stow v0.3.7/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= +======= +github.com/flyteorg/flyteidl v1.5.18-0.20230913190844-dc07c4922069 h1:8MJ/9HeJ+B+K8lvOuQjUXmpYQkZNLEzyMlhZdmtLtpM= +github.com/flyteorg/flyteidl v1.5.18-0.20230913190844-dc07c4922069/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.20-0.20230922194114-38e5c44a6f69 h1:AEQIufFO40WrJ3gdxsF/hfOnKvCmJvUKj5RzSk6naSY= +github.com/flyteorg/flyteidl v1.5.20-0.20230922194114-38e5c44a6f69/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.20-0.20231002193413-9bb0dd7669d3 h1:FattF/MhnzISxLGB52xEHI12ixA7toJ4ahr4LfMOzJM= +github.com/flyteorg/flyteidl v1.5.20-0.20231002193413-9bb0dd7669d3/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= +github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= +github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= +github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= +>>>>>>> flyteplugins/jeev/gpu-type github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go b/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go index b3115a7a20..bef5c35a05 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go @@ -10,6 +10,7 @@ import ( // TaskOverrides interface to expose any overrides that have been set for this task (like resource overrides etc) type TaskOverrides interface { GetResources() *v1.ResourceRequirements + GetResourceExtensions() *core.ResourceExtensions GetConfig() *v1.ConfigMap } diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go index d40f1461d7..e82322c1cd 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go @@ -3,7 +3,9 @@ package mocks import ( + flyteidlcore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" mock "github.com/stretchr/testify/mock" + v1 "k8s.io/api/core/v1" ) @@ -46,6 +48,40 @@ func (_m *TaskOverrides) GetConfig() *v1.ConfigMap { return r0 } +type TaskOverrides_GetResourceExtensions struct { + *mock.Call +} + +func (_m TaskOverrides_GetResourceExtensions) Return(_a0 *flyteidlcore.ResourceExtensions) *TaskOverrides_GetResourceExtensions { + return &TaskOverrides_GetResourceExtensions{Call: _m.Call.Return(_a0)} +} + +func (_m *TaskOverrides) OnGetResourceExtensions() *TaskOverrides_GetResourceExtensions { + c_call := _m.On("GetResourceExtensions") + return &TaskOverrides_GetResourceExtensions{Call: c_call} +} + +func (_m *TaskOverrides) OnGetResourceExtensionsMatch(matchers ...interface{}) *TaskOverrides_GetResourceExtensions { + c_call := _m.On("GetResourceExtensions", matchers...) + return &TaskOverrides_GetResourceExtensions{Call: c_call} +} + +// GetResourceExtensions provides a mock function with given fields: +func (_m *TaskOverrides) GetResourceExtensions() *flyteidlcore.ResourceExtensions { + ret := _m.Called() + + var r0 *flyteidlcore.ResourceExtensions + if rf, ok := ret.Get(0).(func() *flyteidlcore.ResourceExtensions); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*flyteidlcore.ResourceExtensions) + } + } + + return r0 +} + type TaskOverrides_GetResources struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 67547f8256..abfd7b46ce 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -53,7 +53,9 @@ var ( ImagePullBackoffGracePeriod: config2.Duration{ Duration: time.Minute * 3, }, - GpuResourceName: ResourceNvidiaGPU, + GpuDeviceNodeLabel: "k8s.amazonaws.com/accelerator", + GpuPartitionSizeNodeLabel: "k8s.amazonaws.com/gpu-partition-size", + GpuResourceName: ResourceNvidiaGPU, DefaultPodTemplateResync: config2.Duration{ Duration: 30 * time.Second, }, @@ -140,6 +142,18 @@ type K8sPluginConfig struct { // one, and the corresponding task marked as failed ImagePullBackoffGracePeriod config2.Duration `json:"image-pull-backoff-grace-period" pflag:"-,Time to wait for transient ImagePullBackoff errors to be resolved."` + // The node label that specifies the attached GPU device. + GpuDeviceNodeLabel string `json:"gpu-device-node-label" pflag:"-,The node label that specifies the attached GPU device."` + + // The node label that specifies the attached GPU partition size. + GpuPartitionSizeNodeLabel string `json:"gpu-partition-size-node-label" pflag:"-,The node label that specifies the attached GPU partition size."` + + // Override for node selector requirement added to pods intended for unpartitioned GPU nodes. + GpuUnpartitionedNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"gpu-unpartitioned-node-selector-requirement" pflag:"-,Override for node selector requirement added to pods intended for unpartitioned GPU nodes."` + + // Override for toleration added to pods intended for unpartitioned GPU nodes. + GpuUnpartitionedToleration *v1.Toleration `json:"gpu-unpartitioned-toleration" pflag:"-,Override for toleration added to pods intended for unpartitioned GPU nodes."` + // The name of the GPU resource to use when the task resource requests GPUs. GpuResourceName v1.ResourceName `json:"gpu-resource-name" pflag:"-,The name of the GPU resource to use when the task resource requests GPUs."` diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 268d33e02c..cbde8ba60b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -9,11 +9,20 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" +<<<<<<< HEAD pluginserrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/template" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils" +======= + pluginserrors "github.com/flyteorg/flyteplugins/go/tasks/errors" + pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + "github.com/golang/protobuf/proto" +>>>>>>> flyteplugins/jeev/gpu-type "github.com/flyteorg/flyte/flytestdlib/logger" @@ -32,6 +41,53 @@ const defaultContainerTemplateName = "default" const primaryContainerTemplateName = "primary" const PrimaryContainerKey = "primary_container_name" +const GpuPartitionSizeNotSet = "NotSet" + +// AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement +// objects to an existing v1.Affinity object. If there are no existing required +// node selectors, the new v1.NodeSelectorRequirement will be added as-is. +// However, if there are existing required node selectors, we iterate over all existing +// node selector terms and append the node selector requirement. Note that multiple node +// selector terms are OR'd, and match expressions within a single node selector term +// are AND'd during scheduling. +// See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity +func AddRequiredNodeSelectorRequirements(base *v1.Affinity, new ...v1.NodeSelectorRequirement) { + if base.NodeAffinity == nil { + base.NodeAffinity = &v1.NodeAffinity{} + } + if base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + if len(base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 { + nodeSelectorTerms := base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + for i := range nodeSelectorTerms { + nst := &nodeSelectorTerms[i] + nst.MatchExpressions = append(nst.MatchExpressions, new...) + } + } else { + base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{v1.NodeSelectorTerm{MatchExpressions: new}} + } +} + +// AddPreferredNodeSelectorRequirements appends the provided v1.NodeSelectorRequirement +// objects to an existing v1.Affinity object's list of preferred scheduling terms. +// See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity-weight +// for how weights are used during scheduling. +func AddPreferredNodeSelectorRequirements(base *v1.Affinity, weight int32, new ...v1.NodeSelectorRequirement) { + if base.NodeAffinity == nil { + base.NodeAffinity = &v1.NodeAffinity{} + } + base.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append( + base.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + v1.PreferredSchedulingTerm{ + Weight: weight, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: new, + }, + }, + ) +} + // ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified. func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity) { // Determine node selector terms to add to node affinity @@ -48,6 +104,7 @@ func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1. nodeSelectorRequirement = *config.GetK8sPluginConfig().NonInterruptibleNodeSelectorRequirement } +<<<<<<< HEAD if affinity.NodeAffinity == nil { affinity.NodeAffinity = &v1.NodeAffinity{} } @@ -64,6 +121,9 @@ func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1. affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{{MatchExpressions: []v1.NodeSelectorRequirement{nodeSelectorRequirement}}} } +======= + AddRequiredNodeSelectorRequirements(affinity, nodeSelectorRequirement) +>>>>>>> flyteplugins/jeev/gpu-type } // ApplyInterruptibleNodeAffinity configures the node-affinity for the pod using the configuration specified. @@ -75,6 +135,121 @@ func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec) { ApplyInterruptibleNodeSelectorRequirement(interruptible, podSpec.Affinity) } +// Specialized merging of overrides into a base *core.ResourceExtensions object. Note +// that doing a nested merge may not be the intended behavior all the time, so we +// handle each field separately here. +func applyResourceExtensionsOverrides(base, overrides *core.ResourceExtensions) *core.ResourceExtensions { + // Handle case where base might be nil + var new *core.ResourceExtensions + if base == nil { + new = &core.ResourceExtensions{} + } else { + new = proto.Clone(base).(*core.ResourceExtensions) + } + + // No overrides found + if overrides == nil { + return new + } + + // GPU Accelerator + if overrides.GetGpuAccelerator() != nil { + new.GpuAccelerator = overrides.GetGpuAccelerator() + } + + return new +} + +func ApplyGPUNodeSelectors(podSpec *v1.PodSpec, gpuAccelerator *core.GPUAccelerator) { + // Short circuit if pod spec does not contain any containers that use GPUs + requiresGPUs := false + for _, cnt := range podSpec.Containers { + if _, ok := cnt.Resources.Limits[config.GetK8sPluginConfig().GpuResourceName]; ok { + requiresGPUs = true + break + } + } + if !requiresGPUs { + return + } + + if podSpec.Affinity == nil { + podSpec.Affinity = &v1.Affinity{} + } + + // Apply changes for GPU device + device := gpuAccelerator.GetDevice() + if len(device) > 0 { + // Add node selector requirement for GPU device + deviceNsr := v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{device}, + } + AddRequiredNodeSelectorRequirements(podSpec.Affinity, deviceNsr) + // Add toleration for GPU device + deviceTol := v1.Toleration{ + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: device, + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + } + podSpec.Tolerations = append(podSpec.Tolerations, deviceTol) + } + + // Short circuit if a partition size preference is not specified + partitionSizeValue := gpuAccelerator.GetPartitionSizeValue() + if partitionSizeValue == nil { + return + } + + // Apply changes for GPU partition size, if applicable + var partitionSizeNsr *v1.NodeSelectorRequirement + var partitionSizeTol *v1.Toleration + switch partitionSizeValue.(type) { + case *core.GPUAccelerator_Unpartitioned: + if !gpuAccelerator.GetUnpartitioned() { + break + } + if config.GetK8sPluginConfig().GpuUnpartitionedNodeSelectorRequirement != nil { + partitionSizeNsr = config.GetK8sPluginConfig().GpuUnpartitionedNodeSelectorRequirement + } else { + partitionSizeNsr = &v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: v1.NodeSelectorOpDoesNotExist, + } + } + if config.GetK8sPluginConfig().GpuUnpartitionedToleration != nil { + partitionSizeTol = config.GetK8sPluginConfig().GpuUnpartitionedToleration + } else { + partitionSizeTol = &v1.Toleration{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: GpuPartitionSizeNotSet, + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + } + } + case *core.GPUAccelerator_PartitionSize: + partitionSizeNsr = &v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{gpuAccelerator.GetPartitionSize()}, + } + partitionSizeTol = &v1.Toleration{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: gpuAccelerator.GetPartitionSize(), + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + } + } + if partitionSizeNsr != nil { + AddRequiredNodeSelectorRequirements(podSpec.Affinity, *partitionSizeNsr) + } + if partitionSizeTol != nil { + podSpec.Tolerations = append(podSpec.Tolerations, *partitionSizeTol) + } +} + // UpdatePod updates the base pod spec used to execute tasks. This is configured with plugins and task metadata-specific options func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, resourceRequirements []v1.ResourceRequirements, podSpec *v1.PodSpec) { @@ -91,6 +266,7 @@ func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, podSpec.SchedulerName = config.GetK8sPluginConfig().SchedulerName } podSpec.NodeSelector = utils.UnionMaps(config.GetK8sPluginConfig().DefaultNodeSelector, podSpec.NodeSelector) + if taskExecutionMetadata.IsInterruptible() { podSpec.NodeSelector = utils.UnionMaps(podSpec.NodeSelector, config.GetK8sPluginConfig().InterruptibleNodeSelector) } @@ -250,6 +426,22 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut return nil, nil, err } + // handling for resource extensions + // Merge overrides with base resource extensions + var resourceExtensions *core.ResourceExtensions + if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().GetResources() != nil { + resourceExtensions = taskTemplate.GetContainer().GetResources().GetExtensions() + } + resourceExtensions = applyResourceExtensionsOverrides( + resourceExtensions, + tCtx.TaskExecutionMetadata().GetOverrides().GetResourceExtensions(), + ) + + // GPU accelerator + if resourceExtensions.GetGpuAccelerator() != nil { + ApplyGPUNodeSelectors(podSpec, resourceExtensions.GetGpuAccelerator()) + } + return podSpec, objectMeta, nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 50c9d0eb49..237bb23682 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -29,7 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements) pluginsCore.TaskExecutionMetadata { +func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements, resourceExtensions *core.ResourceExtensions) pluginsCore.TaskExecutionMetadata { taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{} taskExecutionMetadata.On("GetNamespace").Return("test-namespace") taskExecutionMetadata.On("GetAnnotations").Return(map[string]string{"annotation-1": "val1"}) @@ -54,6 +54,7 @@ func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements) pluginsCore. to := &pluginsCoreMock.TaskOverrides{} to.On("GetResources").Return(resources) + to.On("GetResourceExtensions").Return(resourceExtensions) taskExecutionMetadata.On("GetOverrides").Return(to) taskExecutionMetadata.On("IsInterruptible").Return(true) taskExecutionMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{}) @@ -84,7 +85,7 @@ func dummyInputReader() io.InputReader { return inputReader } -func dummyExecContext(r *v1.ResourceRequirements) pluginsCore.TaskExecutionContext { +func dummyExecContext(r *v1.ResourceRequirements, rm *core.ResourceExtensions) pluginsCore.TaskExecutionContext { ow := &pluginsIOMock.OutputWriter{} ow.OnGetOutputPrefixPath().Return("") ow.OnGetRawOutputPrefix().Return("") @@ -92,7 +93,7 @@ func dummyExecContext(r *v1.ResourceRequirements) pluginsCore.TaskExecutionConte ow.OnGetPreviousCheckpointsPrefix().Return("/prev") tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(r)) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(r, rm)) tCtx.OnInputReader().Return(dummyInputReader()) tCtx.OnTaskReader().Return(dummyTaskReader()) tCtx.OnOutputWriter().Return(ow) @@ -112,6 +113,226 @@ func TestPodSetup(t *testing.T) { t.Run("ToK8sPodInterruptible", toK8sPodInterruptible) } +func TestAddRequiredNodeSelectorRequirements(t *testing.T) { + t.Run("with empty node affinity", func(t *testing.T) { + affinity := v1.Affinity{} + nst := v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + } + AddRequiredNodeSelectorRequirements(&affinity, nst) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + }, + }, + }, + }, + affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + }) + + t.Run("with existing node affinity", func(t *testing.T) { + affinity := v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "required", + Operator: v1.NodeSelectorOpIn, + Values: []string{"required"}, + }, + }, + }, + }, + }, + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + v1.PreferredSchedulingTerm{ + Weight: 1, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "preferred", + Operator: v1.NodeSelectorOpIn, + Values: []string{"preferred"}, + }, + }, + }, + }, + }, + }, + } + nst := v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + } + AddRequiredNodeSelectorRequirements(&affinity, nst) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "required", + Operator: v1.NodeSelectorOpIn, + Values: []string{"required"}, + }, + v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + }, + }, + }, + }, + affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.PreferredSchedulingTerm{ + v1.PreferredSchedulingTerm{ + Weight: 1, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "preferred", + Operator: v1.NodeSelectorOpIn, + Values: []string{"preferred"}, + }, + }, + }, + }, + }, + affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + ) + }) +} + +func TestAddPreferredNodeSelectorRequirements(t *testing.T) { + t.Run("with empty node affinity", func(t *testing.T) { + affinity := v1.Affinity{} + nst := v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + } + AddPreferredNodeSelectorRequirements(&affinity, 10, nst) + assert.EqualValues( + t, + []v1.PreferredSchedulingTerm{ + v1.PreferredSchedulingTerm{ + Weight: 10, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + }, + }, + }, + }, + }, + affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + ) + }) + + t.Run("with existing node affinity", func(t *testing.T) { + affinity := v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "required", + Operator: v1.NodeSelectorOpIn, + Values: []string{"required"}, + }, + }, + }, + }, + }, + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + v1.PreferredSchedulingTerm{ + Weight: 1, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "preferred", + Operator: v1.NodeSelectorOpIn, + Values: []string{"preferred"}, + }, + }, + }, + }, + }, + }, + } + nst := v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + } + AddPreferredNodeSelectorRequirements(&affinity, 10, nst) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "required", + Operator: v1.NodeSelectorOpIn, + Values: []string{"required"}, + }, + }, + }, + }, + affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.PreferredSchedulingTerm{ + v1.PreferredSchedulingTerm{ + Weight: 1, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "preferred", + Operator: v1.NodeSelectorOpIn, + Values: []string{"preferred"}, + }, + }, + }, + }, + v1.PreferredSchedulingTerm{ + Weight: 10, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "new", + Operator: v1.NodeSelectorOpIn, + Values: []string{"new"}, + }, + }, + }, + }, + }, + affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + ) + }) +} + func TestApplyInterruptibleNodeAffinity(t *testing.T) { t.Run("WithInterruptibleNodeSelectorRequirement", func(t *testing.T) { podSpec := v1.PodSpec{} @@ -196,6 +417,288 @@ func TestApplyInterruptibleNodeAffinity(t *testing.T) { }) } +func TestApplyResourceExtensionsOverrides(t *testing.T) { + t4 := &core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-t4", + }, + } + partitionedA100 := &core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + } + unpartitionedA100 := &core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_Unpartitioned{ + Unpartitioned: true, + }, + }, + } + + t.Run("base and overrides are nil", func(t *testing.T) { + final := applyResourceExtensionsOverrides(nil, nil) + assert.NotNil(t, final) + }) + + t.Run("base is nil", func(t *testing.T) { + final := applyResourceExtensionsOverrides(nil, t4) + assert.EqualValues( + t, + t4.GetGpuAccelerator(), + final.GetGpuAccelerator(), + ) + }) + + t.Run("overrides is nil", func(t *testing.T) { + final := applyResourceExtensionsOverrides(t4, nil) + assert.EqualValues( + t, + t4.GetGpuAccelerator(), + final.GetGpuAccelerator(), + ) + }) + + t.Run("merging", func(t *testing.T) { + final := applyResourceExtensionsOverrides(partitionedA100, unpartitionedA100) + assert.EqualValues( + t, + unpartitionedA100.GetGpuAccelerator(), + final.GetGpuAccelerator(), + ) + }) +} + +func TestApplyGPUNodeSelectors(t *testing.T) { + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + GpuResourceName: "nvidia.com/gpu", + GpuDeviceNodeLabel: "gpu-device", + GpuPartitionSizeNodeLabel: "gpu-partition-size", + })) + + basePodSpec := &v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + }, + }, + } + + t.Run("without gpu resource", func(t *testing.T) { + podSpec := &v1.PodSpec{} + ApplyGPUNodeSelectors( + podSpec, + &core.GPUAccelerator{Device: "nvidia-tesla-a100"}, + ) + assert.Nil(t, podSpec.Affinity) + }) + + t.Run("with gpu device spec only", func(t *testing.T) { + podSpec := basePodSpec.DeepCopy() + ApplyGPUNodeSelectors( + podSpec, + &core.GPUAccelerator{Device: "nvidia-tesla-a100"}, + ) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "gpu-device", + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: "gpu-device", + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) + }) + + t.Run("with gpu device and partition size spec", func(t *testing.T) { + podSpec := basePodSpec.DeepCopy() + ApplyGPUNodeSelectors( + podSpec, + &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + ) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "gpu-device", + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + v1.NodeSelectorRequirement{ + Key: "gpu-partition-size", + Operator: v1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: "gpu-device", + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: "gpu-partition-size", + Value: "1g.5gb", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) + }) + + t.Run("with unpartitioned gpu device spec", func(t *testing.T) { + podSpec := basePodSpec.DeepCopy() + ApplyGPUNodeSelectors( + podSpec, + &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_Unpartitioned{ + Unpartitioned: true, + }, + }, + ) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "gpu-device", + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + v1.NodeSelectorRequirement{ + Key: "gpu-partition-size", + Operator: v1.NodeSelectorOpDoesNotExist, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: "gpu-device", + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: "gpu-partition-size", + Value: GpuPartitionSizeNotSet, + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) + }) + + t.Run("with unpartitioned gpu device spec with custom node selector and toleration", func(t *testing.T) { + gpuUnpartitionedNodeSelectorRequirement := v1.NodeSelectorRequirement{ + Key: "gpu-unpartitioned", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + } + gpuUnpartitionedToleration := v1.Toleration{ + Key: "gpu-unpartitioned", + Value: "true", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + } + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + GpuResourceName: "nvidia.com/gpu", + GpuDeviceNodeLabel: "gpu-device", + GpuPartitionSizeNodeLabel: "gpu-partition-size", + GpuUnpartitionedNodeSelectorRequirement: &gpuUnpartitionedNodeSelectorRequirement, + GpuUnpartitionedToleration: &gpuUnpartitionedToleration, + })) + + podSpec := basePodSpec.DeepCopy() + ApplyGPUNodeSelectors( + podSpec, + &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_Unpartitioned{ + Unpartitioned: true, + }, + }, + ) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: "gpu-device", + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + gpuUnpartitionedNodeSelectorRequirement, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: "gpu-device", + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + gpuUnpartitionedToleration, + }, + podSpec.Tolerations, + ) + }) +} + func updatePod(t *testing.T) { taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{ Limits: v1.ResourceList{ @@ -206,7 +709,7 @@ func updatePod(t *testing.T) { v1.ResourceCPU: resource.MustParse("1024m"), v1.ResourceStorage: resource.MustParse("100M"), }, - }) + }, &core.ResourceExtensions{}) pod := &v1.Pod{ Spec: v1.PodSpec{ @@ -259,7 +762,7 @@ func updatePod(t *testing.T) { } func TestUpdatePodWithDefaultAffinityAndInterruptibleNodeSelectorRequirement(t *testing.T) { - taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{}) + taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ DefaultAffinity: &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ @@ -323,7 +826,7 @@ func toK8sPodInterruptible(t *testing.T) { v1.ResourceCPU: resource.MustParse("1024m"), v1.ResourceStorage: resource.MustParse("100M"), }, - }) + }, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) @@ -390,7 +893,7 @@ func TestToK8sPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("1024m"), v1.ResourceStorage: resource.MustParse("100M"), }, - }) + }, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) @@ -407,7 +910,7 @@ func TestToK8sPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("1024m"), v1.ResourceStorage: resource.MustParse("100M"), }, - }) + }, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) @@ -425,7 +928,7 @@ func TestToK8sPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("1024m"), v1.ResourceStorage: resource.MustParse("100M"), }, - }) + }, &core.ResourceExtensions{}) assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ DefaultNodeSelector: map[string]string{ @@ -452,7 +955,7 @@ func TestToK8sPod(t *testing.T) { }, })) - x := dummyExecContext(&v1.ResourceRequirements{}) + x := dummyExecContext(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) assert.NotNil(t, p.SecurityContext) @@ -464,7 +967,7 @@ func TestToK8sPod(t *testing.T) { assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ EnableHostNetworkingPod: &enabled, })) - x := dummyExecContext(&v1.ResourceRequirements{}) + x := dummyExecContext(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) assert.True(t, p.HostNetwork) @@ -475,7 +978,7 @@ func TestToK8sPod(t *testing.T) { assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ EnableHostNetworkingPod: &enabled, })) - x := dummyExecContext(&v1.ResourceRequirements{}) + x := dummyExecContext(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) assert.False(t, p.HostNetwork) @@ -483,7 +986,7 @@ func TestToK8sPod(t *testing.T) { t.Run("skipSettingHostNetwork", func(t *testing.T) { assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{})) - x := dummyExecContext(&v1.ResourceRequirements{}) + x := dummyExecContext(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) assert.False(t, p.HostNetwork) @@ -517,7 +1020,7 @@ func TestToK8sPod(t *testing.T) { }, })) - x := dummyExecContext(&v1.ResourceRequirements{}) + x := dummyExecContext(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) assert.NotNil(t, p.DNSConfig) @@ -538,7 +1041,7 @@ func TestToK8sPod(t *testing.T) { "foo": "bar", }, })) - x := dummyExecContext(&v1.ResourceRequirements{}) + x := dummyExecContext(&v1.ResourceRequirements{}, &core.ResourceExtensions{}) p, _, _, err := ToK8sPodSpec(ctx, x) assert.NoError(t, err) for _, c := range p.Containers { @@ -553,6 +1056,139 @@ func TestToK8sPod(t *testing.T) { }) } +func TestToK8sPodResourceExtensions(t *testing.T) { + ctx := context.TODO() + + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + GpuDeviceNodeLabel: "gpu-node-label", + GpuPartitionSizeNodeLabel: "gpu-partition-size", + GpuResourceName: ResourceNvidiaGPU, + })) + + task := &core.TaskTemplate{ + Type: "test", + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Resources: &core.Resources{ + Limits: []*core.Resources_ResourceEntry{ + &core.Resources_ResourceEntry{ + Name: core.Resources_GPU, + Value: "1", + }, + }, + Extensions: &core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-t4", + }, + }, + }, + }, + }, + } + + taskReader := &pluginsCoreMock.TaskReader{} + taskReader.On("Read", mock.Anything).Return(task, nil) + + ow := &pluginsIOMock.OutputWriter{} + ow.OnGetOutputPrefixPath().Return("") + ow.OnGetRawOutputPrefix().Return("") + ow.OnGetCheckpointPrefix().Return("") + ow.OnGetPreviousCheckpointsPrefix().Return("") + + execContextWithResourceExtensions := func(rm *core.ResourceExtensions) pluginsCore.TaskExecutionContext { + tCtx := &pluginsCoreMock.TaskExecutionContext{} + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, rm)) + tCtx.OnInputReader().Return(dummyInputReader()) + tCtx.OnTaskReader().Return(taskReader) + tCtx.OnOutputWriter().Return(ow) + return tCtx + } + + t.Run("no override", func(t *testing.T) { + tCtx := execContextWithResourceExtensions(&core.ResourceExtensions{}) + p, _, _, err := ToK8sPodSpec(ctx, tCtx) + assert.NoError(t, err) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-t4"}, + }, + }, + }, + }, + p.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-t4", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + }, + p.Tolerations, + ) + }) + + t.Run("with override", func(t *testing.T) { + tCtx := execContextWithResourceExtensions(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) + p, _, _, err := ToK8sPodSpec(ctx, tCtx) + assert.NoError(t, err) + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + p.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + }, + p.Tolerations, + ) + }) +} + func TestDemystifyPending(t *testing.T) { assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ CreateContainerErrorGracePeriod: config1.Duration{ @@ -1089,7 +1725,7 @@ func TestGetPodTemplate(t *testing.T) { taskReader.On("Read", mock.Anything).Return(task, nil) tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{})) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{})) tCtx.OnTaskReader().Return(taskReader) // initialize PodTemplateStore @@ -1115,7 +1751,7 @@ func TestGetPodTemplate(t *testing.T) { taskReader.On("Read", mock.Anything).Return(task, nil) tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{})) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{})) tCtx.OnTaskReader().Return(taskReader) // initialize PodTemplateStore @@ -1142,7 +1778,7 @@ func TestGetPodTemplate(t *testing.T) { taskReader.On("Read", mock.Anything).Return(task, nil) tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{})) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{})) tCtx.OnTaskReader().Return(taskReader) // initialize PodTemplateStore @@ -1170,7 +1806,7 @@ func TestGetPodTemplate(t *testing.T) { taskReader.On("Read", mock.Anything).Return(task, nil) tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{})) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{})) tCtx.OnTaskReader().Return(taskReader) // initialize PodTemplateStore @@ -1212,7 +1848,7 @@ func TestMergeWithBasePodTemplate(t *testing.T) { taskReader.On("Read", mock.Anything).Return(task, nil) tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{})) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{})) tCtx.OnTaskReader().Return(taskReader) resultPodSpec, resultObjectMeta, err := MergeWithBasePodTemplate(context.TODO(), tCtx, &podSpec, &objectMeta, "foo") @@ -1266,7 +1902,7 @@ func TestMergeWithBasePodTemplate(t *testing.T) { taskReader.On("Read", mock.Anything).Return(task, nil) tCtx := &pluginsCoreMock.TaskExecutionContext{} - tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{})) + tCtx.OnTaskExecutionMetadata().Return(dummyTaskExecutionMetadata(&v1.ResourceRequirements{}, &core.ResourceExtensions{})) tCtx.OnTaskReader().Return(taskReader) resultPodSpec, resultObjectMeta, err := MergeWithBasePodTemplate(context.TODO(), tCtx, &podSpec, &objectMeta, "foo") diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go index 0a69186c3c..1ca0281f3e 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go @@ -97,6 +97,7 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta v1.ResourceCPU: resource.MustParse("10"), }, }) + overrides.OnGetResourceExtensions().Return(&core2.ResourceExtensions{}) tMeta := &mocks.TaskExecutionMetadata{} tMeta.OnGetTaskExecutionID().Return(tID) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index 4c788b6bc1..9713ffea46 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -10,6 +10,7 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" kfplugins "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins/kubeflow" +<<<<<<< HEAD "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" @@ -17,6 +18,16 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils" "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/common" +======= + "github.com/flyteorg/flyteplugins/go/tasks/logs" + pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + flytek8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + pluginIOMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + "github.com/flyteorg/flyteplugins/go/tasks/plugins/k8s/kfoperators/common" +>>>>>>> flyteplugins/jeev/gpu-type mpiOp "github.com/kubeflow/common/pkg/apis/common/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" @@ -55,12 +66,14 @@ var ( resourceRequirements = &corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + flytek8sConfig.ResourceNvidiaGPU: resource.MustParse("1"), }, Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100m"), - corev1.ResourceMemory: resource.MustParse("512Mi"), + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + flytek8sConfig.ResourceNvidiaGPU: resource.MustParse("1"), }, } @@ -151,8 +164,16 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExecut }) tID.OnGetGeneratedName().Return("some-acceptable-name") - resources := &mocks.TaskOverrides{} - resources.OnGetResources().Return(resourceRequirements) + overrides := &mocks.TaskOverrides{} + overrides.OnGetResources().Return(resourceRequirements) + overrides.OnGetResourceExtensions().Return(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) @@ -164,7 +185,7 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExecut Name: "blah", }) taskExecutionMetadata.OnIsInterruptible().Return(true) - taskExecutionMetadata.OnGetOverrides().Return(resources) + taskExecutionMetadata.OnGetOverrides().Return(overrides) taskExecutionMetadata.OnGetK8sServiceAccount().Return(serviceAccount) taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) @@ -324,10 +345,49 @@ func TestBuildResourceMPI(t *testing.T) { } for _, replicaSpec := range mpiJob.Spec.MPIReplicaSpecs { - for _, container := range replicaSpec.Template.Spec.Containers { + podSpec := replicaSpec.Template.Spec + for _, container := range podSpec.Containers { assert.Equal(t, resourceRequirements.Requests, container.Resources.Requests) assert.Equal(t, resourceRequirements.Limits, container.Resources.Limits) } + assert.EqualValues( + t, + []corev1.NodeSelectorTerm{ + corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []corev1.Toleration{ + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) } } diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index 07297e48eb..ebff3b29af 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -8,9 +8,16 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/common" +<<<<<<< HEAD "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" +======= + "github.com/flyteorg/flyteplugins/go/tasks/logs" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + flytek8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" +>>>>>>> flyteplugins/jeev/gpu-type commonOp "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -160,8 +167,8 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskEx }) tID.OnGetGeneratedName().Return("some-acceptable-name") - resources := &mocks.TaskOverrides{} - resources.OnGetResources().Return(&corev1.ResourceRequirements{ + overrides := &mocks.TaskOverrides{} + overrides.OnGetResources().Return(&corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1000m"), corev1.ResourceMemory: resource.MustParse("1Gi"), @@ -173,6 +180,14 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskEx flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), }, }) + overrides.OnGetResourceExtensions().Return(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) @@ -184,7 +199,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskEx Name: "blah", }) taskExecutionMetadata.OnIsInterruptible().Return(true) - taskExecutionMetadata.OnGetOverrides().Return(resources) + taskExecutionMetadata.OnGetOverrides().Return(overrides) taskExecutionMetadata.OnGetK8sServiceAccount().Return(serviceAccount) taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) @@ -337,14 +352,16 @@ func TestBuildResourcePytorchElastic(t *testing.T) { assert.Equal(t, 1, len(pytorchJob.Spec.PyTorchReplicaSpecs)) assert.Contains(t, pytorchJob.Spec.PyTorchReplicaSpecs, kubeflowv1.PyTorchJobReplicaTypeWorker) - var hasContainerWithDefaultPytorchName = false - - for _, container := range pytorchJob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers { - if container.Name == kubeflowv1.PytorchJobDefaultContainerName { - hasContainerWithDefaultPytorchName = true + for _, replicaSpec := range pytorchJob.Spec.PyTorchReplicaSpecs { + var hasContainerWithDefaultPytorchName = false + podSpec := replicaSpec.Template.Spec + for _, container := range podSpec.Containers { + if container.Name == kubeflowv1.PytorchJobDefaultContainerName { + hasContainerWithDefaultPytorchName = true + } } - } +<<<<<<< HEAD assert.True(t, hasContainerWithDefaultPytorchName) // verify TaskExecutionMetadata labels and annotations are copied to the PyTorchJob @@ -357,6 +374,48 @@ func TestBuildResourcePytorchElastic(t *testing.T) { for _, replicaSpec := range pytorchJob.Spec.PyTorchReplicaSpecs { assert.Equal(t, v, replicaSpec.Template.ObjectMeta.Labels[k]) } +======= + assert.EqualValues( + t, + []corev1.NodeSelectorTerm{ + corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []corev1.Toleration{ + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) + + assert.True(t, hasContainerWithDefaultPytorchName) +>>>>>>> flyteplugins/jeev/gpu-type } } @@ -389,8 +448,8 @@ func TestBuildResourcePytorch(t *testing.T) { for _, replicaSpec := range pytorchJob.Spec.PyTorchReplicaSpecs { var hasContainerWithDefaultPytorchName = false - - for _, container := range replicaSpec.Template.Spec.Containers { + podSpec := replicaSpec.Template.Spec + for _, container := range podSpec.Containers { if container.Name == kubeflowv1.PytorchJobDefaultContainerName { hasContainerWithDefaultPytorchName = true } @@ -399,6 +458,45 @@ func TestBuildResourcePytorch(t *testing.T) { assert.Equal(t, resourceRequirements.Limits, container.Resources.Limits, fmt.Sprintf(" container.Resources.Limits [%+v]", container.Resources.Limits.Cpu().String())) } + assert.EqualValues( + t, + []corev1.NodeSelectorTerm{ + corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []corev1.Toleration{ + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) + assert.True(t, hasContainerWithDefaultPytorchName) } } diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index dc21710410..4ab1d1df7e 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -8,9 +8,16 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/common" +<<<<<<< HEAD "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" +======= + "github.com/flyteorg/flyteplugins/go/tasks/logs" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + flytek8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" +>>>>>>> flyteplugins/jeev/gpu-type commonOp "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -153,8 +160,16 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.Tas }) tID.OnGetGeneratedName().Return("some-acceptable-name") - resources := &mocks.TaskOverrides{} - resources.OnGetResources().Return(resourceRequirements) + overrides := &mocks.TaskOverrides{} + overrides.OnGetResources().Return(resourceRequirements) + overrides.OnGetResourceExtensions().Return(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) @@ -166,7 +181,7 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.Tas Name: "blah", }) taskExecutionMetadata.OnIsInterruptible().Return(true) - taskExecutionMetadata.OnGetOverrides().Return(resources) + taskExecutionMetadata.OnGetOverrides().Return(overrides) taskExecutionMetadata.OnGetK8sServiceAccount().Return(serviceAccount) taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetEnvironmentVariables().Return(nil) @@ -327,8 +342,8 @@ func TestBuildResourceTensorFlow(t *testing.T) { for _, replicaSpec := range tensorflowJob.Spec.TFReplicaSpecs { var hasContainerWithDefaultTensorFlowName = false - - for _, container := range replicaSpec.Template.Spec.Containers { + podSpec := replicaSpec.Template.Spec + for _, container := range podSpec.Containers { if container.Name == kubeflowv1.TFJobDefaultContainerName { hasContainerWithDefaultTensorFlowName = true } @@ -337,6 +352,45 @@ func TestBuildResourceTensorFlow(t *testing.T) { assert.Equal(t, resourceRequirements.Limits, container.Resources.Limits) } + assert.EqualValues( + t, + []corev1.NodeSelectorTerm{ + corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + corev1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []corev1.Toleration{ + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + podSpec.Tolerations, + ) + assert.True(t, hasContainerWithDefaultTensorFlowName) } } diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go b/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go index ddc75fa858..c15e7143bf 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/container_test.go @@ -7,11 +7,20 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" +<<<<<<< HEAD pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" pluginsCoreMock "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s" pluginsIOMock "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" +======= + pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + pluginsCoreMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + flytek8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + pluginsIOMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" +>>>>>>> flyteplugins/jeev/gpu-type "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -25,8 +34,9 @@ import ( var containerResourceRequirements = &v1.ResourceRequirements{ Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1024m"), - v1.ResourceStorage: resource.MustParse("100M"), + v1.ResourceCPU: resource.MustParse("1024m"), + v1.ResourceStorage: resource.MustParse("100M"), + flytek8s.ResourceNvidiaGPU: resource.MustParse("1"), }, } @@ -64,6 +74,14 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore. to := &pluginsCoreMock.TaskOverrides{} to.On("GetResources").Return(resources) + to.OnGetResourceExtensions().Return(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) taskMetadata.On("GetOverrides").Return(to) taskMetadata.On("IsInterruptible").Return(true) taskMetadata.On("GetEnvironmentVariables").Return(nil) @@ -143,6 +161,45 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) { assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args) assert.Equal(t, "service-account", j.Spec.ServiceAccountName) + + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + v1.NodeSelectorRequirement{ + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + j.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) + assert.EqualValues( + t, + []v1.Toleration{ + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: flytek8sConfig.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + }, + j.Spec.Tolerations, + ) } func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go b/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go index 850cc76383..90edf6c1c5 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go @@ -92,6 +92,14 @@ func dummySidecarTaskMetadata(resources *v1.ResourceRequirements) pluginsCore.Ta to := &pluginsCoreMock.TaskOverrides{} to.On("GetResources").Return(resources) + to.OnGetResourceExtensions().Return(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) taskMetadata.On("GetOverrides").Return(to) taskMetadata.On("GetEnvironmentVariables").Return(nil) @@ -136,14 +144,14 @@ func getPodSpec() v1.PodSpec { Args: []string{"pyflyte-execute", "--task-module", "tests.flytekit.unit.sdk.tasks.test_sidecar_tasks", "--task-name", "simple_sidecar_task", "--inputs", "{{.input}}", "--output-prefix", "{{.outputPrefix}}"}, Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - "cpu": resource.MustParse("2"), - "memory": resource.MustParse("200Mi"), - "gpu": resource.MustParse("1"), + "cpu": resource.MustParse("2"), + "memory": resource.MustParse("200Mi"), + ResourceNvidiaGPU: resource.MustParse("1"), }, Requests: v1.ResourceList{ - "cpu": resource.MustParse("1"), - "memory": resource.MustParse("100Mi"), - "gpu": resource.MustParse("1"), + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("100Mi"), + ResourceNvidiaGPU: resource.MustParse("1"), }, }, VolumeMounts: []v1.VolumeMount{ @@ -180,18 +188,50 @@ func getPodSpec() v1.PodSpec { func checkTolerations(t *testing.T, res client.Object, gpuTol v1.Toleration) { // Assert user-specified tolerations don't get overridden - assert.Len(t, res.(*v1.Pod).Spec.Tolerations, 2) - for _, tol := range res.(*v1.Pod).Spec.Tolerations { - if tol.Key == "my toleration key" { - assert.Equal(t, tol.Value, "my toleration value") - } else if tol.Key == gpuTol.Key { - assert.Equal(t, tol, gpuTol) - } else { - t.Fatalf("unexpected toleration [%+v]", tol) - } + assert.Len(t, res.(*v1.Pod).Spec.Tolerations, 4) + for _, tol := range []v1.Toleration{ + v1.Toleration{Key: "my toleration key", Value: "my toleration value"}, + gpuTol, + { + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + { + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + } { + assert.Contains(t, res.(*v1.Pod).Spec.Tolerations, tol) } } +func checkNodeAffinity(t *testing.T, res client.Object) { + assert.EqualValues( + t, + []v1.NodeSelectorTerm{ + v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + v1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: v1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + res.(*v1.Pod).Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) +} + func TestBuildSidecarResource_TaskType2(t *testing.T) { podSpec := getPodSpec() @@ -267,6 +307,7 @@ func TestBuildSidecarResource_TaskType2(t *testing.T) { assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name) checkTolerations(t, res, tolGPU) + checkNodeAffinity(t, res) // Assert resource requirements are correctly set expectedCPURequest := resource.MustParse("1") @@ -356,6 +397,7 @@ func TestBuildSidecarResource_TaskType1(t *testing.T) { }, DefaultCPURequest: resource.MustParse("1024m"), DefaultMemoryRequest: resource.MustParse("1024Mi"), + GpuResourceName: ResourceNvidiaGPU, })) taskCtx := getDummySidecarTaskContext(&task, sidecarResourceRequirements) res, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx) @@ -373,6 +415,8 @@ func TestBuildSidecarResource_TaskType1(t *testing.T) { assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name) checkTolerations(t, res, tolGPU) + checkNodeAffinity(t, res) + // Assert resource requirements are correctly set expectedCPURequest := resource.MustParse("1") assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) @@ -473,6 +517,7 @@ func TestBuildSidecarResource(t *testing.T) { }, DefaultCPURequest: resource.MustParse("1024m"), DefaultMemoryRequest: resource.MustParse("1024Mi"), + GpuResourceName: ResourceNvidiaGPU, })) taskCtx := getDummySidecarTaskContext(&task, sidecarResourceRequirements) res, err := DefaultPodPlugin.BuildResource(context.TODO(), taskCtx) @@ -496,6 +541,7 @@ func TestBuildSidecarResource(t *testing.T) { assert.Equal(t, "service-account", res.(*v1.Pod).Spec.ServiceAccountName) checkTolerations(t, res, tolGPU) + checkNodeAffinity(t, res) // Assert resource requirements are correctly set expectedCPURequest := resource.MustParse("2048m") diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 5e7b82d55c..cba599f19e 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -128,8 +128,16 @@ func dummyRayTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExecut }) tID.OnGetGeneratedName().Return("some-acceptable-name") - resources := &mocks.TaskOverrides{} - resources.OnGetResources().Return(resourceRequirements) + overrides := &mocks.TaskOverrides{} + overrides.OnGetResources().Return(resourceRequirements) + overrides.OnGetResourceExtensions().Return(&core.ResourceExtensions{ + GpuAccelerator: &core.GPUAccelerator{ + Device: "nvidia-tesla-a100", + PartitionSizeValue: &core.GPUAccelerator_PartitionSize{ + PartitionSize: "1g.5gb", + }, + }, + }) taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) @@ -141,7 +149,7 @@ func dummyRayTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExecut Name: "blah", }) taskExecutionMetadata.OnIsInterruptible().Return(true) - taskExecutionMetadata.OnGetOverrides().Return(resources) + taskExecutionMetadata.OnGetOverrides().Return(overrides) taskExecutionMetadata.OnGetK8sServiceAccount().Return(serviceAccount) taskExecutionMetadata.OnGetPlatformResources().Return(&corev1.ResourceRequirements{}) taskExecutionMetadata.OnGetSecurityContext().Return(core.SecurityContext{ @@ -152,16 +160,73 @@ func dummyRayTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExecut return taskCtx } +func checkTolerations(t *testing.T, podSpec corev1.PodSpec) { + // Assert user-specified tolerations don't get overridden + assert.Len(t, podSpec.Tolerations, 3) + for _, tol := range []corev1.Toleration{ + { + Key: "storage", + Value: "dedicated", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Value: "nvidia-tesla-a100", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + { + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Value: "1g.5gb", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }, + } { + assert.Contains(t, podSpec.Tolerations, tol) + } +} + +func checkNodeAffinity(t *testing.T, podSpec corev1.PodSpec) { + assert.EqualValues( + t, + []corev1.NodeSelectorTerm{ + corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + corev1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"nvidia-tesla-a100"}, + }, + corev1.NodeSelectorRequirement{ + Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"1g.5gb"}, + }, + }, + }, + }, + podSpec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + ) +} + func TestBuildResourceRay(t *testing.T) { rayJobResourceHandler := rayJobResourceHandler{} taskTemplate := dummyRayTaskTemplate("ray-id", dummyRayCustomObj()) - toleration := []corev1.Toleration{{ - Key: "storage", - Value: "dedicated", - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoSchedule, - }} - err := config.SetK8sPluginConfig(&config.K8sPluginConfig{DefaultTolerations: toleration}) + toleration := []corev1.Toleration{ + { + Key: "storage", + Value: "dedicated", + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + } + err := config.SetK8sPluginConfig(&config.K8sPluginConfig{ + DefaultTolerations: toleration, + GpuDeviceNodeLabel: "gpu-node-label", + GpuPartitionSizeNodeLabel: "gpu-partition-size", + GpuResourceName: config.ResourceNvidiaGPU, + }) assert.Nil(t, err) RayResource, err := rayJobResourceHandler.BuildResource(context.TODO(), dummyRayTaskContext(taskTemplate)) @@ -180,7 +245,8 @@ func TestBuildResourceRay(t *testing.T) { "node-ip-address": "$MY_POD_IP", "num-cpus": "1"}) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Annotations, map[string]string{"annotation-1": "val1"}) assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Labels, map[string]string{"label-1": "val1"}) - assert.Equal(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Tolerations, toleration) + checkTolerations(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec) + checkNodeAffinity(t, ray.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec) workerReplica := int32(3) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas, &workerReplica) @@ -191,7 +257,8 @@ func TestBuildResourceRay(t *testing.T) { assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].RayStartParams, map[string]string{"disable-usage-stats": "true", "node-ip-address": "$MY_POD_IP"}) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Annotations, map[string]string{"annotation-1": "val1"}) assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Labels, map[string]string{"label-1": "val1"}) - assert.Equal(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Tolerations, toleration) + checkTolerations(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec) + checkNodeAffinity(t, ray.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec) } func TestDefaultStartParameters(t *testing.T) { diff --git a/flyteplugins/tests/end_to_end.go b/flyteplugins/tests/end_to_end.go index a8f708e2f1..db9c264ccc 100644 --- a/flyteplugins/tests/end_to_end.go +++ b/flyteplugins/tests/end_to_end.go @@ -21,9 +21,16 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" +<<<<<<< HEAD "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog" catalogMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/workqueue" +======= + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" + catalogMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" +>>>>>>> flyteplugins/jeev/gpu-type v1 "k8s.io/api/core/v1" @@ -156,6 +163,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i Requests: map[v1.ResourceName]resource.Quantity{}, Limits: map[v1.ResourceName]resource.Quantity{}, }) + overrides.OnGetResourceExtensions().Return(&core.ResourceExtensions{}) tMeta := &coreMocks.TaskExecutionMetadata{} tMeta.OnGetTaskExecutionID().Return(tID)