From 8be78ea7f53ca5787b1e8f655673a20f289dc39f Mon Sep 17 00:00:00 2001 From: Fabio Graetz Date: Sun, 14 Jan 2024 18:52:03 +0000 Subject: [PATCH] Undo changes from rebase in ray.go Signed-off-by: Fabio Graetz --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 36 ++++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index ba8eeab010..0e5684059a 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -46,8 +46,7 @@ var logTemplateRegexes = struct { tasklog.MustCreateRegex("rayJobID"), } -type rayJobResourceHandler struct { -} +type rayJobResourceHandler struct{} func (rayJobResourceHandler) GetProperties() k8s.PluginProperties { return k8s.PluginProperties{} @@ -128,7 +127,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC EnableIngress: &enableIngress, RayStartParams: headNodeRayStartParams, }, - WorkerGroupSpecs: []rayv1alpha1.WorkerGroupSpec{}, + WorkerGroupSpecs: []rayv1alpha1.WorkerGroupSpec{}, + EnableInTreeAutoscaling: &rayJob.RayCluster.EnableAutoscaling, } for _, spec := range rayJob.RayCluster.WorkerGroupSpec { @@ -140,16 +140,6 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC taskCtx, ) - minReplicas := spec.Replicas - maxReplicas := spec.Replicas - if spec.MinReplicas != 0 { - minReplicas = spec.MinReplicas - } - - if spec.MaxReplicas != 0 { - maxReplicas = spec.MaxReplicas - } - workerNodeRayStartParams := make(map[string]string) if spec.RayStartParams != nil { workerNodeRayStartParams = spec.RayStartParams @@ -165,6 +155,15 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC workerNodeRayStartParams[DisableUsageStatsStartParameter] = "true" } + minReplicas := spec.MinReplicas + if minReplicas > spec.Replicas { + minReplicas = spec.Replicas + } + maxReplicas := spec.MaxReplicas + if maxReplicas < spec.Replicas { + maxReplicas = spec.Replicas + } + workerNodeSpec := rayv1alpha1.WorkerGroupSpec{ GroupName: spec.GroupName, MinReplicas: &minReplicas, @@ -184,11 +183,18 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC rayClusterSpec.WorkerGroupSpecs[index].Template.Spec.ServiceAccountName = serviceAccountName } + shutdownAfterJobFinishes := cfg.ShutdownAfterJobFinishes + ttlSecondsAfterFinished := &cfg.TTLSecondsAfterFinished + if rayJob.ShutdownAfterJobFinishes { + shutdownAfterJobFinishes = true + ttlSecondsAfterFinished = &rayJob.TtlSecondsAfterFinished + } + jobSpec := rayv1alpha1.RayJobSpec{ RayClusterSpec: rayClusterSpec, Entrypoint: strings.Join(primaryContainer.Args, " "), - ShutdownAfterJobFinishes: cfg.ShutdownAfterJobFinishes, - TTLSecondsAfterFinished: &cfg.TTLSecondsAfterFinished, + ShutdownAfterJobFinishes: shutdownAfterJobFinishes, + TTLSecondsAfterFinished: ttlSecondsAfterFinished, RuntimeEnv: rayJob.RuntimeEnv, }