Skip to content

Commit

Permalink
fix sigstop
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Jan 3, 2024
1 parent 6362da4 commit 4d33975
Show file tree
Hide file tree
Showing 28 changed files with 107 additions and 28 deletions.
56 changes: 56 additions & 0 deletions cdc/tests/integration_tests/_utils/run_kafka_consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/bash
# Usage:
# run_kafka_consumer --workdir=... --upstream-uri=... --downstream-uri=... --log-suffix=...

set -e

pwd=$(pwd)

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source "$CUR"/../_utils/test_prepare

workdir=
upstream_uri=
downstream_uri="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}"
log_suffix=

while [[ $# -gt 0 ]]; do
key="$1"

case $key in
--workdir)
workdir=$2
shift
shift
;;
--upstream-uri)
upstream_uri=$2
shift
shift
;;
--downstream-uri)
downstream_uri=$2
shift
shift
;;
--log-suffix)
log_suffix=$2
shift
shift
;;
*)
echo "run_kafka_consumer: Unknown argument: $key"
exit 1
;;
esac
done

echo "[$(date)] <<<<<< START kafka consumer in $TEST_NAME case >>>>>>"
cd "$workdir"
cdc_kafka_consumer \
--log-file "$workdir/cdc_kafka_consumer$log_suffix.log" \
--log-level info \
--upstream-uri "$upstream_uri" \
--downstream-uri "$downstream_uri" \
>> "$workdir/cdc_kafka_consumer_stdout$log_suffix.log" 2>&1 &
cd "$pwd"
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function run() {

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/availability/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function prepare() {
--start-ts=$start_ts --sink-uri="$SINK_URI" \
--disable-version-check
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function run() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $UP_PD
changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

# wait task is dispatched
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function run() {

changefeedid=$(tikv-cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

# make sure the first capture does job first.
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ function run() {
changefeedid="changefeed-error"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

ensure $MAX_RETRIES check_changefeed_mark_failed_regex $UP_PD ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*"
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_fast_fail/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function run() {
changefeedid="changefeed-fast-fail"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "ErrGCTTLExceeded"
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function run() {
target_ts=$(($start_ts + 90 * 10 ** 3 * 2 ** 18))
changefeed_id=$(tikv-cdc cli changefeed create --sink-uri="$SINK_URI" --start-ts=$start_ts --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

for i in $(seq 1 10); do
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_reconstruct/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ function run() {
uuid="custom-changefeed-name"
run_cdc_cli changefeed create --start-ts=$start_ts --sort-engine=memory --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

# Make sure changefeed is created.
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/disk_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ EOF

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id=$CF_ID
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/flow_control/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ EOF

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

# Wait until cdc pulls the data from tikv and store it in soter
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster | grep -oE "id\":\s[0-9]+" | grep -oE "[0-9]+")
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/http_api/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ function run() {

python3 $CUR/util/test_case.py create_changefeed $TLS_DIR "$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi
# wait for changefeed created
sleep 2
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/kill_owner/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

export GO_FAILPOINTS='github.com/tikv/migration/cdc/cdc/capture/ownerFlushIntervalInject=return(10)'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/kv_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function run() {
--changefeed-id="$uuid" \
--config $CUR/conf/changefeed.toml
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/multi_capture/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function run() {
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --format="raw" --start-key="$SplitKey1" --end-key="$SplitKey2"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --format="raw" --start-key="$SplitKey2" --end-key="$End_Key"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

check_sync_diff $WORK_DIR $UP_PD $DOWN_PD
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/processor_err_chan/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

retry_time=10
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/processor_panic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function run() {
# Make sure the task is assigned to the first cdc
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix 2 --addr 127.0.0.1:8601
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi
ensure 10 "tikv-cdc cli processor list|jq '.|length'|grep -E '^1$'"

Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/processor_stop_delay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
23 changes: 23 additions & 0 deletions cdc/tests/integration_tests/run_kafka_in_docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
# Usage:
# ./tests/integration_tests/run_kafka_in_docker.sh --case [test_name]

set -euo pipefail

CASE="*"

while [[ $# -gt 0 ]]; do
key="$1"

case $key in
--case)
CASE=$2
shift
shift
;;
esac
done

COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 \
CASE="$CASE" \
docker-compose -f ./deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml up --build
4 changes: 2 additions & 2 deletions cdc/tests/integration_tests/sigstop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function run_kill_upstream() {

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 10000 &
Expand Down Expand Up @@ -87,7 +87,7 @@ function run_kill_downstream() {

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --pd $DOWN_PD
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI" --downstream-uri "tikv://${UP_PD_HOST_1}:${UP_PD_PORT_1}"
fi

rawkv_op $DOWN_PD put 10000 &
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/sink_hang/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
4 changes: 2 additions & 2 deletions cdc/tests/integration_tests/sorter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
esac
run_cdc_cli changefeed create -c $CF_NAME --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="unified"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

check_sync_diff $WORK_DIR $UP_PD $DOWN_PD
Expand Down Expand Up @@ -60,7 +60,7 @@ function run() {

run_cdc_cli changefeed create -c $CF_NAME --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="memory"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

check_sync_diff $WORK_DIR $UP_PD $DOWN_PD
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/stop_downstream/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function run() {

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id=$CF_ID
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down

0 comments on commit 4d33975

Please sign in to comment.