Skip to content

Commit

Permalink
kvclient(cdc): handle out-of-order prewrite events for pipelined DMLs (
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Sep 11, 2024
1 parent 0848c61 commit 434eda4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
30 changes: 20 additions & 10 deletions cdc/kv/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,23 @@ func newMatcher() *matcher {

func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) {
key := newMatchKey(row)
// tikv may send a fake prewrite event with empty value caused by txn heartbeat.
// here we need to avoid the fake prewrite event overwrite the prewrite value.
if old, exist := m.unmatchedValue[key]; exist {
// tikv may send a fake prewrite event with empty value caused by txn heartbeat.
// here we need to avoid the fake prewrite event overwrite the prewrite value.

// when the old-value is disabled, the value of the fake prewrite event is empty.
// when the old-value is enabled, the value of the fake prewrite event is also empty,
// but the old value of the fake prewrite event is not empty.
// We can distinguish fake prewrite events by whether the value is empty,
// no matter the old-value is enabled or disabled
if _, exist := m.unmatchedValue[key]; exist && len(row.GetValue()) == 0 {
return
// when the old-value is disabled, the value of the fake prewrite event is empty.
// when the old-value is enabled, the value of the fake prewrite event is also empty,
// but the old value of the fake prewrite event is not empty.
// We can distinguish fake prewrite events by whether the value is empty,
// no matter the old-value is enabled or disabled
if len(row.GetValue()) == 0 {
return
}

// For pipelined-DML transactions, the row with latest Generation will be kept.
if row.Generation < old.Generation {
return
}
}
m.unmatchedValue[key] = row
}
Expand All @@ -62,12 +69,15 @@ func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) {
func (m *matcher) matchRow(row *cdcpb.Event_Row, initialized bool) bool {
if value, exist := m.unmatchedValue[newMatchKey(row)]; exist {
// TiKV may send a fake prewrite event with empty value caused by txn heartbeat.
//
// We need to skip match if the region is not initialized,
// as prewrite events may be sent out of order.
if !initialized && len(value.GetValue()) == 0 {
return false
}
// Pipelined-DML transactions can only be matched after initialized.
if !initialized && value.Generation > 0 {
return false
}
row.Value = value.GetValue()
row.OldValue = value.GetOldValue()
delete(m.unmatchedValue, newMatchKey(row))
Expand Down
37 changes: 37 additions & 0 deletions cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,40 @@ func TestMatchMatchCachedRollbackRow(t *testing.T) {
matcher.matchCachedRollbackRow(true)
require.Empty(t, matcher.unmatchedValue)
}

func TestMatchPipelinedDMLs(t *testing.T) {
t.Parallel()
matcher := newMatcher()

matcher.putPrewriteRow(&cdcpb.Event_Row{
Generation: 2,
StartTs: 1,
Key: []byte("k"),
Value: []byte("v2"),
OldValue: []byte("ov2"),
})
row := &cdcpb.Event_Row{StartTs: 1, CommitTs: 3, Key: []byte("k")}
matched := matcher.matchRow(row, false)
require.False(t, matched, "prewrites with generation shouldn't be matched before initialized")

matcher.cacheCommitRow(row)

matcher.putPrewriteRow(&cdcpb.Event_Row{
Generation: 4,
StartTs: 1,
Key: []byte("k"),
Value: []byte("v4"),
OldValue: []byte("ov4"),
})
matcher.putPrewriteRow(&cdcpb.Event_Row{
Generation: 3,
StartTs: 1,
Key: []byte("k"),
Value: []byte("v3"),
OldValue: []byte("ov3"),
})

rows := matcher.matchCachedRow(true)
require.Equal(t, 1, len(rows))
require.Equal(t, rows[0].Value, []byte("v4"))
}

0 comments on commit 434eda4

Please sign in to comment.