Skip to content

Commit

Permalink
Remove scheme from log links
Browse files Browse the repository at this point in the history
  • Loading branch information
eapolinario committed Jan 27, 2024
1 parent 4b6dbc3 commit 8049e31
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 229 deletions.
41 changes: 20 additions & 21 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVars []tasklog.TemplateVar, taskTemplate *core.TaskTemplate) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand All @@ -39,19 +39,19 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas

logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
TaskTemplate: taskTemplate,
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionID: taskExecID,
ExtraTemplateVars: extraLogTemplateVars,
TaskTemplate: taskTemplate,
},
)

Expand Down Expand Up @@ -89,25 +89,25 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON})
} else {
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON})
}
}

Expand All @@ -117,7 +117,6 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
tasklog.TemplateLogPlugin{
Name: logLinkType,
DisplayName: fmt.Sprintf("%s logs", logLinkType),
Scheme: tasklog.TemplateSchemeDynamic,
DynamicTemplateURIs: []tasklog.TemplateURI{
dynamicLogLink,
},
Expand Down
1 change: 0 additions & 1 deletion flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs",
},
MessageFormat: core.TaskLog_JSON,
Scheme: tasklog.TemplateSchemeTaskExecution,
},
},
}, nil, []*core.TaskLog{
Expand Down
48 changes: 15 additions & 33 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,6 @@ import (
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
)

//go:generate enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml

type TemplateScheme int

const (
TemplateSchemePod TemplateScheme = iota
TemplateSchemeTaskExecution
TemplateSchemeDynamic
)

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

Expand All @@ -25,31 +15,23 @@ type TemplateVar struct {
Value string
}

type TemplateVars []TemplateVar

type TemplateVarsByScheme struct {
Common TemplateVars
Pod TemplateVars
TaskExecution TemplateVars
}

// Input contains all available information about task's execution that a log plugin can use to construct task's
// log links.
type Input struct {
HostName string
PodName string
Namespace string
ContainerName string
ContainerID string
LogName string
PodRFC3339StartTime string
PodRFC3339FinishTime string
PodUnixStartTime int64
PodUnixFinishTime int64
PodUID string
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVarsByScheme *TemplateVarsByScheme
TaskTemplate *core.TaskTemplate
HostName string
PodName string
Namespace string
ContainerName string
ContainerID string
LogName string
PodRFC3339StartTime string
PodRFC3339FinishTime string
PodUnixStartTime int64
PodUnixFinishTime int64
PodUID string
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVars []TemplateVar
TaskTemplate *core.TaskTemplate
}

// Output contains all task logs a plugin generates for a given Input.
Expand All @@ -64,10 +46,10 @@ type Plugin interface {
}

type TemplateLogPlugin struct {
// TODO: these don't need pflags anymore?
Name string `json:"name" pflag:",Name of the plugin."`
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
DynamicTemplateURIs []TemplateURI `json:"dynamictemplateUris" pflag:",URI Templates for generating dynamic task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
}
18 changes: 9 additions & 9 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func initDefaultRegexes() templateRegexes {

var defaultRegexes = initDefaultRegexes()

func replaceAll(template string, vars TemplateVars) string {
func replaceAll(template string, vars []TemplateVar) string {
for _, v := range vars {
if len(v.Value) > 0 {
template = v.Regex.ReplaceAllLiteralString(template, v.Value)
Expand All @@ -82,14 +82,14 @@ func replaceAll(template string, vars TemplateVars) string {
return template
}

func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
vars := TemplateVars{
{defaultRegexes.LogName, input.LogName},
func (input Input) templateVarsForScheme() []TemplateVar {
vars := []TemplateVar{
TemplateVar{defaultRegexes.LogName, input.LogName},
}

gotExtraTemplateVars := input.ExtraTemplateVarsByScheme != nil
gotExtraTemplateVars := input.ExtraTemplateVars != nil
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Common...)
vars = append(vars, input.ExtraTemplateVars...)
}

port := input.TaskTemplate.GetConfig()["port"]
Expand Down Expand Up @@ -117,7 +117,7 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
TemplateVar{defaultRegexes.Hostname, input.HostName},
)
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...)
vars = append(vars, input.ExtraTemplateVars...)
}
if input.TaskExecutionID != nil {
taskExecutionIdentifier := input.TaskExecutionID.GetID()
Expand Down Expand Up @@ -176,7 +176,7 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
}
}
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...)
vars = append(vars, input.ExtraTemplateVars...)
}

vars = append(
Expand Down Expand Up @@ -212,7 +212,7 @@ func getDynamicLogLinkTypes(taskTemplate *core.TaskTemplate) []string {
}

func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme(p.Scheme)
templateVars := input.templateVarsForScheme()
taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs))
for _, templateURI := range p.TemplateURIs {
taskLogs = append(taskLogs, &core.TaskLog{
Expand Down
Loading

0 comments on commit 8049e31

Please sign in to comment.