From f61f437527e4218a6841f6e816cccf2d59942284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20G=C3=B3mez?= Date: Thu, 21 Dec 2023 15:47:06 +0100 Subject: [PATCH] Add env var support (#324) Co-authored-by: Filipe Regadas --- pkg/flink/handler.go | 22 ++++++++++++---------- pkg/flink/types.go | 9 +++++++++ 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 897bbf2..0f943eb 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -40,11 +40,12 @@ import ( ) type FlinkTaskContext struct { - ClusterName ClusterName - Namespace string - Annotations map[string]string - Labels map[string]string - Job flinkIdl.FlinkJob + ClusterName ClusterName + Namespace string + Annotations map[string]string + Labels map[string]string + EnvironmentVariables map[string]string + Job flinkIdl.FlinkJob } type FlinkTaskExecContext interface { @@ -92,11 +93,12 @@ func NewFlinkTaskContext(ctx context.Context, taskCtx FlinkTaskExecContext) (*Fl } return &FlinkTaskContext{ - ClusterName: cn, - Namespace: taskMetadata.GetNamespace(), - Annotations: GetDefaultAnnotations(taskMetadata), - Labels: GetDefaultLabels(taskMetadata), - Job: job, + ClusterName: cn, + Namespace: taskMetadata.GetNamespace(), + Annotations: GetDefaultAnnotations(taskMetadata), + Labels: GetDefaultLabels(taskMetadata), + EnvironmentVariables: GetDefaultEnvironmentVariables(taskMetadata), + Job: job, }, nil } diff --git a/pkg/flink/types.go b/pkg/flink/types.go index d33628f..769c663 100644 --- a/pkg/flink/types.go +++ b/pkg/flink/types.go @@ -63,6 +63,15 @@ func GetDefaultLabels(taskCtx pluginsCore.TaskExecutionMetadata) Labels { ) } +type EnvVars map[string]string + +func GetDefaultEnvironmentVariables(taskCtx pluginsCore.TaskExecutionMetadata) EnvVars { + return utils.UnionMaps( + config.GetK8sPluginConfig().DefaultEnvVars, + utils.CopyMap(taskCtx.GetEnvironmentVariables()), + ) +} + type ClusterName string func (cn ClusterName) Validate() error {