From e3fc6a3672dc3da7ce38a00887bb9f9a68c3b993 Mon Sep 17 00:00:00 2001 From: Iaroslav Ciupin Date: Thu, 11 Jul 2024 21:47:53 +0300 Subject: [PATCH] Fix pool validation (#369) * Apparently, its possible to store clusterassignment in Resource API with empty cluster pool string *TODO: Summarize tests added, integration tests run, or other steps you took to validate this change. Include (or link to) relevant test output or screenshots.* * no concerns Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [x] To be upstreamed to OSS *TODO: Link Linear issue(s) using [magic words](https://linear.app/docs/github#magic-words). `fixes` will move to merged status, while `ref` will only link the PR.* * [ ] Added tests * [ ] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation --- .../pkg/manager/impl/execution_manager.go | 11 ++++---- .../manager/impl/execution_manager_test.go | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index aab62c12f98..226a9c1e4e7 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -409,16 +409,17 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, req *admin. return nil, err } - if req.GetSpec().GetClusterAssignment() == nil { + reqAssignment := req.GetSpec().GetClusterAssignment() + reqPool := reqAssignment.GetClusterPoolName() + storedPool := storedAssignment.GetClusterPoolName() + if reqPool == "" { return storedAssignment, nil } - if storedAssignment == nil { - return req.GetSpec().GetClusterAssignment(), nil + if storedPool == "" { + return reqAssignment, nil } - reqPool := req.Spec.ClusterAssignment.GetClusterPoolName() - storedPool := storedAssignment.GetClusterPoolName() if reqPool != storedPool { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "execution with project %q and domain %q cannot run on cluster pool %q, because its configured to run on pool %q", req.Project, req.Domain, reqPool, storedPool) } diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 1c8c2b9f60c..5646d192ae2 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -5519,6 +5519,32 @@ func TestGetClusterAssignment(t *testing.T) { assert.NoError(t, err) assert.True(t, proto.Equal(ca, &reqClusterAssignment)) }) + t.Run("empty value in DB, takes value from request", func(t *testing.T) { + clusterPoolAsstProvider := &runtimeIFaceMocks.ClusterPoolAssignmentConfiguration{} + clusterPoolAsstProvider.OnGetClusterPoolAssignments().Return(runtimeInterfaces.ClusterPoolAssignments{ + workflowIdentifier.GetDomain(): runtimeInterfaces.ClusterPoolAssignment{ + Pool: "", + }, + }) + mockConfig := getMockExecutionsConfigProvider() + mockConfig.(*runtimeMocks.MockConfigurationProvider).AddClusterPoolAssignmentConfiguration(clusterPoolAsstProvider) + + executionManager := ExecutionManager{ + resourceManager: &managerMocks.MockResourceManager{}, + config: mockConfig, + } + + reqClusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"} + ca, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + ClusterAssignment: &reqClusterAssignment, + }, + }) + assert.NoError(t, err) + assert.True(t, proto.Equal(ca, &reqClusterAssignment)) + }) t.Run("value from request doesn't match value from config", func(t *testing.T) { reqClusterAssignment := admin.ClusterAssignment{ClusterPoolName: "swimming-pool"} _, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{