Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove-Get concurrency fix #27

Draft
wants to merge 8 commits into
base: rc/v1.7.next1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"errors"

"github.com/multiversx/mx-chain-core-go/core"
)

Expand Down
5 changes: 5 additions & 0 deletions fifocache/fifocacheSharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func (c *FIFOShardedCache) MaxSize() int {
return c.maxsize
}

// GetRemovalStatus will return the unknown status because this implementation does not track removed keys
func (c *FIFOShardedCache) GetRemovalStatus(_ []byte) types.RemovalStatus {
return types.UnknownRemovalStatus
}

// Close does nothing for this cacher implementation
func (c *FIFOShardedCache) Close() error {
return nil
Expand Down
21 changes: 20 additions & 1 deletion fifocache/fifocacheSharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/multiversx/mx-chain-storage-go/fifocache"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -374,6 +375,22 @@ func TestFIFOShardedCache_CacherRegisterHasOrAddAddedDataHandlerNotAddedShouldNo
assert.Equal(t, 1, len(c.AddedDataHandlers()))
}

func TestFifoShardedCache_GetRemovalStatus(t *testing.T) {
t.Parallel()

key := []byte("key")
value := []byte("value")

cache, _ := fifocache.NewShardedCache(100, 2)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

_ = cache.Put(key, value, 0)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

cache.Remove(key)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))
}

func TestFifoShardedCache_ConcurrentOperation(t *testing.T) {
fifoCacher, _ := fifocache.NewShardedCache(10000, 3)

Expand All @@ -382,7 +399,7 @@ func TestFifoShardedCache_ConcurrentOperation(t *testing.T) {
wg.Add(numOperations)
for i := 0; i < numOperations; i++ {
go func(idx int) {
switch idx % 16 {
switch idx % 17 {
case 0:
fifoCacher.AddedDataHandlers()
case 1:
Expand Down Expand Up @@ -415,6 +432,8 @@ func TestFifoShardedCache_ConcurrentOperation(t *testing.T) {
fifoCacher.SizeInBytesContained()
case 15:
fifoCacher.UnRegisterHandler("id")
case 16:
fifoCacher.GetRemovalStatus(nil)
}

wg.Done()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.20
require (
github.com/hashicorp/golang-lru v0.6.0
github.com/multiversx/concurrent-map v0.1.4
github.com/multiversx/mx-chain-core-go v1.2.16
github.com/multiversx/mx-chain-logger-go v1.0.13
github.com/multiversx/mx-chain-core-go v1.2.17-0.20231025072115-f3530bc1cab1
github.com/multiversx/mx-chain-logger-go v1.0.14-0.20231025072404-d54164fbae96
github.com/stretchr/testify v1.7.2
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d
)
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUYwbO0993uPI=
github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o=
github.com/multiversx/mx-chain-core-go v1.2.16 h1:m0hUNmZQjGJxKDLQOHoM9jSaeDfVTbyd+mqiS8+NckE=
github.com/multiversx/mx-chain-core-go v1.2.16/go.mod h1:BILOGHUOIG5dNNX8cgkzCNfDaVtoYrJRYcPnpxRMH84=
github.com/multiversx/mx-chain-logger-go v1.0.13 h1:eru/TETo0MkO4ZTnXsQDKf4PBRpAXmqjT02klNT/JnY=
github.com/multiversx/mx-chain-logger-go v1.0.13/go.mod h1:MZJhTAtZTJxT+yK2EHc4ZW3YOHUc1UdjCD0iahRNBZk=
github.com/multiversx/mx-chain-core-go v1.2.17-0.20231025072115-f3530bc1cab1 h1:JyPl8MWDT2eECtNraEtDLKBCEgXgYxJY5vYjTDAJNms=
github.com/multiversx/mx-chain-core-go v1.2.17-0.20231025072115-f3530bc1cab1/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE=
github.com/multiversx/mx-chain-logger-go v1.0.14-0.20231025072404-d54164fbae96 h1:6rG9/MtlWapWkN30HfXPzX2oJc1CjLnbW1NGne2pwLo=
github.com/multiversx/mx-chain-logger-go v1.0.14-0.20231025072404-d54164fbae96/go.mod h1:996BTzWMAf0RxqE+Sqv7kB4OEnV9J6pJQ/ShKmlHxQM=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
5 changes: 5 additions & 0 deletions immunitycache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ func (ic *ImmunityCache) Diagnose(_ bool) {
)
}

// GetRemovalStatus will return the unknown status because this implementation does not track removed keys
func (ic *ImmunityCache) GetRemovalStatus(_ []byte) types.RemovalStatus {
return types.UnknownRemovalStatus
}

// Close does nothing for this cacher implementation
func (ic *ImmunityCache) Close() error {
return nil
Expand Down
17 changes: 17 additions & 0 deletions immunitycache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -336,6 +337,22 @@ func TestImmunityCache_ForgetCapacityHadBeenReachedInThePast(t *testing.T) {
require.Equal(t, uint64(0), cache.numCapacityReachedOccurrences.GetUint64())
}

func TestImmunityCache_GetRemovalStatus(t *testing.T) {
t.Parallel()

key := []byte("key")
value := []byte("value")

cache := newCacheToTest(1, 4, 1000)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

_ = cache.Put(key, value, 0)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

cache.Remove(key)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))
}

func newCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *ImmunityCache {
cache, err := NewImmunityCache(CacheConfig{
Name: "test",
Expand Down
28 changes: 19 additions & 9 deletions leveldb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import (
var _ types.Batcher = (*batch)(nil)

type batch struct {
batch *leveldb.Batch
cachedData map[string][]byte
removedData map[string]struct{}
mutBatch sync.RWMutex
batch *leveldb.Batch
cachedData map[string][]byte
removedData map[string]struct{}
previouslyRemoved map[string]struct{}
mutBatch sync.RWMutex
}

// NewBatch creates a batch
func NewBatch() *batch {
func NewBatch(previouslyRemoved map[string]struct{}) *batch {
return &batch{
batch: &leveldb.Batch{},
cachedData: make(map[string][]byte),
removedData: make(map[string]struct{}),
mutBatch: sync.RWMutex{},
batch: &leveldb.Batch{},
cachedData: make(map[string][]byte),
removedData: make(map[string]struct{}),
previouslyRemoved: previouslyRemoved,
mutBatch: sync.RWMutex{},
}
}

Expand Down Expand Up @@ -69,6 +71,14 @@ func (b *batch) IsRemoved(key []byte) bool {
defer b.mutBatch.RUnlock()

_, found := b.removedData[string(key)]
if found {
return true
}
_, found = b.cachedData[string(key)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a situation when both, cachedData[key} and previouslyRemoved[key], return true?

Copy link
Contributor Author

@iulianpascalau iulianpascalau Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The situation is the following:
the key existed and a call to Delete was made. The batch was written, and the data was deleted from the persister
and the new batch was created, the previously removed data was passed to the new batch. Then, immediately we call Put with the same key. Now, both caches contain the same key.
All is fine because in this case, we want the IsRemoved function to return false. Added a test with this exact scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if found {
return false
}
_, found = b.previouslyRemoved[string(key)]

return found
}
Expand Down
6 changes: 6 additions & 0 deletions leveldb/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package leveldb

// PutBatch will call the unexported putBatch function
func (s *SerialDB) PutBatch() {
_ = s.putBatch()
}
2 changes: 1 addition & 1 deletion leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *DB) Has(key []byte) error {

// CreateBatch returns a batcher to be used for batch writing data to the database
func (s *DB) createBatch() types.Batcher {
return NewBatch()
return NewBatch(make(map[string]struct{}))
}

// putBatch writes the Batch data into the database
Expand Down
4 changes: 2 additions & 2 deletions leveldb/leveldbSerial.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi
closer: closing.NewSafeChanCloser(),
}

dbStore.batch = NewBatch()
dbStore.batch = NewBatch(make(map[string]struct{}))

go dbStore.batchTimeoutHandle(ctx)
go dbStore.processLoop(ctx)
Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *SerialDB) putBatch() error {
return common.ErrInvalidBatch
}
s.sizeBatch = 0
s.batch = NewBatch()
s.batch = NewBatch(dbBatch.removedData)
s.mutBatch.Unlock()

ch := make(chan error)
Expand Down
85 changes: 85 additions & 0 deletions leveldb/leveldbSerial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -358,3 +359,87 @@ func TestSerialDB_ConcurrentOperations(t *testing.T) {

wg.Wait()
}

func TestSerialDB_PutRemoveGet(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

ldb := createSerialLevelDb(t, 100000, 1000000, 10)

numKeys := 10000
for i := 0; i < numKeys; i++ {
_ = ldb.Put([]byte(fmt.Sprintf("key %d", i)), []byte("val"))
}

time.Sleep(time.Second * 2)

numErr := uint32(0)

for i := 0; i < numKeys; i++ {
key := []byte(fmt.Sprintf("key %d", i))

recoveredVal, _ := ldb.Get(key)
assert.NotEmpty(t, recoveredVal)

wg := &sync.WaitGroup{}
wg.Add(2)

// emulate the following scenario:
// the sequence Remove(key) -> Get(key) is done while the putBatch is called. So the actual edgecase is
// go routine 1: Remove(key) -----------------> Get(key)
// go routine 2: putBatch()

go func() {
time.Sleep(time.Millisecond * 1)
ldb.PutBatch()
wg.Done()
}()
go func() {
_ = ldb.Remove(key)

time.Sleep(time.Millisecond * 1)

recoveredVal2, _ := ldb.Get(key)
if len(recoveredVal2) > 0 {
// the key-value was not removed
atomic.AddUint32(&numErr, 1)
}

wg.Done()
}()

wg.Wait()

require.Zero(t, atomic.LoadUint32(&numErr), "iteration %d out of %d", i, numKeys)
}

_ = ldb.Close()
}

func TestSerialDB_PutRemovePutHas(t *testing.T) {
ldb := createSerialLevelDb(t, 100000, 1000000, 10)

key := []byte("key")
value := []byte("value")

_ = ldb.Put(key, value)

// manually put the <key, value> pair in storage
ldb.PutBatch()
time.Sleep(time.Second)
assert.Nil(t, ldb.Has(key)) // key was found

// we now remove the key
_ = ldb.Remove(key)

// manually delete the key from the storage
ldb.PutBatch()
time.Sleep(time.Second)
assert.NotNil(t, ldb.Has(key)) // missing key

_ = ldb.Put(key, value) // put the key again
assert.Nil(t, ldb.Has(key)) // key was found

_ = ldb.Close()
}
1 change: 1 addition & 0 deletions lrucache/export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package lrucache

// AddedDataHandlers -
func (c *lruCache) AddedDataHandlers() map[string]func(key []byte, value interface{}) {
return c.mapDataHandlers
}
6 changes: 6 additions & 0 deletions lrucache/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
var _ types.Cacher = (*lruCache)(nil)

var log = logger.GetOrCreate("storage/lrucache")
var _ types.Cacher = (*lruCache)(nil)

// LRUCache implements a Least Recently Used eviction cache
type lruCache struct {
Expand Down Expand Up @@ -184,6 +185,11 @@ func (c *lruCache) MaxSize() int {
return c.maxsize
}

// GetRemovalStatus will return the unknown status because this implementation does not track removed keys
func (c *lruCache) GetRemovalStatus(_ []byte) types.RemovalStatus {
return types.UnknownRemovalStatus
}

// Close does nothing for this cacher implementation
func (c *lruCache) Close() error {
return nil
Expand Down
20 changes: 18 additions & 2 deletions lrucache/lrucache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/storage"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/lrucache"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -421,7 +421,7 @@ func TestLRUCache_CloseShouldNotErr(t *testing.T) {
}

type cacheWrapper struct {
c storage.Cacher
c types.Cacher
}

func newCacheWrapper() *cacheWrapper {
Expand Down Expand Up @@ -459,3 +459,19 @@ func TestLruCache_LenDuringEviction(t *testing.T) {
assert.Fail(t, "test failed, deadlock occurred")
}
}

func TestLruCache_GetRemovalStatus(t *testing.T) {
t.Parallel()

key := []byte("key")
value := []byte("value")

cache, _ := lrucache.NewCache(2)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

_ = cache.Put(key, value, 0)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

cache.Remove(key)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))
}
Loading