From 28df4f248eb861b831d5e2d1abf7a69bb0ccd4a7 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Mon, 18 Dec 2023 12:20:56 -0300 Subject: [PATCH] Add flyin template scheme and unit tests Signed-off-by: Eduardo Apolinario --- .../tasks/pluginmachinery/tasklog/plugin.go | 3 + .../tasks/pluginmachinery/tasklog/template.go | 29 ++++ .../pluginmachinery/tasklog/template_test.go | 132 ++++++++++++++++++ .../tasklog/templatescheme_enumer.go | 11 +- 4 files changed, 170 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index c43da02e58..da2357a6d9 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -14,6 +14,7 @@ type TemplateScheme int const ( TemplateSchemePod TemplateScheme = iota TemplateSchemeTaskExecution + TemplateSchemeFlyin ) // TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. @@ -30,6 +31,7 @@ type TemplateVarsByScheme struct { Common TemplateVars Pod TemplateVars TaskExecution TemplateVars + Flyin TemplateVars } // Input contains all available information about task's execution that a log plugin can use to construct task's @@ -48,6 +50,7 @@ type Input struct { PodUID string TaskExecutionID pluginsCore.TaskExecutionID ExtraTemplateVarsByScheme *TemplateVarsByScheme + TaskTemplate *core.TaskTemplate } // Output contains all task logs a plugin generates for a given Input. diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 750b1972df..829b634084 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -35,6 +35,7 @@ type templateRegexes struct { ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp GeneratedName *regexp.Regexp + Port *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -60,6 +61,7 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), MustCreateRegex("generatedName"), + MustCreateRegex("port"), } } @@ -85,6 +87,18 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { } switch scheme { + case TemplateSchemeFlyin: + // TODO: Confirm that having a default port is okay. + port := input.TaskTemplate.GetConfig()["port"] + if port == "" { + port = "8081" + } + vars = append( + vars, + // Replace the port with the port from the task template. + TemplateVar{defaultRegexes.Port, port}, + ) + fallthrough case TemplateSchemePod: // Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log // stream. Therefore, we must also strip the prefix. @@ -181,7 +195,22 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { templateVars := input.templateVarsForScheme(p.Scheme) taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) + + // Grab metadata from task template and check if key "link_type" is set to "vscode". + // If so, add a vscode link to the task logs. + isFlyin := false + if input.TaskTemplate != nil && input.TaskTemplate.GetConfig() != nil { + config := input.TaskTemplate.GetConfig() + if config != nil && config["link_type"] == "vscode" { + isFlyin = true + } + } for _, templateURI := range p.TemplateURIs { + // Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template. + // This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section. + if p.DisplayName == "Flyin Logs" && isFlyin == false { + continue + } taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), Name: p.DisplayName + input.LogName, diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index f279707a3b..09637fb811 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -74,6 +74,24 @@ func Test_Input_templateVarsForScheme(t *testing.T) { LogName: "main_logs", TaskExecutionID: dummyTaskExecID(), } + flyinBase := Input{ + HostName: "my-host", + PodName: "my-pod", + PodUID: "my-pod-uid", + Namespace: "my-namespace", + ContainerName: "my-container", + ContainerID: "docker://containerID", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "port": "1234", + }, + }, + } tests := []struct { name string @@ -202,6 +220,50 @@ func Test_Input_templateVarsForScheme(t *testing.T) { {testRegexes.Baz, "baz"}, }, }, + { + "flyin happy path", + TemplateSchemeFlyin, + flyinBase, + nil, + nil, + TemplateVars{ + {defaultRegexes.Port, "1234"}, + }, + nil, + }, + { + "flyin and pod happy path", + TemplateSchemeFlyin, + flyinBase, + nil, + TemplateVars{ + {defaultRegexes.LogName, "main_logs"}, + {defaultRegexes.Port, "1234"}, + {defaultRegexes.PodName, "my-pod"}, + {defaultRegexes.PodUID, "my-pod-uid"}, + {defaultRegexes.Namespace, "my-namespace"}, + {defaultRegexes.ContainerName, "my-container"}, + {defaultRegexes.ContainerID, "containerID"}, + {defaultRegexes.Hostname, "my-host"}, + {defaultRegexes.PodRFC3339StartTime, "1970-01-01T01:02:03+01:00"}, + {defaultRegexes.PodRFC3339FinishTime, "1970-01-01T04:25:45+01:00"}, + {defaultRegexes.PodUnixStartTime, "123"}, + {defaultRegexes.PodUnixFinishTime, "12345"}, + }, + nil, + nil, + }, + { + "pod with port not affected", + TemplateSchemePod, + podBase, + nil, + nil, + nil, + TemplateVars{ + {defaultRegexes.Port, "1234"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -474,6 +536,76 @@ func TestTemplateLogPlugin(t *testing.T) { }, }, }, + { + "flyin", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + }, + args{ + input: Input{ + PodName: "my-pod-name", + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + "port": "1234", + }, + }, + }, + }, + Output{ + TaskLogs: []*core.TaskLog{ + { + Uri: "vscode://flyin:1234/my-pod-name", + MessageFormat: core.TaskLog_JSON, + }, + }, + }, + }, + { + "flyin - default port", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + }, + args{ + input: Input{ + PodName: "my-pod-name", + TaskTemplate: &core.TaskTemplate{ + Config: map[string]string{ + "link_type": "vscode", + }, + }, + }, + }, + Output{ + TaskLogs: []*core.TaskLog{ + { + Uri: "vscode://flyin:8081/my-pod-name", + MessageFormat: core.TaskLog_JSON, + }, + }, + }, + }, + { + "flyin - no link_type in task template", + TemplateLogPlugin{ + Scheme: TemplateSchemeFlyin, + TemplateURIs: []TemplateURI{"vscode://flyin:{{ .port }}/{{ .podName }}"}, + MessageFormat: core.TaskLog_JSON, + DisplayName: "Flyin Logs", + }, + args{ + input: Input{ + PodName: "my-pod-name", + }, + }, + Output{ + TaskLogs: []*core.TaskLog{}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go index 70f15faf01..c1f4d668c0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/templatescheme_enumer.go @@ -7,9 +7,9 @@ import ( "fmt" ) -const _TemplateSchemeName = "PodTaskExecution" +const _TemplateSchemeName = "PodTaskExecutionFlyin" -var _TemplateSchemeIndex = [...]uint8{0, 3, 16} +var _TemplateSchemeIndex = [...]uint8{0, 3, 16, 21} func (i TemplateScheme) String() string { if i < 0 || i >= TemplateScheme(len(_TemplateSchemeIndex)-1) { @@ -18,11 +18,12 @@ func (i TemplateScheme) String() string { return _TemplateSchemeName[_TemplateSchemeIndex[i]:_TemplateSchemeIndex[i+1]] } -var _TemplateSchemeValues = []TemplateScheme{0, 1} +var _TemplateSchemeValues = []TemplateScheme{0, 1, 2} var _TemplateSchemeNameToValueMap = map[string]TemplateScheme{ - _TemplateSchemeName[0:3]: 0, - _TemplateSchemeName[3:16]: 1, + _TemplateSchemeName[0:3]: 0, + _TemplateSchemeName[3:16]: 1, + _TemplateSchemeName[16:21]: 2, } // TemplateSchemeString retrieves an enum value from the enum constants string name.