From 02df161799b8b398ff9bd8e8b783d4084892ec7f Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 20 Aug 2024 16:02:28 -0700 Subject: [PATCH 01/10] insert approvals with badger batch update --- storage/badger/approvals.go | 84 +++++----- storage/badger/cache_b.go | 152 ++++++++++++++++++ storage/badger/operation/approvals.go | 22 +-- storage/badger/operation/common.go | 40 +++++ .../badger/operation/reader_batch_writer.go | 130 +++++++++++++++ storage/batch.go | 76 ++++++++- 6 files changed, 453 insertions(+), 51 deletions(-) create mode 100644 storage/badger/cache_b.go create mode 100644 storage/badger/operation/reader_batch_writer.go diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index eb3cf4ae820..56aab0a7f8e 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -3,6 +3,7 @@ package badger import ( "errors" "fmt" + "sync" "github.com/dgraph-io/badger/v2" @@ -11,24 +12,24 @@ import ( "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" - "github.com/onflow/flow-go/storage/badger/transaction" ) // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *badger.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + db *badger.DB + cache *CacheB[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals } func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals { - store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error { - return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val))) + store := func(key flow.Identifier, val *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error { + return storage.OnlyBadgerWriter(operation.InsertResultApproval(val)) } - retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) { + retrieve := func(approvalID flow.Identifier) func(tx storage.Reader) (*flow.ResultApproval, error) { var approval flow.ResultApproval - return func(tx *badger.Txn) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { err := operation.RetrieveResultApproval(approvalID, &approval)(tx) return &approval, err } @@ -36,21 +37,22 @@ func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApp res := &ResultApprovals{ db: db, - cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, - withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), - withStore[flow.Identifier, *flow.ResultApproval](store), - withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)), + cache: newCacheB[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, + withLimitB[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), + withStoreB[flow.Identifier, *flow.ResultApproval](store), + withRetrieveB[flow.Identifier, *flow.ResultApproval](retrieve)), + indexing: new(sync.Mutex), } return res } -func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error { +func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error { return r.cache.PutTx(approval.ID(), approval) } -func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { val, err := r.cache.Get(approvalID)(tx) if err != nil { return nil, err @@ -59,8 +61,8 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f } } -func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { var approvalID flow.Identifier err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx) if err != nil { @@ -70,29 +72,27 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f } } -func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return func(tx *badger.Txn) error { - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx) - if err == nil { - return nil - } +func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + r, w := tx.ReaderWriter() + + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + // no approval found, index the approval + + return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -105,14 +105,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // Store stores a ResultApproval func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { - return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval)) + return operation.WithBadgerReaderBatchWriter(r.db, r.store(approval)) } // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { - err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID)) + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + + err := operation.WithBadgerReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) } @@ -121,16 +129,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app // ByID retrieves a ResultApproval by its ID func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byID(approvalID)(tx) + return r.byID(approvalID)(operation.ToReader(r.db)) } // ByChunk retrieves a ResultApproval by result ID and chunk index. The // ResultApprovals store is only used within a verification node, where it is // assumed that there is never more than one approval per chunk. func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byChunk(resultID, chunkIndex)(tx) + return r.byChunk(resultID, chunkIndex)(operation.ToReader(r.db)) } diff --git a/storage/badger/cache_b.go b/storage/badger/cache_b.go new file mode 100644 index 00000000000..4c7ed3a97d5 --- /dev/null +++ b/storage/badger/cache_b.go @@ -0,0 +1,152 @@ +package badger + +import ( + "errors" + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func withLimitB[K comparable, V any](limit uint) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.limit = limit + } +} + +type storeFuncB[K comparable, V any] func(key K, val V) func(storage.BadgerReaderBatchWriter) error + +func withStoreB[K comparable, V any](store storeFuncB[K, V]) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.store = store + } +} + +func noStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + return fmt.Errorf("no store function for cache put available") + } +} + +// nolint: unused +func noopStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + return nil + } +} + +type retrieveFuncB[K comparable, V any] func(key K) func(storage.Reader) (V, error) + +func withRetrieveB[K comparable, V any](retrieve retrieveFuncB[K, V]) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.retrieve = retrieve + } +} + +func noRetrieveB[K comparable, V any](_ K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +type CacheB[K comparable, V any] struct { + metrics module.CacheMetrics + limit uint + store storeFuncB[K, V] + retrieve retrieveFuncB[K, V] + resource string + cache *lru.Cache[K, V] +} + +func newCacheB[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*CacheB[K, V])) *CacheB[K, V] { + c := CacheB[K, V]{ + metrics: collector, + limit: 1000, + store: noStoreB[K, V], + retrieve: noRetrieveB[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *CacheB[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(tx) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *CacheB[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +func (c *CacheB[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *CacheB[K, V]) PutTx(key K, resource V) func(storage.BadgerReaderBatchWriter) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(tx storage.BadgerReaderBatchWriter) error { + tx.AddCallback(func(err error) { + if err != nil { + c.Insert(key, resource) + } + }) + + err := storeOps(tx) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + return nil + } +} diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go index 8a994eed2a2..af64911df7e 100644 --- a/storage/badger/operation/approvals.go +++ b/storage/badger/operation/approvals.go @@ -1,19 +1,21 @@ package operation import ( - "github.com/dgraph-io/badger/v2" - "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" ) // InsertResultApproval inserts a ResultApproval by ID. -func InsertResultApproval(approval *flow.ResultApproval) func(*badger.Txn) error { - return insert(makePrefix(codeResultApproval, approval.ID()), approval) +// The same key (`approval.ID()`) necessitates that the value (full `approval`) is +// also identical (otherwise, we would have a successful pre-image attack on our +// cryptographic hash function). Therefore, concurrent calls to this function are safe. +func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error { + return insertW(makePrefix(codeResultApproval, approval.ID()), approval) } // RetrieveResultApproval retrieves an approval by ID. -func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error { - return retrieve(makePrefix(codeResultApproval, approvalID), approval) +func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error { + return retrieveR(makePrefix(codeResultApproval, approvalID), approval) } // IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID @@ -21,11 +23,11 @@ func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApp // error is returned. This operation is only used by the ResultApprovals store, // which is only used within a Verification node, where it is assumed that there // is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { + return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } // LookupResultApproval finds a ResultApproval by result ID and chunk index. -func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error { + return retrieveR(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } diff --git a/storage/badger/operation/common.go b/storage/badger/operation/common.go index 1c293348231..a23a360337f 100644 --- a/storage/badger/operation/common.go +++ b/storage/badger/operation/common.go @@ -44,6 +44,22 @@ func batchWrite(key []byte, entity interface{}) func(writeBatch *badger.WriteBat } } +func insertW(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + // insert will encode the given entity using msgpack and will insert the resulting // binary data in the badger DB under the provided key. It will error if the // key already exists. @@ -266,6 +282,30 @@ func retrieve(key []byte, entity interface{}) func(*badger.Txn) error { } } +// retrieve will retrieve the binary data under the given key from the badger DB +// and decode it into the given entity. The provided entity needs to be a +// pointer to an initialized entity of the correct type. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer, or failure +// to decode an existing database value +func retrieveR(key []byte, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + return nil + } +} + // exists returns true if a key exists in the database. // No errors are expected during normal operation. func exists(key []byte, keyExists *bool) func(*badger.Txn) error { diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go new file mode 100644 index 00000000000..6e6c8799117 --- /dev/null +++ b/storage/badger/operation/reader_batch_writer.go @@ -0,0 +1,130 @@ +package operation + +import ( + "errors" + "fmt" + "io" + "sync" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type ReaderBatchWriter struct { + db *badger.DB + batch *badger.WriteBatch + + addingCallback sync.Mutex // protect callbacks + callbacks []func(error) +} + +var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) ReaderWriter() (storage.Reader, storage.Writer) { + // reusing the same underlying object, but expose with different interfaces + return b, b +} + +func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.addingCallback.Lock() + defer b.addingCallback.Unlock() + + b.callbacks = append(b.callbacks, callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Flush() + + b.notifyCallbacks(err) + + return err +} + +func (b *ReaderBatchWriter) notifyCallbacks(err error) { + b.addingCallback.Lock() + defer b.addingCallback.Unlock() + + for _, callback := range b.callbacks { + callback(err) + } +} + +func WithBadgerReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatchWriter) error) error { + batch := NewBadgerReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.notifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewBadgerReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + db: db, + batch: db.NewWriteBatch(), + } +} + +// ToReader is a helper function to convert a BadgerReaderBatchWriter to a Reader +var ToReader = NewBadgerReaderBatchWriter + +var _ storage.Reader = (*ReaderBatchWriter)(nil) + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b *ReaderBatchWriter) Get(key []byte) ([]byte, io.Closer, error) { + tx := b.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil, storage.ErrNotFound + } + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + var value []byte + err = item.Value(func(val []byte) error { + value = append([]byte{}, val...) + return nil + }) + if err != nil { + return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) + } + + return value, new(noopCloser), nil +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key) +} + +func (b *ReaderBatchWriter) DeleteRange(start, end []byte) error { + // TODO: implement + return fmt.Errorf("not implemented") +} diff --git a/storage/batch.go b/storage/batch.go index 3147fc5c0e7..d04207055f3 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -1,11 +1,19 @@ package storage -import "github.com/dgraph-io/badger/v2" +import ( + "io" + "github.com/dgraph-io/badger/v2" +) + +// deprecated +// use Writer instead type Transaction interface { Set(key, val []byte) error } +// deprecated +// use BadgerReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. type BatchStorage interface { @@ -20,3 +28,69 @@ type BatchStorage interface { // Flush will flush the write batch and update the cache. Flush() error } + +type Reader interface { + // Get gets the value for the given key. It returns ErrNotFound if the DB + // does not contain the key. + // + // The caller should not modify the contents of the returned slice, but it is + // safe to modify the contents of the argument after Get returns. The + // returned slice will remain valid until the returned Closer is closed. On + // success, the caller MUST call closer.Close() or a memory leak will occur. + Get(key []byte) (value []byte, closer io.Closer, err error) +} + +// Writer is an interface for batch writing to a storage backend. +type Writer interface { + // Set sets the value for the given key. It overwrites any previous value + // for that key; a DB is not a multi-map. + // + // It is safe to modify the contents of the arguments after Set returns. + Set(k, v []byte) error + + // Delete deletes the value for the given key. Deletes are blind all will + // succeed even if the given key does not exist. + // + // It is safe to modify the contents of the arguments after Delete returns. + Delete(key []byte) error + + // DeleteRange deletes all of the point keys (and values) in the range + // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT + // delete overlapping range keys (eg, keys set via RangeKeySet). + // + // It is safe to modify the contents of the arguments after DeleteRange + // returns. + DeleteRange(start, end []byte) error +} + +// BadgerReaderBatchWriter is an interface for badger-specific reader and writer. +type BadgerReaderBatchWriter interface { + // ReaderWriter returns the reader and writer for the storage backend. + // The reader is used to read data from the storage backend, and + // the writer is used to write data to the storage backend with an atomic batch + // update. + // Note: + // - There is no guarantee on the consistency of the data read, + // the data read may not reflect the latest data written. + // it is the responsibility of the caller to ensure the consistency. + // - The writer cannot be used concurrently for writing. + ReaderWriter() (Reader, Writer) + + // BadgerBatch returns the underlying batch object + // Useful for implementing badger-specific operations + BadgerWriteBatch() *badger.WriteBatch + + // AddCallback adds a callback to execute after the batch has been flush + // regardless the batch update is succeeded or failed. + // The error parameter is the error returned by the batch update. + AddCallback(func(error)) +} + +// OnlyBadgerWriter is an adapter to convert a function that takes a Writer +// to a function that takes a BadgerReaderBatchWriter. +func OnlyBadgerWriter(fn func(Writer) error) func(BadgerReaderBatchWriter) error { + return func(rw BadgerReaderBatchWriter) error { + _, w := rw.ReaderWriter() + return fn(w) + } +} From 947034527a2cc1cf29ba8d709371063e4688cabd Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 09:41:17 -0700 Subject: [PATCH 02/10] add concurrent tests --- storage/badger/approvals_test.go | 50 ++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/storage/badger/approvals_test.go b/storage/badger/approvals_test.go index 1b13a49ae59..5e433fa0c66 100644 --- a/storage/badger/approvals_test.go +++ b/storage/badger/approvals_test.go @@ -2,6 +2,7 @@ package badger_test import ( "errors" + "sync" "testing" "github.com/dgraph-io/badger/v2" @@ -79,3 +80,52 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { require.True(t, errors.Is(err, storage.ErrDataMismatch)) }) } + +// verify that storing and indexing two conflicting approvals concurrently should fail +// one of them is succeed, the other one should fail +func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + metrics := metrics.NewNoopCollector() + store := bstorage.NewResultApprovals(metrics, db) + + approval1 := unittest.ResultApprovalFixture() + approval2 := unittest.ResultApprovalFixture() + + var wg sync.WaitGroup + wg.Add(2) + + var firstIndexErr, secondIndexErr error + + // First goroutine stores and indexes the first approval. + go func() { + defer wg.Done() + + err := store.Store(approval1) + require.NoError(t, err) + + firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID()) + }() + + // Second goroutine stores and tries to index the second approval for the same chunk. + go func() { + defer wg.Done() + + err := store.Store(approval2) + require.NoError(t, err) + + secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + }() + + // Wait for both goroutines to finish + wg.Wait() + + // Check that one of the Index operations succeeded and the other failed + if firstIndexErr == nil { + require.Error(t, secondIndexErr) + require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch)) + } else { + require.NoError(t, secondIndexErr) + require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch)) + } + }) +} From 32bb86bb26df875a0ec01712c40495a914d7a3a8 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 11:21:11 -0700 Subject: [PATCH 03/10] rename --- storage/badger/approvals.go | 4 ++-- storage/badger/operation/reader_batch_writer.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index 56aab0a7f8e..e81e72b4fc3 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -105,7 +105,7 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // Store stores a ResultApproval func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { - return operation.WithBadgerReaderBatchWriter(r.db, r.store(approval)) + return operation.WithReaderBatchWriter(r.db, r.store(approval)) } // Index indexes a ResultApproval by chunk (ResultID + chunk index). @@ -120,7 +120,7 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app r.indexing.Lock() defer r.indexing.Unlock() - err := operation.WithBadgerReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) + err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) } diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index 6e6c8799117..00003346504 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -55,8 +55,8 @@ func (b *ReaderBatchWriter) notifyCallbacks(err error) { } } -func WithBadgerReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatchWriter) error) error { - batch := NewBadgerReaderBatchWriter(db) +func WithReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) err := fn(batch) if err != nil { @@ -72,7 +72,7 @@ func WithBadgerReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatc return batch.Commit() } -func NewBadgerReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { +func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { return &ReaderBatchWriter{ db: db, batch: db.NewWriteBatch(), @@ -80,7 +80,7 @@ func NewBadgerReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { } // ToReader is a helper function to convert a BadgerReaderBatchWriter to a Reader -var ToReader = NewBadgerReaderBatchWriter +var ToReader = NewReaderBatchWriter var _ storage.Reader = (*ReaderBatchWriter)(nil) From 5362f46847fa1c5b65c948b468c4604be544624d Mon Sep 17 00:00:00 2001 From: Leo Zhang Date: Thu, 22 Aug 2024 17:21:23 -0700 Subject: [PATCH 04/10] Apply suggestions from code review Co-authored-by: Jordan Schalm --- storage/badger/approvals.go | 1 + storage/badger/operation/reader_batch_writer.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index e81e72b4fc3..a31227341f4 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -72,6 +72,7 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f } } +// CAUTION: Caller must acquire `indexing` lock. func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error { return func(tx storage.BadgerReaderBatchWriter) error { r, w := tx.ReaderWriter() diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index 00003346504..a90d5b3682d 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -79,7 +79,7 @@ func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { } } -// ToReader is a helper function to convert a BadgerReaderBatchWriter to a Reader +// ToReader is a helper function to convert a *badger.DB to a Reader var ToReader = NewReaderBatchWriter var _ storage.Reader = (*ReaderBatchWriter)(nil) @@ -111,7 +111,7 @@ func (b *ReaderBatchWriter) Get(key []byte) ([]byte, io.Closer, error) { return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) } - return value, new(noopCloser), nil + return value, noopCloser{}, nil } var _ storage.Writer = (*ReaderBatchWriter)(nil) From bfa51ecd56f987961cf15d1b84b3f7695b9da631 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 17:23:32 -0700 Subject: [PATCH 05/10] update comments --- storage/badger/cache_b.go | 4 ++-- storage/badger/operation/common.go | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/storage/badger/cache_b.go b/storage/badger/cache_b.go index 4c7ed3a97d5..442ebbbc8d5 100644 --- a/storage/badger/cache_b.go +++ b/storage/badger/cache_b.go @@ -87,7 +87,7 @@ func (c *CacheB[K, V]) IsCached(key K) bool { // injected. During normal operations, the following error returns are expected: // - `storage.ErrNotFound` if key is unknown. func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) { - return func(tx storage.Reader) (V, error) { + return func(r storage.Reader) (V, error) { // check if we have it in the cache resource, cached := c.cache.Get(key) @@ -97,7 +97,7 @@ func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) { } // get it from the database - resource, err := c.retrieve(key)(tx) + resource, err := c.retrieve(key)(r) if err != nil { if errors.Is(err, storage.ErrNotFound) { c.metrics.CacheNotFound(c.resource) diff --git a/storage/badger/operation/common.go b/storage/badger/operation/common.go index a23a360337f..09abd97a62f 100644 --- a/storage/badger/operation/common.go +++ b/storage/badger/operation/common.go @@ -44,6 +44,12 @@ func batchWrite(key []byte, entity interface{}) func(writeBatch *badger.WriteBat } } +// insertW will encode the given entity using msgpack and will insert the resulting +// binary data in the badger DB under the provided key. It will error if the +// key already exists. +// Error returns: +// - generic error in case of unexpected failure from the database layer or +// encoding failure. func insertW(key []byte, val interface{}) func(storage.Writer) error { return func(w storage.Writer) error { value, err := msgpack.Marshal(val) @@ -60,6 +66,7 @@ func insertW(key []byte, val interface{}) func(storage.Writer) error { } } +// deprecated - use insertW instead // insert will encode the given entity using msgpack and will insert the resulting // binary data in the badger DB under the provided key. It will error if the // key already exists. From 3c6e7972abfc8c88825c2c6bfc8af95cecbec0d7 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 17:27:55 -0700 Subject: [PATCH 06/10] add OnCommitSucceed --- storage/badger/cache_b.go | 10 ++++------ storage/batch.go | 9 +++++++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/storage/badger/cache_b.go b/storage/badger/cache_b.go index 442ebbbc8d5..5e786d51146 100644 --- a/storage/badger/cache_b.go +++ b/storage/badger/cache_b.go @@ -135,14 +135,12 @@ func (c *CacheB[K, V]) Insert(key K, resource V) { func (c *CacheB[K, V]) PutTx(key K, resource V) func(storage.BadgerReaderBatchWriter) error { storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) - return func(tx storage.BadgerReaderBatchWriter) error { - tx.AddCallback(func(err error) { - if err != nil { - c.Insert(key, resource) - } + return func(rw storage.BadgerReaderBatchWriter) error { + storage.OnCommitSucceed(rw, func() { + c.Insert(key, resource) }) - err := storeOps(tx) // execute operations to store resource + err := storeOps(rw) // execute operations to store resource if err != nil { return fmt.Errorf("could not store resource: %w", err) } diff --git a/storage/batch.go b/storage/batch.go index d04207055f3..b73dbeabd7a 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -94,3 +94,12 @@ func OnlyBadgerWriter(fn func(Writer) error) func(BadgerReaderBatchWriter) error return fn(w) } } + +// OnCommitSucceed adds a callback to execute after the batch has been successfully committed. +func OnCommitSucceed(b BadgerReaderBatchWriter, onSuccessFn func()) { + b.AddCallback(func(err error) { + if err == nil { + onSuccessFn() + } + }) +} From 0437ff8c6668ac7dec9e598af0d3d4086d3f470f Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 17:30:37 -0700 Subject: [PATCH 07/10] remove DeleteRange --- storage/badger/operation/reader_batch_writer.go | 6 ------ storage/batch.go | 8 -------- 2 files changed, 14 deletions(-) diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index a90d5b3682d..b6e32d3ee46 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -2,7 +2,6 @@ package operation import ( "errors" - "fmt" "io" "sync" @@ -123,8 +122,3 @@ func (b *ReaderBatchWriter) Set(key, value []byte) error { func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key) } - -func (b *ReaderBatchWriter) DeleteRange(start, end []byte) error { - // TODO: implement - return fmt.Errorf("not implemented") -} diff --git a/storage/batch.go b/storage/batch.go index b73dbeabd7a..5048749f783 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -53,14 +53,6 @@ type Writer interface { // // It is safe to modify the contents of the arguments after Delete returns. Delete(key []byte) error - - // DeleteRange deletes all of the point keys (and values) in the range - // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT - // delete overlapping range keys (eg, keys set via RangeKeySet). - // - // It is safe to modify the contents of the arguments after DeleteRange - // returns. - DeleteRange(start, end []byte) error } // BadgerReaderBatchWriter is an interface for badger-specific reader and writer. From f99eea7684e23b4b71cc788b8751bd1f4baf9c8a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 17:39:21 -0700 Subject: [PATCH 08/10] replace ReaderWriter with GlobalReader and Writer --- storage/badger/approvals.go | 8 +++----- .../badger/operation/reader_batch_writer.go | 9 ++++++--- storage/batch.go | 20 +++++++++---------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index a31227341f4..4733c8e3e77 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -74,11 +74,9 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f // CAUTION: Caller must acquire `indexing` lock. func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error { - return func(tx storage.BadgerReaderBatchWriter) error { - r, w := tx.ReaderWriter() - + return func(rw storage.BadgerReaderBatchWriter) error { var storedApprovalID flow.Identifier - err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(rw.GlobalReader()) if err != nil { if !errors.Is(err, storage.ErrNotFound) { return fmt.Errorf("could not lookup result approval ID: %w", err) @@ -86,7 +84,7 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // no approval found, index the approval - return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) + return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) } // an approval is already indexed, double check if it is the same diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index b6e32d3ee46..fc6fcae7c6b 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -21,9 +21,12 @@ type ReaderBatchWriter struct { var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) -func (b *ReaderBatchWriter) ReaderWriter() (storage.Reader, storage.Writer) { - // reusing the same underlying object, but expose with different interfaces - return b, b +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b +} + +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b } func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { diff --git a/storage/batch.go b/storage/batch.go index 5048749f783..2b3d687d78c 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -57,16 +57,17 @@ type Writer interface { // BadgerReaderBatchWriter is an interface for badger-specific reader and writer. type BadgerReaderBatchWriter interface { - // ReaderWriter returns the reader and writer for the storage backend. - // The reader is used to read data from the storage backend, and - // the writer is used to write data to the storage backend with an atomic batch - // update. + // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). + // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. + // This reader may observe different values for the same key on subsequent reads. + GlobalReader() Reader + + // Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. + // When we `Write` into the batch, that write operation is added to the pending batch, but not committed. + // The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. // Note: - // - There is no guarantee on the consistency of the data read, - // the data read may not reflect the latest data written. - // it is the responsibility of the caller to ensure the consistency. // - The writer cannot be used concurrently for writing. - ReaderWriter() (Reader, Writer) + Writer() Writer // BadgerBatch returns the underlying batch object // Useful for implementing badger-specific operations @@ -82,8 +83,7 @@ type BadgerReaderBatchWriter interface { // to a function that takes a BadgerReaderBatchWriter. func OnlyBadgerWriter(fn func(Writer) error) func(BadgerReaderBatchWriter) error { return func(rw BadgerReaderBatchWriter) error { - _, w := rw.ReaderWriter() - return fn(w) + return fn(rw.Writer()) } } From 300119dde40a8df4915444f2c3da69ac87c63890 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 22 Aug 2024 17:43:41 -0700 Subject: [PATCH 09/10] refactor ToReader --- storage/badger/operation/reader_batch_writer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index fc6fcae7c6b..d92df33d3b3 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -82,7 +82,9 @@ func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { } // ToReader is a helper function to convert a *badger.DB to a Reader -var ToReader = NewReaderBatchWriter +func ToReader(db *badger.DB) storage.Reader { + return NewReaderBatchWriter(db) +} var _ storage.Reader = (*ReaderBatchWriter)(nil) From ddd940445fecb658498c421754fe0e52a0ba019a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 23 Aug 2024 10:21:13 -0700 Subject: [PATCH 10/10] addressing review comments --- storage/badger/approvals.go | 2 +- storage/badger/operation/approvals.go | 6 ++++-- storage/badger/operation/reader_batch_writer.go | 17 ++++++++++++++++- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index 4733c8e3e77..3023204290c 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -84,7 +84,7 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // no approval found, index the approval - return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) + return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) } // an approval is already indexed, double check if it is the same diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go index af64911df7e..7c78095e090 100644 --- a/storage/badger/operation/approvals.go +++ b/storage/badger/operation/approvals.go @@ -18,12 +18,14 @@ func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApp return retrieveR(makePrefix(codeResultApproval, approvalID), approval) } -// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID +// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID // and chunk index. If a value for this key exists, a storage.ErrAlreadyExists // error is returned. This operation is only used by the ResultApprovals store, // which is only used within a Verification node, where it is assumed that there // is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { +// CAUTION: In order to prevent overwriting, use of this function must be +// synchronized with check (RetrieveResultApproval) for existance of the key. +func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index d92df33d3b3..3c8a26530fd 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -16,15 +16,30 @@ type ReaderBatchWriter struct { batch *badger.WriteBatch addingCallback sync.Mutex // protect callbacks - callbacks []func(error) + + // callbacks are executed regardless of the success of the batch commit. + // if any function that is adding writes to the batch fails, the callbacks + // are also called with the error, in this case the callbacks are executed + // before the batch is submitted. This is useful for the locks in those functions + // to be released. + // callbacks must be non-blocking + callbacks []func(error) } var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. func (b *ReaderBatchWriter) GlobalReader() storage.Reader { return b } +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. func (b *ReaderBatchWriter) Writer() storage.Writer { return b }