Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iaroslav-ciupin committed Sep 27, 2024
1 parent e3fc6a3 commit 3f85987
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 227 deletions.
4 changes: 2 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
}

func (m *ExecutionManager) getClusterAssignment(ctx context.Context, req *admin.ExecutionCreateRequest) (*admin.ClusterAssignment, error) {
storedAssignment, err := m.fetchClusterAssignment(ctx, req.Org, req.Project, req.Domain)
storedAssignment, err := m.fetchClusterAssignment(ctx, req.Project, req.Domain)
if err != nil {
return nil, err

Check warning on line 409 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L409

Added line #L409 was not covered by tests
}
Expand All @@ -427,7 +427,7 @@ func (m *ExecutionManager) getClusterAssignment(ctx context.Context, req *admin.
return storedAssignment, nil
}

func (m *ExecutionManager) fetchClusterAssignment(ctx context.Context, org, project, domain string) (*admin.ClusterAssignment, error) {
func (m *ExecutionManager) fetchClusterAssignment(ctx context.Context, project, domain string) (*admin.ClusterAssignment, error) {
resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: project,
Domain: domain,
Expand Down
225 changes: 0 additions & 225 deletions flyteadmin/tests/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/flyteorg/flyte/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -331,226 +329,3 @@ func TestListWorkflowExecutions_Pagination(t *testing.T) {
assert.Equal(t, len(resp.Executions), 1)
assert.Empty(t, resp.Token)
}

func TestGetWorkflowExecutionCounts(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

executionCountsResp, err := client.GetExecutionCounts(ctx, &admin.ExecutionCountsGetRequest{
Project: "project1",
Domain: "domain1",
Filters: "gte(execution_created_at,2000-01-01T00:00:00Z)",
})
assert.Nil(t, err)
assert.Equal(t, 3, len(executionCountsResp.ExecutionCounts))
otherPhase := false
for _, item := range executionCountsResp.ExecutionCounts {
if item.Phase == core.WorkflowExecution_FAILED {
assert.Equal(t, int64(1), item.Count)
} else if item.Phase == core.WorkflowExecution_SUCCEEDED {
assert.Equal(t, int64(1), item.Count)
} else if item.Phase == core.WorkflowExecution_RUNNING {
assert.Equal(t, int64(2), item.Count)
} else {
otherPhase = true
}
}
assert.False(t, otherPhase)
}

func TestGetWorkflowExecutionCounts_Filters(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

executionCountsResp, err := client.GetExecutionCounts(ctx, &admin.ExecutionCountsGetRequest{
Project: "project1",
Domain: "domain1",
Filters: "gte(execution_created_at,2000-01-01T00:00:00Z)+eq(launch_plan_id, 1)",
})
assert.Nil(t, err)
assert.Equal(t, 3, len(executionCountsResp.ExecutionCounts))
otherPhase := false
for _, item := range executionCountsResp.ExecutionCounts {
if item.Phase == core.WorkflowExecution_SUCCEEDED {
assert.Equal(t, int64(1), item.Count)
} else if item.Phase == core.WorkflowExecution_RUNNING {
assert.Equal(t, int64(1), item.Count)
} else if item.Phase == core.WorkflowExecution_FAILED {
assert.Equal(t, int64(1), item.Count)
} else {
otherPhase = true
}
}
assert.False(t, otherPhase)
}

func TestGetWorkflowExecutionCounts_PhaseFilter(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

executionCountsResp, err := client.GetExecutionCounts(ctx, &admin.ExecutionCountsGetRequest{
Project: "project1",
Domain: "domain1",
Filters: "gte(execution_created_at,2000-01-01T00:00:00Z)+eq(phase,RUNNING)",
})
assert.Nil(t, err)
assert.Equal(t, 1, len(executionCountsResp.ExecutionCounts))
assert.Equal(t, core.WorkflowExecution_RUNNING, executionCountsResp.ExecutionCounts[0].Phase)
assert.Equal(t, int64(2), executionCountsResp.ExecutionCounts[0].Count)
}

func TestGetRunningExecutionsCount(t *testing.T) {
truncateAllTablesForTestingOnly()
populateWorkflowExecutionsForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

runningExecutionsCountResp, err := client.GetRunningExecutionsCount(ctx, &admin.RunningExecutionsCountGetRequest{
Project: "project1",
Domain: "domain1",
})
assert.Nil(t, err)
assert.Equal(t, int64(2), runningExecutionsCountResp.Count)
}

func TestResolvedSpec(t *testing.T) {
truncateAllTablesForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

insertTasksForTests(t, client)
createWorkflowReq := getWorkflowCreateRequest()
_, err := client.CreateWorkflow(ctx, &createWorkflowReq)
assert.Nil(t, err)
createLaunchPlanReq := getLaunchPlanCreateRequest(createWorkflowReq.Id)
_, err = client.CreateLaunchPlan(ctx, &createLaunchPlanReq)

spec := &admin.ExecutionSpec{
LaunchPlan: &launchPlanIdentifier,
Labels: &admin.Labels{Values: map[string]string{
"foo": "bar",
}},
OverwriteCache: true,
MaxParallelism: 10,
ClusterAssignment: &admin.ClusterAssignment{
ClusterPoolName: "cluster",
},
Metadata: &admin.ExecutionMetadata{},
Annotations: &admin.Annotations{Values: map[string]string{"foo": "bar"}},
SecurityContext: &core.SecurityContext{RunAs: &core.Identity{IamRole: "iamrole"}},
TaskResourceAttributes: &admin.TaskResourceAttributes{
Defaults: &admin.TaskResourceSpec{
Cpu: "1",
Gpu: "0",
Memory: "1Gi",
EphemeralStorage: "0",
},
Limits: &admin.TaskResourceSpec{
Cpu: "2",
Gpu: "0",
Memory: "2Gi",
EphemeralStorage: "0",
},
},
}
_, err = client.CreateExecution(ctx, &admin.ExecutionCreateRequest{
Project: launchPlanIdentifier.Project,
Domain: launchPlanIdentifier.Domain,
Name: launchPlanIdentifier.Name,
Spec: spec,
})
require.NoError(t, err)

resp, err := client.GetExecution(ctx, &admin.WorkflowExecutionGetRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: launchPlanIdentifier.Project,
Domain: launchPlanIdentifier.Domain,
Name: launchPlanIdentifier.Name,
},
})
assert.NoError(t, err)
spec.Interruptible = &wrapperspb.BoolValue{
Value: false,
}
assert.True(t, proto.Equal(spec, resp.Closure.ResolvedSpec))
}

func TestSingleTaskResolvedSpec(t *testing.T) {
truncateAllTablesForTestingOnly()

ctx := context.Background()
client, conn := GetTestAdminServiceClient()
defer conn.Close()

insertTasksForTests(t, client)
createWorkflowReq := getWorkflowCreateRequest()
_, err := client.CreateWorkflow(ctx, &createWorkflowReq)
assert.Nil(t, err)
createLaunchPlanReq := getLaunchPlanCreateRequest(createWorkflowReq.Id)
_, err = client.CreateLaunchPlan(ctx, &createLaunchPlanReq)

spec := &admin.ExecutionSpec{
LaunchPlan: &launchPlanIdentifier,
Labels: &admin.Labels{Values: map[string]string{
"foo": "bar",
}},
OverwriteCache: true,
MaxParallelism: 10,
ClusterAssignment: &admin.ClusterAssignment{
ClusterPoolName: "cluster",
},
Metadata: &admin.ExecutionMetadata{},
Annotations: &admin.Annotations{Values: map[string]string{"foo": "bar"}},
SecurityContext: &core.SecurityContext{RunAs: &core.Identity{IamRole: "iamrole"}},
TaskResourceAttributes: &admin.TaskResourceAttributes{
Defaults: &admin.TaskResourceSpec{
Cpu: "1",
Gpu: "0",
Memory: "1Gi",
EphemeralStorage: "0",
},
Limits: &admin.TaskResourceSpec{
Cpu: "2",
Gpu: "0",
Memory: "2Gi",
EphemeralStorage: "0",
},
},
}
_, err = client.CreateExecution(ctx, &admin.ExecutionCreateRequest{
Project: launchPlanIdentifier.Project,
Domain: launchPlanIdentifier.Domain,
Name: launchPlanIdentifier.Name,
Spec: spec,
})
assert.Nil(t, err)

resp, err := client.GetExecution(ctx, &admin.WorkflowExecutionGetRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: launchPlanIdentifier.Project,
Domain: launchPlanIdentifier.Domain,
Name: launchPlanIdentifier.Name,
},
})
assert.Nil(t, err)
spec.Interruptible = &wrapperspb.BoolValue{
Value: false,
}
assert.True(t, proto.Equal(spec, resp.Closure.ResolvedSpec))
}

0 comments on commit 3f85987

Please sign in to comment.