From 09cb3b1d869a999281affb2ff5fe4358937bba75 Mon Sep 17 00:00:00 2001 From: Jeev B Date: Fri, 10 Nov 2023 11:14:40 -0800 Subject: [PATCH] Refactor task logs framework (#4396) * Refactor task logs framework Signed-off-by: Jeev B * Return templateLogPluginCollection instead of nil even if no plugins are specified Signed-off-by: Jeev B --------- Signed-off-by: Jeev B --- flyteplugins/go/tasks/logs/config.go | 25 +- flyteplugins/go/tasks/logs/logging_utils.go | 53 ++-- .../go/tasks/logs/logging_utils_test.go | 2 +- .../tasks/pluginmachinery/tasklog/plugin.go | 10 + .../tasks/pluginmachinery/tasklog/template.go | 57 +--- .../pluginmachinery/tasklog/template_test.go | 243 ++++++------------ 6 files changed, 128 insertions(+), 262 deletions(-) diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index 69ef17ed89..ca5a6012a8 100644 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -1,45 +1,34 @@ package logs import ( - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" ) //go:generate pflags LogConfig --default-var=DefaultConfig -// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. -type TemplateURI = string - // LogConfig encapsulates plugins' log configs type LogConfig struct { IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"` // Deprecated: Please use CloudwatchTemplateURI CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."` // Deprecated: Please use CloudwatchTemplateURI - CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."` - CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"` + CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."` + CloudwatchTemplateURI tasklog.TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"` IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"` // Deprecated: Please use KubernetesTemplateURI - KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"` - KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"` + KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"` + KubernetesTemplateURI tasklog.TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"` IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"` // Deprecated: Please use StackDriverTemplateURI GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"` // Deprecated: Please use StackDriverTemplateURI - StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` - StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` - - Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"` -} + StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` + StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` -type TemplateLogPluginConfig struct { - DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` - TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` - MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` - Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` + Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"` } var ( diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 6af1889e9f..4978109458 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -13,11 +13,6 @@ import ( "github.com/flyteorg/flyte/flytestdlib/logger" ) -type logPlugin struct { - Name string - Plugin tasklog.Plugin -} - // Internal func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) { if logPlugin == nil { @@ -66,67 +61,53 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas return logs.TaskLogs, nil } -type taskLogPluginWrapper struct { - logPlugins []logPlugin +type templateLogPluginCollection struct { + plugins []tasklog.TemplateLogPlugin } -func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) { - logs := make([]*core.TaskLog, 0, len(t.logPlugins)) - suffix := input.LogName +func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) { + var taskLogs []*core.TaskLog - for _, plugin := range t.logPlugins { - input.LogName = plugin.Name + suffix - o, err := plugin.Plugin.GetTaskLogs(input) + for _, plugin := range t.plugins { + o, err := plugin.GetTaskLogs(input) if err != nil { return tasklog.Output{}, err } - - logs = append(logs, o.TaskLogs...) + taskLogs = append(taskLogs, o.TaskLogs...) } - return tasklog.Output{ - TaskLogs: logs, - }, nil + return tasklog.Output{TaskLogs: taskLogs}, nil } // InitializeLogPlugins initializes log plugin based on config. func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { // Use a list to maintain order. - logPlugins := make([]logPlugin, 0, 2) + var plugins []tasklog.TemplateLogPlugin if cfg.IsKubernetesEnabled { if len(cfg.KubernetesTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsCloudwatchEnabled { if len(cfg.CloudwatchTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsStackDriverEnabled { if len(cfg.StackDriverTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON}) } } - if len(cfg.Templates) > 0 { - for _, cfg := range cfg.Templates { - logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.Scheme, cfg.TemplateURIs, cfg.MessageFormat)}) - } - } - - if len(logPlugins) == 0 { - return nil, nil - } - - return taskLogPluginWrapper{logPlugins: logPlugins}, nil + plugins = append(plugins, cfg.Templates...) + return templateLogPluginCollection{plugins: plugins}, nil } diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 066fdd96c8..91048eff16 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -320,7 +320,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c func TestGetLogsForContainerInPod_Templates(t *testing.T) { assertTestSucceeded(t, &LogConfig{ - Templates: []TemplateLogPluginConfig{ + Templates: []tasklog.TemplateLogPlugin{ { DisplayName: "StackDriver", TemplateURIs: []string{ diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index b812221f6d..c43da02e58 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -16,6 +16,9 @@ const ( TemplateSchemeTaskExecution ) +// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. +type TemplateURI = string + type TemplateVar struct { Regex *regexp.Regexp Value string @@ -57,3 +60,10 @@ type Plugin interface { // Generates a TaskLog object given necessary computation information GetTaskLogs(i Input) (logs Output, err error) } + +type TemplateLogPlugin struct { + DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` + TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` + MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` + Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` +} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 77c49d2695..750b1972df 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -34,6 +34,7 @@ type templateRegexes struct { ExecutionName *regexp.Regexp ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp + GeneratedName *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -58,6 +59,7 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionName"), MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), + MustCreateRegex("generatedName"), } } @@ -121,6 +123,10 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { defaultRegexes.NodeID, input.TaskExecutionID.GetUniqueNodeID(), }, + TemplateVar{ + defaultRegexes.GeneratedName, + input.TaskExecutionID.GetGeneratedName(), + }, TemplateVar{ defaultRegexes.TaskRetryAttempt, strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10), @@ -172,55 +178,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { return vars } -// A simple log plugin that supports templates in urls to build the final log link. -// See `defaultRegexes` for supported templates. -type TemplateLogPlugin struct { - scheme TemplateScheme - templateUris []string - messageFormat core.TaskLog_MessageFormat -} - -func (s TemplateLogPlugin) GetTaskLog(podName, podUID, namespace, containerName, containerID, logName string, podRFC3339StartTime string, podRFC3339FinishTime string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) { - o, err := s.GetTaskLogs(Input{ - LogName: logName, - Namespace: namespace, - PodName: podName, - PodUID: podUID, - ContainerName: containerName, - ContainerID: containerID, - PodRFC3339StartTime: podRFC3339StartTime, - PodRFC3339FinishTime: podRFC3339FinishTime, - PodUnixStartTime: podUnixStartTime, - PodUnixFinishTime: podUnixFinishTime, - }) - - if err != nil || len(o.TaskLogs) == 0 { - return core.TaskLog{}, err - } - - return *o.TaskLogs[0], nil -} - -func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { - templateVars := input.templateVarsForScheme(s.scheme) - taskLogs := make([]*core.TaskLog, 0, len(s.templateUris)) - for _, templateURI := range s.templateUris { +func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { + templateVars := input.templateVarsForScheme(p.Scheme) + taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) + for _, templateURI := range p.TemplateURIs { taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), - Name: input.LogName, - MessageFormat: s.messageFormat, + Name: p.DisplayName + input.LogName, + MessageFormat: p.MessageFormat, }) } return Output{TaskLogs: taskLogs}, nil } - -// NewTemplateLogPlugin creates a template-based log plugin with the provided template Uri and message format. -// See `defaultRegexes` for supported templates. -func NewTemplateLogPlugin(scheme TemplateScheme, templateUris []string, messageFormat core.TaskLog_MessageFormat) TemplateLogPlugin { - return TemplateLogPlugin{ - scheme: scheme, - templateUris: templateUris, - messageFormat: messageFormat, - } -} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index 320ece05a4..f279707a3b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -5,7 +5,6 @@ import ( "regexp" "testing" - "github.com/go-test/deep" "github.com/stretchr/testify/assert" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -13,26 +12,6 @@ import ( coreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" ) -func TestTemplateLog(t *testing.T) { - p := NewTemplateLogPlugin(TemplateSchemePod, []string{"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.podUID}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log"}, core.TaskLog_JSON) - tl, err := p.GetTaskLog( - "f-uuid-driver", - "pod-uid", - "flyteexamples-production", - "spark-kubernetes-driver", - "cri-o://abc", - "main_logs", - "2015-03-14T17:08:14+01:00", - "2021-06-15T20:47:57+02:00", - 1426349294, - 1623782877, - ) - assert.NoError(t, err) - assert.Equal(t, tl.GetName(), "main_logs") - assert.Equal(t, tl.GetMessageFormat(), core.TaskLog_JSON) - assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_pod-uid_flyteexamples-production_spark-kubernetes-driver-abc.log", tl.Uri) -} - // Latest Run: Benchmark_mustInitTemplateRegexes-16 45960 26914 ns/op func Benchmark_initDefaultRegexes(b *testing.B) { for i := 0; i < b.N; i++ { @@ -172,6 +151,7 @@ func Test_Input_templateVarsForScheme(t *testing.T) { TemplateVars{ {defaultRegexes.LogName, "main_logs"}, {defaultRegexes.NodeID, "n0-0-n0"}, + {defaultRegexes.GeneratedName, "generated-name"}, {defaultRegexes.TaskRetryAttempt, "1"}, {defaultRegexes.TaskID, "my-task-name"}, {defaultRegexes.TaskVersion, "1"}, @@ -245,147 +225,99 @@ func Test_Input_templateVarsForScheme(t *testing.T) { } } -func Test_templateLogPlugin_Regression(t *testing.T) { - type fields struct { - templateURI string - messageFormat core.TaskLog_MessageFormat - } +func TestTemplateLogPlugin(t *testing.T) { type args struct { - podName string - podUID string - namespace string - containerName string - containerID string - logName string - podRFC3339StartTime string - podRFC3339FinishTime string - podUnixStartTime int64 - podUnixFinishTime int64 + input Input } tests := []struct { - name string - fields fields - args args - want core.TaskLog - wantErr bool + name string + plugin TemplateLogPlugin + args args + want Output }{ { "cloudwatch", - fields{ - templateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "f-uuid-driver", - podUID: "pod-uid", - namespace: "flyteexamples-production", - containerName: "spark-kubernetes-driver", - containerID: "cri-o://abc", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "f-uuid-driver", + PodUID: "pod-uid", + Namespace: "flyteexamples-production", + ContainerName: "spark-kubernetes-driver", + ContainerID: "cri-o://abc", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_flyteexamples-production_spark-kubernetes-driver-abc.log", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, { "stackdriver", - fields{ - templateURI: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "podName", - podUID: "pod-uid", - namespace: "flyteexamples-production", - containerName: "spark-kubernetes-driver", - containerID: "cri-o://abc", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "podName", + PodUID: "pod-uid", + Namespace: "flyteexamples-production", + ContainerName: "spark-kubernetes-driver", + ContainerID: "cri-o://abc", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3DpodName", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, { "kubernetes", - fields{ - templateURI: "https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "flyteexamples-development-task-name", - podUID: "pod-uid", - namespace: "flyteexamples-development", - containerName: "ignore", - containerID: "ignore", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "flyteexamples-development-task-name", + PodUID: "pod-uid", + Namespace: "flyteexamples-development", + ContainerName: "ignore", + ContainerID: "ignore", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://dashboard.k8s.net/#!/log/flyteexamples-development/flyteexamples-development-task-name/pod?namespace=flyteexamples-development", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := TemplateLogPlugin{ - templateUris: []string{tt.fields.templateURI}, - messageFormat: tt.fields.messageFormat, - } - - got, err := s.GetTaskLog(tt.args.podName, tt.args.podUID, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName, tt.args.podRFC3339FinishTime, tt.args.podRFC3339FinishTime, tt.args.podUnixStartTime, tt.args.podUnixFinishTime) - if (err != nil) != tt.wantErr { - t.Errorf("GetTaskLog() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if diff := deep.Equal(got, tt.want); diff != nil { - t.Errorf("GetTaskLog() got = %v, want %v, diff: %v", got, tt.want, diff) - } - }) - } -} - -func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { - type fields struct { - scheme TemplateScheme - templateURI string - messageFormat core.TaskLog_MessageFormat - } - type args struct { - input Input - } - tests := []struct { - name string - fields fields - args args - want Output - wantErr bool - }{ { "splunk", - fields{ - templateURI: "https://prd-p-ighar.splunkcloud.com/en-US/app/search/search?q=search%20container_name%3D%22{{ .containerName }}%22", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://prd-p-ighar.splunkcloud.com/en-US/app/search/search?q=search%20container_name%3D%22{{ .containerName }}%22"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -410,13 +342,12 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "ddog", - fields{ - templateURI: "https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -441,13 +372,12 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "stackdriver-with-rfc3339-timestamp", - fields{ - templateURI: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}%20%22{{.podRFC3339StartTime}}%22", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}%20%22{{.podRFC3339StartTime}}%22"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -472,14 +402,13 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "task-with-task-execution-identifier", - fields{ - scheme: TemplateSchemeTaskExecution, - templateURI: "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + Scheme: TemplateSchemeTaskExecution, + TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -505,14 +434,13 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "mapped-task-with-task-execution-identifier", - fields{ - scheme: TemplateSchemeTaskExecution, - templateURI: "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + Scheme: TemplateSchemeTaskExecution, + TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -545,23 +473,14 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := TemplateLogPlugin{ - scheme: tt.fields.scheme, - templateUris: []string{tt.fields.templateURI}, - messageFormat: tt.fields.messageFormat, - } - got, err := s.GetTaskLogs(tt.args.input) - if (err != nil) != tt.wantErr { - t.Errorf("NewTaskLog() error = %v, wantErr %v", err, tt.wantErr) - return - } + got, err := tt.plugin.GetTaskLogs(tt.args.input) + assert.NoError(t, err) if !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewTaskLog() got = %v, want %v", got, tt.want) + t.Errorf("GetTaskLogs() got = %v, want %v", got, tt.want) } }) }