diff --git a/server/storage/backend/tx_buffer.go b/server/storage/backend/tx_buffer.go index 7c2f9d63ac4a..f963351cf1e9 100644 --- a/server/storage/backend/tx_buffer.go +++ b/server/storage/backend/tx_buffer.go @@ -16,9 +16,11 @@ package backend import ( "bytes" + "fmt" "sort" "go.etcd.io/etcd/client/pkg/v3/verify" + "go.etcd.io/etcd/server/v3/storage/schema" ) const bucketBufferInitialSize = 512 @@ -83,13 +85,38 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { if !ok { delete(txw.buckets, k) txr.buckets[k] = wb - continue + rb = wb + } else { + if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { + // assume no duplicate keys + sort.Sort(wb) + } + rb.merge(wb) } - if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { - // assume no duplicate keys - sort.Sort(wb) + if k == schema.Meta.ID() { + // TODO: remove the duplicated items + // Meta bucket is the only case which may have duplicated keys. + } else { + verifyNoDuplicatedKeys(rb) } - rb.merge(wb) + // Alternative solution: only verify the Key bucket. Reasons: + // 1. The keys in the Key bucket are monotonically increasing + // revisions, so there will never have duplicated keys. So no + // need to perform the operation of removing duplicated keys + // from the Key bucket. The Key bucket is the most performance + // sensitive bucket, so it can also increase the performance + // (Need to run benchmark the double confirm this). + // 2. In case we add other buckets in the future, which may break + // the invariant property. Other buckets are not performance + // sensitive, so we just keep them as they are for simplicity. + // + /* + if k == schema.Key.ID() { + verifyNoDuplicatedKeys(rb) + } else { + // TODO: remove the duplicated items + } + */ } txw.reset() // increase the buffer version @@ -205,16 +232,19 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { } sort.Stable(bb) +} - // remove duplicates, using only newest update - widx := 0 - for ridx := 1; ridx < bb.used; ridx++ { - if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { - widx++ +func verifyNoDuplicatedKeys(bb *bucketBuffer) { + verify.Verify(func() { + keyMaps := make(map[string]struct{}) + for i := 0; i < bb.used; i++ { + data := bb.buf[i] + if _, ok := keyMaps[string(data.key)]; ok { + panic(fmt.Sprintf("found duplicated keys in the bucketBuffer: %s", string(data.key))) + } + keyMaps[string(data.key)] = struct{}{} } - bb.buf[widx] = bb.buf[ridx] - } - bb.used = widx + 1 + }) } func (bb *bucketBuffer) Len() int { return bb.used }