From 35016759a3848b27d604d872e9a79bfd89e4ab53 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Wed, 20 Dec 2023 13:10:59 -0300 Subject: [PATCH] Add Flyin propeller config (#4610) * Revert "Detect subNode phase updates to reduce evaluation frequency of ArrayNode (#4535)" This reverts commit b50ba877e4632826dd4d5bc0978a41d39c8d172a. Signed-off-by: Eduardo Apolinario * Add flyin pflags Signed-off-by: Eduardo Apolinario * Add taskTemplate as parameter to GetLogsForContainerInPod Signed-off-by: Eduardo Apolinario * Add flyin template scheme and unit tests Signed-off-by: Eduardo Apolinario * Revert unintended change. Signed-off-by: Eduardo Apolinario * Lint Signed-off-by: Eduardo Apolinario * Fix bad refactor due to lint warning Signed-off-by: Eduardo Apolinario * Remove TODOs Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- flyteplugins/go/tasks/logs/config.go | 3 + flyteplugins/go/tasks/logs/logconfig_flags.go | 2 + .../go/tasks/logs/logconfig_flags_test.go | 28 ++++ flyteplugins/go/tasks/logs/logging_utils.go | 11 +- .../go/tasks/logs/logging_utils_test.go | 54 +++++-- .../tasks/pluginmachinery/tasklog/plugin.go | 3 + .../tasks/pluginmachinery/tasklog/template.go | 27 ++++ .../pluginmachinery/tasklog/template_test.go | 132 ++++++++++++++++++ .../tasklog/templatescheme_enumer.go | 11 +- .../go/tasks/plugins/k8s/pod/plugin.go | 7 +- 10 files changed, 258 insertions(+), 20 deletions(-) diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index ca5a6012a8..b802844a4a 100644 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -28,6 +28,9 @@ type LogConfig struct { StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` + IsFlyinEnabled bool `json:"flyin-enabled" pflag:",Enable Log-links to flyin logs"` + FlyinTemplateURI tasklog.TemplateURI `json:"flyin-template-uri" pflag:",Template Uri to use when building flyin log links"` + Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"` } diff --git a/flyteplugins/go/tasks/logs/logconfig_flags.go b/flyteplugins/go/tasks/logs/logconfig_flags.go index 00c08a8a58..de8ba022dc 100755 --- a/flyteplugins/go/tasks/logs/logconfig_flags.go +++ b/flyteplugins/go/tasks/logs/logconfig_flags.go @@ -61,5 +61,7 @@ func (cfg LogConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gcp-project"), DefaultConfig.GCPProjectName, "Name of the project in GCP") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "stackdriver-logresourcename"), DefaultConfig.StackdriverLogResourceName, "Name of the logresource in stackdriver") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "stackdriver-template-uri"), DefaultConfig.StackDriverTemplateURI, "Template Uri to use when building stackdriver log links") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "flyin-enabled"), DefaultConfig.IsFlyinEnabled, "Enable Log-links to flyin logs") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "flyin-template-uri"), DefaultConfig.FlyinTemplateURI, "Template Uri to use when building flyin log links") return cmdFlags } diff --git a/flyteplugins/go/tasks/logs/logconfig_flags_test.go b/flyteplugins/go/tasks/logs/logconfig_flags_test.go index 8bb775df1f..dfbee43c69 100755 --- a/flyteplugins/go/tasks/logs/logconfig_flags_test.go +++ b/flyteplugins/go/tasks/logs/logconfig_flags_test.go @@ -253,4 +253,32 @@ func TestLogConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_flyin-enabled", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("flyin-enabled", testValue) + if vBool, err := cmdFlags.GetBool("flyin-enabled"); err == nil { + testDecodeJson_LogConfig(t, fmt.Sprintf("%v", vBool), &actual.IsFlyinEnabled) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_flyin-template-uri", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("flyin-template-uri", testValue) + if vString, err := cmdFlags.GetString("flyin-template-uri"); err == nil { + testDecodeJson_LogConfig(t, fmt.Sprintf("%v", vString), &actual.FlyinTemplateURI) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 4978109458..20f3522e27 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -14,7 +14,7 @@ import ( ) // Internal -func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) { +func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) { if logPlugin == nil { return nil, nil } @@ -51,6 +51,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas PodUnixFinishTime: finishTime, TaskExecutionID: taskExecID, ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme, + TaskTemplate: taskTemplate, }, ) @@ -108,6 +109,14 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { } } + if cfg.IsFlyinEnabled { + if len(cfg.FlyinTemplateURI) > 0 { + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON}) + } else { + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON}) + } + } + plugins = append(plugins, cfg.Templates...) return templateLogPluginCollection{plugins: plugins}, nil } diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 91048eff16..46eb682201 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -44,7 +44,7 @@ func dummyTaskExecID() pluginCore.TaskExecutionID { func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) { logPlugin, err := InitializeLogPlugins(&LogConfig{}) assert.NoError(t, err) - l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil) + l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, l) } @@ -56,7 +56,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) { CloudwatchLogGroup: "/kubernetes/flyte-production", }) assert.NoError(t, err) - p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil) + p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, p) } @@ -87,7 +87,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) { } pod.Name = podName - p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil) + p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, p) } @@ -112,7 +112,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) { } pod.Name = podName - p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil) + p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil) assert.NoError(t, err) assert.Nil(t, p) } @@ -142,7 +142,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 1) } @@ -172,7 +172,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 1) } @@ -205,7 +205,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 2) } @@ -236,7 +236,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) { } pod.Name = podName - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil) assert.Nil(t, err) assert.Len(t, logs, 1) } @@ -252,7 +252,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) { IsStackDriverEnabled: true, StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", - }, []*core.TaskLog{ + }, nil, []*core.TaskLog{ { Uri: "https://k8s-my-log-server/my-namespace/my-pod/ContainerName/ContainerID", MessageFormat: core.TaskLog_JSON, @@ -275,7 +275,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) { assertTestSucceeded(t, &LogConfig{ IsStackDriverEnabled: true, StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", - }, []*core.TaskLog{ + }, nil, []*core.TaskLog{ { Uri: "https://sd-my-log-server/my-namespace/my-pod/ContainerName/ContainerID", MessageFormat: core.TaskLog_JSON, @@ -285,7 +285,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) { }) } -func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*core.TaskLog) { +func assertTestSucceeded(tb testing.TB, config *LogConfig, taskTemplate *core.TaskTemplate, expectedTaskLogs []*core.TaskLog) { logPlugin, err := InitializeLogPlugins(config) assert.NoError(tb, err) @@ -310,7 +310,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c }, } - logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil) + logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil, taskTemplate) assert.Nil(tb, err) assert.Len(tb, logs, len(expectedTaskLogs)) if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 { @@ -337,7 +337,7 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) { Scheme: tasklog.TemplateSchemeTaskExecution, }, }, - }, []*core.TaskLog{ + }, nil, []*core.TaskLog{ { Uri: "https://my-log-server/my-namespace/my-pod/ContainerName/ContainerID", MessageFormat: core.TaskLog_JSON, @@ -350,3 +350,31 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) { }, }) } + +func TestGetLogsForContainerInPod_Flyin(t *testing.T) { + assertTestSucceeded(t, + &LogConfig{ + IsKubernetesEnabled: true, + KubernetesTemplateURI: "https://k8s.com", + IsFlyinEnabled: true, + FlyinTemplateURI: "https://flyin.mydomain.com:{{ .port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", + }, + &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "65535", + }, + }, + []*core.TaskLog{ + { + Uri: "https://k8s.com", + MessageFormat: core.TaskLog_JSON, + Name: "Kubernetes Logs my-Suffix", + }, + { + Uri: "https://flyin.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID", + MessageFormat: core.TaskLog_JSON, + Name: "Flyin Logs my-Suffix", + }, + }) +} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index c43da02e58..da2357a6d9 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -14,6 +14,7 @@ type TemplateScheme int const ( TemplateSchemePod TemplateScheme = iota TemplateSchemeTaskExecution + TemplateSchemeFlyin ) // TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. @@ -30,6 +31,7 @@ type TemplateVarsByScheme struct { Common TemplateVars Pod TemplateVars TaskExecution TemplateVars + Flyin TemplateVars } // Input contains all available information about task's execution that a log plugin can use to construct task's @@ -48,6 +50,7 @@ type Input struct { PodUID string TaskExecutionID pluginsCore.TaskExecutionID ExtraTemplateVarsByScheme *TemplateVarsByScheme + TaskTemplate *core.TaskTemplate } // Output contains all task logs a plugin generates for a given Input. diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 750b1972df..ea5c5f373c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -35,6 +35,7 @@ type templateRegexes struct { ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp GeneratedName *regexp.Regexp + Port *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -60,6 +61,7 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), MustCreateRegex("generatedName"), + MustCreateRegex("port"), } } @@ -85,6 +87,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { } switch scheme { + case TemplateSchemeFlyin: + port := input.TaskTemplate.GetConfig()["port"] + if port == "" { + port = "8080" + } + vars = append( + vars, + TemplateVar{defaultRegexes.Port, port}, + ) + fallthrough case TemplateSchemePod: // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log // stream. Therefore, we must also strip the prefix. @@ -181,7 +193,22 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { templateVars := input.templateVarsForScheme(p.Scheme) taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) + + // Grab metadata from task template and check if key "link_type" is set to "vscode". + // If so, add a vscode link to the task logs. + isFlyin := false + if input.TaskTemplate != nil && input.TaskTemplate.GetConfig() != nil { + config := input.TaskTemplate.GetConfig() + if config != nil && config["link_type"] == "vscode" { + isFlyin = true + } + } for _, templateURI := range p.TemplateURIs { + // Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template. + // This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section. + if p.DisplayName == "Flyin Logs" && !isFlyin { + continue + } taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), Name: p.DisplayName + input.LogName, diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index f279707a3b..ad6eef25a3 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -74,6 +74,24 @@ func Test_Input_templateVarsForScheme(t *testing.T) { LogName: "main_logs", TaskExecutionID: dummyTaskExecID(), } + flyinBase := Input{ + HostName: "my-host", + PodName: "my-pod", + PodUID: "my-pod-uid", + Namespace: "my-namespace", + ContainerName: "my-container", + ContainerID: "docker://containerID", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "port": "1234", + }, + }, + } tests := []struct { name string @@ -202,6 +220,50 @@ func Test_Input_templateVarsForScheme(t *testing.T) { {testRegexes.Baz, "baz"}, }, }, + { + "flyin happy path", + TemplateSchemeFlyin, + flyinBase, + nil, + nil, + TemplateVars{ + {defaultRegexes.Port, "1234"}, + }, + nil, + }, + { + "flyin and pod happy path", + TemplateSchemeFlyin, + flyinBase, + nil, + TemplateVars{ + {defaultRegexes.LogName, "main_logs"}, + {defaultRegexes.Port, "1234"}, + {defaultRegexes.PodName, "my-pod"}, + {defaultRegexes.PodUID, "my-pod-uid"}, + {defaultRegexes.Namespace, "my-namespace"}, + {defaultRegexes.ContainerName, "my-container"}, + {defaultRegexes.ContainerID, "containerID"}, + {defaultRegexes.Hostname, "my-host"}, + {defaultRegexes.PodRFC3339StartTime, "1970-01-01T01:02:03+01:00"}, + {defaultRegexes.PodRFC3339FinishTime, "1970-01-01T04:25:45+01:00"}, + {defaultRegexes.PodUnixStartTime, "123"}, + {defaultRegexes.PodUnixFinishTime, "12345"}, + }, + nil, + nil, + }, + { + "pod with port not affected", + TemplateSchemePod, + podBase, + nil, + nil, + nil, + TemplateVars{ + {defaultRegexes.Port, "1234"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -474,6 +536,76 @@ func TestTemplateLogPlugin(t *testing.T) { }, }, }, + { + "flyin", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + }, + args{ + input: Input{ + PodName: "my-pod-name", + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "1234", + }, + }, + }, + }, + Output{ + TaskLogs: []*core.TaskLog{ + { + Uri: "vscode://flyin:1234/my-pod-name", + MessageFormat: core.TaskLog_JSON, + }, + }, + }, + }, + { + "flyin - default port", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + }, + args{ + input: Input{ + PodName: "my-pod-name", + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + }, + }, + }, + }, + Output{ + TaskLogs: []*core.TaskLog{ + { + Uri: "vscode://flyin:8080/my-pod-name", + MessageFormat: core.TaskLog_JSON, + }, + }, + }, + }, + { + "flyin - no link_type in task template", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + DisplayName: "Flyin Logs", + }, + args{ + input: Input{ + PodName: "my-pod-name", + }, + }, + Output{ + TaskLogs: []*core.TaskLog{}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go index 70f15faf01..c1f4d668c0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go @@ -7,9 +7,9 @@ import ( "fmt" ) -const _TemplateSchemeName = "PodTaskExecution" +const _TemplateSchemeName = "PodTaskExecutionFlyin" -var _TemplateSchemeIndex = [...]uint8{0, 3, 16} +var _TemplateSchemeIndex = [...]uint8{0, 3, 16, 21} func (i TemplateScheme) String() string { if i < 0 || i >= TemplateScheme(len(_TemplateSchemeIndex)-1) { @@ -18,11 +18,12 @@ func (i TemplateScheme) String() string { return _TemplateSchemeName[_TemplateSchemeIndex[i]:_TemplateSchemeIndex[i+1]] } -var _TemplateSchemeValues = []TemplateScheme{0, 1} +var _TemplateSchemeValues = []TemplateScheme{0, 1, 2} var _TemplateSchemeNameToValueMap = map[string]TemplateScheme{ - _TemplateSchemeName[0:3]: 0, - _TemplateSchemeName[3:16]: 1, + _TemplateSchemeName[0:3]: 0, + _TemplateSchemeName[3:16]: 1, + _TemplateSchemeName[16:21]: 2, } // TemplateSchemeString retrieves an enum value from the enum constants string name. diff --git a/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go b/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go index b266a6f5e8..eae0ac98b7 100644 --- a/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go +++ b/flyteplugins/go/tasks/plugins/k8s/pod/plugin.go @@ -153,6 +153,11 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin return pluginsCore.PhaseInfoUndefined, err } + taskTemplate, err := pluginContext.TaskReader().Read(ctx) + if err != nil { + return pluginsCore.PhaseInfoUndefined, err + } + pod := r.(*v1.Pod) transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time @@ -168,7 +173,7 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown { - taskLogs, err := logs.GetLogsForContainerInPod(ctx, logPlugin, taskExecID, pod, 0, logSuffix, extraLogTemplateVarsByScheme) + taskLogs, err := logs.GetLogsForContainerInPod(ctx, logPlugin, taskExecID, pod, 0, logSuffix, extraLogTemplateVarsByScheme, taskTemplate) if err != nil { return pluginsCore.PhaseInfoUndefined, err }