Skip to content

Commit

Permalink
Transfer commits
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Oct 3, 2023
2 parents d9586b0 + f12771a commit 06b0d4f
Show file tree
Hide file tree
Showing 15 changed files with 1,363 additions and 75 deletions.
5 changes: 5 additions & 0 deletions flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26
<<<<<<< HEAD
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyteidl v0.0.0-00010101000000-000000000000
=======
github.com/flyteorg/flyteidl v1.5.20-0.20231002193413-9bb0dd7669d3
github.com/flyteorg/flytestdlib v1.0.15
>>>>>>> flyteplugins/jeev/gpu-type
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.3
github.com/hashicorp/golang-lru v0.5.4
Expand Down
13 changes: 13 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,21 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
<<<<<<< HEAD
github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864=
github.com/flyteorg/stow v0.3.7/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo=
=======
github.com/flyteorg/flyteidl v1.5.18-0.20230913190844-dc07c4922069 h1:8MJ/9HeJ+B+K8lvOuQjUXmpYQkZNLEzyMlhZdmtLtpM=
github.com/flyteorg/flyteidl v1.5.18-0.20230913190844-dc07c4922069/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.20-0.20230922194114-38e5c44a6f69 h1:AEQIufFO40WrJ3gdxsF/hfOnKvCmJvUKj5RzSk6naSY=
github.com/flyteorg/flyteidl v1.5.20-0.20230922194114-38e5c44a6f69/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.20-0.20231002193413-9bb0dd7669d3 h1:FattF/MhnzISxLGB52xEHI12ixA7toJ4ahr4LfMOzJM=
github.com/flyteorg/flyteidl v1.5.20-0.20231002193413-9bb0dd7669d3/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo=
>>>>>>> flyteplugins/jeev/gpu-type
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
// TaskOverrides interface to expose any overrides that have been set for this task (like resource overrides etc)
type TaskOverrides interface {
GetResources() *v1.ResourceRequirements
GetResourceExtensions() *core.ResourceExtensions
GetConfig() *v1.ConfigMap
}

Expand Down
36 changes: 36 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ var (
ImagePullBackoffGracePeriod: config2.Duration{
Duration: time.Minute * 3,
},
GpuResourceName: ResourceNvidiaGPU,
GpuDeviceNodeLabel: "k8s.amazonaws.com/accelerator",
GpuPartitionSizeNodeLabel: "k8s.amazonaws.com/gpu-partition-size",
GpuResourceName: ResourceNvidiaGPU,
DefaultPodTemplateResync: config2.Duration{
Duration: 30 * time.Second,
},
Expand Down Expand Up @@ -140,6 +142,18 @@ type K8sPluginConfig struct {
// one, and the corresponding task marked as failed
ImagePullBackoffGracePeriod config2.Duration `json:"image-pull-backoff-grace-period" pflag:"-,Time to wait for transient ImagePullBackoff errors to be resolved."`

// The node label that specifies the attached GPU device.
GpuDeviceNodeLabel string `json:"gpu-device-node-label" pflag:"-,The node label that specifies the attached GPU device."`

// The node label that specifies the attached GPU partition size.
GpuPartitionSizeNodeLabel string `json:"gpu-partition-size-node-label" pflag:"-,The node label that specifies the attached GPU partition size."`

// Override for node selector requirement added to pods intended for unpartitioned GPU nodes.
GpuUnpartitionedNodeSelectorRequirement *v1.NodeSelectorRequirement `json:"gpu-unpartitioned-node-selector-requirement" pflag:"-,Override for node selector requirement added to pods intended for unpartitioned GPU nodes."`

// Override for toleration added to pods intended for unpartitioned GPU nodes.
GpuUnpartitionedToleration *v1.Toleration `json:"gpu-unpartitioned-toleration" pflag:"-,Override for toleration added to pods intended for unpartitioned GPU nodes."`

// The name of the GPU resource to use when the task resource requests GPUs.
GpuResourceName v1.ResourceName `json:"gpu-resource-name" pflag:"-,The name of the GPU resource to use when the task resource requests GPUs."`

Expand Down
192 changes: 192 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@ import (

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

<<<<<<< HEAD
pluginserrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/template"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
=======
pluginserrors "github.com/flyteorg/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/golang/protobuf/proto"
>>>>>>> flyteplugins/jeev/gpu-type

"github.com/flyteorg/flyte/flytestdlib/logger"

Expand All @@ -32,6 +41,53 @@ const defaultContainerTemplateName = "default"
const primaryContainerTemplateName = "primary"
const PrimaryContainerKey = "primary_container_name"

const GpuPartitionSizeNotSet = "NotSet"

// AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement
// objects to an existing v1.Affinity object. If there are no existing required
// node selectors, the new v1.NodeSelectorRequirement will be added as-is.
// However, if there are existing required node selectors, we iterate over all existing
// node selector terms and append the node selector requirement. Note that multiple node
// selector terms are OR'd, and match expressions within a single node selector term
// are AND'd during scheduling.
// See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity
func AddRequiredNodeSelectorRequirements(base *v1.Affinity, new ...v1.NodeSelectorRequirement) {
if base.NodeAffinity == nil {
base.NodeAffinity = &v1.NodeAffinity{}
}
if base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{}
}
if len(base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 {
nodeSelectorTerms := base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
for i := range nodeSelectorTerms {
nst := &nodeSelectorTerms[i]
nst.MatchExpressions = append(nst.MatchExpressions, new...)
}
} else {
base.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{v1.NodeSelectorTerm{MatchExpressions: new}}
}
}

// AddPreferredNodeSelectorRequirements appends the provided v1.NodeSelectorRequirement
// objects to an existing v1.Affinity object's list of preferred scheduling terms.
// See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity-weight
// for how weights are used during scheduling.
func AddPreferredNodeSelectorRequirements(base *v1.Affinity, weight int32, new ...v1.NodeSelectorRequirement) {
if base.NodeAffinity == nil {
base.NodeAffinity = &v1.NodeAffinity{}
}
base.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(
base.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
v1.PreferredSchedulingTerm{
Weight: weight,
Preference: v1.NodeSelectorTerm{
MatchExpressions: new,
},
},
)
}

// ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified.
func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity) {
// Determine node selector terms to add to node affinity
Expand All @@ -48,6 +104,7 @@ func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.
nodeSelectorRequirement = *config.GetK8sPluginConfig().NonInterruptibleNodeSelectorRequirement
}

<<<<<<< HEAD
if affinity.NodeAffinity == nil {
affinity.NodeAffinity = &v1.NodeAffinity{}
}
Expand All @@ -64,6 +121,9 @@ func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{{MatchExpressions: []v1.NodeSelectorRequirement{nodeSelectorRequirement}}}
}

=======
AddRequiredNodeSelectorRequirements(affinity, nodeSelectorRequirement)
>>>>>>> flyteplugins/jeev/gpu-type
}

// ApplyInterruptibleNodeAffinity configures the node-affinity for the pod using the configuration specified.
Expand All @@ -75,6 +135,121 @@ func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec) {
ApplyInterruptibleNodeSelectorRequirement(interruptible, podSpec.Affinity)
}

// Specialized merging of overrides into a base *core.ResourceExtensions object. Note
// that doing a nested merge may not be the intended behavior all the time, so we
// handle each field separately here.
func applyResourceExtensionsOverrides(base, overrides *core.ResourceExtensions) *core.ResourceExtensions {
// Handle case where base might be nil
var new *core.ResourceExtensions
if base == nil {
new = &core.ResourceExtensions{}
} else {
new = proto.Clone(base).(*core.ResourceExtensions)
}

// No overrides found
if overrides == nil {
return new
}

// GPU Accelerator
if overrides.GetGpuAccelerator() != nil {
new.GpuAccelerator = overrides.GetGpuAccelerator()
}

return new
}

func ApplyGPUNodeSelectors(podSpec *v1.PodSpec, gpuAccelerator *core.GPUAccelerator) {
// Short circuit if pod spec does not contain any containers that use GPUs
requiresGPUs := false
for _, cnt := range podSpec.Containers {
if _, ok := cnt.Resources.Limits[config.GetK8sPluginConfig().GpuResourceName]; ok {
requiresGPUs = true
break
}
}
if !requiresGPUs {
return
}

if podSpec.Affinity == nil {
podSpec.Affinity = &v1.Affinity{}
}

// Apply changes for GPU device
device := gpuAccelerator.GetDevice()
if len(device) > 0 {
// Add node selector requirement for GPU device
deviceNsr := v1.NodeSelectorRequirement{
Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{device},
}
AddRequiredNodeSelectorRequirements(podSpec.Affinity, deviceNsr)
// Add toleration for GPU device
deviceTol := v1.Toleration{
Key: config.GetK8sPluginConfig().GpuDeviceNodeLabel,
Value: device,
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectNoSchedule,
}
podSpec.Tolerations = append(podSpec.Tolerations, deviceTol)
}

// Short circuit if a partition size preference is not specified
partitionSizeValue := gpuAccelerator.GetPartitionSizeValue()
if partitionSizeValue == nil {
return
}

// Apply changes for GPU partition size, if applicable
var partitionSizeNsr *v1.NodeSelectorRequirement
var partitionSizeTol *v1.Toleration
switch partitionSizeValue.(type) {
case *core.GPUAccelerator_Unpartitioned:
if !gpuAccelerator.GetUnpartitioned() {
break
}
if config.GetK8sPluginConfig().GpuUnpartitionedNodeSelectorRequirement != nil {
partitionSizeNsr = config.GetK8sPluginConfig().GpuUnpartitionedNodeSelectorRequirement
} else {
partitionSizeNsr = &v1.NodeSelectorRequirement{
Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel,
Operator: v1.NodeSelectorOpDoesNotExist,
}
}
if config.GetK8sPluginConfig().GpuUnpartitionedToleration != nil {
partitionSizeTol = config.GetK8sPluginConfig().GpuUnpartitionedToleration
} else {
partitionSizeTol = &v1.Toleration{
Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel,
Value: GpuPartitionSizeNotSet,
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectNoSchedule,
}
}
case *core.GPUAccelerator_PartitionSize:
partitionSizeNsr = &v1.NodeSelectorRequirement{
Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{gpuAccelerator.GetPartitionSize()},
}
partitionSizeTol = &v1.Toleration{
Key: config.GetK8sPluginConfig().GpuPartitionSizeNodeLabel,
Value: gpuAccelerator.GetPartitionSize(),
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectNoSchedule,
}
}
if partitionSizeNsr != nil {
AddRequiredNodeSelectorRequirements(podSpec.Affinity, *partitionSizeNsr)
}
if partitionSizeTol != nil {
podSpec.Tolerations = append(podSpec.Tolerations, *partitionSizeTol)
}
}

// UpdatePod updates the base pod spec used to execute tasks. This is configured with plugins and task metadata-specific options
func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata,
resourceRequirements []v1.ResourceRequirements, podSpec *v1.PodSpec) {
Expand All @@ -91,6 +266,7 @@ func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata,
podSpec.SchedulerName = config.GetK8sPluginConfig().SchedulerName
}
podSpec.NodeSelector = utils.UnionMaps(config.GetK8sPluginConfig().DefaultNodeSelector, podSpec.NodeSelector)

if taskExecutionMetadata.IsInterruptible() {
podSpec.NodeSelector = utils.UnionMaps(podSpec.NodeSelector, config.GetK8sPluginConfig().InterruptibleNodeSelector)
}
Expand Down Expand Up @@ -250,6 +426,22 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut
return nil, nil, err
}

// handling for resource extensions
// Merge overrides with base resource extensions
var resourceExtensions *core.ResourceExtensions
if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().GetResources() != nil {
resourceExtensions = taskTemplate.GetContainer().GetResources().GetExtensions()
}
resourceExtensions = applyResourceExtensionsOverrides(
resourceExtensions,
tCtx.TaskExecutionMetadata().GetOverrides().GetResourceExtensions(),
)

// GPU accelerator
if resourceExtensions.GetGpuAccelerator() != nil {
ApplyGPUNodeSelectors(podSpec, resourceExtensions.GetGpuAccelerator())
}

return podSpec, objectMeta, nil
}

Expand Down
Loading

0 comments on commit 06b0d4f

Please sign in to comment.