From 1618195b88ce8b6f7514bf417149b69b4f41a593 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Sat, 5 Oct 2024 17:20:38 -0700 Subject: [PATCH] Add basic SASL and TLS support for Kafka cloud events Signed-off-by: Jason Parraga --- charts/flyte-core/README.md | 1 + .../flyte-core/templates/admin/configmap.yaml | 2 +- .../templates/admin/deployment.yaml | 4 ++ charts/flyte-core/values.yaml | 3 + .../manifests/complete-agent.yaml | 4 +- .../sandbox-bundled/manifests/complete.yaml | 4 +- docker/sandbox-bundled/manifests/dev.yaml | 4 +- flyteadmin/.golangci.yml | 2 + flyteadmin/pkg/async/cloudevent/factory.go | 7 +-- .../interfaces/application_configuration.go | 60 +++++++++++++++++++ 10 files changed, 78 insertions(+), 13 deletions(-) diff --git a/charts/flyte-core/README.md b/charts/flyte-core/README.md index 3fa8d2dc2ad..836e89d78a8 100644 --- a/charts/flyte-core/README.md +++ b/charts/flyte-core/README.md @@ -59,6 +59,7 @@ helm install gateway bitnami/contour -n flyte | cloud_events.enable | bool | `false` | | | cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | | | cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | | +| cloud_events.secretName | string | `""` | The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the configuration contains secrets. | | cloud_events.type | string | `"aws"` | | | cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain | | cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters | diff --git a/charts/flyte-core/templates/admin/configmap.yaml b/charts/flyte-core/templates/admin/configmap.yaml index 04e5cac6b38..9d1f9a40c82 100644 --- a/charts/flyte-core/templates/admin/configmap.yaml +++ b/charts/flyte-core/templates/admin/configmap.yaml @@ -79,7 +79,7 @@ data: externalEvents: {{ tpl (toYaml .) $ | nindent 6 }} {{- end }} {{- end }} -{{- if .Values.cloud_events.enable }} +{{- if and .Values.cloud_events.enable (not .Values.cloud_events.secretName) }} {{- with .Values.cloud_events }} cloud_events.yaml: | cloudEvents: {{ tpl (toYaml .) $ | nindent 6 }} diff --git a/charts/flyte-core/templates/admin/deployment.yaml b/charts/flyte-core/templates/admin/deployment.yaml index 23ea9966dfc..d185e490538 100755 --- a/charts/flyte-core/templates/admin/deployment.yaml +++ b/charts/flyte-core/templates/admin/deployment.yaml @@ -196,6 +196,10 @@ spec: name: flyte-admin-base-config - configMap: name: flyte-admin-clusters-config + {{- if .Values.cloud_events.secretName }} + - secret: + name: {{ .Values.cloud_events.secretName }} + {{- end }} name: clusters-config-volume {{- if .Values.cluster_resource_manager.enabled }} - configMap: diff --git a/charts/flyte-core/values.yaml b/charts/flyte-core/values.yaml index 33ef5746908..986d6958275 100755 --- a/charts/flyte-core/values.yaml +++ b/charts/flyte-core/values.yaml @@ -943,6 +943,9 @@ external_events: # Cloud events are used to send events (unprocessed, as Admin see them) in cloud event format to # an SNS topic (or gcp equivalent) cloud_events: + # -- The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the + # configuration contains secrets. + secretName: "" enable: false type: aws aws: diff --git a/docker/sandbox-bundled/manifests/complete-agent.yaml b/docker/sandbox-bundled/manifests/complete-agent.yaml index 4b0ef1cd982..73f19dfd44e 100644 --- a/docker/sandbox-bundled/manifests/complete-agent.yaml +++ b/docker/sandbox-bundled/manifests/complete-agent.yaml @@ -816,7 +816,7 @@ type: Opaque --- apiVersion: v1 data: - haSharedSecret: cmRzbzQ4N3RQaWhuMk00OA== + haSharedSecret: ak5wVTFQVjRHMm5ZanVNUQ== proxyPassword: "" proxyUsername: "" kind: Secret @@ -1413,7 +1413,7 @@ spec: metadata: annotations: checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81 - checksum/secret: 51528951e92c2bf712bbde990941593aae1fcf72144a1fe944c312ddad86e161 + checksum/secret: db4b259a37cc362add2a4fd4c52954eabc67e69e2c399a292605415c70da4a2b labels: app: docker-registry release: flyte-sandbox diff --git a/docker/sandbox-bundled/manifests/complete.yaml b/docker/sandbox-bundled/manifests/complete.yaml index a44f3dad5dd..ce5fe830ddc 100644 --- a/docker/sandbox-bundled/manifests/complete.yaml +++ b/docker/sandbox-bundled/manifests/complete.yaml @@ -798,7 +798,7 @@ type: Opaque --- apiVersion: v1 data: - haSharedSecret: T1I2Q2tTcmREVG15MldGUQ== + haSharedSecret: RGZkSmNtV3k4dDZYd0pHVw== proxyPassword: "" proxyUsername: "" kind: Secret @@ -1362,7 +1362,7 @@ spec: metadata: annotations: checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81 - checksum/secret: d723e395edc0fd2f221b9088efffe0d1f4dfabdef9892065fdabe12233362cf5 + checksum/secret: d0a1f670be47a94b928141eae8a50733a775d074aee3a78db555b0728c90718e labels: app: docker-registry release: flyte-sandbox diff --git a/docker/sandbox-bundled/manifests/dev.yaml b/docker/sandbox-bundled/manifests/dev.yaml index d87f3a1642c..419dc50d3a9 100644 --- a/docker/sandbox-bundled/manifests/dev.yaml +++ b/docker/sandbox-bundled/manifests/dev.yaml @@ -499,7 +499,7 @@ metadata: --- apiVersion: v1 data: - haSharedSecret: ZnltNHNiZ01NRFNkb1RlMA== + haSharedSecret: amliZ0l4QXczU0ZjUUloWQ== proxyPassword: "" proxyUsername: "" kind: Secret @@ -934,7 +934,7 @@ spec: metadata: annotations: checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81 - checksum/secret: eeab364c20a0e8ad5a1526ccd7ddbd1d5a442087e7267c4d761279102b81be21 + checksum/secret: 4c93d218f3a1654f7eb3a238a4b1f57fbfda80cd8e5c4aaf5a9286a93f4a94f2 labels: app: docker-registry release: flyte-sandbox diff --git a/flyteadmin/.golangci.yml b/flyteadmin/.golangci.yml index 4dbb031812b..cd180b89d1c 100644 --- a/flyteadmin/.golangci.yml +++ b/flyteadmin/.golangci.yml @@ -39,3 +39,5 @@ issues: exclude-rules: - path: pkg/workflowengine/impl/prepare_execution.go text: "copies lock" + - path: pkg/runtime/interfaces/application_configuration.go + text: "G402: TLS InsecureSkipVerify may be true." diff --git a/flyteadmin/pkg/async/cloudevent/factory.go b/flyteadmin/pkg/async/cloudevent/factory.go index 65cd48de938..51c38ffea44 100644 --- a/flyteadmin/pkg/async/cloudevent/factory.go +++ b/flyteadmin/pkg/async/cloudevent/factory.go @@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi case cloudEventImplementations.Kafka: saramaConfig := sarama.NewConfig() - var err error - saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version) - if err != nil { - logger.Fatalf(ctx, "failed to parse kafka version, %v", err) - panic(err) - } + cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig) kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName) if err != nil { panic(err) diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 15ed271412a..86bb9d30f61 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -1,6 +1,10 @@ package interfaces import ( + "context" + "crypto/tls" + + "github.com/Shopify/sarama" "github.com/golang/protobuf/ptypes/wrappers" "golang.org/x/time/rate" @@ -8,6 +12,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/database" + "github.com/flyteorg/flyte/flytestdlib/logger" ) // DbConfig is used to for initiating the database connection with the store that holds registered @@ -231,11 +236,66 @@ type GCPConfig struct { ProjectID string `json:"projectId"` } +type SASLConfig struct { + Enabled bool `json:"enabled"` + User string `json:"user"` + Password string `json:"password"` + Handshake bool `json:"handshake"` + Mechanism sarama.SASLMechanism `json:"mechanism"` +} + +type TLSConfig struct { + Enabled bool `json:"enabled"` + InsecureSkipVerify bool `json:"insecureSkipVerify"` + CertPath string `json:"certPath"` + KeyPath string `json:"keyPath"` +} + type KafkaConfig struct { // The version of Kafka, e.g. 2.1.0, 0.8.2.0 Version string `json:"version"` // kafka broker addresses Brokers []string `json:"brokers"` + // sasl config + SASLConfig SASLConfig `json:"sasl_config"` + // tls config + TLSConfig TLSConfig `json:"tls_config"` +} + +func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) { + var err error + s.Version, err = sarama.ParseKafkaVersion(k.Version) + if err != nil { + logger.Fatalf(ctx, "failed to parse kafka version, %v", err) + panic(err) + } + + if k.SASLConfig.Enabled { + s.Net.SASL.Enable = true + s.Net.SASL.User = k.SASLConfig.User + s.Net.SASL.Password = k.SASLConfig.Password + s.Net.SASL.Handshake = k.SASLConfig.Handshake + + if k.SASLConfig.Mechanism == "" { + k.SASLConfig.Mechanism = sarama.SASLTypePlaintext + } + s.Net.SASL.Mechanism = k.SASLConfig.Mechanism + } + + if k.TLSConfig.Enabled { + s.Net.TLS.Enable = true + s.Net.TLS.Config = &tls.Config{ + InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify, + } + if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" { + cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath) + if err != nil { + logger.Fatalf(ctx, "failed to load kafka client keypair: %v", err) + panic(err) + } + s.Net.TLS.Config.Certificates = []tls.Certificate{cert} + } + } } // This section holds configuration for the event scheduler used to schedule workflow executions.