Skip to content

Commit

Permalink
Remove scheme switch
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Jan 26, 2024
1 parent 0a42ca1 commit 4b6dbc3
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 112 deletions.
20 changes: 0 additions & 20 deletions flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,26 +378,6 @@ func TestGetLogsForContainerInPod_Flyin(t *testing.T) {
&core.TaskTemplate{},
nil,
},
{
"Flyin enabled but no port in task template",
&LogConfig{
DynamicLogLinks: map[string]tasklog.TemplateURI{
"vscode": "https://flyin.mydomain.com:{{ .taskConfig.port }}/{{ .namespace }}/{{ .podName }}/{{ .containerName }}/{{ .containerId }}",
},
},
&core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
},
},
[]*core.TaskLog{
{
Uri: "https://flyin.mydomain.com:8080/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "vscode logs my-Suffix",
},
},
},
{
"Flyin disabled but config present in TaskTemplate",
&LogConfig{},
Expand Down
58 changes: 27 additions & 31 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,38 +92,34 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Common...)
}

switch scheme {
case TemplateSchemeDynamic:
port := input.TaskTemplate.GetConfig()["port"]
if port == "" {
port = "8080"
}
port := input.TaskTemplate.GetConfig()["port"]
if port != "" {
vars = append(
vars,
TemplateVar{defaultRegexes.Port, port},
)
fallthrough
case TemplateSchemePod:
// Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log
// stream. Therefore, we must also strip the prefix.
containerID := input.ContainerID
stripDelimiter := "://"
if split := strings.Split(input.ContainerID, stripDelimiter); len(split) > 1 {
containerID = split[1]
}
vars = append(
vars,
TemplateVar{defaultRegexes.PodName, input.PodName},
TemplateVar{defaultRegexes.PodUID, input.PodUID},
TemplateVar{defaultRegexes.Namespace, input.Namespace},
TemplateVar{defaultRegexes.ContainerName, input.ContainerName},
TemplateVar{defaultRegexes.ContainerID, containerID},
TemplateVar{defaultRegexes.Hostname, input.HostName},
)
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...)
}
case TemplateSchemeTaskExecution:
}

// Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log
// stream. Therefore, we must also strip the prefix.
containerID := input.ContainerID
stripDelimiter := "://"
if split := strings.Split(input.ContainerID, stripDelimiter); len(split) > 1 {
containerID = split[1]
}
vars = append(
vars,
TemplateVar{defaultRegexes.PodName, input.PodName},
TemplateVar{defaultRegexes.PodUID, input.PodUID},
TemplateVar{defaultRegexes.Namespace, input.Namespace},
TemplateVar{defaultRegexes.ContainerName, input.ContainerName},
TemplateVar{defaultRegexes.ContainerID, containerID},
TemplateVar{defaultRegexes.Hostname, input.HostName},
)
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...)
}
if input.TaskExecutionID != nil {
taskExecutionIdentifier := input.TaskExecutionID.GetID()
vars = append(
vars,
Expand Down Expand Up @@ -178,9 +174,9 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
},
)
}
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...)
}
}
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...)
}

vars = append(
Expand Down
111 changes: 50 additions & 61 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,19 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
},
nil,
},
{
"pod with unused extra vars",
TemplateSchemePod,
podBase,
&TemplateVarsByScheme{
TaskExecution: TemplateVars{
{testRegexes.Bar, "bar"},
{testRegexes.Baz, "baz"},
},
},
nil,
nil,
TemplateVars{
{testRegexes.Bar, "bar"},
{testRegexes.Baz, "baz"},
},
},
{
"task execution happy path",
TemplateSchemeTaskExecution,
taskExecutionBase,
nil,
TemplateVars{
{defaultRegexes.LogName, "main_logs"},
{defaultRegexes.PodName, ""},
{defaultRegexes.PodUID, ""},
{defaultRegexes.Namespace, ""},
{defaultRegexes.ContainerName, ""},
{defaultRegexes.ContainerID, ""},
{defaultRegexes.Hostname, ""},
{defaultRegexes.NodeID, "n0-0-n0"},
{defaultRegexes.GeneratedName, "generated-name"},
{defaultRegexes.TaskRetryAttempt, "1"},
Expand Down Expand Up @@ -212,23 +201,23 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
},
nil,
},
{
"task execution with unused extra vars",
TemplateSchemeTaskExecution,
taskExecutionBase,
&TemplateVarsByScheme{
Pod: TemplateVars{
{testRegexes.Bar, "bar"},
{testRegexes.Baz, "baz"},
},
},
nil,
nil,
TemplateVars{
{testRegexes.Bar, "bar"},
{testRegexes.Baz, "baz"},
},
},
// {
// "task execution with unused extra vars",
// TemplateSchemeTaskExecution,
// taskExecutionBase,
// &TemplateVarsByScheme{
// Pod: TemplateVars{
// {testRegexes.Bar, "bar"},
// {testRegexes.Baz, "baz"},
// },
// },
// nil,
// nil,
// TemplateVars{
// {testRegexes.Bar, "bar"},
// {testRegexes.Baz, "baz"},
// },
// },
{
"flyin happy path",
TemplateSchemeDynamic,
Expand Down Expand Up @@ -573,33 +562,33 @@ func TestTemplateLogPlugin(t *testing.T) {
},
},
},
{
"flyin - default port",
TemplateLogPlugin{
Name: "vscode",
Scheme: TemplateSchemeDynamic,
DynamicTemplateURIs: []TemplateURI{"vscode://flyin:{{ .taskConfig.port }}/{{ .podName }}"},
MessageFormat: core.TaskLog_JSON,
},
args{
input: Input{
PodName: "my-pod-name",
TaskTemplate: &core.TaskTemplate{
Config: map[string]string{
"link_type": "vscode",
},
},
},
},
Output{
TaskLogs: []*core.TaskLog{
{
Uri: "vscode://flyin:8080/my-pod-name",
MessageFormat: core.TaskLog_JSON,
},
},
},
},
// {
// "flyin - default port",
// TemplateLogPlugin{
// Name: "vscode",
// Scheme: TemplateSchemeDynamic,
// DynamicTemplateURIs: []TemplateURI{"vscode://flyin:{{ .taskConfig.port }}/{{ .podName }}"},
// MessageFormat: core.TaskLog_JSON,
// },
// args{
// input: Input{
// PodName: "my-pod-name",
// TaskTemplate: &core.TaskTemplate{
// Config: map[string]string{
// "link_type": "vscode",
// },
// },
// },
// },
// Output{
// TaskLogs: []*core.TaskLog{
// {
// Uri: "vscode://flyin:8080/my-pod-name",
// MessageFormat: core.TaskLog_JSON,
// },
// },
// },
// },
{
"flyin - no link_type in task template",
TemplateLogPlugin{
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta

tID := &mocks.TaskExecutionID{}
tID.OnGetGeneratedName().Return("notfound")
tID.On("GetUniqueNodeID").Return("an-unique-id")
tID.OnGetID().Return(core2.TaskExecutionIdentifier{
TaskId: &core2.Identifier{
ResourceType: core2.ResourceType_TASK,
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func dummyDaskTaskContext(taskTemplate *core.TaskTemplate, resources *v1.Resourc
},
})
tID.On("GetGeneratedName").Return(testTaskID)
tID.On("GetUniqueNodeID").Return("an-unique-id")

taskExecutionMetadata := &mocks.TaskExecutionMetadata{}
taskExecutionMetadata.OnGetTaskExecutionID().Return(tID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ func dummyTaskContext() pluginsCore.TaskExecutionContext {
},
RetryAttempt: 0,
})
tID.OnGetGeneratedName().Return("some-acceptable-name")
tID.On("GetUniqueNodeID").Return("an-unique-id")

taskExecutionMetadata := &mocks.TaskExecutionMetadata{}
taskExecutionMetadata.OnGetTaskExecutionID().Return(tID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func dummyMPITaskContext(taskTemplate *core.TaskTemplate, resources *corev1.Reso
},
})
tID.OnGetGeneratedName().Return("some-acceptable-name")
tID.On("GetUniqueNodeID").Return("an-unique-id")

overrides := &mocks.TaskOverrides{}
overrides.OnGetResources().Return(resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func dummyPytorchTaskContext(taskTemplate *core.TaskTemplate, resources *corev1.
},
})
tID.OnGetGeneratedName().Return("some-acceptable-name")
tID.On("GetUniqueNodeID").Return("an-unique-id")

overrides := &mocks.TaskOverrides{}
overrides.OnGetResources().Return(resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func dummyTensorFlowTaskContext(taskTemplate *core.TaskTemplate, resources *core
},
})
tID.OnGetGeneratedName().Return("some-acceptable-name")
tID.On("GetUniqueNodeID").Return("an-unique-id")

overrides := &mocks.TaskOverrides{}
overrides.OnGetResources().Return(resources)
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/k8s/pod/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func dummySidecarTaskMetadata(resources *v1.ResourceRequirements, extendedResour
},
})
tID.On("GetGeneratedName").Return("my_project:my_domain:my_name")
tID.On("GetUniqueNodeID").Return("an-unique-id")
taskMetadata.On("GetTaskExecutionID").Return(tID)

to := &pluginsCoreMock.TaskOverrides{}
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate, interruptible bool)
},
})
tID.On("GetGeneratedName").Return("some-acceptable-name")
tID.On("GetUniqueNodeID").Return("an-unique-id")

overrides := &mocks.TaskOverrides{}
overrides.On("GetResources").Return(&corev1.ResourceRequirements{})
Expand Down

0 comments on commit 4b6dbc3

Please sign in to comment.