Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 committed Sep 23, 2024
1 parent a812081 commit b578156
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions tests/integration_tests/debezium/src/test_cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func fetchNextCDCRecord(reader *kafka.Reader, kind Kind, timeout time.Duration)
return nil, fmt.Errorf("Failed to read CDC record of %s: %w", kind, err)
}

if err = reader.CommitMessages(ctx, m); err != nil {
return nil, fmt.Errorf("Failed to commit CDC record of %s: %w", kind, err)
}

if len(m.Value) == 0 {
continue
}
Expand Down Expand Up @@ -209,7 +213,6 @@ func fetchAllCDCRecords(reader *kafka.Reader, kind Kind) []map[string]any {

obj, err := fetchNextCDCRecord(reader, kind, waitTimeout)
if err != nil {
fmt.Println("Received error", err)
logger.Error(
"Received error when fetching CDC record",
zap.Error(err),
Expand All @@ -218,7 +221,7 @@ func fetchAllCDCRecords(reader *kafka.Reader, kind Kind) []map[string]any {
}
if obj == nil {
// No more records
fmt.Println("No more records")
fmt.Println("No more records", kind, obj, err)
break
}

Expand Down

0 comments on commit b578156

Please sign in to comment.