Skip to content

Commit

Permalink
Set max parallelism in flytewf
Browse files Browse the repository at this point in the history
  • Loading branch information
RRap0so committed Jun 28, 2024
1 parent bba8c11 commit 2ae45e8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
4 changes: 4 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
flyteWf.Tasks = nil
}

if data.ExecutionParameters.ExecutionConfig.MaxParallelism > 0 {
flyteWf.ExecutionConfig.MaxParallelism = uint32(data.ExecutionParameters.ExecutionConfig.MaxParallelism)
}

executionTargetSpec := executioncluster.ExecutionTargetSpec{
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Expand Down
68 changes: 68 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,74 @@ func TestExecute(t *testing.T) {
assert.Equal(t, resp.Cluster, clusterID)
}

func TestExecute_MaxParallelism(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.createCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, flyteWf, flyteWorkflow)
assert.Empty(t, opts)
return nil, nil
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
}

mockApplicationConfig := runtimeMocks.MockApplicationProvider{}
mockApplicationConfig.SetTopLevelConfig(runtimeInterfaces.ApplicationConfig{
UseOffloadedWorkflowClosure: false,
})
mockRuntime := runtimeMocks.NewMockConfigurationProvider(&mockApplicationConfig, nil, nil, nil, nil, nil)

mockBuilder := mocks.FlyteWorkflowBuilder{}
workflowClosure := core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Id: &core.Identifier{
Project: "p",
Domain: "d",
Name: "n",
Version: "version",
},
},
},
}
mockBuilder.OnBuildMatch(mock.MatchedBy(func(wfClosure *core.CompiledWorkflowClosure) bool {
return proto.Equal(wfClosure, &workflowClosure)
}), mock.MatchedBy(func(inputs *core.LiteralMap) bool {
return proto.Equal(inputs, testInputs)
}), mock.MatchedBy(func(executionID *core.WorkflowExecutionIdentifier) bool {
return proto.Equal(executionID, execID)
}), namespace).Return(flyteWf, nil)
executor := K8sWorkflowExecutor{
config: mockRuntime,
workflowBuilder: &mockBuilder,
executionCluster: getFakeExecutionCluster(),
}

resp, err := executor.Execute(context.TODO(), interfaces.ExecutionData{
Namespace: namespace,
ExecutionID: execID,
ReferenceWorkflowName: "ref_workflow_name",
ReferenceLaunchPlanName: "ref_lp_name",
WorkflowClosure: &workflowClosure,
ExecutionParameters: interfaces.ExecutionParameters{
Inputs: testInputs,
ExecutionConfig: &admin.WorkflowExecutionConfig{
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
IamRole: testRoleSc,
K8SServiceAccount: testK8sServiceAccountSc,
},
},
MaxParallelism: 10,
},
},
})
assert.NoError(t, err)
assert.Equal(t, resp.Cluster, clusterID)
assert.Equal(t, flyteWf.ExecutionConfig.MaxParallelism, uint32(10))
}

func TestExecute_AlreadyExists(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.createCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
Expand Down

0 comments on commit 2ae45e8

Please sign in to comment.