Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flyin propeller config #4610

Merged
merged 11 commits into from
Dec 20, 2023
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type LogConfig struct {
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

IsFlyinEnabled bool `json:"flyin-enabled" pflag:",Enable Log-links to flyin logs"`
FlyinTemplateURI tasklog.TemplateURI `json:"flyin-template-uri" pflag:",Template Uri to use when building flyin log links"`

Comment on lines +31 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this specific to flyin? This should be a map of string & tasklog.TemplateLogPlugin... task templates should be able to enable any number of them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sync'd offline and the idea is to make this more extensible by providing a way for tasks to dynamically set log links. I'll be sending a follow up PR shortly.

Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"`
}

Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/logs/logconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions flyteplugins/go/tasks/logs/logconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand Down Expand Up @@ -51,6 +51,7 @@
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
TaskTemplate: taskTemplate,
},
)

Expand Down Expand Up @@ -108,6 +109,14 @@
}
}

if cfg.IsFlyinEnabled {
if len(cfg.FlyinTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{cfg.FlyinTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Flyin Logs", Scheme: tasklog.TemplateSchemeFlyin, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://flyin.%s/logs/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}", cfg.GCPProjectName)}, MessageFormat: core.TaskLog_JSON})
}

Check warning on line 117 in flyteplugins/go/tasks/logs/logging_utils.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/logs/logging_utils.go#L116-L117

Added lines #L116 - L117 were not covered by tests
}

plugins = append(plugins, cfg.Templates...)
return templateLogPluginCollection{plugins: plugins}, nil
}
54 changes: 41 additions & 13 deletions flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func dummyTaskExecID() pluginCore.TaskExecutionID {
func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) {
logPlugin, err := InitializeLogPlugins(&LogConfig{})
assert.NoError(t, err)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, l)
}
Expand All @@ -56,7 +56,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) {
CloudwatchLogGroup: "/kubernetes/flyte-production",
})
assert.NoError(t, err)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand All @@ -112,7 +112,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil, nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 2)
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil, nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand All @@ -252,7 +252,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) {

IsStackDriverEnabled: true,
StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
}, []*core.TaskLog{
}, nil, []*core.TaskLog{
{
Uri: "https://k8s-my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Expand All @@ -275,7 +275,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) {
assertTestSucceeded(t, &LogConfig{
IsStackDriverEnabled: true,
StackDriverTemplateURI: "https://sd-my-log-server/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
}, []*core.TaskLog{
}, nil, []*core.TaskLog{
{
Uri: "https://sd-my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Expand All @@ -285,7 +285,7 @@ func TestGetLogsForContainerInPod_LegacyTemplate(t *testing.T) {
})
}

func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*core.TaskLog) {
func assertTestSucceeded(tb testing.TB, config *LogConfig, taskTemplate *core.TaskTemplate, expectedTaskLogs []*core.TaskLog) {
logPlugin, err := InitializeLogPlugins(config)
assert.NoError(tb, err)

Expand All @@ -310,7 +310,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c
},
}

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil, taskTemplate)
assert.Nil(tb, err)
assert.Len(tb, logs, len(expectedTaskLogs))
if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 {
Expand All @@ -337,7 +337,7 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
Scheme: tasklog.TemplateSchemeTaskExecution,
},
},
}, []*core.TaskLog{
}, nil, []*core.TaskLog{
{
Uri: "https://my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Expand All @@ -350,3 +350,31 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
},
})
}

func TestGetLogsForContainerInPod_Flyin(t *testing.T) {
assertTestSucceeded(t,
&LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "https://k8s.com",
IsFlyinEnabled: true,
FlyinTemplateURI: "https://flyin.mydomain.com:{{ .port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
"port": "65535",
},
},
[]*core.TaskLog{
{
Uri: "https://k8s.com",
MessageFormat: core.TaskLog_JSON,
Name: "Kubernetes Logs my-Suffix",
},
{
Uri: "https://flyin.mydomain.com:65535/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "Flyin Logs my-Suffix",
},
})
}
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TemplateScheme int
const (
TemplateSchemePod TemplateScheme = iota
TemplateSchemeTaskExecution
TemplateSchemeFlyin
)

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
Expand All @@ -30,6 +31,7 @@ type TemplateVarsByScheme struct {
Common TemplateVars
Pod TemplateVars
TaskExecution TemplateVars
Flyin TemplateVars
}

// Input contains all available information about task's execution that a log plugin can use to construct task's
Expand All @@ -48,6 +50,7 @@ type Input struct {
PodUID string
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVarsByScheme *TemplateVarsByScheme
TaskTemplate *core.TaskTemplate
}

// Output contains all task logs a plugin generates for a given Input.
Expand Down
27 changes: 27 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type templateRegexes struct {
ExecutionProject *regexp.Regexp
ExecutionDomain *regexp.Regexp
GeneratedName *regexp.Regexp
Port *regexp.Regexp
}

func initDefaultRegexes() templateRegexes {
Expand All @@ -60,6 +61,7 @@ func initDefaultRegexes() templateRegexes {
MustCreateRegex("executionProject"),
MustCreateRegex("executionDomain"),
MustCreateRegex("generatedName"),
MustCreateRegex("port"),
}
}

Expand All @@ -85,6 +87,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
}

switch scheme {
case TemplateSchemeFlyin:
port := input.TaskTemplate.GetConfig()["port"]
if port == "" {
port = "8080"
}
vars = append(
vars,
TemplateVar{defaultRegexes.Port, port},
)
fallthrough
case TemplateSchemePod:
// Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log
// stream. Therefore, we must also strip the prefix.
Expand Down Expand Up @@ -181,7 +193,22 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(p.Scheme)
taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs))

// Grab metadata from task template and check if key "link_type" is set to "vscode".
// If so, add a vscode link to the task logs.
isFlyin := false
if input.TaskTemplate != nil && input.TaskTemplate.GetConfig() != nil {
config := input.TaskTemplate.GetConfig()
if config != nil && config["link_type"] == "vscode" {
isFlyin = true
}
}
for _, templateURI := range p.TemplateURIs {
// Skip Flyin logs if plugin is enabled but no metadata is defined in input's task template.
// This is to prevent Flyin logs from being generated for tasks that don't have a Flyin metadata section.
if p.DisplayName == "Flyin Logs" && !isFlyin {
continue
}
taskLogs = append(taskLogs, &core.TaskLog{
Uri: replaceAll(templateURI, templateVars),
Name: p.DisplayName + input.LogName,
Expand Down
Loading
Loading