Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flyteconsole url to FlyteWorkflow CRD #5449

Merged
merged 14 commits into from
Jun 14, 2024
Merged
1 change: 1 addition & 0 deletions flyteadmin/pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
MaxParallelism: 25,
K8SServiceAccount: "",
UseOffloadedWorkflowClosure: false,
ConsoleURL: "",
})

var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type ApplicationConfig struct {
Envs map[string]string `json:"envs,omitempty"`

FeatureGates FeatureGates `json:"featureGates" pflag:",Enable experimental features."`

// A URL pointing to the flyteconsole instance used to hit this flyteadmin instance.
ConsoleURL string `json:"consoleUrl,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ConsoleURL string `json:"consoleUrl,omitempty"`
ConsoleURL string `json:"consoleUrl,omitempty" pflag:",A URL pointing to the flyteconsole instance used to hit this flyteadmin instance."`

}

func (a *ApplicationConfig) GetRoleNameKey() string {
Expand Down
4 changes: 4 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
flyteWf.Tasks = nil
}

if e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL != "" {
flyteWf.ConsoleURL = e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL

Check warning on line 59 in flyteadmin/pkg/workflowengine/impl/k8s_executor.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/workflowengine/impl/k8s_executor.go#L59

Added line #L59 was not covered by tests
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL != "" {
flyteWf.ConsoleURL = e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL
}
if consoleURL := e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL; len(consoleURL) > 0 {
flyteWf.ConsoleURL = consoleURL
}


executionTargetSpec := executioncluster.ExecutionTargetSpec{
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ type TaskExecutionMetadata interface {
GetPlatformResources() *v1.ResourceRequirements
GetInterruptibleFailureThreshold() int32
GetEnvironmentVariables() map[string]string
GetConsoleURL() string
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (e ErrorCollection) Error() string {

// Parameters struct is used by the Templating Engine to replace the templated parameters
type Parameters struct {
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
TaskExecMetadata core.TaskExecutionMetadata
Inputs io.InputReader
OutputPath io.OutputFilePaths
Task core.TaskTemplatePath
IncludeConsoleURL bool
}

// Render Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,12 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.
}
container.Args = modifiedArgs

container.Env, container.EnvFrom = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID())
// The flyteconsole url is added based on the `IncludeConsoleURL` bit set via the task template
consoleURL := ""
if parameters.IncludeConsoleURL {
consoleURL = parameters.TaskExecMetadata.GetConsoleURL()
}
container.Env, container.EnvFrom = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID(), consoleURL)

// retrieve platformResources and overrideResources to use when aggregating container resources
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func TestToK8sContainer(t *testing.T) {
"foo": "bar",
})
mockTaskExecMetadata.OnGetNamespace().Return("my-namespace")
mockTaskExecMetadata.OnGetConsoleURL().Return("")

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
Expand Down Expand Up @@ -447,9 +448,10 @@ func TestToK8sContainer(t *testing.T) {
assert.False(t, *container.SecurityContext.AllowPrivilegeEscalation)
}

func getTemplateParametersForTest(resourceRequirements, platformResources *v1.ResourceRequirements) template.Parameters {
func getTemplateParametersForTest(resourceRequirements, platformResources *v1.ResourceRequirements, includeConsoleURL bool, consoleURL string) template.Parameters {
mockTaskExecMetadata := mocks.TaskExecutionMetadata{}
mockTaskExecutionID := mocks.TaskExecutionID{}
mockTaskExecutionID.OnGetUniqueNodeID().Return("unique_node_id")
mockTaskExecutionID.OnGetGeneratedName().Return("gen_name")
mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
Expand Down Expand Up @@ -477,6 +479,7 @@ func getTemplateParametersForTest(resourceRequirements, platformResources *v1.Re
mockTaskExecMetadata.OnGetPlatformResources().Return(platformResources)
mockTaskExecMetadata.OnGetEnvironmentVariables().Return(nil)
mockTaskExecMetadata.OnGetNamespace().Return("my-namespace")
mockTaskExecMetadata.OnGetConsoleURL().Return(consoleURL)

mockInputReader := mocks2.InputReader{}
mockInputPath := storage.DataReference("s3://input/path")
Expand All @@ -492,9 +495,10 @@ func getTemplateParametersForTest(resourceRequirements, platformResources *v1.Re
mockOutputPath.OnGetPreviousCheckpointsPrefix().Return("/prev")

return template.Parameters{
TaskExecMetadata: &mockTaskExecMetadata,
Inputs: &mockInputReader,
OutputPath: &mockOutputPath,
TaskExecMetadata: &mockTaskExecMetadata,
Inputs: &mockInputReader,
OutputPath: &mockOutputPath,
IncludeConsoleURL: includeConsoleURL,
}
}

Expand All @@ -506,7 +510,7 @@ func TestAddFlyteCustomizationsToContainer(t *testing.T) {
Limits: v1.ResourceList{
v1.ResourceEphemeralStorage: resource.MustParse("2048Mi"),
},
}, nil)
}, nil, false, "")
container := &v1.Container{
Command: []string{
"{{ .Input }}",
Expand Down Expand Up @@ -554,7 +558,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
assert.True(t, container.Resources.Requests.Cpu().Equal(resource.MustParse("1")))
Expand All @@ -577,7 +581,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
assert.True(t, container.Resources.Requests.Cpu().Equal(resource.MustParse("1")))
Expand Down Expand Up @@ -612,7 +616,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
assert.True(t, container.Resources.Requests.Cpu().Equal(resource.MustParse("10")))
Expand Down Expand Up @@ -649,7 +653,7 @@ func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {
templateParameters := getTemplateParametersForTest(&v1.ResourceRequirements{
Requests: overrideRequests,
Limits: overrideLimits,
}, &v1.ResourceRequirements{})
}, &v1.ResourceRequirements{}, false, "")

err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeMergeExistingResources, container)
assert.NoError(t, err)
Expand Down Expand Up @@ -684,7 +688,7 @@ func TestAddFlyteCustomizationsToContainer_ValidateExistingResources(t *testing.
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("20"),
},
})
}, false, "")
err := AddFlyteCustomizationsToContainer(context.TODO(), templateParameters, ResourceCustomizationModeEnsureExistingResourcesInRange, container)
assert.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flytek8s

import (
"context"
"fmt"
"os"
"strconv"

Expand Down Expand Up @@ -32,7 +33,7 @@ func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar {
return envVars
}

func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1.EnvVar {

if id == nil || id.GetID().NodeExecutionId == nil || id.GetID().NodeExecutionId.ExecutionId == nil {
return []v1.EnvVar{}
Expand Down Expand Up @@ -69,6 +70,14 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
// },
}

if consoleURL != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if consoleURL != "" {
if len(consoleURL) > 0 {

envVars = append(envVars, v1.EnvVar{
Name: "FLYTE_EXECUTION_URL",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to const?

// TODO: should we use net/url to build this url?
Value: fmt.Sprintf("%s/projects/%s/domains/%s/executions/%s/nodeId/%s/nodes", consoleURL, nodeExecutionID.Project, nodeExecutionID.Domain, nodeExecutionID.Name, id.GetUniqueNodeID()),
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
})
}

// Task definition Level env variables.
if id.GetID().TaskId != nil {
taskID := id.GetID().TaskId
Expand Down Expand Up @@ -113,9 +122,9 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
return envVars
}

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID) ([]v1.EnvVar, []v1.EnvFromSource) {
func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource) {
envVars = append(envVars, GetContextEnvVars(ctx)...)
envVars = append(envVars, GetExecutionEnvVars(id)...)
envVars = append(envVars, GetExecutionEnvVars(id, consoleURL)...)

for k, v := range taskEnvironmentVariables {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -18,8 +19,35 @@ import (

func TestGetExecutionEnvVars(t *testing.T) {
mock := mockTaskExecutionIdentifier{}
envVars := GetExecutionEnvVars(mock)
assert.Len(t, envVars, 12)
tests := []struct {
name string
expectedEnvVars int
consoleURL string
expectedEnvVar *v12.EnvVar
}{
{
"no-console-url",
12,
"",
nil,
},
{
"with-console-url",
13,
"scheme://host/path",
&v12.EnvVar{
Name: "FLYTE_EXECUTION_URL",
Value: "scheme://host/path/projects/proj/domains/domain/executions/name/nodeId/unique-node-id/nodes",
},
},
}
for _, tt := range tests {
envVars := GetExecutionEnvVars(mock, tt.consoleURL)
assert.Len(t, envVars, tt.expectedEnvVars)
if tt.expectedEnvVar != nil {
assert.True(t, proto.Equal(&envVars[4], tt.expectedEnvVar))
}
}
}

func TestGetTolerationsForResources(t *testing.T) {
Expand Down Expand Up @@ -257,7 +285,7 @@ func TestDecorateEnvVars(t *testing.T) {
defer os.Setenv("value", originalEnvVal)

expected := append(defaultEnv, GetContextEnvVars(ctx)...)
expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{})...)
expected = append(expected, GetExecutionEnvVars(mockTaskExecutionIdentifier{}, "")...)

aggregated := append(expected, v12.EnvVar{Name: "k", Value: "v"})
type args struct {
Expand All @@ -270,20 +298,21 @@ func TestDecorateEnvVars(t *testing.T) {
additionEnvVar map[string]string
additionEnvVarFromEnv map[string]string
executionEnvVar map[string]string
consoleURL string
want []v12.EnvVar
}{
{"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, expected},
{"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, aggregated},
{"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, aggregated},
{"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, aggregated},
{"no-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, emptyEnvVar, "", expected},
{"with-additional", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, additionalEnv, emptyEnvVar, emptyEnvVar, "", aggregated},
{"from-env", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, envVarsFromEnv, emptyEnvVar, "", aggregated},
{"from-execution-metadata", args{envVars: defaultEnv, id: mockTaskExecutionIdentifier{}}, emptyEnvVar, emptyEnvVar, additionalEnv, "", aggregated},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
DefaultEnvVars: tt.additionEnvVar,
DefaultEnvVarsFromEnv: tt.additionEnvVarFromEnv,
}))
if got, _ := DecorateEnvVars(ctx, tt.args.envVars, tt.executionEnvVar, tt.args.id); !reflect.DeepEqual(got, tt.want) {
if got, _ := DecorateEnvVars(ctx, tt.args.envVars, tt.executionEnvVar, tt.args.id, tt.consoleURL); !reflect.DeepEqual(got, tt.want) {
t.Errorf("DecorateEnvVars() = %v, want %v", got, tt.want)
}
})
Expand Down
22 changes: 18 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,19 @@
return podSpec, &objectMeta, primaryContainerName, nil
}

func shouldIncludeConsoleURL(taskTemplate *core.TaskTemplate) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func shouldIncludeConsoleURL(taskTemplate *core.TaskTemplate) bool {
func hasExternalLinkType(taskTemplate *core.TaskTemplate) bool {

if taskTemplate == nil {
return false

Check warning on line 321 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L321

Added line #L321 was not covered by tests
}
config := taskTemplate.GetConfig()
if config == nil {
return false
}
// The presence of any "link_type" is sufficient to guarantee that the console URL should be included.
_, exists := config["link_type"]
return exists

Check warning on line 329 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go#L328-L329

Added lines #L328 - L329 were not covered by tests
}

// ApplyFlytePodConfiguration updates the PodSpec and ObjectMeta with various Flyte configuration. This includes
// applying default k8s configuration, applying overrides (resources etc.), injecting copilot containers, and merging with the
// configuration PodTemplate (if exists).
Expand All @@ -328,10 +341,11 @@

// add flyte resource customizations to containers
templateParameters := template.Parameters{
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
IncludeConsoleURL: shouldIncludeConsoleURL(taskTemplate),
}

resourceRequests := make([]v1.ResourceRequirements, 0, len(podSpec.Containers))
Expand Down
Loading
Loading