diff --git a/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/elasticsearch.yaml b/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/elasticsearch.yaml new file mode 100644 index 000000000..e3f82e75d --- /dev/null +++ b/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/elasticsearch.yaml @@ -0,0 +1,18 @@ +apiVersion: elasticsearch.k8s.elastic.co/v1 +kind: Elasticsearch +metadata: + name: quickstart +spec: + version: 7.9.2 + http: + tls: + selfSignedCertificate: + disabled: true + nodeSets: + - name: default + count: 1 + config: + node.master: true + node.data: true + node.ingest: true + node.store.allow_mmap: false diff --git a/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/manifests.yaml b/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/manifests.yaml new file mode 100644 index 000000000..9c6d2e63b --- /dev/null +++ b/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/manifests.yaml @@ -0,0 +1,44 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Sink +metadata: + name: sink-sample-inferring-type +spec: + className: org.apache.pulsar.io.elasticsearch.ElasticSearchSink + replicas: 1 + maxReplicas: 1 + input: + topics: + - persistent://public/default/input-sink-topic + sinkConfig: + elasticSearchUrl: "http://quickstart-es-http.default.svc.cluster.local:9200" + username: "elastic" + password: "QqB1OtT6m79vfP7H9H0q2a82" + pulsar: + pulsarConfig: "test-sink" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + resources: + limits: + cpu: "0.2" + memory: 1.1G + requests: + cpu: "0.1" + memory: 1G + image: streamnative/pulsar-io-elastic-search:2.9.3.16 + java: + jar: connectors/pulsar-io-elastic-search-2.9.3.16.nar + jarLocation: "" # use pulsar provided connectors + clusterName: test-pulsar + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-sink +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 diff --git a/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/verify.sh b/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/verify.sh new file mode 100644 index 000000000..98f283940 --- /dev/null +++ b/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/verify.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} + +source "${BASE_DIR}"/.ci/helm.sh + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/manifests.yaml +es_file="${BASE_DIR}"/.ci/tests/integration/cases/elasticsearch-sink-inferring-type/elasticsearch.yaml + +function install_elasticsearch_cluster() { + # install eck operator + kubectl create -f https://download.elastic.co/downloads/eck/2.3.0/crds.yaml + kubectl apply -f https://download.elastic.co/downloads/eck/2.3.0/operator.yaml + num=0 + while [[ ${num} -lt 1 ]]; do + sleep 5 + kubectl get pods -n elastic-system + num=$(kubectl get pods -n elastic-system -l control-plane=elastic-operator | wc -l) + done + kubectl wait -n elastic-system -l control-plane=elastic-operator --for=condition=Ready pod --timeout=5m && true + + # install es cluster + kubectl apply -f "${es_file}" + num=0 + while [[ ${num} -lt 1 ]]; do + sleep 5 + kubectl get pods + num=$(kubectl get pods -l elasticsearch.k8s.elastic.co/cluster-name=quickstart | wc -l) + done + kubectl wait -l elasticsearch.k8s.elastic.co/cluster-name=quickstart --for=condition=Ready pod --timeout=5m && true +} + +function uninstall_elasticsearch_cluster() { + kubectl delete elasticsearches.elasticsearch.k8s.elastic.co quickstart + while true; do + kubectl get elasticsearches.elasticsearch.k8s.elastic.co quickstart > /dev/null 2>&1 + if [ $? -eq 1 ]; then + break + fi + sleep 5 + done + + kubectl delete -f https://download.elastic.co/downloads/eck/2.3.0/operator.yaml + kubectl delete -f https://download.elastic.co/downloads/eck/2.3.0/crds.yaml +} + +# setup es cluster +setup_es_result=$(install_elasticsearch_cluster 2>&1) +if [ $? -ne 0 ]; then + echo "$setup_es_result" + uninstall_elasticsearch_cluster > /dev/null 2>&1 +fi + +password=$(kubectl get secret quickstart-es-elastic-user -o go-template='{{.data.elastic | base64decode}}') +sed -i.bak "s/password: \(.*\)/password: ${password}/g" "${manifests_file}" +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh sink-sample-inferring-type 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_sink_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_sink 2>&1) +if [ $? -eq 0 ]; then + echo "e2e-test: ok" | yq eval - +else + echo "$verify_sink_result" +fi +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true +uninstall_elasticsearch_cluster > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/cases/java-function-inferring-type/manifests.yaml b/.ci/tests/integration/cases/java-function-inferring-type/manifests.yaml new file mode 100644 index 000000000..a29157fda --- /dev/null +++ b/.ci/tests/integration/cases/java-function-inferring-type/manifests.yaml @@ -0,0 +1,75 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: function-sample-inferring-type + namespace: default +spec: + image: streamnative/pulsar-functions-java-sample:2.9.3.16 + className: org.apache.pulsar.functions.api.examples.ExclamationFunction + forwardSourceMessageProperty: true + maxPendingAsyncRequests: 1000 + replicas: 1 + maxReplicas: 5 + logTopic: persistent://public/default/logging-function-logs + input: + topics: + - persistent://public/default/input-java-topic + output: + topic: persistent://public/default/output-java-topic + resources: + requests: + cpu: 50m + memory: 1G + limits: + memory: 1.1G + # each secret will be loaded ad an env variable from the `path` secret with the `key` in that secret in the name of `name` + secretsMap: + "name": + path: "test-secret" + key: "username" + "pwd": + path: "test-secret" + key: "password" + pulsar: + pulsarConfig: "test-pulsar" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + #authConfig: "test-auth" + java: + jar: /pulsar/examples/api-examples.jar + # to be delete & use admission hook + clusterName: test + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-pulsar +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 +#--- +#apiVersion: v1 +#kind: ConfigMap +#metadata: +# name: test-auth +#data: +# clientAuthenticationPlugin: "abc" +# clientAuthenticationParameters: "xyz" +# tlsTrustCertsFilePath: "uvw" +# useTls: "true" +# tlsAllowInsecureConnection: "false" +# tlsHostnameVerificationEnable: "true" +--- +apiVersion: v1 +data: + username: YWRtaW4= + password: MWYyZDFlMmU2N2Rm +kind: Secret +metadata: + name: test-secret +type: Opaque diff --git a/.ci/tests/integration/cases/java-function-inferring-type/verify.sh b/.ci/tests/integration/cases/java-function-inferring-type/verify.sh new file mode 100644 index 000000000..e8eb5bf7a --- /dev/null +++ b/.ci/tests/integration/cases/java-function-inferring-type/verify.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} + +source "${BASE_DIR}"/.ci/helm.sh + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/java-function-inferring-type/manifests.yaml + +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh function-sample-inferring-type 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_java_function 2>&1) +if [ $? -eq 0 ]; then + echo "e2e-test: ok" | yq eval - +else + echo "$verify_java_result" +fi +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/cases/mongodb-source-inferring-type/manifests.yaml b/.ci/tests/integration/cases/mongodb-source-inferring-type/manifests.yaml new file mode 100644 index 000000000..20a89683b --- /dev/null +++ b/.ci/tests/integration/cases/mongodb-source-inferring-type/manifests.yaml @@ -0,0 +1,53 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Source +metadata: + name: source-sample-inferring-type +spec: + className: org.apache.pulsar.io.debezium.mongodb.DebeziumMongoDbSource + replicas: 1 + maxReplicas: 1 + output: + producerConf: + maxPendingMessages: 1000 + maxPendingMessagesAcrossPartitions: 50000 + useThreadLocalProducers: true + topic: persistent://public/default/output-source-topic + forwardSourceMessageProperty: true + resources: + limits: + cpu: "0.2" + memory: 1.1G + requests: + cpu: "0.1" + memory: 1G + sourceConfig: + mongodb.hosts: rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017 + mongodb.name: dbserver1 + mongodb.user: debezium + mongodb.password: dbz + mongodb.task.id: "1" + database.whitelist: inventory + pulsar.service.url: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 + pulsar: + pulsarConfig: "test-source" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + image: streamnative/pulsar-io-debezium-mongodb:2.9.3.16 + java: + jar: connectors/pulsar-io-debezium-mongodb-2.9.3.16.nar + jarLocation: "" # use pulsar provided connectors + # use package name: + # jarLocation: function://public/default/nul-test-java-source@v1 + clusterName: test-pulsar +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-source +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 diff --git a/.ci/tests/integration/cases/mongodb-source-inferring-type/mongodb-dbz.yaml b/.ci/tests/integration/cases/mongodb-source-inferring-type/mongodb-dbz.yaml new file mode 100644 index 000000000..60c64d6e4 --- /dev/null +++ b/.ci/tests/integration/cases/mongodb-source-inferring-type/mongodb-dbz.yaml @@ -0,0 +1,82 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: mongodb-rbac +subjects: + - kind: ServiceAccount + # Reference to upper's `metadata.name` + name: default + # Reference to upper's `metadata.namespace` + namespace: default +roleRef: + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 +kind: Service +metadata: + name: mongo + labels: + name: mongo +spec: + ports: + - port: 27017 + clusterIP: None + selector: + role: mongo +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: mongo-dbz +spec: + selector: + matchLabels: + role: mongo + serviceName: "mongo" + replicas: 3 + template: + metadata: + labels: + role: mongo + spec: + terminationGracePeriodSeconds: 10 + containers: + - name: mongo + image: debezium/example-mongodb:0.10 + env: + - name: MONGODB_USER + value: "debezium" + - name: MONGODB_PASSWORD + value: "dbz" + command: + - mongod + - "--replSet" + - rs0 + - "--bind_ip" # bind mongo to all ip address to allow others to access + - "0.0.0.0" + ports: + - containerPort: 27017 + volumeMounts: + - name: mongo-persistent-storage + mountPath: /data/db + - name: mongo-sidecar + image: cvallance/mongo-k8s-sidecar + env: + - name: MONGO_SIDECAR_POD_LABELS + value: "role=mongo" + - name: KUBE_NAMESPACE + value: default + - name: KUBERNETES_MONGO_SERVICE_NAME + value: "mongo" + volumeClaimTemplates: + - metadata: + name: mongo-persistent-storage + spec: + storageClassName: "standard" + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 2Gi +# Need to manually run the ./usr/local/bin/init-inventory.sh script to setup users and database(except the config for rs) diff --git a/.ci/tests/integration/cases/mongodb-source-inferring-type/verify.sh b/.ci/tests/integration/cases/mongodb-source-inferring-type/verify.sh new file mode 100644 index 000000000..a0e84fc08 --- /dev/null +++ b/.ci/tests/integration/cases/mongodb-source-inferring-type/verify.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} + +source "${BASE_DIR}"/.ci/helm.sh + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/mongodb-source-inferring-type/manifests.yaml +mongodb_file="${BASE_DIR}"/.ci/tests/integration/cases/mongodb-source-inferring-type/mongodb-dbz.yaml + +function install_mongodb_server() { + # change default storageclass + default_sc=$(kubectl get storageclasses.storage.k8s.io | grep default | awk '{ print $1 }') + sed -i.bak "s/storageClassName: \(.*\)/storageClassName: ${default_sc}/g" "${mongodb_file}" + + # install mongodb server + kubectl apply -f "${mongodb_file}" + num=0 + while [[ ${num} -lt 3 ]]; do + sleep 5 + kubectl get pods + num=$(kubectl get pods -l role=mongo | wc -l) + done + kubectl wait -l role=mongo --for=condition=Ready pod --timeout=5m && true + + # initialize the data + kubectl exec mongo-dbz-0 -c mongo -- bash ./usr/local/bin/init-inventory.sh +} + +function uninstall_mongodb_server() { + # uninstall mongodb server + kubectl delete -f "${mongodb_file}" +} + +# setup mongodb server +setup_mongodb_result=$(install_mongodb_server 2>&1) +if [ $? -ne 0 ]; then + echo "$setup_mongodb_result" + uninstall_mongodb_server > /dev/null 2>&1 +fi + +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh source-sample-inferring-type 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_source_result=$(ci::verify_source 2>&1) +if [ $? -eq 0 ]; then + echo "e2e-test: ok" | yq eval - +else + echo "$verify_source_result" +fi +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true +uninstall_mongodb_server > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/e2e.yaml b/.ci/tests/integration/e2e.yaml index dee384fbc..0923ae0ca 100644 --- a/.ci/tests/integration/e2e.yaml +++ b/.ci/tests/integration/e2e.yaml @@ -160,3 +160,9 @@ verify: expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration/cases/health-check/verify.sh expected: expected.data.yaml + - query: timeout 5m bash .ci/tests/integration/cases/java-function-inferring-type/verify.sh + expected: expected.data.yaml + - query: bash .ci/tests/integration/cases/mongodb-source-inferring-type/verify.sh + expected: expected.data.yaml + - query: bash .ci/tests/integration/cases/elasticsearch-sink-inferring-type/verify.sh + expected: expected.data.yaml diff --git a/pkg/config/image_capabilities.go b/pkg/config/image_capabilities.go new file mode 100644 index 000000000..eeb50e6bc --- /dev/null +++ b/pkg/config/image_capabilities.go @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package config defines config related tools +package config + +import ( + "regexp" +) + +type ImageCapabilities struct { + // +optional + InferTypeClassName *ImageCapability `json:"inferTypeClassName,omitempty"` +} + +type ImageCapability struct { + // +optional + ImagePatterns []string `json:"imagePatterns,omitempty"` +} + +func DefaultImageCapabilities() ImageCapabilities { + return ImageCapabilities{ + InferTypeClassName: &ImageCapability{ + ImagePatterns: []string{ + // all 2.9 versions starting from 2.9.3.16 + `streamnative/.*?:2\.9\.(3\.(16|[2-9][0-9])|[4-9]\..*)`, + // all 2.10 versions starting from 2.10.2.1 + `streamnative/.*?:2\.10\.[2-9]\..*`, + // all 2.x versions starting from 2.11 + `streamnative/.*?:2\.1[1-9]\..*`, + // all versions starting from 3.0 + `streamnative/.*?:([3-9]|[1-9][0-9]+)\..*`, + }, + }, + } +} + +func (c *ImageCapability) MatchImage(image string) bool { + if c == nil { + return false + } + for _, pattern := range c.ImagePatterns { + if m, err := regexp.Match(pattern, []byte(image)); err != nil { + continue + } else if m { + return true + } + } + return false +} diff --git a/pkg/config/image_capabilities_test.go b/pkg/config/image_capabilities_test.go new file mode 100644 index 000000000..e6923b3fb --- /dev/null +++ b/pkg/config/image_capabilities_test.go @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package config defines config related tools +package config + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMatchImage(t *testing.T) { + imageCapabilities := DefaultImageCapabilities() + + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.8.1")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.8.2")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.8.3.19")) + + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.1")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.2")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.13.1")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.13")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.14")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.15")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.16")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.16.2")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.16.2.test")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.20")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.3.30")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.4.30")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.4.0")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.4.1")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.9.4.190")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("custom/pulsar:2.9.4.190")) + + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.10.1")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.10.1.19")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.10.2.1")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.10.3.1")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.10.4.2")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.10.4.20")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("custom/pulsar:2.10.4.20")) + + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.11.0.1")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:2.11.1")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("xxxxx/pulsar:2.11.1")) + + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:3.1.1")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:4.1.0")) + assert.Equal(t, true, imageCapabilities.InferTypeClassName.MatchImage("streamnative/pulsar:5.2.1")) + assert.Equal(t, false, imageCapabilities.InferTypeClassName.MatchImage("yyyyyy/pulsar:5.2.1")) +} diff --git a/pkg/webhook/function_webhook.go b/pkg/webhook/function_webhook.go index ddd8c38a3..53b357d91 100644 --- a/pkg/webhook/function_webhook.go +++ b/pkg/webhook/function_webhook.go @@ -21,8 +21,10 @@ package webhook import ( "context" "fmt" + "github.com/streamnative/function-mesh/controllers/spec" "github.com/streamnative/function-mesh/api/compute/v1alpha1" + "github.com/streamnative/function-mesh/pkg/config" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" @@ -130,11 +132,18 @@ func (webhook *FunctionWebhook) Default(ctx context.Context, obj runtime.Object) paddingResourceLimit(&r.Spec.Resources) } - if r.Spec.Input.TypeClassName == "" { + imageCapabilities := config.DefaultImageCapabilities() + image := r.Spec.Image + if image == "" { + image = spec.DefaultRunnerImage + } + canInferTypeClassName := r.Spec.Java != nil && imageCapabilities.InferTypeClassName.MatchImage(image) + + if r.Spec.Input.TypeClassName == "" && !canInferTypeClassName { r.Spec.Input.TypeClassName = "[B" } - if r.Spec.Output.TypeClassName == "" { + if r.Spec.Output.TypeClassName == "" && !canInferTypeClassName { r.Spec.Output.TypeClassName = "[B" } diff --git a/pkg/webhook/sink_webhook.go b/pkg/webhook/sink_webhook.go index d49a6e6e1..c64c61125 100644 --- a/pkg/webhook/sink_webhook.go +++ b/pkg/webhook/sink_webhook.go @@ -21,6 +21,8 @@ package webhook import ( "context" "fmt" + "github.com/streamnative/function-mesh/controllers/spec" + "github.com/streamnative/function-mesh/pkg/config" "github.com/streamnative/function-mesh/api/compute/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -121,7 +123,12 @@ func (webhook *SinkWebhook) Default(ctx context.Context, obj runtime.Object) err paddingResourceLimit(&r.Spec.Resources) } - if r.Spec.Input.TypeClassName == "" { + imageCapabilities := config.DefaultImageCapabilities() + image := r.Spec.Image + if image == "" { + image = spec.DefaultRunnerImage + } + if r.Spec.Input.TypeClassName == "" && !imageCapabilities.InferTypeClassName.MatchImage(image) { r.Spec.Input.TypeClassName = "[B" } return nil diff --git a/pkg/webhook/source_webhook.go b/pkg/webhook/source_webhook.go index 0f5f6adc3..76f57157d 100644 --- a/pkg/webhook/source_webhook.go +++ b/pkg/webhook/source_webhook.go @@ -21,6 +21,8 @@ package webhook import ( "context" "fmt" + "github.com/streamnative/function-mesh/controllers/spec" + "github.com/streamnative/function-mesh/pkg/config" "github.com/streamnative/function-mesh/api/compute/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -131,7 +133,12 @@ func (webhook *SourceWebhook) Default(ctx context.Context, obj runtime.Object) e paddingResourceLimit(&r.Spec.Resources) } - if r.Spec.Output.TypeClassName == "" { + imageCapabilities := config.DefaultImageCapabilities() + image := r.Spec.Image + if image == "" { + image = spec.DefaultRunnerImage + } + if r.Spec.Output.TypeClassName == "" && !imageCapabilities.InferTypeClassName.MatchImage(image) { r.Spec.Output.TypeClassName = "[B" } return nil