Skip to content

Commit

Permalink
[close tikv#381] Add integration tests for Kafka sink (tikv#382)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Ping Yu <[email protected]>

* add go:generate

Signed-off-by: Ping Yu <[email protected]>

* add tools/Makefile

Signed-off-by: Ping Yu <[email protected]>

* fix build error

Signed-off-by: Ping Yu <[email protected]>

* built

Signed-off-by: Ping Yu <[email protected]>

* fix

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* clean up

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* add ut

Signed-off-by: Ping Yu <[email protected]>

* fix check

Signed-off-by: Ping Yu <[email protected]>

* first works

Signed-off-by: Ping Yu <[email protected]>

* kafka sink in run.sh

Signed-off-by: Ping Yu <[email protected]>

* fix failpoint

Signed-off-by: Ping Yu <[email protected]>

* fix fmt

Signed-off-by: Ping Yu <[email protected]>

* fix sigstop

Signed-off-by: Ping Yu <[email protected]>

* fix check

Signed-off-by: Ping Yu <[email protected]>

* skip not support case

Signed-off-by: Ping Yu <[email protected]>

* default to kafka

Signed-off-by: Ping Yu <[email protected]>

* update readme.md in tests

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Jan 8, 2024
1 parent 99b7b74 commit aeae68f
Show file tree
Hide file tree
Showing 47 changed files with 965 additions and 43 deletions.
14 changes: 10 additions & 4 deletions cdc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ LDFLAGS += -X "$(CDC_PKG)/pkg/version.GoVersion=$(GOVERSION)"

SCVERSION := stable

TEST_ON_BRANCH ?= release-6.5

include tools/Makefile

default: build buildsucc
Expand Down Expand Up @@ -208,7 +210,7 @@ rawkv_data:
cdc_state_checker:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_state_checker ./tests/utils/cdc_state_checker/

integration_test_build: check_failpoint_ctl rawkv_data cdc_state_checker
integration_test_build: check_failpoint_ctl rawkv_data cdc_state_checker kafka_consumer
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/tikv/migration/... \
Expand All @@ -219,8 +221,12 @@ integration_test_build: check_failpoint_ctl rawkv_data cdc_state_checker
$(FAILPOINT_DISABLE)

integration_test: prepare_test_binaries check_third_party_binary integration_test_build
tests/integration_tests/run.sh "$(CASE)"
@bash <(curl -s https://codecov.io/bash) -F cdc -s /tmp/tikv_cdc_test -f *.out -t $(TIKV_MIGRATION_CODECOV_TOKEN)
tests/integration_tests/run.sh tikv "$(CASE)"
@bash <(curl -s https://codecov.io/bash) -F cdc -s /tmp/tikv_cdc_test -f *.out -t "$(TIKV_MIGRATION_CODECOV_TOKEN)"

integration_test_kafka: prepare_test_binaries check_third_party_binary integration_test_build
tests/integration_tests/run.sh kafka "$(CASE)"
@bash <(curl -s https://codecov.io/bash) -F cdc -s /tmp/tikv_cdc_test -f *.out -t "$(TIKV_MIGRATION_CODECOV_TOKEN)"

# To make try on integration tests easier.
# Comment out the succeed cases and retry from the failed one.
Expand All @@ -242,7 +248,7 @@ integration_test_by_group: prepare_test_binaries check_third_party_binary integr
tests/integration_tests/run_group.sh others

prepare_test_binaries:
cd scripts && ./download-integration-test-binaries.sh master && cd ..
cd scripts && ./download-integration-test-binaries.sh "$(TEST_ON_BRANCH)" && cd ..
touch prepare_test_binaries

check_third_party_binary:
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,11 @@ func (b *JSONEventBatchDecoder) NextChangedEvent() (*model.RawKVEntry, error) {
valueLen := binary.BigEndian.Uint64(b.valueBytes[:8])
value := b.valueBytes[8 : valueLen+8]
b.valueBytes = b.valueBytes[valueLen+8:]
mvMsg := new(mqMessageValue)
if err := mvMsg.Decode(value); err != nil {
kvMsg := new(mqMessageValue)
if err := kvMsg.Decode(value); err != nil {
return nil, errors.Trace(err)
}
kvEvent := mqMessageToKvEvent(b.nextKey, mvMsg)
kvEvent := mqMessageToKvEvent(b.nextKey, kvMsg)
b.nextKey = nil
return kvEvent, nil
}
Expand Down
6 changes: 6 additions & 0 deletions cdc/cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/sink/codec"
Expand Down Expand Up @@ -243,6 +244,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
return 0, nil
}

failpoint.Inject("SinkFlushEventPanic", func() {
time.Sleep(time.Second)
log.Fatal("kafka sink injected error")
})

for _, msg := range messages {
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
Expand Down
Loading

0 comments on commit aeae68f

Please sign in to comment.