Skip to content

Commit

Permalink
wait for the ingestion processor to start consuming from Kafka
Browse files Browse the repository at this point in the history
Issue: ZENKO-4286
  • Loading branch information
Kerkesni committed Jan 27, 2025
1 parent 4e55e03 commit 0da65a3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
32 changes: 32 additions & 0 deletions .github/scripts/end2end/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,35 @@ wait_for_all_pods_behind_services() {
done
done
}

# wait for consumer group to be in a stable state (no rebance + at least one consumer connected)
wait_for_consumer_group() {
# Getting the name of the first kafka pod
kafka_pod=$(sudo kubectl --kubeconfig=/etc/kubernetes/admin.conf -n zenko get pods -l brokerId=0,kafka_cr=end2end-base-queue,app=kafka -o jsonpath='{.items[0].metadata.name}')
consumer_group=$1
timeout_s=$2
interval_s=${3:-5}
sudo kubectl --kubeconfig=/etc/kubernetes/admin.conf -n zenko exec -it $kafka_pod -- bash -c '
export KAFKA_OPTS=
consumer_group=$1
timeout_s=$2
start_time=$(date +%s)
while true; do
# The state becomes "Stable" when no rebalance is happening and at least one consumer is connected
state=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $consumer_group --state | awk '"'"'NF>1 && $(NF-1) != "STATE" {print (NF>1?$(NF-1):"None")} {next}'"'"')
if [ "$state" == "Stable" ]; then
echo "Consumer group $consumer_group is now consuming."
exit 0
fi
# Check if we have reached the timeout
current_time=$(date +%s)
elapsed_time=$((current_time - start_time))
if [ "$elapsed_time" -ge "$timeout_s" ]; then
echo "Error: Timed out waiting for consumer group $consumer_group to start consuming."
exit 1
fi
# Sleep for 1 second before checking again
sleep 1
done
' -- "$consumer_group" "$timeout_s" "$interval_s"
}
3 changes: 3 additions & 0 deletions .github/scripts/end2end/configure-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,6 @@ sleep 10

kubectl wait --for condition=DeploymentFailure=false --timeout 25m -n ${NAMESPACE} zenko/${ZENKO_NAME}
kubectl wait --for condition=DeploymentInProgress=false --timeout 25m -n ${NAMESPACE} zenko/${ZENKO_NAME}

# wait for ingestion processor to start consuming from Kafka
wait_for_consumer_group $UUID.backbeat-ingestion-group 300

0 comments on commit 0da65a3

Please sign in to comment.