Skip to content

Commit

Permalink
Add ProcessGuarantee to WindowConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Nov 21, 2024
1 parent 3c92372 commit e6f829f
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 12 deletions.
14 changes: 13 additions & 1 deletion .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,22 @@ function ci::verify_crypto_function() {
function ci::send_test_data() {
inputtopic=$1
inputmessage=$2
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}"
count=$3
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}"
return 0
}

function ci::verify_backlog() {
topic=$1
sub=$2
expected=$3
BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-amdin topic stats $topic | grep msgBacklog)
if [[ "$BACKLOG" == *"\"msgBacklog\" : $expecte"* ]]; then
return 0
fi
return 1
}

function ci::verify_exclamation_function() {
inputtopic=$1
outputtopic=$2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
windowConfig:
windowLengthCount: 10
slidingIntervalCount: 5
processingGuarantee: atleast_once
# the processingGuarantee should be manual for window function
# see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319
processingGuarantee: manual
subscriptionPosition: earliest
---
apiVersion: v1
Expand Down
31 changes: 29 additions & 2 deletions .ci/tests/integration/cases/logging-window-function/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,44 @@ if [ $? -ne 0 ]; then
exit 1
fi

verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1)
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

sleep 3

# the 3 messages will not be processed, so backlog should be 3
verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic-partition-0" "public/default/window-function-sample" 3 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_backlog_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

sleep 3

verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic-partition-0" "public/default/window-function-sample" 0 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_backlog_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l)
if [ $verify_log_result -ne 0 ]; then
sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;)
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
if [ $verify_log_topic_result -ne 0 ]; then
echo "e2e-test: ok" | yq eval -
else
Expand Down
19 changes: 10 additions & 9 deletions api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,16 @@ type LogConfig struct {
}

type WindowConfig struct {
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
LateDataTopic string `json:"lateDataTopic,omitempty"`
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
LateDataTopic string `json:"lateDataTopic,omitempty"`
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
}

type VPASpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,13 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- atleast_once
- atmost_once
- effectively_once
- manual
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3805,6 +3805,13 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- atleast_once
- atmost_once
- effectively_once
- manual
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functionmeshes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,13 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- atleast_once
- atmost_once
- effectively_once
- manual
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3783,6 +3783,13 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- atleast_once
- atmost_once
- effectively_once
- manual
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
4 changes: 4 additions & 0 deletions pkg/webhook/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ func validateWindowConfigs(windowConfig *v1alpha1.WindowConfig) *field.Error {
"Watermark interval must be positive")
}
}
if windowConfig.ProcessingGuarantee == v1alpha1.Manual || windowConfig.ProcessingGuarantee == v1alpha1.EffectivelyOnce {
return field.Invalid(field.NewPath("spec").Child("windowConfig"), windowConfig.ProcessingGuarantee,
"Window function only supports atleast_once and atmost_once")
}
}
return nil
}
Expand Down

0 comments on commit e6f829f

Please sign in to comment.