diff --git a/.ci/clusters/global_env.yaml b/.ci/clusters/global_env.yaml new file mode 100644 index 000000000..d8d796c5c --- /dev/null +++ b/.ci/clusters/global_env.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: mesh-global-env +data: + global1: globalvalue1 + shared1: fromglobal diff --git a/.ci/helm.sh b/.ci/helm.sh index 042eaba47..3e7be0e4d 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -596,3 +596,16 @@ function ci::verify_log_topic_with_auth() { fi return 1 } + +function ci::verify_env() { + pod="$1-function-0" + key=$2 + expect=$3 + result=$(kubectl exec -n ${NAMESPACE} ${pod} -- env | grep "${key}") + echo "$result" + echo "$expect" + if [[ "$result" = "$expect" ]]; then + return 0 + fi + return 1 +} \ No newline at end of file diff --git a/.ci/tests/integration/cases/global-and-namespaced-env/env.yaml b/.ci/tests/integration/cases/global-and-namespaced-env/env.yaml new file mode 100644 index 000000000..02fc101f4 --- /dev/null +++ b/.ci/tests/integration/cases/global-and-namespaced-env/env.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: mesh-namespaced-env + namespace: default +data: + namespaced1: namespacedvalue1 + shared1: fromnamespace diff --git a/.ci/tests/integration/cases/global-and-namespaced-env/manifests.yaml b/.ci/tests/integration/cases/global-and-namespaced-env/manifests.yaml new file mode 100644 index 000000000..dfec15e55 --- /dev/null +++ b/.ci/tests/integration/cases/global-and-namespaced-env/manifests.yaml @@ -0,0 +1,77 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: function-sample-env + namespace: default +spec: + image: streamnative/pulsar-functions-java-sample:2.9.2.23 + className: org.apache.pulsar.functions.api.examples.ExclamationFunction + forwardSourceMessageProperty: true + maxPendingAsyncRequests: 1000 + replicas: 1 + maxReplicas: 5 + logTopic: persistent://public/default/logging-function-logs + input: + topics: + - persistent://public/default/input-java-topic + typeClassName: java.lang.String + output: + topic: persistent://public/default/output-java-topic + typeClassName: java.lang.String + resources: + requests: + cpu: 50m + memory: 1G + limits: + memory: 1.1G + # each secret will be loaded ad an env variable from the `path` secret with the `key` in that secret in the name of `name` + secretsMap: + "name": + path: "test-secret" + key: "username" + "pwd": + path: "test-secret" + key: "password" + pulsar: + pulsarConfig: "test-pulsar" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + #authConfig: "test-auth" + java: + jar: /pulsar/examples/api-examples.jar + # to be delete & use admission hook + clusterName: test + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-pulsar +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 +#--- +#apiVersion: v1 +#kind: ConfigMap +#metadata: +# name: test-auth +#data: +# clientAuthenticationPlugin: "abc" +# clientAuthenticationParameters: "xyz" +# tlsTrustCertsFilePath: "uvw" +# useTls: "true" +# tlsAllowInsecureConnection: "false" +# tlsHostnameVerificationEnable: "true" +--- +apiVersion: v1 +data: + username: YWRtaW4= + password: MWYyZDFlMmU2N2Rm +kind: Secret +metadata: + name: test-secret +type: Opaque diff --git a/.ci/tests/integration/cases/global-and-namespaced-env/verify.sh b/.ci/tests/integration/cases/global-and-namespaced-env/verify.sh new file mode 100644 index 000000000..5de2bc370 --- /dev/null +++ b/.ci/tests/integration/cases/global-and-namespaced-env/verify.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} + +source "${BASE_DIR}"/.ci/helm.sh + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/global-and-namespaced-env/manifests.yaml +env_file="${BASE_DIR}"/.ci/tests/integration/cases/global-and-namespaced-env/env.yaml + +kubectl apply -f "${env_file}" > /dev/null 2>&1 +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh function-sample-env 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_env_result=$(ci::verify_env "function-sample-env" global1 global1=globalvalue1 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_env_result" + kubectl delete -f "${env_file}" > /dev/null 2>&1 + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_env_result=$(ci::verify_env "function-sample-env" namespaced1 namespaced1=namespacedvalue1 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_env_result" + kubectl delete -f "${env_file}" > /dev/null 2>&1 + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +# if global and namespaced env has same key, the value from namespace should be used +verify_env_result=$(ci::verify_env "function-sample-env" shared1 shared1=fromnamespace 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_env_result" + kubectl delete -f "${env_file}" > /dev/null 2>&1 + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + +# delete the namespaced env, the function can start successfully but without namespaced env injected +kubectl delete -f "${env_file}" > /dev/null 2>&1 +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh function-sample-env 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_env_result=$(ci::verify_env "function-sample-env" global1 global1=globalvalue1 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_env_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_env_result=$(ci::verify_env "function-sample-env" shared1 shared1=fromglobal 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_env_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_env_result=$(ci::verify_env "function-sample-env" namespaced1 "" 2>&1) +if [ $? -eq 0 ]; then + echo "e2e-test: ok" | yq eval - +else + echo "$verify_env_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/e2e.yaml b/.ci/tests/integration/e2e.yaml index bf11a8df8..a6c8d773b 100644 --- a/.ci/tests/integration/e2e.yaml +++ b/.ci/tests/integration/e2e.yaml @@ -89,6 +89,11 @@ setup: bash .ci/upload_function.sh pypip bash .ci/upload_function.sh go + - name: apply global env config map + command: | + kubectl create ns ${FUNCTION_MESH_NAMESPACE} + kubectl create -n ${FUNCTION_MESH_NAMESPACE} -f .ci/clusters/global_env.yaml + - name: install function-mesh operator command: | make generate @@ -96,7 +101,7 @@ setup: image="function-mesh-operator:latest" IMG=${image} make docker-build-skip-test kind load docker-image ${image} - helm install ${FUNCTION_MESH_RELEASE_NAME} -n ${FUNCTION_MESH_NAMESPACE} --set operatorImage=${image} --create-namespace charts/function-mesh-operator + helm install ${FUNCTION_MESH_RELEASE_NAME} -n ${FUNCTION_MESH_NAMESPACE} --set operatorImage=${image} --set controllerManager.globalConfigMap=mesh-global-env --set controllerManager.globalConfigMapNamespace=${FUNCTION_MESH_NAMESPACE} --set controllerManager.namespacedConfigMap=mesh-namespaced-env --create-namespace charts/function-mesh-operator wait: - namespace: function-mesh resource: pod @@ -168,3 +173,5 @@ verify: expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration/cases/python-log-format-json/verify.sh expected: expected.data.yaml + - query: timeout 5m bash .ci/tests/integration/cases/global-and-namespaced-env/verify.sh + expected: expected.data.yaml diff --git a/charts/function-mesh-operator/templates/controller-manager-deployment.yaml b/charts/function-mesh-operator/templates/controller-manager-deployment.yaml index d367dd24f..6fb5ed551 100644 --- a/charts/function-mesh-operator/templates/controller-manager-deployment.yaml +++ b/charts/function-mesh-operator/templates/controller-manager-deployment.yaml @@ -61,6 +61,9 @@ spec: - --pprof-addr=:{{ .Values.controllerManager.pprof.port }} - --config-file={{ .Values.controllerManager.configFile }} - --enable-init-containers={{ .Values.controllerManager.enableInitContainers }} + - --global-config-map={{ .Values.controllerManager.globalConfigMap }} + - --global-config-map-namespace={{ .Values.controllerManager.globalConfigMapNamespace }} + - --namespaced-config-map={{ .Values.controllerManager.namespacedConfigMap }} env: - name: NAMESPACE valueFrom: diff --git a/controllers/function.go b/controllers/function.go index 0baeb716e..7c926d051 100644 --- a/controllers/function.go +++ b/controllers/function.go @@ -69,7 +69,7 @@ func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, fun } function.Status.Selector = selector.String() - if r.checkIfStatefulSetNeedUpdate(statefulSet, function) { + if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, function) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update function.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -93,7 +93,7 @@ func (r *FunctionReconciler) ApplyFunctionStatefulSet(ctx context.Context, funct if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - desiredStatefulSet := spec.MakeFunctionStatefulSet(function) + desiredStatefulSet := spec.MakeFunctionStatefulSet(ctx, r, function) keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet) desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { @@ -403,9 +403,9 @@ func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, functi return nil } -func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, +func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, function *v1alpha1.Function) bool { - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(function).Spec) + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(ctx, r, function).Spec) } func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, diff --git a/controllers/sink.go b/controllers/sink.go index 874e9abde..254e877bd 100644 --- a/controllers/sink.go +++ b/controllers/sink.go @@ -69,7 +69,7 @@ func (r *SinkReconciler) ObserveSinkStatefulSet(ctx context.Context, sink *v1alp } sink.Status.Selector = selector.String() - if r.checkIfStatefulSetNeedUpdate(statefulSet, sink) { + if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, sink) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update sink.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -92,7 +92,7 @@ func (r *SinkReconciler) ApplySinkStatefulSet(ctx context.Context, sink *v1alpha if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - desiredStatefulSet := spec.MakeSinkStatefulSet(sink) + desiredStatefulSet := spec.MakeSinkStatefulSet(ctx, r, sink) keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet) desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { @@ -400,8 +400,8 @@ func (r *SinkReconciler) ApplySinkCleanUpJob(ctx context.Context, sink *v1alpha1 return nil } -func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool { - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(sink).Spec) +func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(ctx, r, sink).Spec) } func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool { diff --git a/controllers/sink_controller_test.go b/controllers/sink_controller_test.go index 38373fd2b..e2ec15695 100644 --- a/controllers/sink_controller_test.go +++ b/controllers/sink_controller_test.go @@ -41,7 +41,7 @@ var _ = Describe("Sink Controller", func() { if sink.Status.Conditions == nil { sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - statefulSet := spec.MakeSinkStatefulSet(sink) + statefulSet := spec.MakeSinkStatefulSet(context.Background(), k8sClient, sink) It("Should create pulsar configmap successfully", func() { Expect(k8sClient.Create(context.Background(), pulsarConfig)).Should(Succeed()) diff --git a/controllers/source.go b/controllers/source.go index eddcba3dc..9fa5f060d 100644 --- a/controllers/source.go +++ b/controllers/source.go @@ -69,7 +69,7 @@ func (r *SourceReconciler) ObserveSourceStatefulSet(ctx context.Context, source } source.Status.Selector = selector.String() - if r.checkIfStatefulSetNeedUpdate(statefulSet, source) { + if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, source) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update source.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -93,7 +93,7 @@ func (r *SourceReconciler) ApplySourceStatefulSet(ctx context.Context, source *v if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - desiredStatefulSet := spec.MakeSourceStatefulSet(source) + desiredStatefulSet := spec.MakeSourceStatefulSet(ctx, r, source) keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet) desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { @@ -402,8 +402,8 @@ func (r *SourceReconciler) ApplySourceCleanUpJob(ctx context.Context, source *v1 return nil } -func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool { - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(source).Spec) +func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(ctx, r, source).Spec) } func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool { diff --git a/controllers/source_controller_test.go b/controllers/source_controller_test.go index e2b20f1e5..cf86e6ee5 100644 --- a/controllers/source_controller_test.go +++ b/controllers/source_controller_test.go @@ -40,7 +40,7 @@ var _ = Describe("Source Controller", func() { if source.Status.Conditions == nil { source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - statefulSet := spec.MakeSourceStatefulSet(source) + statefulSet := spec.MakeSourceStatefulSet(context.Background(), k8sClient, source) It("Should create pulsar configmap successfully", func() { Expect(k8sClient.Create(context.Background(), pulsarConfig)).Should(Succeed()) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 697df7a3f..2f7b1fbf8 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -39,7 +39,9 @@ import ( autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -203,7 +205,7 @@ func MakeHeadlessServiceName(serviceName string) string { return fmt.Sprintf("%s-headless", serviceName) } -func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, +func MakeStatefulSet(ctx context.Context, r client.Reader, objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, container *corev1.Container, filebeatContainer *corev1.Container, volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, pulsar v1alpha1.PulsarMessaging, javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, @@ -265,7 +267,7 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI Name: DownloaderVolume, }) } - return &appsv1.StatefulSet{ + desiredStatefulSet := &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", APIVersion: "apps/v1", @@ -275,6 +277,8 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI MakeHeadlessServiceName(objectMeta.Name), downloaderContainer, volumeClaimTemplates, persistentVolumeClaimRetentionPolicy), } + MergeGlobalAndNamespacedEnv(ctx, r, objectMeta.Namespace, desiredStatefulSet) + return desiredStatefulSet } func MakeStatefulSetSpec(replicas *int32, container *corev1.Container, filebeatContainer *corev1.Container, @@ -2276,3 +2280,47 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En }, } } + +func MergeGlobalAndNamespacedEnv(ctx context.Context, r client.Reader, namespace string, statefulSet *appsv1.StatefulSet) { + envData := make(map[string]string) + if utils.GlobalConfigMap != "" { + globalCM := &corev1.ConfigMap{} + err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM) + if err != nil && !k8serrors.IsNotFound(err) { + return + } + if globalCM.Data != nil { + for key, val := range globalCM.Data { + envData[key] = val + } + } + } + if utils.NamespacedConfigMap != "" { + namespacedCM := &corev1.ConfigMap{} + err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: utils.NamespacedConfigMap}, namespacedCM) + if err != nil && !k8serrors.IsNotFound(err) { + return + } + if namespacedCM.Data != nil { + for key, val := range namespacedCM.Data { + envData[key] = val + } + } + } + + if len(envData) == 0 { + return + } + + globalEnvs := make([]corev1.EnvVar, 0, len(envData)) + for key, val := range envData { + globalEnvs = append(globalEnvs, corev1.EnvVar{ + Name: key, + Value: val, + }) + } + + for i := range statefulSet.Spec.Template.Spec.Containers { + statefulSet.Spec.Template.Spec.Containers[i].Env = append(statefulSet.Spec.Template.Spec.Containers[i].Env, globalEnvs...) + } +} diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 0392746b6..18b48f727 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -18,6 +18,7 @@ package spec import ( + "context" "regexp" "github.com/streamnative/function-mesh/utils" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" "github.com/streamnative/function-mesh/api/compute/v1alpha1" @@ -57,9 +59,9 @@ func MakeFunctionService(function *v1alpha1.Function) *corev1.Service { return MakeService(objectMeta, labels) } -func MakeFunctionStatefulSet(function *v1alpha1.Function) *appsv1.StatefulSet { +func MakeFunctionStatefulSet(ctx context.Context, r client.Reader, function *v1alpha1.Function) *appsv1.StatefulSet { objectMeta := MakeFunctionObjectMeta(function) - return MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, + return MakeStatefulSet(ctx, r, objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, makeFunctionContainer(function), makeFilebeatContainer(function.Spec.VolumeMounts, function.Spec.Pod.Env, function.Spec.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.TLSSecret, diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index d88c27f89..e84aa7482 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -18,6 +18,7 @@ package spec import ( + "context" "regexp" "github.com/streamnative/function-mesh/utils" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/streamnative/function-mesh/api/compute/v1alpha1" ) @@ -53,9 +55,9 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { return MakeService(objectMeta, labels) } -func MakeSinkStatefulSet(sink *v1alpha1.Sink) *appsv1.StatefulSet { +func MakeSinkStatefulSet(ctx context.Context, r client.Reader, sink *v1alpha1.Sink) *appsv1.StatefulSet { objectMeta := MakeSinkObjectMeta(sink) - return MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), + return MakeStatefulSet(ctx, r, objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Spec.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent, sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.AuthConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.TLSSecret, sink.Spec.Pulsar.AuthSecret, sink.Spec.FilebeatImage), diff --git a/controllers/spec/source.go b/controllers/spec/source.go index 3fb249426..b551d4d2a 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -18,6 +18,7 @@ package spec import ( + "context" "fmt" "regexp" @@ -28,6 +29,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/streamnative/function-mesh/api/compute/v1alpha1" ) @@ -54,9 +56,9 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service { return MakeService(objectMeta, labels) } -func MakeSourceStatefulSet(source *v1alpha1.Source) *appsv1.StatefulSet { +func MakeSourceStatefulSet(ctx context.Context, r client.Reader, source *v1alpha1.Source) *appsv1.StatefulSet { objectMeta := MakeSourceObjectMeta(source) - return MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), + return MakeStatefulSet(ctx, r, objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Spec.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent, source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.AuthConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.TLSSecret, source.Spec.Pulsar.AuthSecret, source.Spec.FilebeatImage), diff --git a/main.go b/main.go index a58e12ebe..6aebd1796 100644 --- a/main.go +++ b/main.go @@ -72,6 +72,9 @@ func main() { var configFile string var watchedNamespace string var enableInitContainers bool + var globalConfigMap string + var globalConfigMapNamespace string + var namespacedConfigMap string flag.StringVar(&metricsAddr, "metrics-addr", lookupEnvOrString("METRICS_ADDR", ":8080"), "The address the metric endpoint binds to.") flag.StringVar(&leaderElectionID, "leader-election-id", @@ -97,10 +100,19 @@ func main() { "The address the pprof binds to.") flag.BoolVar(&enableInitContainers, "enable-init-containers", lookupEnvOrBool("ENABLE_INIT_CONTAINERS", false), "Whether to use an init container to download package") + flag.StringVar(&globalConfigMap, "global-config-map", lookupEnvOrString("GLOBAL_CONFIG_MAP", ""), + "A ConfigMap used to inject envs to all functions/sinks/sources") + flag.StringVar(&globalConfigMapNamespace, "global-config-map-namespace", lookupEnvOrString("GLOBAL_CONFIG_MAP_NAMESPACE", "default"), + "The namespace of the global ConfigMap. Defaults to 'default'") + flag.StringVar(&namespacedConfigMap, "namespaced-config-map", lookupEnvOrString("NAMESPACED_CONFIG_MAP", ""), + "A ConfigMap used to inject envs to functions/sinks/sources in a namespace") flag.Parse() ctrl.SetLogger(zap.New(zap.UseDevMode(true))) utils.EnableInitContainers = enableInitContainers + utils.GlobalConfigMap = globalConfigMap + utils.GlobalConfigMapNamespace = globalConfigMapNamespace + utils.NamespacedConfigMap = namespacedConfigMap // enable pprof if enablePprof { diff --git a/utils/configs.go b/utils/configs.go index c5c2cbc46..e5ce9b68c 100644 --- a/utils/configs.go +++ b/utils/configs.go @@ -19,3 +19,6 @@ package utils var EnableInitContainers = false +var GlobalConfigMap = "" +var GlobalConfigMapNamespace = "default" +var NamespacedConfigMap = ""