Skip to content

Commit

Permalink
feat(replica-spec): Update corresponding golang files based on protob…
Browse files Browse the repository at this point in the history
…uf changes

Resolves: #4408
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed May 25, 2024
1 parent 8d5ab85 commit b10a69e
Show file tree
Hide file tree
Showing 5 changed files with 1,006 additions and 576 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,37 @@ type kfDistributedReplicaSpec interface {
GetImage() string
GetResources() *core.Resources
GetRestartPolicy() kfplugins.RestartPolicy
GetCommon() *kfplugins.CommonReplicaSpec
}

type allowsCommandOverride interface {
GetCommand() []string
}

func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, rs kfDistributedReplicaSpec, primaryContainerName string, isMaster bool) (*commonOp.ReplicaSpec, error) {
var replicas int32
var image string
var resources *core.Resources
var restartPolicy kfplugins.RestartPolicy

Check warning on line 308 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L305-L308

Added lines #L305 - L308 were not covered by tests

// replicas, image, resources, restartPolicy are deprecated since the common replica spec is introduced.
// Therefore, if the common replica spec is set, use that to get the common fields
common := rs.GetCommon()
if common != nil {
replicas = common.GetReplicas()
image = common.GetImage()
resources = common.GetResources()
restartPolicy = common.GetRestartPolicy()
} else {
replicas = rs.GetReplicas()
image = rs.GetImage()
resources = rs.GetResources()
restartPolicy = rs.GetRestartPolicy()

Check warning on line 322 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L312-L322

Added lines #L312 - L322 were not covered by tests
}

taskCtxOptions := []flytek8s.PluginTaskExecutionContextOption{}
if rs != nil && rs.GetResources() != nil {
resources, err := flytek8s.ToK8sResourceRequirements(rs.GetResources())
if resources != nil {
resources, err := flytek8s.ToK8sResourceRequirements(resources)

Check warning on line 327 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L326-L327

Added lines #L326 - L327 were not covered by tests
if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "invalid TaskSpecification on Resources [%v], Err: [%v]", resources, err.Error())
}
Expand All @@ -321,26 +342,23 @@ func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExe
replicaSpec.Replicas = &replicas
}

if rs != nil {
var command []string
if v, ok := rs.(allowsCommandOverride); ok {
command = v.GetCommand()
}
if err := OverrideContainerSpec(
&replicaSpec.Template.Spec,
primaryContainerName,
rs.GetImage(),
command,
); err != nil {
return nil, err
}
var command []string
if v, ok := rs.(allowsCommandOverride); ok {
command = v.GetCommand()

Check warning on line 347 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L345-L347

Added lines #L345 - L347 were not covered by tests
}
if err := OverrideContainerSpec(
&replicaSpec.Template.Spec,
primaryContainerName,
image,
command,
); err != nil {
return nil, err

Check warning on line 355 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L349-L355

Added lines #L349 - L355 were not covered by tests
}

replicaSpec.RestartPolicy = ParseRestartPolicy(rs.GetRestartPolicy())
replicaSpec.RestartPolicy = ParseRestartPolicy(restartPolicy)

Check warning on line 358 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L358

Added line #L358 was not covered by tests

if !isMaster {
replicas := rs.GetReplicas()
replicaSpec.Replicas = &replicas
}
if !isMaster {
replicaSpec.Replicas = &replicas

Check warning on line 361 in flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator.go#L360-L361

Added lines #L360 - L361 were not covered by tests
}

return replicaSpec, nil
Expand Down
Loading

0 comments on commit b10a69e

Please sign in to comment.