Skip to content

Commit

Permalink
Evaluate templatizable taskConfig values at runtime
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Jan 27, 2024
1 parent 8049e31 commit 6ca8fec
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 40 deletions.
21 changes: 9 additions & 12 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type templateRegexes struct {
ExecutionProject *regexp.Regexp
ExecutionDomain *regexp.Regexp
GeneratedName *regexp.Regexp
Port *regexp.Regexp
}

func initDefaultRegexes() templateRegexes {
Expand All @@ -67,7 +66,6 @@ func initDefaultRegexes() templateRegexes {
MustCreateRegex("executionProject"),
MustCreateRegex("executionDomain"),
MustCreateRegex("generatedName"),
MustCreateDynamicLogRegex("port"),
}
}

Expand All @@ -82,7 +80,7 @@ func replaceAll(template string, vars []TemplateVar) string {
return template
}

func (input Input) templateVarsForScheme() []TemplateVar {
func (input Input) templateVars() []TemplateVar {
vars := []TemplateVar{
TemplateVar{defaultRegexes.LogName, input.LogName},
}
Expand All @@ -92,14 +90,6 @@ func (input Input) templateVarsForScheme() []TemplateVar {
vars = append(vars, input.ExtraTemplateVars...)
}

port := input.TaskTemplate.GetConfig()["port"]
if port != "" {
vars = append(
vars,
TemplateVar{defaultRegexes.Port, port},
)
}

// Container IDs are prefixed with docker://, cri-o://, etc. which is stripped by fluentd before pushing to a log
// stream. Therefore, we must also strip the prefix.
containerID := input.ContainerID
Expand Down Expand Up @@ -193,6 +183,13 @@ func (input Input) templateVarsForScheme() []TemplateVar {
},
)

// Add values from task template config as dynamic regexes (i.e. templates prefixed by .taskConfig)
for key, value := range input.TaskTemplate.GetConfig() {
if value != "" {
vars = append(vars, TemplateVar{MustCreateDynamicLogRegex(key), value})
}
}

return vars
}

Expand All @@ -212,7 +209,7 @@ func getDynamicLogLinkTypes(taskTemplate *core.TaskTemplate) []string {
}

func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
templateVars := input.templateVarsForScheme()
templateVars := input.templateVars()
taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs))
for _, templateURI := range p.TemplateURIs {
taskLogs = append(taskLogs, &core.TaskLog{
Expand Down
45 changes: 17 additions & 28 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,23 @@ func dummyTaskExecID() pluginCore.TaskExecutionID {
return tID
}

func Test_Input_templateVarsForScheme(t *testing.T) {
func Test_Input_templateVars(t *testing.T) {
testRegexes := struct {
Foo *regexp.Regexp
Bar *regexp.Regexp
Baz *regexp.Regexp
Ham *regexp.Regexp
Spam *regexp.Regexp
Foo *regexp.Regexp
Bar *regexp.Regexp
Baz *regexp.Regexp
Ham *regexp.Regexp
Spam *regexp.Regexp
LinkType *regexp.Regexp
Port *regexp.Regexp
}{
MustCreateRegex("foo"),
MustCreateRegex("bar"),
MustCreateRegex("baz"),
MustCreateRegex("ham"),
MustCreateRegex("spam"),
MustCreateDynamicLogRegex("link_type"),
MustCreateDynamicLogRegex("port"),
}
podBase := Input{
HostName: "my-host",
Expand Down Expand Up @@ -93,7 +97,8 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
PodUnixFinishTime: 12345,
TaskTemplate: &core.TaskTemplate{
Config: map[string]string{
"port": "1234",
"link_type": "vscode",
"port": "1234",
},
},
}
Expand Down Expand Up @@ -188,30 +193,13 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
},
nil,
},
// {
// "task execution with unused extra vars",
// TemplateSchemeTaskExecution,
// taskExecutionBase,
// &TemplateVarsByScheme{
// Pod: TemplateVars{
// {testRegexes.Bar, "bar"},
// {testRegexes.Baz, "baz"},
// },
// },
// nil,
// nil,
// TemplateVars{
// {testRegexes.Bar, "bar"},
// {testRegexes.Baz, "baz"},
// },
// },
{
"flyin happy path",
flyinBase,
nil,
nil,
[]TemplateVar{
{defaultRegexes.Port, "1234"},
{testRegexes.Port, "1234"},
},
nil,
},
Expand All @@ -221,7 +209,6 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
nil,
[]TemplateVar{
{defaultRegexes.LogName, "main_logs"},
{defaultRegexes.Port, "1234"},
{defaultRegexes.PodName, "my-pod"},
{defaultRegexes.PodUID, "my-pod-uid"},
{defaultRegexes.Namespace, "my-namespace"},
Expand All @@ -232,6 +219,8 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
{defaultRegexes.PodRFC3339FinishTime, "1970-01-01T04:25:45+01:00"},
{defaultRegexes.PodUnixStartTime, "123"},
{defaultRegexes.PodUnixFinishTime, "12345"},
{testRegexes.LinkType, "vscode"},
{testRegexes.Port, "1234"},
},
nil,
nil,
Expand All @@ -243,15 +232,15 @@ func Test_Input_templateVarsForScheme(t *testing.T) {
nil,
nil,
[]TemplateVar{
{defaultRegexes.Port, "1234"},
{testRegexes.Port, "1234"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
base := tt.baseVars
base.ExtraTemplateVars = tt.extraVars
got := base.templateVarsForScheme()
got := base.templateVars()
if tt.exact != nil {
assert.Equal(t, got, tt.exact)
}
Expand Down

0 comments on commit 6ca8fec

Please sign in to comment.