Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Badger] Refactor approvals to use badger batch updates #6466

Draft
wants to merge 9 commits into
base: leo/db-ops
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/bootstrap/utils/md5.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package utils

// The google storage API only provides md5 and crc32 hence overriding the linter flag for md5
// #nosec
import (
"crypto/md5" //nolint:gosec
// #nosec
"crypto/md5"
"io"
"os"
)
Expand Down
4 changes: 3 additions & 1 deletion cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
)

type VerificationConfig struct {
Expand Down Expand Up @@ -201,7 +203,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
vmCtx := fvm.NewContext(fvmOptions...)

chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger)
approvalStorage := badger.NewResultApprovals(node.Metrics.Cache, node.DB)
approvalStorage := store.NewResultApprovals(node.Metrics.Cache, badgerimpl.ToDB(node.DB))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from badger to pebble in the future would just be changing badgerimpl to pebbleimpl here.

verifierEng, err = verifier.New(
node.Logger,
collector,
Expand Down
4 changes: 3 additions & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ import (
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/state/protocol/util"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -1021,7 +1023,7 @@ func VerificationNode(t testing.TB,

chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Log)

approvalStorage := storage.NewResultApprovals(node.Metrics, node.PublicDB)
approvalStorage := store.NewResultApprovals(node.Metrics, badgerimpl.ToDB(node.PublicDB))

node.VerifierEngine, err = verifier.New(node.Log,
collector,
Expand Down
31 changes: 0 additions & 31 deletions storage/badger/operation/approvals.go

This file was deleted.

8 changes: 7 additions & 1 deletion storage/batch.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package storage

import "github.com/dgraph-io/badger/v2"
import (
"github.com/dgraph-io/badger/v2"
)

// deprecated
// use Writer instead
type Transaction interface {
Set(key, val []byte) error
}

// deprecated
// use ReaderBatchWriter instead
// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra
// callbacks which fire after the batch is successfully flushed.
type BatchStorage interface {
Expand Down
34 changes: 34 additions & 0 deletions storage/operation/approvals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package operation

import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

// InsertResultApproval inserts a ResultApproval by ID.
// The same key (`approval.ID()`) necessitates that the value (full `approval`) is
// also identical (otherwise, we would have a successful pre-image attack on our
// cryptographic hash function). Therefore, concurrent calls to this function are safe.
func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error {
return Upsert(makePrefix(codeResultApproval, approval.ID()), approval)
}

// RetrieveResultApproval retrieves an approval by ID.
func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error {
return Retrieve(makePrefix(codeResultApproval, approvalID), approval)
}

// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists
// error is returned. This operation is only used by the ResultApprovals store,
// which is only used within a Verification node, where it is assumed that there
// is only one approval per chunk.
// CAUTION: Use of this function must be synchronized by storage.ResultApprovals.
func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error {
return Upsert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}

// LookupResultApproval finds a ResultApproval by result ID and chunk index.
func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error {
return Retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}
44 changes: 44 additions & 0 deletions storage/operation/approvals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package operation_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
"github.com/onflow/flow-go/storage/operation/dbtest"
"github.com/onflow/flow-go/utils/unittest"
)

func BenchmarkRetrieveApprovals(b *testing.B) {
dbtest.BenchWithDB(b, func(b *testing.B, db storage.DB) {
b.Run("RetrieveApprovals", func(b *testing.B) {
approval := unittest.ResultApprovalFixture()
require.NoError(b, db.WithReaderBatchWriter(storage.OnlyWriter(operation.InsertResultApproval(approval))))

b.ResetTimer()

approvalID := approval.ID()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var stored flow.ResultApproval
require.NoError(b, operation.RetrieveResultApproval(approvalID, &stored)(db.Reader()))
}
})

})
})
}

func BenchmarkInsertApproval(b *testing.B) {
dbtest.BenchWithDB(b, func(b *testing.B, db storage.DB) {
b.Run("InsertApprovals", func(b *testing.B) {
for i := 0; i < b.N; i++ {
approval := unittest.ResultApprovalFixture()
require.NoError(b, db.WithReaderBatchWriter(storage.OnlyWriter(operation.InsertResultApproval(approval))))
}
})
})
}
23 changes: 23 additions & 0 deletions storage/operation/badgerimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package badgerimpl

import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *badger.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *badger.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}
65 changes: 65 additions & 0 deletions storage/operation/badgerimpl/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package badgerimpl

import (
"bytes"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

type badgerIterator struct {
iter *badger.Iterator
lowerBound []byte
upperBound []byte
}

var _ storage.Iterator = (*badgerIterator)(nil)

func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator {
options := badger.DefaultIteratorOptions
if ops.IterateKeyOnly {
options.PrefetchValues = false
}

tx := db.NewTransaction(false)
iter := tx.NewIterator(options)

lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix)

return &badgerIterator{
iter: iter,
lowerBound: lowerBound,
upperBound: upperBound,
}
}

func (i *badgerIterator) SeekGE() {
i.iter.Seek(i.lowerBound)
}

func (i *badgerIterator) Valid() bool {
// if it's beyond the upper bound, it's invalid
if !i.iter.Valid() {
return false
}
key := i.iter.Item().Key()
// "< 0" means the upperBound is exclusive
valid := bytes.Compare(key, i.upperBound) < 0
return valid
}

func (i *badgerIterator) Next() {
i.iter.Next()
}

func (i *badgerIterator) IterItem() storage.IterItem {
return i.iter.Item()
}

var _ storage.IterItem = (*badger.Item)(nil)

func (i *badgerIterator) Close() error {
i.iter.Close()
return nil
}
54 changes: 54 additions & 0 deletions storage/operation/badgerimpl/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package badgerimpl

import (
"errors"
"io"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
)

type dbReader struct {
db *badger.DB
}

type noopCloser struct{}

var _ io.Closer = (*noopCloser)(nil)

func (noopCloser) Close() error { return nil }

func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
tx := b.db.NewTransaction(false)
defer tx.Discard()

item, err := tx.Get(key)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, nil, storage.ErrNotFound
}
return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err)
}

var value []byte
err = item.Value(func(val []byte) error {
value = append([]byte{}, val...)
return nil
})
if err != nil {
return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err)
}

return value, noopCloser{}, nil
}

func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) {
return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil
}

// ToReader is a helper function to convert a *badger.DB to a Reader
func ToReader(db *badger.DB) storage.Reader {
return dbReader{db}
}
Loading