diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 0d63f558449b..527088d4b04e 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -28,6 +28,23 @@ import ( type BucketID int +const ( + BucketIdKey BucketID = 1 + BucketIdMeta BucketID = 2 + BucketIdLease BucketID = 3 + BucketIdAlarm BucketID = 4 + BucketIdCluster BucketID = 5 + + BucketIdMembers BucketID = 10 + BucketIdMembersRemoved BucketID = 11 + + BucketIdAuth BucketID = 20 + BucketIdAuthUsers BucketID = 21 + BucketIdAuthRoles BucketID = 22 + + BucketIdTest BucketID = 100 +) + type Bucket interface { // ID returns a unique identifier of a bucket. // The id must NOT be persisted and can be used as lightweight identificator diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index 7c2f9d63ac4a..faf5015ec929 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -16,6 +16,7 @@ package backend import ( "bytes" + "fmt" "sort" "go.etcd.io/etcd/client/pkg/v3/verify" @@ -83,13 +84,43 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { if !ok { delete(txw.buckets, k) txr.buckets[k] = wb - continue + rb = wb + } else { + if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { + // assume no duplicate keys + sort.Sort(wb) + } + rb.merge(wb) } - if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { - // assume no duplicate keys - sort.Sort(wb) + // Only verify the Key bucket. Reasons: + // 1. The keys in the Key bucket are monotonically increasing + // revisions, so there will never have duplicated keys. So no + // need to perform the operation of removing duplicated keys + // from the Key bucket. The Key bucket is the most performance + // sensitive bucket, so it can also increase the performance + // (Need to run benchmark the double confirm this). + // 2. Currently, Meta bucket is the only case which might have + // duplicated keys. In case we add other buckets in the future, + // which may break the invariant property. Other buckets are + // also not performance sensitive, so we just keep them as they + // are for simplicity. + // + if k == BucketIdKey { + verifyNoDuplicatedKeys(rb) + } else { + if rb.used <= 1 { + continue + } + sort.Stable(rb) + widx := 0 + for ridx := 1; ridx < rb.used; ridx++ { + if !bytes.Equal(rb.buf[ridx].key, rb.buf[widx].key) { + widx++ + } + rb.buf[widx] = rb.buf[ridx] + } + rb.used = widx + 1 } - rb.merge(wb) } txw.reset() // increase the buffer version @@ -205,16 +236,19 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { } sort.Stable(bb) +} - // remove duplicates, using only newest update - widx := 0 - for ridx := 1; ridx < bb.used; ridx++ { - if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { - widx++ +func verifyNoDuplicatedKeys(bb *bucketBuffer) { + verify.Verify(func() { + keyMaps := make(map[string]struct{}) + for i := 0; i < bb.used; i++ { + data := bb.buf[i] + if _, ok := keyMaps[string(data.key)]; ok { + panic(fmt.Sprintf("found duplicated keys in the bucketBuffer: %s", string(data.key))) + } + keyMaps[string(data.key)] = struct{}{} } - bb.buf[widx] = bb.buf[ridx] - } - bb.used = widx + 1 + }) } func (bb *bucketBuffer) Len() int { return bb.used } diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index 5472af3c3b47..5c8d4712aa6d 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -40,20 +40,20 @@ var ( ) var ( - Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true}) - Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false}) - Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false}) - Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false}) - Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false}) + Key = backend.Bucket(bucket{id: backend.BucketIdKey, name: keyBucketName, safeRangeBucket: true}) + Meta = backend.Bucket(bucket{id: backend.BucketIdMeta, name: metaBucketName, safeRangeBucket: false}) + Lease = backend.Bucket(bucket{id: backend.BucketIdLease, name: leaseBucketName, safeRangeBucket: false}) + Alarm = backend.Bucket(bucket{id: backend.BucketIdAlarm, name: alarmBucketName, safeRangeBucket: false}) + Cluster = backend.Bucket(bucket{id: backend.BucketIdCluster, name: clusterBucketName, safeRangeBucket: false}) - Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false}) - MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false}) + Members = backend.Bucket(bucket{id: backend.BucketIdMembers, name: membersBucketName, safeRangeBucket: false}) + MembersRemoved = backend.Bucket(bucket{id: backend.BucketIdMembersRemoved, name: membersRemovedBucketName, safeRangeBucket: false}) - Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false}) - AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false}) - AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false}) + Auth = backend.Bucket(bucket{id: backend.BucketIdAuth, name: authBucketName, safeRangeBucket: false}) + AuthUsers = backend.Bucket(bucket{id: backend.BucketIdAuthUsers, name: authUsersBucketName, safeRangeBucket: false}) + AuthRoles = backend.Bucket(bucket{id: backend.BucketIdAuthRoles, name: authRolesBucketName, safeRangeBucket: false}) - Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}) + Test = backend.Bucket(bucket{id: backend.BucketIdTest, name: testBucketName, safeRangeBucket: false}) ) type bucket struct {