From e6f829f454db7daf5d949455732a7b13674a42eb Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Thu, 21 Nov 2024 16:47:10 +0800 Subject: [PATCH] Add `ProcessGuarantee` to WindowConfig --- .ci/helm.sh | 14 ++++++++- .../logging-window-function/manifests.yaml | 4 +++ .../cases/logging-window-function/verify.sh | 31 +++++++++++++++++-- api/compute/v1alpha1/common.go | 19 ++++++------ ...ompute.functionmesh.io-functionmeshes.yaml | 7 +++++ ...crd-compute.functionmesh.io-functions.yaml | 7 +++++ ...ompute.functionmesh.io_functionmeshes.yaml | 7 +++++ .../compute.functionmesh.io_functions.yaml | 7 +++++ pkg/webhook/validate.go | 4 +++ 9 files changed, 88 insertions(+), 12 deletions(-) diff --git a/.ci/helm.sh b/.ci/helm.sh index a1b8003eb..685da5e50 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -337,10 +337,22 @@ function ci::verify_crypto_function() { function ci::send_test_data() { inputtopic=$1 inputmessage=$2 - kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}" + count=$3 + kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}" return 0 } +function ci::verify_backlog() { + topic=$1 + sub=$2 + expected=$3 + BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-amdin topic stats $topic | grep msgBacklog) + if [[ "$BACKLOG" == *"\"msgBacklog\" : $expecte"* ]]; then + return 0 + fi + return 1 +} + function ci::verify_exclamation_function() { inputtopic=$1 outputtopic=$2 diff --git a/.ci/tests/integration/cases/logging-window-function/manifests.yaml b/.ci/tests/integration/cases/logging-window-function/manifests.yaml index 3c1492b9d..cfa816ae7 100644 --- a/.ci/tests/integration/cases/logging-window-function/manifests.yaml +++ b/.ci/tests/integration/cases/logging-window-function/manifests.yaml @@ -36,6 +36,10 @@ spec: windowConfig: windowLengthCount: 10 slidingIntervalCount: 5 + processingGuarantee: atleast_once + # the processingGuarantee should be manual for window function + # see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319 + processingGuarantee: manual subscriptionPosition: earliest --- apiVersion: v1 diff --git a/.ci/tests/integration/cases/logging-window-function/verify.sh b/.ci/tests/integration/cases/logging-window-function/verify.sh index edd74f2e4..dfc44582c 100644 --- a/.ci/tests/integration/cases/logging-window-function/verify.sh +++ b/.ci/tests/integration/cases/logging-window-function/verify.sh @@ -43,17 +43,44 @@ if [ $? -ne 0 ]; then exit 1 fi -verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1) +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1) if [ $? -ne 0 ]; then echo "$verify_java_result" kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true exit 1 fi +sleep 3 + +# the 3 messages will not be processed, so backlog should be 3 +verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic-partition-0" "public/default/window-function-sample" 3 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_backlog_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_java_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +sleep 3 + +verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic-partition-0" "public/default/window-function-sample" 0 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_backlog_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l) if [ $verify_log_result -ne 0 ]; then sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;) - verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l) + verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l) if [ $verify_log_topic_result -ne 0 ]; then echo "e2e-test: ok" | yq eval - else diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 45a3cf71a..0f5bedec2 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -533,15 +533,16 @@ type LogConfig struct { } type WindowConfig struct { - ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"` - WindowLengthCount *int32 `json:"windowLengthCount,omitempty"` - WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"` - SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"` - SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"` - LateDataTopic string `json:"lateDataTopic,omitempty"` - MaxLagMs *int64 `json:"maxLagMs,omitempty"` - WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"` - TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"` + ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"` + WindowLengthCount *int32 `json:"windowLengthCount,omitempty"` + WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"` + SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"` + SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"` + LateDataTopic string `json:"lateDataTopic,omitempty"` + MaxLagMs *int64 `json:"maxLagMs,omitempty"` + WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"` + TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"` + ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` } type VPASpec struct { diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index 109c1a0d2..91de157db 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -3786,6 +3786,13 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once + - manual + type: string slidingIntervalCount: format: int32 type: integer diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index 8e3568ecf..4a8e94585 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -3805,6 +3805,13 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once + - manual + type: string slidingIntervalCount: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 72502a887..7e4fa9969 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -3786,6 +3786,13 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once + - manual + type: string slidingIntervalCount: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 94bfefa84..4da38ef64 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -3783,6 +3783,13 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - atleast_once + - atmost_once + - effectively_once + - manual + type: string slidingIntervalCount: format: int32 type: integer diff --git a/pkg/webhook/validate.go b/pkg/webhook/validate.go index 73cb82ba4..593624c3a 100644 --- a/pkg/webhook/validate.go +++ b/pkg/webhook/validate.go @@ -389,6 +389,10 @@ func validateWindowConfigs(windowConfig *v1alpha1.WindowConfig) *field.Error { "Watermark interval must be positive") } } + if windowConfig.ProcessingGuarantee == v1alpha1.Manual || windowConfig.ProcessingGuarantee == v1alpha1.EffectivelyOnce { + return field.Invalid(field.NewPath("spec").Child("windowConfig"), windowConfig.ProcessingGuarantee, + "Window function only supports atleast_once and atmost_once") + } } return nil }