From 72e743882d79b50208da0c4488063ab4518ce266 Mon Sep 17 00:00:00 2001 From: Jeev B Date: Mon, 13 Nov 2023 13:34:32 -0800 Subject: [PATCH] Add more context for ray log template links (#4416) Signed-off-by: Jeev B --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 32 ++++- .../go/tasks/plugins/k8s/ray/ray_test.go | 114 +++++++++++++++++- 2 files changed, 141 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index 0bc4f1183b..50a9c76094 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -3,6 +3,7 @@ package ray import ( "context" "fmt" + "regexp" "strconv" "strings" "time" @@ -37,6 +38,14 @@ const ( DisableUsageStatsStartParameter = "disable-usage-stats" ) +var logTemplateRegexes = struct { + RayClusterName *regexp.Regexp + RayJobID *regexp.Regexp +}{ + tasklog.MustCreateRegex("rayClusterName"), + tasklog.MustCreateRegex("rayJobID"), +} + type rayJobResourceHandler struct { } @@ -442,8 +451,27 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() input := tasklog.Input{ - Namespace: rayJob.Namespace, - TaskExecutionID: taskExecID, + Namespace: rayJob.Namespace, + TaskExecutionID: taskExecID, + ExtraTemplateVarsByScheme: &tasklog.TemplateVarsByScheme{}, + } + if rayJob.Status.JobId != "" { + input.ExtraTemplateVarsByScheme.Common = append( + input.ExtraTemplateVarsByScheme.Common, + tasklog.TemplateVar{ + Regex: logTemplateRegexes.RayJobID, + Value: rayJob.Status.JobId, + }, + ) + } + if rayJob.Status.RayClusterName != "" { + input.ExtraTemplateVarsByScheme.Common = append( + input.ExtraTemplateVarsByScheme.Common, + tasklog.TemplateVar{ + Regex: logTemplateRegexes.RayClusterName, + Value: rayJob.Status.RayClusterName, + }, + ) } // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index ccb518fa03..64700957c9 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -608,13 +608,21 @@ func newPluginContext() k8s.PluginContext { taskExecID := &mocks.TaskExecutionID{} taskExecID.OnGetID().Return(core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Name: "my-task-name", + Project: "my-task-project", + Domain: "my-task-domain", + Version: "1", + }, NodeExecutionId: &core.NodeExecutionIdentifier{ ExecutionId: &core.WorkflowExecutionIdentifier{ - Name: "my_name", - Project: "my_project", - Domain: "my_domain", + Name: "my-execution-name", + Project: "my-execution-project", + Domain: "my-execution-domain", }, }, + RetryAttempt: 1, }) taskExecID.OnGetUniqueNodeID().Return("unique-node") taskExecID.OnGetGeneratedName().Return("generated-name") @@ -678,6 +686,106 @@ func TestGetTaskPhase(t *testing.T) { } } +func TestGetEventInfo_LogTemplates(t *testing.T) { + pluginCtx := newPluginContext() + testCases := []struct { + name string + rayJob rayv1alpha1.RayJob + logPlugin tasklog.TemplateLogPlugin + expectedTaskLogs []*core.TaskLog + }{ + { + name: "namespace", + rayJob: rayv1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + }, + }, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "namespace", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}"}, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "namespace", + Uri: "http://test/test-namespace", + }, + }, + }, + { + name: "task execution ID", + rayJob: rayv1alpha1.RayJob{}, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "taskExecID", + TemplateURIs: []tasklog.TemplateURI{ + "http://test/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}", + }, + Scheme: tasklog.TemplateSchemeTaskExecution, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "taskExecID", + Uri: "http://test/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/unique-node/taskId/my-task-name/attempt/1", + }, + }, + }, + { + name: "ray cluster name", + rayJob: rayv1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + }, + Status: rayv1alpha1.RayJobStatus{ + RayClusterName: "ray-cluster", + }, + }, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "ray cluster name", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}/{{ .rayClusterName }}"}, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "ray cluster name", + Uri: "http://test/test-namespace/ray-cluster", + }, + }, + }, + { + name: "ray job ID", + rayJob: rayv1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + }, + Status: rayv1alpha1.RayJobStatus{ + JobId: "ray-job-1", + }, + }, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "ray job ID", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}/{{ .rayJobID }}"}, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "ray job ID", + Uri: "http://test/test-namespace/ray-job-1", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ti, err := getEventInfoForRayJob( + logs.LogConfig{Templates: []tasklog.TemplateLogPlugin{tc.logPlugin}}, + pluginCtx, + &tc.rayJob, + ) + assert.NoError(t, err) + assert.Equal(t, tc.expectedTaskLogs, ti.Logs) + }) + } +} + func TestGetEventInfo_DashboardURL(t *testing.T) { pluginCtx := newPluginContext() testCases := []struct {