Skip to content

Commit

Permalink
Undo changes from rebase in ray.go
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Graetz <[email protected]>
  • Loading branch information
fg91 committed Jan 14, 2024
1 parent f0946f6 commit 8be78ea
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ var logTemplateRegexes = struct {
tasklog.MustCreateRegex("rayJobID"),
}

type rayJobResourceHandler struct {
}
type rayJobResourceHandler struct{}

func (rayJobResourceHandler) GetProperties() k8s.PluginProperties {
return k8s.PluginProperties{}
Expand Down Expand Up @@ -128,7 +127,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
EnableIngress: &enableIngress,
RayStartParams: headNodeRayStartParams,
},
WorkerGroupSpecs: []rayv1alpha1.WorkerGroupSpec{},
WorkerGroupSpecs: []rayv1alpha1.WorkerGroupSpec{},
EnableInTreeAutoscaling: &rayJob.RayCluster.EnableAutoscaling,
}

for _, spec := range rayJob.RayCluster.WorkerGroupSpec {
Expand All @@ -140,16 +140,6 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
taskCtx,
)

minReplicas := spec.Replicas
maxReplicas := spec.Replicas
if spec.MinReplicas != 0 {
minReplicas = spec.MinReplicas
}

if spec.MaxReplicas != 0 {
maxReplicas = spec.MaxReplicas
}

workerNodeRayStartParams := make(map[string]string)
if spec.RayStartParams != nil {
workerNodeRayStartParams = spec.RayStartParams
Expand All @@ -165,6 +155,15 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
workerNodeRayStartParams[DisableUsageStatsStartParameter] = "true"
}

minReplicas := spec.MinReplicas
if minReplicas > spec.Replicas {
minReplicas = spec.Replicas
}
maxReplicas := spec.MaxReplicas
if maxReplicas < spec.Replicas {
maxReplicas = spec.Replicas
}

workerNodeSpec := rayv1alpha1.WorkerGroupSpec{
GroupName: spec.GroupName,
MinReplicas: &minReplicas,
Expand All @@ -184,11 +183,18 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
rayClusterSpec.WorkerGroupSpecs[index].Template.Spec.ServiceAccountName = serviceAccountName
}

shutdownAfterJobFinishes := cfg.ShutdownAfterJobFinishes
ttlSecondsAfterFinished := &cfg.TTLSecondsAfterFinished
if rayJob.ShutdownAfterJobFinishes {
shutdownAfterJobFinishes = true
ttlSecondsAfterFinished = &rayJob.TtlSecondsAfterFinished
}

jobSpec := rayv1alpha1.RayJobSpec{
RayClusterSpec: rayClusterSpec,
Entrypoint: strings.Join(primaryContainer.Args, " "),
ShutdownAfterJobFinishes: cfg.ShutdownAfterJobFinishes,
TTLSecondsAfterFinished: &cfg.TTLSecondsAfterFinished,
ShutdownAfterJobFinishes: shutdownAfterJobFinishes,
TTLSecondsAfterFinished: ttlSecondsAfterFinished,
RuntimeEnv: rayJob.RuntimeEnv,
}

Expand Down

0 comments on commit 8be78ea

Please sign in to comment.