Skip to content

Commit

Permalink
Add verification to verify there is no duplicated keys in the bucket …
Browse files Browse the repository at this point in the history
…buffer

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 19, 2024
1 parent e3c009e commit bc647ae
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 26 deletions.
17 changes: 17 additions & 0 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ import (

type BucketID int

const (
BucketIdKey BucketID = 1
BucketIdMeta BucketID = 2
BucketIdLease BucketID = 3
BucketIdAlarm BucketID = 4
BucketIdCluster BucketID = 5

BucketIdMembers BucketID = 10
BucketIdMembersRemoved BucketID = 11

BucketIdAuth BucketID = 20
BucketIdAuthUsers BucketID = 21
BucketIdAuthRoles BucketID = 22

BucketIdTest BucketID = 100
)

type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
Expand Down
64 changes: 49 additions & 15 deletions server/storage/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package backend

import (
"bytes"
"fmt"
"sort"

"go.etcd.io/etcd/client/pkg/v3/verify"
Expand Down Expand Up @@ -79,17 +80,47 @@ func (txw *txWriteBuffer) reset() {

func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
for k, wb := range txw.buckets {
rb, ok := txr.buckets[k]
if !ok {
rb, ok1 := txr.buckets[k]
if !ok1 {
delete(txw.buckets, k)
txr.buckets[k] = wb
continue
rb = wb
} else {
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)
}
rb.merge(wb)
}
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)
// Only verify the Key bucket. Reasons:
// 1. The keys in the Key bucket are monotonically increasing
// revisions, so there will never have duplicated keys. So no
// need to perform the operation of removing duplicated keys
// from the Key bucket. The Key bucket is the most performance
// sensitive bucket, so it can also increase the performance
// (Need to run benchmark the double confirm this).
// 2. Currently, Meta bucket is the only case which might have
// duplicated keys. In case we add other buckets in the future,
// which may break the invariant property. Other buckets are
// also not performance sensitive, so we just keep them as they
// are for simplicity.
//
if k == BucketIdKey {
verifyNoDuplicatedKeys(rb, ok1)
} else {
if rb.used <= 1 {
continue
}
sort.Stable(rb)
widx := 0
for ridx := 1; ridx < rb.used; ridx++ {
if !bytes.Equal(rb.buf[ridx].key, rb.buf[widx].key) {
widx++
}
rb.buf[widx] = rb.buf[ridx]
}
rb.used = widx + 1
}
rb.merge(wb)
}
txw.reset()
// increase the buffer version
Expand Down Expand Up @@ -205,16 +236,19 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
}

sort.Stable(bb)
}

// remove duplicates, using only newest update
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
widx++
func verifyNoDuplicatedKeys(bb *bucketBuffer, merged bool) {
verify.Verify(func() {
keyMaps := make(map[string]struct{})
for i := 0; i < bb.used; i++ {
data := bb.buf[i]
if _, ok := keyMaps[string(data.key)]; ok {
panic(fmt.Sprintf("found duplicated keys in the bucketBuffer, bb.used: %d, merged: %t, key: %s", bb.used, merged, string(data.key)))
}
keyMaps[string(data.key)] = struct{}{}
}
bb.buf[widx] = bb.buf[ridx]
}
bb.used = widx + 1
})
}

func (bb *bucketBuffer) Len() int { return bb.used }
Expand Down
22 changes: 11 additions & 11 deletions server/storage/schema/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ var (
)

var (
Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
Key = backend.Bucket(bucket{id: backend.BucketIdKey, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: backend.BucketIdMeta, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: backend.BucketIdLease, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: backend.BucketIdAlarm, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: backend.BucketIdCluster, name: clusterBucketName, safeRangeBucket: false})

Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
Members = backend.Bucket(bucket{id: backend.BucketIdMembers, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: backend.BucketIdMembersRemoved, name: membersRemovedBucketName, safeRangeBucket: false})

Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
Auth = backend.Bucket(bucket{id: backend.BucketIdAuth, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: backend.BucketIdAuthUsers, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: backend.BucketIdAuthRoles, name: authRolesBucketName, safeRangeBucket: false})

Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
Test = backend.Bucket(bucket{id: backend.BucketIdTest, name: testBucketName, safeRangeBucket: false})
)

type bucket struct {
Expand Down

0 comments on commit bc647ae

Please sign in to comment.