From f6cf07c90d11f0730e18b60a24e23b2ec4301d6a Mon Sep 17 00:00:00 2001 From: jules01 Date: Wed, 20 Sep 2023 17:45:28 +0300 Subject: [PATCH] - fixed a concurrency edge-case on the persister - added & integrated removalTrackingCache implementation --- common/errors.go | 1 + fifocache/fifocacheSharded.go | 5 + fifocache/fifocacheSharded_test.go | 21 +- go.mod | 2 +- go.sum | 4 +- immunitycache/cache.go | 5 + immunitycache/cache_test.go | 17 ++ leveldb/batch.go | 28 ++- leveldb/leveldb.go | 2 +- leveldb/leveldbSerial.go | 4 +- lrucache/export_test.go | 1 + lrucache/lrucache.go | 5 + lrucache/lrucache_test.go | 20 +- rtcache/removalTrackingCache.go | 84 +++++++ rtcache/removalTrackingCache_test.go | 81 +++++++ storageUnit/nilStorer.go | 4 +- storageUnit/storageunit.go | 51 ++++- storageUnit/storageunit_test.go | 329 ++++++++++++++++++++++----- testscommon/cacherMock.go | 136 +++++++++++ txcache/disabledCache.go | 5 + txcache/disabledCache_test.go | 3 + txcache/txCache.go | 5 + txcache/txCache_test.go | 16 ++ types/interface.go | 18 +- 24 files changed, 765 insertions(+), 82 deletions(-) create mode 100644 rtcache/removalTrackingCache.go create mode 100644 rtcache/removalTrackingCache_test.go create mode 100644 testscommon/cacherMock.go diff --git a/common/errors.go b/common/errors.go index 6342eb4f..de732482 100644 --- a/common/errors.go +++ b/common/errors.go @@ -2,6 +2,7 @@ package common import ( "errors" + "github.com/multiversx/mx-chain-core-go/core" ) diff --git a/fifocache/fifocacheSharded.go b/fifocache/fifocacheSharded.go index 74f24f91..cff8ea1a 100644 --- a/fifocache/fifocacheSharded.go +++ b/fifocache/fifocacheSharded.go @@ -140,6 +140,11 @@ func (c *FIFOShardedCache) MaxSize() int { return c.maxsize } +// GetRemovalStatus will return the unknown status because this implementation does not track removed keys +func (c *FIFOShardedCache) GetRemovalStatus(_ []byte) types.RemovalStatus { + return types.UnknownRemovalStatus +} + // Close does nothing for this cacher implementation func (c *FIFOShardedCache) Close() error { return nil diff --git a/fifocache/fifocacheSharded_test.go b/fifocache/fifocacheSharded_test.go index 36f0518a..12f2cf57 100644 --- a/fifocache/fifocacheSharded_test.go +++ b/fifocache/fifocacheSharded_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/multiversx/mx-chain-storage-go/fifocache" + "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/assert" ) @@ -374,6 +375,22 @@ func TestFIFOShardedCache_CacherRegisterHasOrAddAddedDataHandlerNotAddedShouldNo assert.Equal(t, 1, len(c.AddedDataHandlers())) } +func TestFifoShardedCache_GetRemovalStatus(t *testing.T) { + t.Parallel() + + key := []byte("key") + value := []byte("value") + + cache, _ := fifocache.NewShardedCache(100, 2) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + _ = cache.Put(key, value, 0) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + cache.Remove(key) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) +} + func TestFifoShardedCache_ConcurrentOperation(t *testing.T) { fifoCacher, _ := fifocache.NewShardedCache(10000, 3) @@ -382,7 +399,7 @@ func TestFifoShardedCache_ConcurrentOperation(t *testing.T) { wg.Add(numOperations) for i := 0; i < numOperations; i++ { go func(idx int) { - switch idx % 16 { + switch idx % 17 { case 0: fifoCacher.AddedDataHandlers() case 1: @@ -415,6 +432,8 @@ func TestFifoShardedCache_ConcurrentOperation(t *testing.T) { fifoCacher.SizeInBytesContained() case 15: fifoCacher.UnRegisterHandler("id") + case 16: + fifoCacher.GetRemovalStatus(nil) } wg.Done() diff --git a/go.mod b/go.mod index cfbd8445..a6f37565 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/hashicorp/golang-lru v0.6.0 github.com/multiversx/concurrent-map v0.1.4 - github.com/multiversx/mx-chain-core-go v1.2.16 + github.com/multiversx/mx-chain-core-go v1.2.17-0.20230919104727-566f55d213ab github.com/multiversx/mx-chain-logger-go v1.0.13 github.com/stretchr/testify v1.7.2 github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d diff --git a/go.sum b/go.sum index d93b610d..fd1adc20 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUYwbO0993uPI= github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o= -github.com/multiversx/mx-chain-core-go v1.2.16 h1:m0hUNmZQjGJxKDLQOHoM9jSaeDfVTbyd+mqiS8+NckE= -github.com/multiversx/mx-chain-core-go v1.2.16/go.mod h1:BILOGHUOIG5dNNX8cgkzCNfDaVtoYrJRYcPnpxRMH84= +github.com/multiversx/mx-chain-core-go v1.2.17-0.20230919104727-566f55d213ab h1:FO/YYB/cWG6ruHOrnQq6Dn4GEnK06I2KlrwtE2AUcKM= +github.com/multiversx/mx-chain-core-go v1.2.17-0.20230919104727-566f55d213ab/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE= github.com/multiversx/mx-chain-logger-go v1.0.13 h1:eru/TETo0MkO4ZTnXsQDKf4PBRpAXmqjT02klNT/JnY= github.com/multiversx/mx-chain-logger-go v1.0.13/go.mod h1:MZJhTAtZTJxT+yK2EHc4ZW3YOHUc1UdjCD0iahRNBZk= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/immunitycache/cache.go b/immunitycache/cache.go index 6c4521b5..68937c9e 100644 --- a/immunitycache/cache.go +++ b/immunitycache/cache.go @@ -321,6 +321,11 @@ func (ic *ImmunityCache) Diagnose(_ bool) { ) } +// GetRemovalStatus will return the unknown status because this implementation does not track removed keys +func (ic *ImmunityCache) GetRemovalStatus(_ []byte) types.RemovalStatus { + return types.UnknownRemovalStatus +} + // Close does nothing for this cacher implementation func (ic *ImmunityCache) Close() error { return nil diff --git a/immunitycache/cache_test.go b/immunitycache/cache_test.go index aedfa09c..5a2d5b0c 100644 --- a/immunitycache/cache_test.go +++ b/immunitycache/cache_test.go @@ -9,6 +9,7 @@ import ( logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -336,6 +337,22 @@ func TestImmunityCache_ForgetCapacityHadBeenReachedInThePast(t *testing.T) { require.Equal(t, uint64(0), cache.numCapacityReachedOccurrences.GetUint64()) } +func TestImmunityCache_GetRemovalStatus(t *testing.T) { + t.Parallel() + + key := []byte("key") + value := []byte("value") + + cache := newCacheToTest(1, 4, 1000) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + _ = cache.Put(key, value, 0) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + cache.Remove(key) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) +} + func newCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *ImmunityCache { cache, err := NewImmunityCache(CacheConfig{ Name: "test", diff --git a/leveldb/batch.go b/leveldb/batch.go index 7367d732..4e530d50 100644 --- a/leveldb/batch.go +++ b/leveldb/batch.go @@ -10,19 +10,21 @@ import ( var _ types.Batcher = (*batch)(nil) type batch struct { - batch *leveldb.Batch - cachedData map[string][]byte - removedData map[string]struct{} - mutBatch sync.RWMutex + batch *leveldb.Batch + cachedData map[string][]byte + removedData map[string]struct{} + previouslyRemoved map[string]struct{} + mutBatch sync.RWMutex } // NewBatch creates a batch -func NewBatch() *batch { +func NewBatch(previouslyRemoved map[string]struct{}) *batch { return &batch{ - batch: &leveldb.Batch{}, - cachedData: make(map[string][]byte), - removedData: make(map[string]struct{}), - mutBatch: sync.RWMutex{}, + batch: &leveldb.Batch{}, + cachedData: make(map[string][]byte), + removedData: make(map[string]struct{}), + previouslyRemoved: previouslyRemoved, + mutBatch: sync.RWMutex{}, } } @@ -69,6 +71,14 @@ func (b *batch) IsRemoved(key []byte) bool { defer b.mutBatch.RUnlock() _, found := b.removedData[string(key)] + if found { + return true + } + _, found = b.cachedData[string(key)] + if found { + return false + } + _, found = b.previouslyRemoved[string(key)] return found } diff --git a/leveldb/leveldb.go b/leveldb/leveldb.go index 61448a7b..fbef2b3a 100644 --- a/leveldb/leveldb.go +++ b/leveldb/leveldb.go @@ -220,7 +220,7 @@ func (s *DB) Has(key []byte) error { // CreateBatch returns a batcher to be used for batch writing data to the database func (s *DB) createBatch() types.Batcher { - return NewBatch() + return NewBatch(make(map[string]struct{})) } // putBatch writes the Batch data into the database diff --git a/leveldb/leveldbSerial.go b/leveldb/leveldbSerial.go index fbb8c5e6..ddcb6c7d 100644 --- a/leveldb/leveldbSerial.go +++ b/leveldb/leveldbSerial.go @@ -80,7 +80,7 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi closer: closing.NewSafeChanCloser(), } - dbStore.batch = NewBatch() + dbStore.batch = NewBatch(make(map[string]struct{})) go dbStore.batchTimeoutHandle(ctx) go dbStore.processLoop(ctx) @@ -246,7 +246,7 @@ func (s *SerialDB) putBatch() error { return common.ErrInvalidBatch } s.sizeBatch = 0 - s.batch = NewBatch() + s.batch = NewBatch(dbBatch.removedData) s.mutBatch.Unlock() ch := make(chan error) diff --git a/lrucache/export_test.go b/lrucache/export_test.go index 92889ed2..d35f4463 100644 --- a/lrucache/export_test.go +++ b/lrucache/export_test.go @@ -1,5 +1,6 @@ package lrucache +// AddedDataHandlers - func (c *lruCache) AddedDataHandlers() map[string]func(key []byte, value interface{}) { return c.mapDataHandlers } diff --git a/lrucache/lrucache.go b/lrucache/lrucache.go index 4d947ff4..0a2ca4f4 100644 --- a/lrucache/lrucache.go +++ b/lrucache/lrucache.go @@ -184,6 +184,11 @@ func (c *lruCache) MaxSize() int { return c.maxsize } +// GetRemovalStatus will return the unknown status because this implementation does not track removed keys +func (c *lruCache) GetRemovalStatus(_ []byte) types.RemovalStatus { + return types.UnknownRemovalStatus +} + // Close does nothing for this cacher implementation func (c *lruCache) Close() error { return nil diff --git a/lrucache/lrucache_test.go b/lrucache/lrucache_test.go index 874fe357..9e9f478b 100644 --- a/lrucache/lrucache_test.go +++ b/lrucache/lrucache_test.go @@ -8,9 +8,9 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core/check" - "github.com/multiversx/mx-chain-core-go/storage" "github.com/multiversx/mx-chain-storage-go/common" "github.com/multiversx/mx-chain-storage-go/lrucache" + "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/assert" ) @@ -421,7 +421,7 @@ func TestLRUCache_CloseShouldNotErr(t *testing.T) { } type cacheWrapper struct { - c storage.Cacher + c types.Cacher } func newCacheWrapper() *cacheWrapper { @@ -459,3 +459,19 @@ func TestLruCache_LenDuringEviction(t *testing.T) { assert.Fail(t, "test failed, deadlock occurred") } } + +func TestLruCache_GetRemovalStatus(t *testing.T) { + t.Parallel() + + key := []byte("key") + value := []byte("value") + + cache, _ := lrucache.NewCache(2) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + _ = cache.Put(key, value, 0) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + cache.Remove(key) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) +} diff --git a/rtcache/removalTrackingCache.go b/rtcache/removalTrackingCache.go new file mode 100644 index 00000000..7e14f1b8 --- /dev/null +++ b/rtcache/removalTrackingCache.go @@ -0,0 +1,84 @@ +package rtcache + +import ( + "fmt" + "sync" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/types" +) + +type unexportedCacher = types.Cacher + +type removalTrackingCache struct { + unexportedCacher + mutCriticalArea sync.RWMutex + removalCache types.Cacher +} + +// NewRemovalTrackingCache will create a new instance of a cache that is able to track removal events +func NewRemovalTrackingCache(mainCache types.Cacher, removalCache types.Cacher) (*removalTrackingCache, error) { + if check.IfNil(mainCache) { + return nil, fmt.Errorf("%w for the main cache", common.ErrNilCacher) + } + if check.IfNil(removalCache) { + return nil, fmt.Errorf("%w for the removal cache", common.ErrNilCacher) + } + + return &removalTrackingCache{ + unexportedCacher: mainCache, + removalCache: removalCache, + }, nil +} + +// Put adds a value into the main cache. Returns true if an eviction occurred. +// It also removes the key from the removalCache +func (cache *removalTrackingCache) Put(key []byte, value interface{}, sizeInBytes int) (evicted bool) { + cache.mutCriticalArea.Lock() + defer cache.mutCriticalArea.Unlock() + + cache.removalCache.Remove(key) + return cache.unexportedCacher.Put(key, value, sizeInBytes) +} + +// Remove removes the provided key from the main cache. +// It also stores the key in the removalCache +func (cache *removalTrackingCache) Remove(key []byte) { + cache.mutCriticalArea.Lock() + defer cache.mutCriticalArea.Unlock() + + _ = cache.removalCache.Put(key, struct{}{}, len(key)) + cache.unexportedCacher.Remove(key) +} + +// GetRemovalStatus will return the removal status by searching the key in both caches +func (cache *removalTrackingCache) GetRemovalStatus(key []byte) types.RemovalStatus { + cache.mutCriticalArea.RLock() + defer cache.mutCriticalArea.RUnlock() + + _, found := cache.removalCache.Get(key) + if found { + return types.ExplicitlyRemovedStatus + } + _, found = cache.unexportedCacher.Get(key) + if found { + return types.NotRemovedStatus + } + + return types.UnknownRemovalStatus +} + +// Clear is used to completely clear both caches. +func (cache *removalTrackingCache) Clear() { + cache.mutCriticalArea.RLock() + defer cache.mutCriticalArea.RUnlock() + + cache.removalCache.Clear() + cache.unexportedCacher.Clear() +} + +// IsInterfaceNil returns true if there is no value under the interface +func (cache *removalTrackingCache) IsInterfaceNil() bool { + return cache == nil +} diff --git a/rtcache/removalTrackingCache_test.go b/rtcache/removalTrackingCache_test.go new file mode 100644 index 00000000..7c1d686d --- /dev/null +++ b/rtcache/removalTrackingCache_test.go @@ -0,0 +1,81 @@ +package rtcache + +import ( + "testing" + + "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/testscommon" + "github.com/multiversx/mx-chain-storage-go/types" + "github.com/stretchr/testify/assert" +) + +func TestNewRemovalTrackingCache(t *testing.T) { + t.Parallel() + + mainCache := testscommon.NewCacherMock() + removalCache := testscommon.NewCacherMock() + + t.Run("nil main cache should error", func(t *testing.T) { + t.Parallel() + + rtCache, err := NewRemovalTrackingCache(nil, removalCache) + assert.ErrorIs(t, err, common.ErrNilCacher) + assert.Contains(t, err.Error(), "main cache") + assert.Nil(t, rtCache) + }) + t.Run("nil removal cache should error", func(t *testing.T) { + t.Parallel() + + rtCache, err := NewRemovalTrackingCache(mainCache, nil) + assert.ErrorIs(t, err, common.ErrNilCacher) + assert.Contains(t, err.Error(), "removal cache") + assert.Nil(t, rtCache) + }) + t.Run("should work", func(t *testing.T) { + t.Parallel() + + rtCache, err := NewRemovalTrackingCache(mainCache, removalCache) + assert.Nil(t, err) + assert.NotNil(t, rtCache) + }) +} + +func TestRemovalTrackingCache_IsInterfaceNil(t *testing.T) { + t.Parallel() + + var rtCache *removalTrackingCache + assert.True(t, rtCache.IsInterfaceNil()) + + rtCache, _ = NewRemovalTrackingCache(testscommon.NewCacherMock(), testscommon.NewCacherMock()) + assert.False(t, rtCache.IsInterfaceNil()) +} + +func TestRemovalTrackingCache_GetRemovalStatus(t *testing.T) { + t.Parallel() + + mainCache := testscommon.NewCacherMock() + removalCache := testscommon.NewCacherMock() + rtCache, _ := NewRemovalTrackingCache(mainCache, removalCache) + + key := []byte("key") + value := []byte("value") + + // the node does not know about the key + assert.Equal(t, types.UnknownRemovalStatus, rtCache.GetRemovalStatus(key)) + + // we add the key + rtCache.Put(key, value, 0) + assert.True(t, rtCache.Has(key)) + assert.Equal(t, types.NotRemovedStatus, rtCache.GetRemovalStatus(key)) + + // we remove the key + rtCache.Remove(key) + assert.False(t, rtCache.Has(key)) + assert.Equal(t, types.ExplicitlyRemovedStatus, rtCache.GetRemovalStatus(key)) + + // due to evictions or manual clear calls we should not know about the key + rtCache.Clear() + assert.False(t, rtCache.Has(key)) + // now the removal tracking cache does not know about the key + assert.Equal(t, types.UnknownRemovalStatus, rtCache.GetRemovalStatus(key)) +} diff --git a/storageUnit/nilStorer.go b/storageUnit/nilStorer.go index 224393f9..2330b452 100644 --- a/storageUnit/nilStorer.go +++ b/storageUnit/nilStorer.go @@ -1,7 +1,7 @@ package storageUnit import ( - storageCore "github.com/multiversx/mx-chain-core-go/storage" + "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-storage-go/common" ) @@ -20,7 +20,7 @@ func (ns *NilStorer) GetFromEpoch(_ []byte, _ uint32) ([]byte, error) { } // GetBulkFromEpoch will do nothing -func (ns *NilStorer) GetBulkFromEpoch(_ [][]byte, _ uint32) ([]storageCore.KeyValuePair, error) { +func (ns *NilStorer) GetBulkFromEpoch(_ [][]byte, _ uint32) ([]data.KeyValuePair, error) { return nil, nil } diff --git a/storageUnit/storageunit.go b/storageUnit/storageunit.go index a560f252..04b8c31a 100644 --- a/storageUnit/storageunit.go +++ b/storageUnit/storageunit.go @@ -8,11 +8,11 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/hashing/blake2b" "github.com/multiversx/mx-chain-core-go/hashing/fnv" "github.com/multiversx/mx-chain-core-go/hashing/keccak" - storageCore "github.com/multiversx/mx-chain-core-go/storage" logger "github.com/multiversx/mx-chain-logger-go" "github.com/multiversx/mx-chain-storage-go/common" "github.com/multiversx/mx-chain-storage-go/fifocache" @@ -20,6 +20,7 @@ import ( "github.com/multiversx/mx-chain-storage-go/lrucache" "github.com/multiversx/mx-chain-storage-go/memorydb" "github.com/multiversx/mx-chain-storage-go/monitoring" + "github.com/multiversx/mx-chain-storage-go/rtcache" "github.com/multiversx/mx-chain-storage-go/types" ) @@ -55,6 +56,7 @@ type ShardIDProviderType string // Shard id provider types that are currently supported const ( + // BinarySplit is the binary-split type value BinarySplit ShardIDProviderType = "BinarySplit" ) @@ -92,6 +94,12 @@ type CacheConfig struct { Shards uint32 } +// CacheCreationConfig holds the configurable elements of a cache at creation time +type CacheCreationConfig struct { + CacheConfig + RemovalTrackingCacheConfig CacheConfig +} + // String returns a readable representation of the object func (config *CacheConfig) String() string { bytes, err := json.Marshal(config) @@ -170,6 +178,11 @@ func (u *Unit) Get(key []byte) ([]byte, error) { u.lock.Lock() defer u.lock.Unlock() + removalStatus := u.cacher.GetRemovalStatus(key) + if removalStatus == types.ExplicitlyRemovedStatus { + return nil, common.ErrKeyNotFound + } + v, ok := u.cacher.Get(key) var err error @@ -200,8 +213,8 @@ func (u *Unit) GetFromEpoch(key []byte, _ uint32) ([]byte, error) { } // GetBulkFromEpoch will call the Get method for all keys as this storer doesn't handle epochs -func (u *Unit) GetBulkFromEpoch(keys [][]byte, _ uint32) ([]storageCore.KeyValuePair, error) { - results := make([]storageCore.KeyValuePair, 0, len(keys)) +func (u *Unit) GetBulkFromEpoch(keys [][]byte, _ uint32) ([]data.KeyValuePair, error) { + results := make([]data.KeyValuePair, 0, len(keys)) for _, key := range keys { value, err := u.Get(key) if err != nil { @@ -211,7 +224,10 @@ func (u *Unit) GetBulkFromEpoch(keys [][]byte, _ uint32) ([]storageCore.KeyValue ) continue } - keyValue := storageCore.KeyValuePair{Key: key, Value: value} + keyValue := data.KeyValuePair{ + Key: key, + Value: value, + } results = append(results, keyValue) } return results, nil @@ -223,6 +239,11 @@ func (u *Unit) Has(key []byte) error { u.lock.RLock() defer u.lock.RUnlock() + removalStatus := u.cacher.GetRemovalStatus(key) + if removalStatus == types.ExplicitlyRemovedStatus { + return common.ErrKeyNotFound + } + has := u.cacher.Has(key) if has { return nil @@ -290,7 +311,7 @@ func NewStorageUnit(c types.Cacher, p types.Persister) (*Unit, error) { } // NewStorageUnitFromConf creates a new storage unit from a storage unit config -func NewStorageUnitFromConf(cacheConf CacheConfig, dbConf DBConfig) (*Unit, error) { +func NewStorageUnitFromConf(cacheConf CacheCreationConfig, dbConf DBConfig) (*Unit, error) { var cache types.Cacher var db types.Persister var err error @@ -323,7 +344,25 @@ func NewStorageUnitFromConf(cacheConf CacheConfig, dbConf DBConfig) (*Unit, erro } // NewCache creates a new cache from a cache config -func NewCache(config CacheConfig) (types.Cacher, error) { +func NewCache(config CacheCreationConfig) (types.Cacher, error) { + mainCache, err := newCache(config.CacheConfig) + if err != nil { + return nil, err + } + + if config.RemovalTrackingCacheConfig.Capacity == 0 { + return mainCache, nil // we do not create removal tracking cache instances + } + + removalCache, err := newCache(config.RemovalTrackingCacheConfig) + if err != nil { + return nil, fmt.Errorf("%w when creating removal cache", err) + } + + return rtcache.NewRemovalTrackingCache(mainCache, removalCache) +} + +func newCache(config CacheConfig) (types.Cacher, error) { monitoring.MonitorNewCache(config.Name, config.SizeInBytes) cacheType := config.Type diff --git a/storageUnit/storageunit_test.go b/storageUnit/storageunit_test.go index a0c8cdf0..4e1186e3 100644 --- a/storageUnit/storageunit_test.go +++ b/storageUnit/storageunit_test.go @@ -10,6 +10,8 @@ import ( "github.com/multiversx/mx-chain-storage-go/lrucache" "github.com/multiversx/mx-chain-storage-go/memorydb" "github.com/multiversx/mx-chain-storage-go/storageUnit" + "github.com/multiversx/mx-chain-storage-go/testscommon" + "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/assert" ) @@ -30,7 +32,24 @@ func initStorageUnit(tb testing.TB, cSize int) *storageUnit.Unit { return sUnit } +func createCacheWithRemovalTracking() types.Cacher { + cacher, _ := storageUnit.NewCache(storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + RemovalTrackingCacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + }) + + return cacher +} + func TestStorageUnitNilPersister(t *testing.T) { + t.Parallel() + cache, err1 := lrucache.NewCache(10) assert.Nil(t, err1, "no error expected but got %s", err1) @@ -41,6 +60,8 @@ func TestStorageUnitNilPersister(t *testing.T) { } func TestStorageUnitNilCacher(t *testing.T) { + t.Parallel() + mdb := memorydb.New() _, err1 := storageUnit.NewStorageUnit(nil, mdb) @@ -48,6 +69,8 @@ func TestStorageUnitNilCacher(t *testing.T) { } func TestStorageUnit(t *testing.T) { + t.Parallel() + cache, err1 := lrucache.NewCache(10) mdb := memorydb.New() @@ -58,6 +81,8 @@ func TestStorageUnit(t *testing.T) { } func TestPutNotPresent(t *testing.T) { + t.Parallel() + key, val := []byte("key0"), []byte("value0") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -70,6 +95,8 @@ func TestPutNotPresent(t *testing.T) { } func TestPutNotPresentCache(t *testing.T) { + t.Parallel() + key, val := []byte("key1"), []byte("value1") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -84,6 +111,8 @@ func TestPutNotPresentCache(t *testing.T) { } func TestPutPresentShouldOverwriteValue(t *testing.T) { + t.Parallel() + key, val := []byte("key2"), []byte("value2") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -100,14 +129,47 @@ func TestPutPresentShouldOverwriteValue(t *testing.T) { } func TestGetNotPresent(t *testing.T) { + t.Parallel() + key := []byte("key3") s := initStorageUnit(t, 10) v, err := s.Get(key) assert.NotNil(t, err, "expected to find no value, but found %s", v) + assert.Nil(t, v) +} + +func TestUnit_GetWithExplicitlyRemovedKeyShouldNotCallThePersister(t *testing.T) { + t.Parallel() + + mdb := &testscommon.PersisterStub{ + HasCalled: func(key []byte) error { + assert.Fail(t, "should have not called Has") + return nil + }, + GetCalled: func(key []byte) ([]byte, error) { + assert.Fail(t, "should have not called Has") + return nil, nil + }, + } + cache := createCacheWithRemovalTracking() + storer, err := storageUnit.NewStorageUnit(cache, mdb) + assert.Nil(t, err) + + key := []byte("key") + value := []byte("value") + + _ = storer.Put(key, value) + _ = storer.Remove(key) + + v, err := storer.Get(key) + assert.Equal(t, err, common.ErrKeyNotFound) // same as the old behaviour (key not present in the cacher & persister) + assert.Nil(t, v) // same as the old behaviour (key not present in the cacher & persister) } func TestGetNotPresentCache(t *testing.T) { + t.Parallel() + key, val := []byte("key4"), []byte("value4") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -123,6 +185,8 @@ func TestGetNotPresentCache(t *testing.T) { } func TestGetPresent(t *testing.T) { + t.Parallel() + key, val := []byte("key5"), []byte("value4") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -136,6 +200,8 @@ func TestGetPresent(t *testing.T) { } func TestHasNotPresent(t *testing.T) { + t.Parallel() + key := []byte("key6") s := initStorageUnit(t, 10) err := s.Has(key) @@ -144,7 +210,36 @@ func TestHasNotPresent(t *testing.T) { assert.Equal(t, err, common.ErrKeyNotFound) } +func TestUnit_HasWithExplicitlyRemovedKeyShouldNotCallThePersister(t *testing.T) { + t.Parallel() + + mdb := &testscommon.PersisterStub{ + HasCalled: func(key []byte) error { + assert.Fail(t, "should have not called Has") + return nil + }, + GetCalled: func(key []byte) ([]byte, error) { + assert.Fail(t, "should have not called Has") + return nil, nil + }, + } + cache := createCacheWithRemovalTracking() + storer, err := storageUnit.NewStorageUnit(cache, mdb) + assert.Nil(t, err) + + key := []byte("key") + value := []byte("value") + + _ = storer.Put(key, value) + _ = storer.Remove(key) + + err = storer.Has(key) + assert.Equal(t, err, common.ErrKeyNotFound) // same as the old behaviour (key not present in the cacher & persister) +} + func TestHasNotPresentCache(t *testing.T) { + t.Parallel() + key, val := []byte("key7"), []byte("value7") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -159,6 +254,8 @@ func TestHasNotPresentCache(t *testing.T) { } func TestHasPresent(t *testing.T) { + t.Parallel() + key, val := []byte("key8"), []byte("value8") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -171,6 +268,8 @@ func TestHasPresent(t *testing.T) { } func TestDeleteNotPresent(t *testing.T) { + t.Parallel() + key := []byte("key12") s := initStorageUnit(t, 10) err := s.Remove(key) @@ -179,6 +278,8 @@ func TestDeleteNotPresent(t *testing.T) { } func TestDeleteNotPresentCache(t *testing.T) { + t.Parallel() + key, val := []byte("key13"), []byte("value13") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -200,6 +301,8 @@ func TestDeleteNotPresentCache(t *testing.T) { } func TestDeletePresent(t *testing.T) { + t.Parallel() + key, val := []byte("key14"), []byte("value14") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -220,6 +323,8 @@ func TestDeletePresent(t *testing.T) { } func TestClearCacheNotAffectPersist(t *testing.T) { + t.Parallel() + key, val := []byte("key15"), []byte("value15") s := initStorageUnit(t, 10) err := s.Put(key, val) @@ -232,31 +337,53 @@ func TestClearCacheNotAffectPersist(t *testing.T) { } func TestDestroyUnitNoError(t *testing.T) { + t.Parallel() + s := initStorageUnit(t, 10) err := s.DestroyUnit() assert.Nil(t, err, "no error expected, but got %s", err) } func TestCreateCacheFromConfWrongType(t *testing.T) { - - cacher, err := storageUnit.NewCache(storageUnit.CacheConfig{Type: "NotLRU", Capacity: 100, Shards: 1, SizeInBytes: 0}) + t.Parallel() + + cacher, err := storageUnit.NewCache( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Type: "NotLRU", + Capacity: 100, + Shards: 1, + SizeInBytes: 0, + }, + }) assert.NotNil(t, err, "error expected") assert.Nil(t, cacher, "cacher expected to be nil, but got %s", cacher) } func TestCreateCacheFromConfOK(t *testing.T) { - - cacher, err := storageUnit.NewCache(storageUnit.CacheConfig{Type: storageUnit.LRUCache, Capacity: 10, Shards: 1, SizeInBytes: 0}) + t.Parallel() + + cacher, err := storageUnit.NewCache( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Type: storageUnit.LRUCache, + Capacity: 10, + Shards: 1, + SizeInBytes: 0, + }, + }) assert.Nil(t, err, "no error expected but got %s", err) assert.NotNil(t, cacher, "valid cacher expected but got nil") } func TestCreateDBFromConfWrongType(t *testing.T) { + t.Parallel() + arg := storageUnit.ArgDB{ DBType: "NotLvlDB", - Path: "test", + Path: t.TempDir(), BatchDelaySeconds: 10, MaxBatchSize: 10, MaxOpenFiles: 10, @@ -268,6 +395,8 @@ func TestCreateDBFromConfWrongType(t *testing.T) { } func TestCreateDBFromConfWrongFileNameLvlDB(t *testing.T) { + t.Parallel() + if testing.Short() { t.Skip("this is not a short test") } @@ -285,6 +414,8 @@ func TestCreateDBFromConfWrongFileNameLvlDB(t *testing.T) { } func TestCreateDBFromConfLvlDBOk(t *testing.T) { + t.Parallel() + arg := storageUnit.ArgDB{ DBType: storageUnit.LvlDB, Path: t.TempDir(), @@ -301,63 +432,89 @@ func TestCreateDBFromConfLvlDBOk(t *testing.T) { } func TestNewStorageUnit_FromConfWrongCacheSizeVsBatchSize(t *testing.T) { - - storer, err := storageUnit.NewStorageUnitFromConf(storageUnit.CacheConfig{ - Capacity: 10, - Type: storageUnit.LRUCache, - }, storageUnit.DBConfig{ - FilePath: "Blocks", - Type: storageUnit.LvlDB, - MaxBatchSize: 11, - BatchDelaySeconds: 1, - MaxOpenFiles: 10, - }) + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: storageUnit.LvlDB, + MaxBatchSize: 11, + BatchDelaySeconds: 1, + MaxOpenFiles: 10, + }, + ) assert.NotNil(t, err, "error expected") assert.Nil(t, storer, "storer expected to be nil but got %s", storer) } func TestNewStorageUnit_FromConfWrongCacheConfig(t *testing.T) { - - storer, err := storageUnit.NewStorageUnitFromConf(storageUnit.CacheConfig{ - Capacity: 10, - Type: "NotLRU", - }, storageUnit.DBConfig{ - FilePath: "Blocks", - Type: storageUnit.LvlDB, - BatchDelaySeconds: 1, - MaxBatchSize: 1, - MaxOpenFiles: 10, - }) + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: "NotLRU", + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: storageUnit.LvlDB, + BatchDelaySeconds: 1, + MaxBatchSize: 1, + MaxOpenFiles: 10, + }, + ) assert.NotNil(t, err, "error expected") assert.Nil(t, storer, "storer expected to be nil but got %s", storer) } func TestNewStorageUnit_FromConfWrongDBConfig(t *testing.T) { - storer, err := storageUnit.NewStorageUnitFromConf(storageUnit.CacheConfig{ - Capacity: 10, - Type: storageUnit.LRUCache, - }, storageUnit.DBConfig{ - FilePath: "Blocks", - Type: "NotLvlDB", - }) + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: "NotLvlDB", + }, + ) assert.NotNil(t, err, "error expected") assert.Nil(t, storer, "storer expected to be nil but got %s", storer) } func TestNewStorageUnit_FromConfLvlDBOk(t *testing.T) { - storer, err := storageUnit.NewStorageUnitFromConf(storageUnit.CacheConfig{ - Capacity: 10, - Type: storageUnit.LRUCache, - }, storageUnit.DBConfig{ - FilePath: "Blocks", - Type: storageUnit.LvlDB, - MaxBatchSize: 1, - BatchDelaySeconds: 1, - MaxOpenFiles: 10, - }) + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: storageUnit.LvlDB, + MaxBatchSize: 1, + BatchDelaySeconds: 1, + MaxOpenFiles: 10, + }, + ) assert.Nil(t, err, "no error expected but got %s", err) assert.NotNil(t, storer, "valid storer expected but got nil") @@ -366,16 +523,23 @@ func TestNewStorageUnit_FromConfLvlDBOk(t *testing.T) { } func TestNewStorageUnit_ShouldWorkLvlDB(t *testing.T) { - storer, err := storageUnit.NewStorageUnitFromConf(storageUnit.CacheConfig{ - Capacity: 10, - Type: storageUnit.LRUCache, - }, storageUnit.DBConfig{ - FilePath: "Blocks", - Type: storageUnit.LvlDB, - BatchDelaySeconds: 1, - MaxBatchSize: 1, - MaxOpenFiles: 10, - }) + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: storageUnit.LvlDB, + BatchDelaySeconds: 1, + MaxBatchSize: 1, + MaxOpenFiles: 10, + }, + ) assert.Nil(t, err, "no error expected but got %s", err) assert.NotNil(t, storer, "valid storer expected but got nil") @@ -383,6 +547,63 @@ func TestNewStorageUnit_ShouldWorkLvlDB(t *testing.T) { assert.Nil(t, err, "no error expected destroying the persister") } +func TestNewStorageUnit_WrongConfigForRemovalCache(t *testing.T) { + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + RemovalTrackingCacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: "", + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: storageUnit.LvlDB, + BatchDelaySeconds: 1, + MaxBatchSize: 1, + MaxOpenFiles: 10, + }, + ) + + assert.ErrorIs(t, err, common.ErrNotSupportedCacheType) + assert.Contains(t, err.Error(), "when creating removal cache") + assert.Nil(t, storer) +} + +func TestNewStorageUnit_ShouldWorkWithRemovalCache(t *testing.T) { + t.Parallel() + + storer, err := storageUnit.NewStorageUnitFromConf( + storageUnit.CacheCreationConfig{ + CacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + RemovalTrackingCacheConfig: storageUnit.CacheConfig{ + Capacity: 10, + Type: storageUnit.LRUCache, + }, + }, + storageUnit.DBConfig{ + FilePath: t.TempDir(), + Type: storageUnit.LvlDB, + BatchDelaySeconds: 1, + MaxBatchSize: 1, + MaxOpenFiles: 10, + }, + ) + + assert.Nil(t, err) + assert.NotNil(t, storer) + + _ = storer.DestroyUnit() +} + const ( valuesInDb = 100000 ) diff --git a/testscommon/cacherMock.go b/testscommon/cacherMock.go new file mode 100644 index 00000000..fe94d4d1 --- /dev/null +++ b/testscommon/cacherMock.go @@ -0,0 +1,136 @@ +package testscommon + +import ( + "sync" + + "github.com/multiversx/mx-chain-storage-go/types" +) + +type cacherMock struct { + mut sync.RWMutex + data map[string]interface{} +} + +// NewCacherMock - +func NewCacherMock() *cacherMock { + return &cacherMock{ + data: make(map[string]interface{}), + } +} + +// Clear - +func (mock *cacherMock) Clear() { + mock.mut.Lock() + defer mock.mut.Unlock() + + mock.data = make(map[string]interface{}) +} + +// Put - +func (mock *cacherMock) Put(key []byte, value interface{}, _ int) (evicted bool) { + mock.mut.Lock() + defer mock.mut.Unlock() + + mock.data[string(key)] = value + + return false +} + +// Get - +func (mock *cacherMock) Get(key []byte) (value interface{}, ok bool) { + mock.mut.RLock() + defer mock.mut.RUnlock() + + val, found := mock.data[string(key)] + + return val, found +} + +// Has - +func (mock *cacherMock) Has(key []byte) bool { + mock.mut.RLock() + defer mock.mut.RUnlock() + + _, found := mock.data[string(key)] + + return found +} + +// Peek - +func (mock *cacherMock) Peek(key []byte) (value interface{}, ok bool) { + return mock.Get(key) +} + +// HasOrAdd - +func (mock *cacherMock) HasOrAdd(key []byte, value interface{}, _ int) (has, added bool) { + mock.mut.Lock() + defer mock.mut.Unlock() + + _, found := mock.data[string(key)] + + mock.data[string(key)] = value + + return !found, found +} + +// Remove - +func (mock *cacherMock) Remove(key []byte) { + mock.mut.Lock() + defer mock.mut.Unlock() + + delete(mock.data, string(key)) +} + +// Keys - +func (mock *cacherMock) Keys() [][]byte { + mock.mut.RLock() + defer mock.mut.RUnlock() + + keys := make([][]byte, 0, len(mock.data)) + for key := range mock.data { + keys = append(keys, []byte(key)) + } + + return keys +} + +// Len - +func (mock *cacherMock) Len() int { + mock.mut.RLock() + defer mock.mut.RUnlock() + + return len(mock.data) +} + +// SizeInBytesContained - +func (mock *cacherMock) SizeInBytesContained() uint64 { + return 0 +} + +// MaxSize - +func (mock *cacherMock) MaxSize() int { + return 0 +} + +// RegisterHandler - +func (mock *cacherMock) RegisterHandler(_ func(key []byte, value interface{}), _ string) { +} + +// UnRegisterHandler - +func (mock *cacherMock) UnRegisterHandler(_ string) { +} + +// GetRemovalStatus - +func (mock *cacherMock) GetRemovalStatus(_ []byte) types.RemovalStatus { + return types.UnknownRemovalStatus +} + +// Close - +func (mock *cacherMock) Close() error { + return nil +} + +// IsInterfaceNil - +func (mock *cacherMock) IsInterfaceNil() bool { + return mock == nil +} diff --git a/txcache/disabledCache.go b/txcache/disabledCache.go index 5a5473e5..3a42ff70 100644 --- a/txcache/disabledCache.go +++ b/txcache/disabledCache.go @@ -122,6 +122,11 @@ func (cache *DisabledCache) GetTransactionsPoolForSender(_ string) []*WrappedTra return make([]*WrappedTransaction, 0) } +// GetRemovalStatus will return the unknown status because this implementation does not track removed keys +func (cache *DisabledCache) GetRemovalStatus(_ []byte) types.RemovalStatus { + return types.UnknownRemovalStatus +} + // Close does nothing func (cache *DisabledCache) Close() error { return nil diff --git a/txcache/disabledCache_test.go b/txcache/disabledCache_test.go index a19e947a..94d93c39 100644 --- a/txcache/disabledCache_test.go +++ b/txcache/disabledCache_test.go @@ -4,6 +4,7 @@ import ( "math" "testing" + "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/require" ) @@ -60,6 +61,8 @@ func TestDisabledCache_DoesNothing(t *testing.T) { maxSize := cache.MaxSize() require.Equal(t, 0, maxSize) + require.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + require.NotPanics(t, func() { cache.RegisterHandler(func(_ []byte, _ interface{}) {}, "") }) require.False(t, cache.IsInterfaceNil()) diff --git a/txcache/txCache.go b/txcache/txCache.go index d938b976..2e2f130a 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -318,6 +318,11 @@ func (cache *TxCache) NotifyAccountNonce(accountKey []byte, nonce uint64) { func (cache *TxCache) ImmunizeTxsAgainstEviction(_ [][]byte) { } +// GetRemovalStatus will return the unknown status because this implementation does not track removed keys +func (cache *TxCache) GetRemovalStatus(_ []byte) types.RemovalStatus { + return types.UnknownRemovalStatus +} + // Close does nothing for this cacher implementation func (cache *TxCache) Close() error { return nil diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 3a8b41c4..cbc1cd47 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -626,6 +626,22 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t cache.Clear() } +func TestTxCache_GetRemovalStatus(t *testing.T) { + t.Parallel() + + key := []byte("key") + value := []byte("value") + + cache := newUnconstrainedCacheToTest() + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + _ = cache.Put(key, value, 0) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) + + cache.Remove(key) + assert.Equal(t, types.UnknownRemovalStatus, cache.GetRemovalStatus(nil)) +} + func newUnconstrainedCacheToTest() *TxCache { txGasHandler, _ := dummyParams() cache, err := NewTxCache(ConfigSourceMe{ diff --git a/types/interface.go b/types/interface.go index 99dca5f4..66892372 100644 --- a/types/interface.go +++ b/types/interface.go @@ -3,7 +3,19 @@ package types import ( "time" - "github.com/multiversx/mx-chain-core-go/storage" + "github.com/multiversx/mx-chain-core-go/data" +) + +// RemovalStatus removal status defines the removal possibility of a key +type RemovalStatus string + +const ( + // UnknownRemovalStatus defines the unknown status + UnknownRemovalStatus RemovalStatus = "unknown" + // ExplicitlyRemovedStatus defines the explicitly removed status + ExplicitlyRemovedStatus RemovalStatus = "explicitly removed" + // NotRemovedStatus the key is not removed + NotRemovedStatus RemovalStatus = "not removed" ) // Persister provides storage of data services in a database like construct @@ -74,6 +86,8 @@ type Cacher interface { RegisterHandler(handler func(key []byte, value interface{}), id string) // UnRegisterHandler deletes the handler from the list UnRegisterHandler(id string) + // GetRemovalStatus returns the removal status for the provided key + GetRemovalStatus(key []byte) RemovalStatus // Close closes the underlying temporary db if the cacher implementation has one, // otherwise it does nothing Close() error @@ -94,7 +108,7 @@ type Storer interface { ClearCache() DestroyUnit() error GetFromEpoch(key []byte, epoch uint32) ([]byte, error) - GetBulkFromEpoch(keys [][]byte, epoch uint32) ([]storage.KeyValuePair, error) + GetBulkFromEpoch(keys [][]byte, epoch uint32) ([]data.KeyValuePair, error) GetOldestEpoch() (uint32, error) RangeKeys(handler func(key []byte, val []byte) bool) Close() error