diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 226a9c1e4e7..27acf152ec8 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -404,7 +404,7 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi } func (m *ExecutionManager) getClusterAssignment(ctx context.Context, req *admin.ExecutionCreateRequest) (*admin.ClusterAssignment, error) { - storedAssignment, err := m.fetchClusterAssignment(ctx, req.Org, req.Project, req.Domain) + storedAssignment, err := m.fetchClusterAssignment(ctx, req.Project, req.Domain) if err != nil { return nil, err } @@ -427,7 +427,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, req *admin. return storedAssignment, nil } -func (m *ExecutionManager) fetchClusterAssignment(ctx context.Context, org, project, domain string) (*admin.ClusterAssignment, error) { +func (m *ExecutionManager) fetchClusterAssignment(ctx context.Context, project, domain string) (*admin.ClusterAssignment, error) { resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ Project: project, Domain: domain, diff --git a/flyteadmin/tests/execution_test.go b/flyteadmin/tests/execution_test.go index 784f87ae0f3..a3d226562b4 100644 --- a/flyteadmin/tests/execution_test.go +++ b/flyteadmin/tests/execution_test.go @@ -12,8 +12,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/wrapperspb" "github.com/flyteorg/flyte/flyteadmin/pkg/repositories" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" @@ -331,226 +329,3 @@ func TestListWorkflowExecutions_Pagination(t *testing.T) { assert.Equal(t, len(resp.Executions), 1) assert.Empty(t, resp.Token) } - -func TestGetWorkflowExecutionCounts(t *testing.T) { - truncateAllTablesForTestingOnly() - populateWorkflowExecutionsForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - executionCountsResp, err := client.GetExecutionCounts(ctx, &admin.ExecutionCountsGetRequest{ - Project: "project1", - Domain: "domain1", - Filters: "gte(execution_created_at,2000-01-01T00:00:00Z)", - }) - assert.Nil(t, err) - assert.Equal(t, 3, len(executionCountsResp.ExecutionCounts)) - otherPhase := false - for _, item := range executionCountsResp.ExecutionCounts { - if item.Phase == core.WorkflowExecution_FAILED { - assert.Equal(t, int64(1), item.Count) - } else if item.Phase == core.WorkflowExecution_SUCCEEDED { - assert.Equal(t, int64(1), item.Count) - } else if item.Phase == core.WorkflowExecution_RUNNING { - assert.Equal(t, int64(2), item.Count) - } else { - otherPhase = true - } - } - assert.False(t, otherPhase) -} - -func TestGetWorkflowExecutionCounts_Filters(t *testing.T) { - truncateAllTablesForTestingOnly() - populateWorkflowExecutionsForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - executionCountsResp, err := client.GetExecutionCounts(ctx, &admin.ExecutionCountsGetRequest{ - Project: "project1", - Domain: "domain1", - Filters: "gte(execution_created_at,2000-01-01T00:00:00Z)+eq(launch_plan_id, 1)", - }) - assert.Nil(t, err) - assert.Equal(t, 3, len(executionCountsResp.ExecutionCounts)) - otherPhase := false - for _, item := range executionCountsResp.ExecutionCounts { - if item.Phase == core.WorkflowExecution_SUCCEEDED { - assert.Equal(t, int64(1), item.Count) - } else if item.Phase == core.WorkflowExecution_RUNNING { - assert.Equal(t, int64(1), item.Count) - } else if item.Phase == core.WorkflowExecution_FAILED { - assert.Equal(t, int64(1), item.Count) - } else { - otherPhase = true - } - } - assert.False(t, otherPhase) -} - -func TestGetWorkflowExecutionCounts_PhaseFilter(t *testing.T) { - truncateAllTablesForTestingOnly() - populateWorkflowExecutionsForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - executionCountsResp, err := client.GetExecutionCounts(ctx, &admin.ExecutionCountsGetRequest{ - Project: "project1", - Domain: "domain1", - Filters: "gte(execution_created_at,2000-01-01T00:00:00Z)+eq(phase,RUNNING)", - }) - assert.Nil(t, err) - assert.Equal(t, 1, len(executionCountsResp.ExecutionCounts)) - assert.Equal(t, core.WorkflowExecution_RUNNING, executionCountsResp.ExecutionCounts[0].Phase) - assert.Equal(t, int64(2), executionCountsResp.ExecutionCounts[0].Count) -} - -func TestGetRunningExecutionsCount(t *testing.T) { - truncateAllTablesForTestingOnly() - populateWorkflowExecutionsForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - runningExecutionsCountResp, err := client.GetRunningExecutionsCount(ctx, &admin.RunningExecutionsCountGetRequest{ - Project: "project1", - Domain: "domain1", - }) - assert.Nil(t, err) - assert.Equal(t, int64(2), runningExecutionsCountResp.Count) -} - -func TestResolvedSpec(t *testing.T) { - truncateAllTablesForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - insertTasksForTests(t, client) - createWorkflowReq := getWorkflowCreateRequest() - _, err := client.CreateWorkflow(ctx, &createWorkflowReq) - assert.Nil(t, err) - createLaunchPlanReq := getLaunchPlanCreateRequest(createWorkflowReq.Id) - _, err = client.CreateLaunchPlan(ctx, &createLaunchPlanReq) - - spec := &admin.ExecutionSpec{ - LaunchPlan: &launchPlanIdentifier, - Labels: &admin.Labels{Values: map[string]string{ - "foo": "bar", - }}, - OverwriteCache: true, - MaxParallelism: 10, - ClusterAssignment: &admin.ClusterAssignment{ - ClusterPoolName: "cluster", - }, - Metadata: &admin.ExecutionMetadata{}, - Annotations: &admin.Annotations{Values: map[string]string{"foo": "bar"}}, - SecurityContext: &core.SecurityContext{RunAs: &core.Identity{IamRole: "iamrole"}}, - TaskResourceAttributes: &admin.TaskResourceAttributes{ - Defaults: &admin.TaskResourceSpec{ - Cpu: "1", - Gpu: "0", - Memory: "1Gi", - EphemeralStorage: "0", - }, - Limits: &admin.TaskResourceSpec{ - Cpu: "2", - Gpu: "0", - Memory: "2Gi", - EphemeralStorage: "0", - }, - }, - } - _, err = client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ - Project: launchPlanIdentifier.Project, - Domain: launchPlanIdentifier.Domain, - Name: launchPlanIdentifier.Name, - Spec: spec, - }) - require.NoError(t, err) - - resp, err := client.GetExecution(ctx, &admin.WorkflowExecutionGetRequest{ - Id: &core.WorkflowExecutionIdentifier{ - Project: launchPlanIdentifier.Project, - Domain: launchPlanIdentifier.Domain, - Name: launchPlanIdentifier.Name, - }, - }) - assert.NoError(t, err) - spec.Interruptible = &wrapperspb.BoolValue{ - Value: false, - } - assert.True(t, proto.Equal(spec, resp.Closure.ResolvedSpec)) -} - -func TestSingleTaskResolvedSpec(t *testing.T) { - truncateAllTablesForTestingOnly() - - ctx := context.Background() - client, conn := GetTestAdminServiceClient() - defer conn.Close() - - insertTasksForTests(t, client) - createWorkflowReq := getWorkflowCreateRequest() - _, err := client.CreateWorkflow(ctx, &createWorkflowReq) - assert.Nil(t, err) - createLaunchPlanReq := getLaunchPlanCreateRequest(createWorkflowReq.Id) - _, err = client.CreateLaunchPlan(ctx, &createLaunchPlanReq) - - spec := &admin.ExecutionSpec{ - LaunchPlan: &launchPlanIdentifier, - Labels: &admin.Labels{Values: map[string]string{ - "foo": "bar", - }}, - OverwriteCache: true, - MaxParallelism: 10, - ClusterAssignment: &admin.ClusterAssignment{ - ClusterPoolName: "cluster", - }, - Metadata: &admin.ExecutionMetadata{}, - Annotations: &admin.Annotations{Values: map[string]string{"foo": "bar"}}, - SecurityContext: &core.SecurityContext{RunAs: &core.Identity{IamRole: "iamrole"}}, - TaskResourceAttributes: &admin.TaskResourceAttributes{ - Defaults: &admin.TaskResourceSpec{ - Cpu: "1", - Gpu: "0", - Memory: "1Gi", - EphemeralStorage: "0", - }, - Limits: &admin.TaskResourceSpec{ - Cpu: "2", - Gpu: "0", - Memory: "2Gi", - EphemeralStorage: "0", - }, - }, - } - _, err = client.CreateExecution(ctx, &admin.ExecutionCreateRequest{ - Project: launchPlanIdentifier.Project, - Domain: launchPlanIdentifier.Domain, - Name: launchPlanIdentifier.Name, - Spec: spec, - }) - assert.Nil(t, err) - - resp, err := client.GetExecution(ctx, &admin.WorkflowExecutionGetRequest{ - Id: &core.WorkflowExecutionIdentifier{ - Project: launchPlanIdentifier.Project, - Domain: launchPlanIdentifier.Domain, - Name: launchPlanIdentifier.Name, - }, - }) - assert.Nil(t, err) - spec.Interruptible = &wrapperspb.BoolValue{ - Value: false, - } - assert.True(t, proto.Equal(spec, resp.Closure.ResolvedSpec)) -}