diff --git a/tests/integration_tests/debezium/src/test_cases.go b/tests/integration_tests/debezium/src/test_cases.go index 7a309e0d775..bdccd475b65 100644 --- a/tests/integration_tests/debezium/src/test_cases.go +++ b/tests/integration_tests/debezium/src/test_cases.go @@ -160,6 +160,10 @@ func fetchNextCDCRecord(reader *kafka.Reader, kind Kind, timeout time.Duration) return nil, fmt.Errorf("Failed to read CDC record of %s: %w", kind, err) } + if err = reader.CommitMessages(ctx, m); err != nil { + return nil, fmt.Errorf("Failed to commit CDC record of %s: %w", kind, err) + } + if len(m.Value) == 0 { continue } @@ -209,7 +213,6 @@ func fetchAllCDCRecords(reader *kafka.Reader, kind Kind) []map[string]any { obj, err := fetchNextCDCRecord(reader, kind, waitTimeout) if err != nil { - fmt.Println("Received error", err) logger.Error( "Received error when fetching CDC record", zap.Error(err), @@ -218,7 +221,7 @@ func fetchAllCDCRecords(reader *kafka.Reader, kind Kind) []map[string]any { } if obj == nil { // No more records - fmt.Println("No more records") + fmt.Println("No more records", kind, obj, err) break }