From 4b6dbc3dacef50daad18ce71538815a8fdb48bfb Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Fri, 26 Jan 2024 15:52:25 -0800 Subject: [PATCH] Remove scheme switch Signed-off-by: Eduardo Apolinario --- .../go/tasks/logs/logging_utils_test.go | 20 ---- .../tasks/pluginmachinery/tasklog/template.go | 58 +++++---- .../pluginmachinery/tasklog/template_test.go | 111 ++++++++---------- .../plugins/array/k8s/management_test.go | 1 + .../go/tasks/plugins/k8s/dask/dask_test.go | 1 + .../common/common_operator_test.go | 2 + .../plugins/k8s/kfoperators/mpi/mpi_test.go | 1 + .../k8s/kfoperators/pytorch/pytorch_test.go | 1 + .../kfoperators/tensorflow/tensorflow_test.go | 1 + .../go/tasks/plugins/k8s/pod/sidecar_test.go | 1 + .../go/tasks/plugins/k8s/spark/spark_test.go | 1 + 11 files changed, 86 insertions(+), 112 deletions(-) diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 02251c3836..a0475a6169 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -378,26 +378,6 @@ func TestGetLogsForContainerInPod_Flyin(t *testing.T) { &core.TaskTemplate{}, nil, }, - { - "Flyin enabled but no port in task template", - &LogConfig{ - DynamicLogLinks: map[string]tasklog.TemplateURI{ - "vscode": "https://flyin.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", - }, - }, - &core.TaskTemplate{ - Config: map[string]string{ - "link_type": "vscode", - }, - }, - []*core.TaskLog{ - { - Uri: "https://flyin.mydomain.com:8080/my-namespace/my-pod/ContainerName/ContainerID", - MessageFormat: core.TaskLog_JSON, - Name: "vscode logs my-Suffix", - }, - }, - }, { "Flyin disabled but config present in TaskTemplate", &LogConfig{}, diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 8faa933dae..2bdb3f40fb 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -92,38 +92,34 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { vars = append(vars, input.ExtraTemplateVarsByScheme.Common...) } - switch scheme { - case TemplateSchemeDynamic: - port := input.TaskTemplate.GetConfig()["port"] - if port == "" { - port = "8080" - } + port := input.TaskTemplate.GetConfig()["port"] + if port != "" { vars = append( vars, TemplateVar{defaultRegexes.Port, port}, ) - fallthrough - case TemplateSchemePod: - // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log - // stream. Therefore, we must also strip the prefix. - containerID := input.ContainerID - stripDelimiter := "://" - if split := strings.Split(input.ContainerID, stripDelimiter); len(split) > 1 { - containerID = split[1] - } - vars = append( - vars, - TemplateVar{defaultRegexes.PodName, input.PodName}, - TemplateVar{defaultRegexes.PodUID, input.PodUID}, - TemplateVar{defaultRegexes.Namespace, input.Namespace}, - TemplateVar{defaultRegexes.ContainerName, input.ContainerName}, - TemplateVar{defaultRegexes.ContainerID, containerID}, - TemplateVar{defaultRegexes.Hostname, input.HostName}, - ) - if gotExtraTemplateVars { - vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...) - } - case TemplateSchemeTaskExecution: + } + + // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log + // stream. Therefore, we must also strip the prefix. + containerID := input.ContainerID + stripDelimiter := "://" + if split := strings.Split(input.ContainerID, stripDelimiter); len(split) > 1 { + containerID = split[1] + } + vars = append( + vars, + TemplateVar{defaultRegexes.PodName, input.PodName}, + TemplateVar{defaultRegexes.PodUID, input.PodUID}, + TemplateVar{defaultRegexes.Namespace, input.Namespace}, + TemplateVar{defaultRegexes.ContainerName, input.ContainerName}, + TemplateVar{defaultRegexes.ContainerID, containerID}, + TemplateVar{defaultRegexes.Hostname, input.HostName}, + ) + if gotExtraTemplateVars { + vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...) + } + if input.TaskExecutionID != nil { taskExecutionIdentifier := input.TaskExecutionID.GetID() vars = append( vars, @@ -178,9 +174,9 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { }, ) } - if gotExtraTemplateVars { - vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...) - } + } + if gotExtraTemplateVars { + vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...) } vars = append( diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index fffae5a036..f62e65250c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -149,23 +149,6 @@ func Test_Input_templateVarsForScheme(t *testing.T) { }, nil, }, - { - "pod with unused extra vars", - TemplateSchemePod, - podBase, - &TemplateVarsByScheme{ - TaskExecution: TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, - nil, - nil, - TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, { "task execution happy path", TemplateSchemeTaskExecution, @@ -173,6 +156,12 @@ func Test_Input_templateVarsForScheme(t *testing.T) { nil, TemplateVars{ {defaultRegexes.LogName, "main_logs"}, + {defaultRegexes.PodName, ""}, + {defaultRegexes.PodUID, ""}, + {defaultRegexes.Namespace, ""}, + {defaultRegexes.ContainerName, ""}, + {defaultRegexes.ContainerID, ""}, + {defaultRegexes.Hostname, ""}, {defaultRegexes.NodeID, "n0-0-n0"}, {defaultRegexes.GeneratedName, "generated-name"}, {defaultRegexes.TaskRetryAttempt, "1"}, @@ -212,23 +201,23 @@ func Test_Input_templateVarsForScheme(t *testing.T) { }, nil, }, - { - "task execution with unused extra vars", - TemplateSchemeTaskExecution, - taskExecutionBase, - &TemplateVarsByScheme{ - Pod: TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, - nil, - nil, - TemplateVars{ - {testRegexes.Bar, "bar"}, - {testRegexes.Baz, "baz"}, - }, - }, + // { + // "task execution with unused extra vars", + // TemplateSchemeTaskExecution, + // taskExecutionBase, + // &TemplateVarsByScheme{ + // Pod: TemplateVars{ + // {testRegexes.Bar, "bar"}, + // {testRegexes.Baz, "baz"}, + // }, + // }, + // nil, + // nil, + // TemplateVars{ + // {testRegexes.Bar, "bar"}, + // {testRegexes.Baz, "baz"}, + // }, + // }, { "flyin happy path", TemplateSchemeDynamic, @@ -573,33 +562,33 @@ func TestTemplateLogPlugin(t *testing.T) { }, }, }, - { - "flyin - default port", - TemplateLogPlugin{ - Name: "vscode", - Scheme: TemplateSchemeDynamic, - DynamicTemplateURIs: []TemplateURI{"vscode://flyin:{{ .taskConfig.port }}/{{ .podName }}"}, - MessageFormat: core.TaskLog_JSON, - }, - args{ - input: Input{ - PodName: "my-pod-name", - TaskTemplate: &core.TaskTemplate{ - Config: map[string]string{ - "link_type": "vscode", - }, - }, - }, - }, - Output{ - TaskLogs: []*core.TaskLog{ - { - Uri: "vscode://flyin:8080/my-pod-name", - MessageFormat: core.TaskLog_JSON, - }, - }, - }, - }, + // { + // "flyin - default port", + // TemplateLogPlugin{ + // Name: "vscode", + // Scheme: TemplateSchemeDynamic, + // DynamicTemplateURIs: []TemplateURI{"vscode://flyin:{{ .taskConfig.port }}/{{ .podName }}"}, + // MessageFormat: core.TaskLog_JSON, + // }, + // args{ + // input: Input{ + // PodName: "my-pod-name", + // TaskTemplate: &core.TaskTemplate{ + // Config: map[string]string{ + // "link_type": "vscode", + // }, + // }, + // }, + // }, + // Output{ + // TaskLogs: []*core.TaskLog{ + // { + // Uri: "vscode://flyin:8080/my-pod-name", + // MessageFormat: core.TaskLog_JSON, + // }, + // }, + // }, + // }, { "flyin - no link_type in task template", TemplateLogPlugin{ diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go index ab26028b09..9404bdfb72 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management_test.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management_test.go @@ -67,6 +67,7 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta tID := &mocks.TaskExecutionID{} tID.OnGetGeneratedName().Return("notfound") + tID.On("GetUniqueNodeID").Return("an-unique-id") tID.OnGetID().Return(core2.TaskExecutionIdentifier{ TaskId: &core2.Identifier{ ResourceType: core2.ResourceType_TASK, diff --git a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go index 576740af93..e7d43a2256 100644 --- a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go @@ -179,6 +179,7 @@ func dummyDaskTaskContext(taskTemplate *core.TaskTemplate, resources *v1.Resourc }, }) tID.On("GetGeneratedName").Return(testTaskID) + tID.On("GetUniqueNodeID").Return("an-unique-id") taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index 64538f2d61..d0e154835c 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -318,6 +318,8 @@ func dummyTaskContext() pluginsCore.TaskExecutionContext { }, RetryAttempt: 0, }) + tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") taskExecutionMetadata := &mocks.TaskExecutionMetadata{} taskExecutionMetadata.OnGetTaskExecutionID().Return(tID) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index d009c7c887..29fe47a446 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -148,6 +148,7 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso }, }) tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index e0606b1020..0700644578 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -154,6 +154,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate, resources *corev1. }, }) tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) diff --git a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index c3252183fc..2402bd4c5a 100644 --- a/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -149,6 +149,7 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate, resources *core }, }) tID.OnGetGeneratedName().Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.OnGetResources().Return(resources) diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go b/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go index 0c728780d9..4977695a1a 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go @@ -84,6 +84,7 @@ func dummySidecarTaskMetadata(resources *v1.ResourceRequirements, extendedResour }, }) tID.On("GetGeneratedName").Return("my_project:my_domain:my_name") + tID.On("GetUniqueNodeID").Return("an-unique-id") taskMetadata.On("GetTaskExecutionID").Return(tID) to := &pluginsCoreMock.TaskOverrides{} diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index b07ab0ef33..264f9514e9 100644 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -379,6 +379,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate, interruptible bool) }, }) tID.On("GetGeneratedName").Return("some-acceptable-name") + tID.On("GetUniqueNodeID").Return("an-unique-id") overrides := &mocks.TaskOverrides{} overrides.On("GetResources").Return(&corev1.ResourceRequirements{})