Skip to content

Commit

Permalink
feat(replica-spec): Update corresponding golang files based on protob…
Browse files Browse the repository at this point in the history
…uf changes

Resolves: flyteorg#4408
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed May 21, 2024
1 parent ce99342 commit 946ca70
Show file tree
Hide file tree
Showing 5 changed files with 1,006 additions and 576 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,37 @@ type kfDistributedReplicaSpec interface {
GetImage() string
GetResources() *core.Resources
GetRestartPolicy() kfplugins.RestartPolicy
GetCommon() *kfplugins.CommonReplicaSpec
}

type allowsCommandOverride interface {
GetCommand() []string
}

func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, rs kfDistributedReplicaSpec, primaryContainerName string, isMaster bool) (*commonOp.ReplicaSpec, error) {
var replicas int32
var image string
var resources *core.Resources
var restartPolicy kfplugins.RestartPolicy

// replicas, image, resources, restartPolicy are deprecated since the common replica spec is introduced.
// Therefore, if the common replica spec is set, use that to get the common fields
common := rs.GetCommon()
if common != nil {
replicas = common.GetReplicas()
image = common.GetImage()
resources = common.GetResources()
restartPolicy = common.GetRestartPolicy()
} else {
replicas = rs.GetReplicas()
image = rs.GetImage()
resources = rs.GetResources()
restartPolicy = rs.GetRestartPolicy()
}

taskCtxOptions := []flytek8s.PluginTaskExecutionContextOption{}
if rs != nil && rs.GetResources() != nil {
resources, err := flytek8s.ToK8sResourceRequirements(rs.GetResources())
if resources != nil {
resources, err := flytek8s.ToK8sResourceRequirements(resources)
if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "invalid TaskSpecification on Resources [%v], Err: [%v]", resources, err.Error())
}
Expand All @@ -321,26 +342,23 @@ func ToReplicaSpecWithOverrides(ctx context.Context, taskCtx pluginsCore.TaskExe
replicaSpec.Replicas = &replicas
}

if rs != nil {
var command []string
if v, ok := rs.(allowsCommandOverride); ok {
command = v.GetCommand()
}
if err := OverrideContainerSpec(
&replicaSpec.Template.Spec,
primaryContainerName,
rs.GetImage(),
command,
); err != nil {
return nil, err
}
var command []string
if v, ok := rs.(allowsCommandOverride); ok {
command = v.GetCommand()
}
if err := OverrideContainerSpec(
&replicaSpec.Template.Spec,
primaryContainerName,
image,
command,
); err != nil {
return nil, err
}

replicaSpec.RestartPolicy = ParseRestartPolicy(rs.GetRestartPolicy())
replicaSpec.RestartPolicy = ParseRestartPolicy(restartPolicy)

if !isMaster {
replicas := rs.GetReplicas()
replicaSpec.Replicas = &replicas
}
if !isMaster {
replicaSpec.Replicas = &replicas
}

return replicaSpec, nil
Expand Down
Loading

0 comments on commit 946ca70

Please sign in to comment.