Skip to content

Commit

Permalink
- fixed a concurrency edge-case on the persister
Browse files Browse the repository at this point in the history
- added & integrated removalTrackingCache implementation
  • Loading branch information
iulianpascalau committed Sep 20, 2023
1 parent 5fca300 commit f6cf07c
Show file tree
Hide file tree
Showing 24 changed files with 765 additions and 82 deletions.
1 change: 1 addition & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"errors"

"github.com/multiversx/mx-chain-core-go/core"
)

Expand Down
5 changes: 5 additions & 0 deletions fifocache/fifocacheSharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func (c *FIFOShardedCache) MaxSize() int {
return c.maxsize
}

// GetRemovalStatus will return the unknown status because this implementation does not track removed keys
func (c *FIFOShardedCache) GetRemovalStatus(_ []byte) types.RemovalStatus {
return types.UnknownRemovalStatus
}

// Close does nothing for this cacher implementation
func (c *FIFOShardedCache) Close() error {
return nil
Expand Down
21 changes: 20 additions & 1 deletion fifocache/fifocacheSharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/multiversx/mx-chain-storage-go/fifocache"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -374,6 +375,22 @@ func TestFIFOShardedCache_CacherRegisterHasOrAddAddedDataHandlerNotAddedShouldNo
assert.Equal(t, 1, len(c.AddedDataHandlers()))
}

func TestFifoShardedCache_GetRemovalStatus(t *testing.T) {
t.Parallel()

key := []byte("key")
value := []byte("value")

cache, _ := fifocache.NewShardedCache(100, 2)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

_ = cache.Put(key, value, 0)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

cache.Remove(key)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))
}

func TestFifoShardedCache_ConcurrentOperation(t *testing.T) {
fifoCacher, _ := fifocache.NewShardedCache(10000, 3)

Expand All @@ -382,7 +399,7 @@ func TestFifoShardedCache_ConcurrentOperation(t *testing.T) {
wg.Add(numOperations)
for i := 0; i < numOperations; i++ {
go func(idx int) {
switch idx % 16 {
switch idx % 17 {
case 0:
fifoCacher.AddedDataHandlers()
case 1:
Expand Down Expand Up @@ -415,6 +432,8 @@ func TestFifoShardedCache_ConcurrentOperation(t *testing.T) {
fifoCacher.SizeInBytesContained()
case 15:
fifoCacher.UnRegisterHandler("id")
case 16:
fifoCacher.GetRemovalStatus(nil)
}

wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/hashicorp/golang-lru v0.6.0
github.com/multiversx/concurrent-map v0.1.4
github.com/multiversx/mx-chain-core-go v1.2.16
github.com/multiversx/mx-chain-core-go v1.2.17-0.20230919104727-566f55d213ab
github.com/multiversx/mx-chain-logger-go v1.0.13
github.com/stretchr/testify v1.7.2
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUYwbO0993uPI=
github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o=
github.com/multiversx/mx-chain-core-go v1.2.16 h1:m0hUNmZQjGJxKDLQOHoM9jSaeDfVTbyd+mqiS8+NckE=
github.com/multiversx/mx-chain-core-go v1.2.16/go.mod h1:BILOGHUOIG5dNNX8cgkzCNfDaVtoYrJRYcPnpxRMH84=
github.com/multiversx/mx-chain-core-go v1.2.17-0.20230919104727-566f55d213ab h1:FO/YYB/cWG6ruHOrnQq6Dn4GEnK06I2KlrwtE2AUcKM=
github.com/multiversx/mx-chain-core-go v1.2.17-0.20230919104727-566f55d213ab/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE=
github.com/multiversx/mx-chain-logger-go v1.0.13 h1:eru/TETo0MkO4ZTnXsQDKf4PBRpAXmqjT02klNT/JnY=
github.com/multiversx/mx-chain-logger-go v1.0.13/go.mod h1:MZJhTAtZTJxT+yK2EHc4ZW3YOHUc1UdjCD0iahRNBZk=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down
5 changes: 5 additions & 0 deletions immunitycache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ func (ic *ImmunityCache) Diagnose(_ bool) {
)
}

// GetRemovalStatus will return the unknown status because this implementation does not track removed keys
func (ic *ImmunityCache) GetRemovalStatus(_ []byte) types.RemovalStatus {
return types.UnknownRemovalStatus
}

// Close does nothing for this cacher implementation
func (ic *ImmunityCache) Close() error {
return nil
Expand Down
17 changes: 17 additions & 0 deletions immunitycache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -336,6 +337,22 @@ func TestImmunityCache_ForgetCapacityHadBeenReachedInThePast(t *testing.T) {
require.Equal(t, uint64(0), cache.numCapacityReachedOccurrences.GetUint64())
}

func TestImmunityCache_GetRemovalStatus(t *testing.T) {
t.Parallel()

key := []byte("key")
value := []byte("value")

cache := newCacheToTest(1, 4, 1000)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

_ = cache.Put(key, value, 0)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

cache.Remove(key)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))
}

func newCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *ImmunityCache {
cache, err := NewImmunityCache(CacheConfig{
Name: "test",
Expand Down
28 changes: 19 additions & 9 deletions leveldb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ import (
var _ types.Batcher = (*batch)(nil)

type batch struct {
batch *leveldb.Batch
cachedData map[string][]byte
removedData map[string]struct{}
mutBatch sync.RWMutex
batch *leveldb.Batch
cachedData map[string][]byte
removedData map[string]struct{}
previouslyRemoved map[string]struct{}
mutBatch sync.RWMutex
}

// NewBatch creates a batch
func NewBatch() *batch {
func NewBatch(previouslyRemoved map[string]struct{}) *batch {
return &batch{
batch: &leveldb.Batch{},
cachedData: make(map[string][]byte),
removedData: make(map[string]struct{}),
mutBatch: sync.RWMutex{},
batch: &leveldb.Batch{},
cachedData: make(map[string][]byte),
removedData: make(map[string]struct{}),
previouslyRemoved: previouslyRemoved,
mutBatch: sync.RWMutex{},
}
}

Expand Down Expand Up @@ -69,6 +71,14 @@ func (b *batch) IsRemoved(key []byte) bool {
defer b.mutBatch.RUnlock()

_, found := b.removedData[string(key)]
if found {
return true
}
_, found = b.cachedData[string(key)]
if found {
return false
}
_, found = b.previouslyRemoved[string(key)]

return found
}
Expand Down
2 changes: 1 addition & 1 deletion leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *DB) Has(key []byte) error {

// CreateBatch returns a batcher to be used for batch writing data to the database
func (s *DB) createBatch() types.Batcher {
return NewBatch()
return NewBatch(make(map[string]struct{}))
}

// putBatch writes the Batch data into the database
Expand Down
4 changes: 2 additions & 2 deletions leveldb/leveldbSerial.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi
closer: closing.NewSafeChanCloser(),
}

dbStore.batch = NewBatch()
dbStore.batch = NewBatch(make(map[string]struct{}))

go dbStore.batchTimeoutHandle(ctx)
go dbStore.processLoop(ctx)
Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *SerialDB) putBatch() error {
return common.ErrInvalidBatch
}
s.sizeBatch = 0
s.batch = NewBatch()
s.batch = NewBatch(dbBatch.removedData)
s.mutBatch.Unlock()

ch := make(chan error)
Expand Down
1 change: 1 addition & 0 deletions lrucache/export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package lrucache

// AddedDataHandlers -
func (c *lruCache) AddedDataHandlers() map[string]func(key []byte, value interface{}) {
return c.mapDataHandlers
}
5 changes: 5 additions & 0 deletions lrucache/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ func (c *lruCache) MaxSize() int {
return c.maxsize
}

// GetRemovalStatus will return the unknown status because this implementation does not track removed keys
func (c *lruCache) GetRemovalStatus(_ []byte) types.RemovalStatus {
return types.UnknownRemovalStatus
}

// Close does nothing for this cacher implementation
func (c *lruCache) Close() error {
return nil
Expand Down
20 changes: 18 additions & 2 deletions lrucache/lrucache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/storage"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/lrucache"
"github.com/multiversx/mx-chain-storage-go/types"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -421,7 +421,7 @@ func TestLRUCache_CloseShouldNotErr(t *testing.T) {
}

type cacheWrapper struct {
c storage.Cacher
c types.Cacher
}

func newCacheWrapper() *cacheWrapper {
Expand Down Expand Up @@ -459,3 +459,19 @@ func TestLruCache_LenDuringEviction(t *testing.T) {
assert.Fail(t, "test failed, deadlock occurred")
}
}

func TestLruCache_GetRemovalStatus(t *testing.T) {
t.Parallel()

key := []byte("key")
value := []byte("value")

cache, _ := lrucache.NewCache(2)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

_ = cache.Put(key, value, 0)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))

cache.Remove(key)
assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil))
}
84 changes: 84 additions & 0 deletions rtcache/removalTrackingCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package rtcache

import (
"fmt"
"sync"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/types"
)

type unexportedCacher = types.Cacher

type removalTrackingCache struct {
unexportedCacher
mutCriticalArea sync.RWMutex
removalCache types.Cacher
}

// NewRemovalTrackingCache will create a new instance of a cache that is able to track removal events
func NewRemovalTrackingCache(mainCache types.Cacher, removalCache types.Cacher) (*removalTrackingCache, error) {
if check.IfNil(mainCache) {
return nil, fmt.Errorf("%w for the main cache", common.ErrNilCacher)
}
if check.IfNil(removalCache) {
return nil, fmt.Errorf("%w for the removal cache", common.ErrNilCacher)
}

return &removalTrackingCache{
unexportedCacher: mainCache,
removalCache: removalCache,
}, nil
}

// Put adds a value into the main cache. Returns true if an eviction occurred.
// It also removes the key from the removalCache
func (cache *removalTrackingCache) Put(key []byte, value interface{}, sizeInBytes int) (evicted bool) {
cache.mutCriticalArea.Lock()
defer cache.mutCriticalArea.Unlock()

cache.removalCache.Remove(key)
return cache.unexportedCacher.Put(key, value, sizeInBytes)
}

// Remove removes the provided key from the main cache.
// It also stores the key in the removalCache
func (cache *removalTrackingCache) Remove(key []byte) {
cache.mutCriticalArea.Lock()
defer cache.mutCriticalArea.Unlock()

_ = cache.removalCache.Put(key, struct{}{}, len(key))
cache.unexportedCacher.Remove(key)
}

// GetRemovalStatus will return the removal status by searching the key in both caches
func (cache *removalTrackingCache) GetRemovalStatus(key []byte) types.RemovalStatus {
cache.mutCriticalArea.RLock()
defer cache.mutCriticalArea.RUnlock()

_, found := cache.removalCache.Get(key)
if found {
return types.ExplicitlyRemovedStatus
}
_, found = cache.unexportedCacher.Get(key)
if found {
return types.NotRemovedStatus
}

return types.UnknownRemovalStatus
}

// Clear is used to completely clear both caches.
func (cache *removalTrackingCache) Clear() {
cache.mutCriticalArea.RLock()
defer cache.mutCriticalArea.RUnlock()

cache.removalCache.Clear()
cache.unexportedCacher.Clear()
}

// IsInterfaceNil returns true if there is no value under the interface
func (cache *removalTrackingCache) IsInterfaceNil() bool {
return cache == nil
}
Loading

0 comments on commit f6cf07c

Please sign in to comment.