Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
Signed-off-by: Future Outlier <[email protected]>
  • Loading branch information
Future Outlier committed Jan 2, 2024
1 parent 81ce71e commit bc92ed1
Show file tree
Hide file tree
Showing 14 changed files with 21 additions and 113 deletions.
17 changes: 0 additions & 17 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,23 +253,6 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
})
}

// Only assign storage when it is either requested or limited in the task definition, or a platform
// default exists.
if !taskResourceRequirements.Defaults.Storage.IsZero() ||
!taskResourceRequirements.Limits.Storage.IsZero() ||
!platformTaskResources.Defaults.Storage.IsZero() {
storageResource := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Storage, taskResourceRequirements.Limits.Storage,
platformTaskResources.Defaults.Storage, platformTaskResources.Limits.Storage)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_STORAGE,
Value: storageResource.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_STORAGE,
Value: storageResource.Limit.String(),
})
}

// Only assign gpu when it is either requested or limited in the task definition, or a platform default exists.
if !taskResourceRequirements.Defaults.GPU.IsZero() ||
!taskResourceRequirements.Limits.GPU.IsZero() ||
Expand Down
9 changes: 0 additions & 9 deletions flyteplugins/go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,14 @@ func TestLoadConfig(t *testing.T) {
}, k8sConfig.DefaultEnvVars)
assert.NotNil(t, k8sConfig.ResourceTolerations)
assert.Contains(t, k8sConfig.ResourceTolerations, v1.ResourceName("nvidia.com/gpu"))
assert.Contains(t, k8sConfig.ResourceTolerations, v1.ResourceStorage)
tolGPU := v1.Toleration{
Key: "flyte/gpu",
Value: "dedicated",
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
Value: "special",
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectPreferNoSchedule,
}

assert.Equal(t, []v1.Toleration{tolGPU}, k8sConfig.ResourceTolerations[v1.ResourceName("nvidia.com/gpu")])
assert.Equal(t, []v1.Toleration{tolStorage}, k8sConfig.ResourceTolerations[v1.ResourceStorage])
expectedCPU := resource.MustParse("1000m")
assert.True(t, expectedCPU.Equal(k8sConfig.DefaultCPURequest))
expectedMemory := resource.MustParse("1024Mi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements

// TODO: Make configurable. 1/15/2019 Flyte Cluster doesn't support setting storage requests/limits.
// https://github.com/kubernetes/enhancements/issues/362
delete(resources.Requests, v1.ResourceStorage)
delete(resources.Limits, v1.ResourceStorage)

gpuResourceName := config.GetK8sPluginConfig().GpuResourceName
shouldAdjustGPU := false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,11 @@ func TestApplyResourceOverrides_RemoveStorage(t *testing.T) {
requestedResourceQuantity := resource.MustParse("1")
overrides := ApplyResourceOverrides(v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceCPU: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Expand Down Expand Up @@ -261,7 +259,6 @@ func TestMergeResources_EmptyIn(t *testing.T) {
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Expand All @@ -280,7 +277,6 @@ func TestMergeResources_EmptyOut(t *testing.T) {
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

var resourceRequirements = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}

Expand Down
44 changes: 13 additions & 31 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,10 @@ func TestApplyGPUNodeSelectors(t *testing.T) {
func updatePod(t *testing.T) {
taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}, nil)

Expand Down Expand Up @@ -809,13 +807,11 @@ func toK8sPodInterruptible(t *testing.T) {

x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
v1.ResourceCPU: resource.MustParse("1024m"),
ResourceNvidiaGPU: resource.MustParse("1"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}, nil)

Expand Down Expand Up @@ -853,17 +849,9 @@ func TestToK8sPod(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
Value: "dedicated",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoSchedule,
}

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
ResourceTolerations: map[v1.ResourceName][]v1.Toleration{
v1.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
ResourceNvidiaGPU: {tolGPU},
},
DefaultCPURequest: resource.MustParse("1024m"),
DefaultMemoryRequest: resource.MustParse("1024Mi"),
Expand All @@ -876,13 +864,11 @@ func TestToK8sPod(t *testing.T) {
t.Run("WithGPU", func(t *testing.T) {
x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
v1.ResourceCPU: resource.MustParse("1024m"),
ResourceNvidiaGPU: resource.MustParse("1"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}, nil)

Expand All @@ -894,12 +880,10 @@ func TestToK8sPod(t *testing.T) {
t.Run("NoGPU", func(t *testing.T) {
x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}, nil)

Expand All @@ -912,12 +896,10 @@ func TestToK8sPod(t *testing.T) {
t.Run("Default toleration, selector, scheduler", func(t *testing.T) {
x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}, nil)

Expand Down
4 changes: 0 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceLi
if !v.IsZero() {
k8sResources[v1.ResourceMemory] = v
}
case core.Resources_STORAGE:
if !v.IsZero() {
k8sResources[v1.ResourceStorage] = v
}
case core.Resources_GPU:
if !v.IsZero() {
k8sResources[ResourceNvidiaGPU] = v
Expand Down
2 changes: 0 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestToK8sResourceList(t *testing.T) {
{Name: core.Resources_CPU, Value: "250m"},
{Name: core.Resources_GPU, Value: "1"},
{Name: core.Resources_MEMORY, Value: "1024Mi"},
{Name: core.Resources_STORAGE, Value: "1024Mi"},
{Name: core.Resources_EPHEMERAL_STORAGE, Value: "1024Mi"},
})

Expand All @@ -43,7 +42,6 @@ func TestToK8sResourceList(t *testing.T) {
assert.Equal(t, resource.MustParse("250m"), r[v1.ResourceCPU])
assert.Equal(t, resource.MustParse("1"), r[ResourceNvidiaGPU])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceMemory])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceStorage])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceEphemeralStorage])
}
{
Expand Down
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/plugins/hive/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func GetSingleHiveQueryTaskTemplate() idlCore.TaskTemplate {

var resourceRequirements = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}

Expand Down
7 changes: 1 addition & 6 deletions flyteplugins/go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

var containerResourceRequirements = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}

Expand Down Expand Up @@ -206,10 +205,6 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) {
assert.NotEmpty(t, j.Spec.Containers)
assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU])

// TODO: Once configurable, test when setting storage is supported on the cluster vs not.
storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage]
assert.Equal(t, int64(0), (&storageRes).Value())

assert.Equal(t, command, j.Spec.Containers[0].Command)
assert.Equal(t, []string{"test-data-reference"}, j.Spec.Containers[0].Args)

Expand Down
30 changes: 4 additions & 26 deletions flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,9 @@ func TestBuildSidecarResource_TaskType2(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
Value: "dedicated",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoSchedule,
}
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
ResourceTolerations: map[v1.ResourceName][]v1.Toleration{
v1.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
ResourceNvidiaGPU: {tolGPU},
},
DefaultCPURequest: resource.MustParse("1024m"),
DefaultMemoryRequest: resource.MustParse("1024Mi"),
Expand Down Expand Up @@ -340,16 +333,9 @@ func TestBuildSidecarResource_TaskType1(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
Value: "dedicated",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoSchedule,
}
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
ResourceTolerations: map[v1.ResourceName][]v1.Toleration{
v1.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
ResourceNvidiaGPU: {tolGPU},
},
DefaultCPURequest: resource.MustParse("1024m"),
DefaultMemoryRequest: resource.MustParse("1024Mi"),
Expand Down Expand Up @@ -413,8 +399,7 @@ func TestBuildSideResource_TaskType1_InvalidSpec(t *testing.T) {

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
ResourceTolerations: map[v1.ResourceName][]v1.Toleration{
v1.ResourceStorage: {},
ResourceNvidiaGPU: {},
ResourceNvidiaGPU: {},
},
DefaultCPURequest: resource.MustParse("1024m"),
DefaultMemoryRequest: resource.MustParse("1024Mi"),
Expand Down Expand Up @@ -457,16 +442,9 @@ func TestBuildSidecarResource(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
Value: "dedicated",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoSchedule,
}
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
ResourceTolerations: map[v1.ResourceName][]v1.Toleration{
v1.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
ResourceNvidiaGPU: {tolGPU},
},
DefaultCPURequest: resource.MustParse("1024m"),
DefaultMemoryRequest: resource.MustParse("1024Mi"),
Expand Down
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/plugins/presto/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func GetPrestoQueryTaskTemplate() idlCore.TaskTemplate {

var resourceRequirements = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
},
}

Expand Down
2 changes: 0 additions & 2 deletions flytepropeller/pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,12 @@ func convertTaskResourcesToRequirements(taskResources v1alpha1.TaskResources) *v
v1.ResourceCPU: taskResources.Requests.CPU,
v1.ResourceMemory: taskResources.Requests.Memory,
v1.ResourceEphemeralStorage: taskResources.Requests.EphemeralStorage,
v1.ResourceStorage: taskResources.Requests.Storage,
utils.ResourceNvidiaGPU: taskResources.Requests.GPU,
},
Limits: v1.ResourceList{
v1.ResourceCPU: taskResources.Limits.CPU,
v1.ResourceMemory: taskResources.Limits.Memory,
v1.ResourceEphemeralStorage: taskResources.Limits.EphemeralStorage,
v1.ResourceStorage: taskResources.Limits.Storage,
utils.ResourceNvidiaGPU: taskResources.Limits.GPU,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,12 @@ func TestConvertTaskResourcesToRequirements(t *testing.T) {
CPU: resource.MustParse("1"),
Memory: resource.MustParse("2"),
EphemeralStorage: resource.MustParse("3"),
Storage: resource.MustParse("4"),
GPU: resource.MustParse("5"),
},
Limits: v1alpha1.TaskResourceSpec{
CPU: resource.MustParse("10"),
Memory: resource.MustParse("20"),
EphemeralStorage: resource.MustParse("30"),
Storage: resource.MustParse("40"),
GPU: resource.MustParse("50"),
},
})
Expand All @@ -397,14 +395,12 @@ func TestConvertTaskResourcesToRequirements(t *testing.T) {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2"),
corev1.ResourceEphemeralStorage: resource.MustParse("3"),
corev1.ResourceStorage: resource.MustParse("4"),
utils.ResourceNvidiaGPU: resource.MustParse("5"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
corev1.ResourceMemory: resource.MustParse("20"),
corev1.ResourceEphemeralStorage: resource.MustParse("30"),
corev1.ResourceStorage: resource.MustParse("40"),
utils.ResourceNvidiaGPU: resource.MustParse("50"),
},
}, resourceRequirements)
Expand Down

0 comments on commit bc92ed1

Please sign in to comment.