Skip to content

Commit

Permalink
add universal database operations
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Sep 14, 2024
1 parent 2a443de commit 1f6ad37
Show file tree
Hide file tree
Showing 17 changed files with 1,491 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrap/utils/md5.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package utils

// The google storage API only provides md5 and crc32 hence overriding the linter flag for md5
// #nosec
import (
// #nosec
"crypto/md5"
"io"
"os"
Expand Down
8 changes: 7 additions & 1 deletion storage/batch.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package storage

import "github.com/dgraph-io/badger/v2"
import (
"github.com/dgraph-io/badger/v2"
)

// deprecated
// use Writer instead
type Transaction interface {
Set(key, val []byte) error
}

// deprecated
// use ReaderBatchWriter instead
// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra
// callbacks which fire after the batch is successfully flushed.
type BatchStorage interface {
Expand Down
65 changes: 65 additions & 0 deletions storage/operation/badgerimpl/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package badgerimpl

import (
"bytes"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

type badgerIterator struct {
iter *badger.Iterator
lowerBound []byte
upperBound []byte
}

var _ storage.Iterator = (*badgerIterator)(nil)

func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator {
options := badger.DefaultIteratorOptions
if ops.IterateKeyOnly {
options.PrefetchValues = false
}

tx := db.NewTransaction(false)
iter := tx.NewIterator(options)

lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix)

return &badgerIterator{
iter: iter,
lowerBound: lowerBound,
upperBound: upperBound,
}
}

func (i *badgerIterator) SeekGE() {
i.iter.Seek(i.lowerBound)
}

func (i *badgerIterator) Valid() bool {
// if it's beyond the upper bound, it's invalid
if !i.iter.Valid() {
return false
}
key := i.iter.Item().Key()
// "< 0" means the upperBound is exclusive
valid := bytes.Compare(key, i.upperBound) < 0
return valid
}

func (i *badgerIterator) Next() {
i.iter.Next()
}

func (i *badgerIterator) IterItem() storage.IterItem {
return i.iter.Item()
}

var _ storage.IterItem = (*badger.Item)(nil)

func (i *badgerIterator) Close() error {
i.iter.Close()
return nil
}
54 changes: 54 additions & 0 deletions storage/operation/badgerimpl/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package badgerimpl

import (
"errors"
"io"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
)

type dbReader struct {
db *badger.DB
}

type noopCloser struct{}

var _ io.Closer = (*noopCloser)(nil)

func (noopCloser) Close() error { return nil }

func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
tx := b.db.NewTransaction(false)
defer tx.Discard()

item, err := tx.Get(key)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, nil, storage.ErrNotFound
}
return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err)
}

var value []byte
err = item.Value(func(val []byte) error {
value = append([]byte{}, val...)
return nil
})
if err != nil {
return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err)
}

return value, noopCloser{}, nil
}

func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) {
return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil
}

// ToReader is a helper function to convert a *badger.DB to a Reader
func ToReader(db *badger.DB) storage.Reader {
return dbReader{db}
}
93 changes: 93 additions & 0 deletions storage/operation/badgerimpl/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package badgerimpl

import (
"fmt"

"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation"
op "github.com/onflow/flow-go/storage/operation"
)

type ReaderBatchWriter struct {
globalReader storage.Reader
batch *badger.WriteBatch

callbacks op.Callbacks
}

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)

func (b *ReaderBatchWriter) GlobalReader() storage.Reader {
return b.globalReader
}

func (b *ReaderBatchWriter) Writer() storage.Writer {
return b
}

func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch {
return b.batch
}

func (b *ReaderBatchWriter) AddCallback(callback func(error)) {
b.callbacks.AddCallback(callback)
}

func (b *ReaderBatchWriter) Commit() error {
err := b.batch.Flush()

b.callbacks.NotifyCallbacks(err)

return err
}

func WithReaderBatchWriter(db *badger.DB, fn func(storage.ReaderBatchWriter) error) error {
batch := NewReaderBatchWriter(db)

err := fn(batch)
if err != nil {
// fn might use lock to ensure concurrent safety while reading and writing data
// and the lock is usually released by a callback.
// in other words, fn might hold a lock to be released by a callback,
// we need to notify the callback for the locks to be released before
// returning the error.
batch.callbacks.NotifyCallbacks(err)
return err
}

return batch.Commit()
}

func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter {
return &ReaderBatchWriter{
globalReader: ToReader(db),
batch: db.NewWriteBatch(),
}
}

var _ storage.Writer = (*ReaderBatchWriter)(nil)

func (b *ReaderBatchWriter) Set(key, value []byte) error {
return b.batch.Set(key, value)
}

func (b *ReaderBatchWriter) Delete(key []byte) error {
return b.batch.Delete(key)
}

func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error {
err := b.batch.Delete(key)
if err != nil {
return fmt.Errorf("could not add key to delete batch (%v): %w", key, err)
}
return nil
})(globalReader)

if err != nil {
return fmt.Errorf("could not find keys by range to be deleted: %w", err)
}
return nil
}
24 changes: 24 additions & 0 deletions storage/operation/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package operation

import "sync"

type Callbacks struct {
sync.Mutex // protect callbacks
callbacks []func(error)
}

func (b *Callbacks) AddCallback(callback func(error)) {
b.Lock()
defer b.Unlock()

b.callbacks = append(b.callbacks, callback)
}

func (b *Callbacks) NotifyCallbacks(err error) {
b.Lock()
defer b.Unlock()

for _, callback := range b.callbacks {
callback(err)
}
}
34 changes: 34 additions & 0 deletions storage/operation/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package operation

import (
"encoding/binary"
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// EncodeKeyPart encodes a value to be used as a part of a key to be stored in storage.
func EncodeKeyPart(v interface{}) []byte {
switch i := v.(type) {
case uint8:
return []byte{i}
case uint32:
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
case uint64:
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
return b
case string:
return []byte(i)
case flow.Role:
return []byte{byte(i)}
case flow.Identifier:
return i[:]
case flow.ChainID:
return []byte(i)
default:
panic(fmt.Sprintf("unsupported type to convert (%T)", v))
}
}
60 changes: 60 additions & 0 deletions storage/operation/dbtest/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dbtest

import (
"testing"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
"github.com/onflow/flow-go/utils/unittest"
)

// helper types and functions
type WithWriter func(func(storage.Writer) error) error

func RunWithStorages(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) {
t.Run("BadgerStorage", func(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := badgerimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := badgerimpl.ToReader(db)
fn(t, reader, withWriter)
})
})

t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
withWriter := func(writing func(storage.Writer) error) error {
writer := pebbleimpl.NewReaderBatchWriter(db)
err := writing(writer)
if err != nil {
return err
}

err = writer.Commit()
if err != nil {
return err
}
return nil
}

reader := pebbleimpl.ToReader(db)
fn(t, reader, withWriter)
})
})
}
Loading

0 comments on commit 1f6ad37

Please sign in to comment.