Skip to content

Commit

Permalink
add collections
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jun 21, 2024
1 parent 05dc8d2 commit 5b4c787
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 0 deletions.
130 changes: 130 additions & 0 deletions storage/pebble/collections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package pebble

import (
"fmt"

"github.com/cockroachdb/pebble"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage/pebble/operation"
)

type Collections struct {
db *pebble.DB
transactions *Transactions
}

func NewCollections(db *pebble.DB, transactions *Transactions) *Collections {
c := &Collections{
db: db,
transactions: transactions,
}
return c
}

func (c *Collections) StoreLight(collection *flow.LightCollection) error {
err := operation.InsertCollection(collection)(c.db)
if err != nil {
return fmt.Errorf("could not insert collection: %w", err)
}

return nil
}

func (c *Collections) Store(collection *flow.Collection) error {
light := collection.Light()
ttx := c.db.NewBatch()
err := operation.InsertCollection(&light)(ttx)
if err != nil {
return fmt.Errorf("could not insert collection: %w", err)
}

for _, tx := range collection.Transactions {
err = c.transactions.storeTx(tx)(ttx)
if err != nil {
return fmt.Errorf("could not insert transaction: %w", err)
}
}

return ttx.Commit(nil)
}

func (c *Collections) ByID(colID flow.Identifier) (*flow.Collection, error) {
var (
light flow.LightCollection
collection flow.Collection
)

err := operation.RetrieveCollection(colID, &light)(c.db)
if err != nil {
return nil, fmt.Errorf("could not retrieve collection: %w", err)
}

for _, txID := range light.Transactions {
tx, err := c.transactions.ByID(txID)
if err != nil {
return nil, fmt.Errorf("could not retrieve transaction: %w", err)
}

collection.Transactions = append(collection.Transactions, tx)
}

return &collection, nil
}

func (c *Collections) LightByID(colID flow.Identifier) (*flow.LightCollection, error) {
var collection flow.LightCollection

err := operation.RetrieveCollection(colID, &collection)(c.db)
if err != nil {
return nil, fmt.Errorf("could not retrieve collection: %w", err)
}

if err != nil {
return nil, err
}

return &collection, nil
}

func (c *Collections) Remove(colID flow.Identifier) error {
err := operation.RemoveCollection(colID)(c.db)
if err != nil {
return fmt.Errorf("could not remove collection: %w", err)
}
return nil
}

func (c *Collections) StoreLightAndIndexByTransaction(collection *flow.LightCollection) error {
tx := c.db.NewBatch()

err := operation.InsertCollection(collection)(tx)
if err != nil {
return fmt.Errorf("could not insert collection: %w", err)
}

for _, txID := range collection.Transactions {
err = operation.IndexCollectionByTransaction(txID, collection.ID())(tx)
if err != nil {
return fmt.Errorf("could not insert transaction ID: %w", err)
}
}

return nil
}

func (c *Collections) LightByTransactionID(txID flow.Identifier) (*flow.LightCollection, error) {
var collection flow.LightCollection
collID := &flow.Identifier{}
err := operation.RetrieveCollectionID(txID, collID)(c.db)
if err != nil {
return nil, fmt.Errorf("could not retrieve collection id: %w", err)
}

err = operation.RetrieveCollection(*collID, &collection)(c.db)
if err != nil {
return nil, fmt.Errorf("could not retrieve collection: %w", err)
}

return &collection, nil
}
87 changes: 87 additions & 0 deletions storage/pebble/collections_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package pebble_test

import (
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/module/metrics"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/unittest"
)

func TestCollections(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {

metrics := metrics.NewNoopCollector()
transactions := bstorage.NewTransactions(metrics, db)
collections := bstorage.NewCollections(db, transactions)

// create a light collection with three transactions
expected := unittest.CollectionFixture(3).Light()

// store the light collection and the transaction index
err := collections.StoreLightAndIndexByTransaction(&expected)
require.Nil(t, err)

// retrieve the light collection by collection id
actual, err := collections.LightByID(expected.ID())
require.Nil(t, err)

// check if the light collection was indeed persisted
assert.Equal(t, &expected, actual)

expectedID := expected.ID()

// retrieve the collection light id by each of its transaction id
for _, txID := range expected.Transactions {
collLight, err := collections.LightByTransactionID(txID)
actualID := collLight.ID()
// check that the collection id can indeed be retrieved by transaction id
require.Nil(t, err)
assert.Equal(t, expectedID, actualID)
}

})
}

func TestCollections_IndexDuplicateTx(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
metrics := metrics.NewNoopCollector()
transactions := bstorage.NewTransactions(metrics, db)
collections := bstorage.NewCollections(db, transactions)

// create two collections which share 1 transaction
col1 := unittest.CollectionFixture(2)
col2 := unittest.CollectionFixture(1)
dupTx := col1.Transactions[0] // the duplicated transaction
col2Tx := col2.Transactions[0] // transaction that's only in col2
col2.Transactions = append(col2.Transactions, dupTx)

// insert col1
col1Light := col1.Light()
err := collections.StoreLightAndIndexByTransaction(&col1Light)
require.NoError(t, err)

// insert col2
col2Light := col2.Light()
err = collections.StoreLightAndIndexByTransaction(&col2Light)
require.NoError(t, err)

// should be able to retrieve col2 by ID
gotLightByCol2ID, err := collections.LightByID(col2.ID())
require.NoError(t, err)
assert.Equal(t, &col2Light, gotLightByCol2ID)

// should be able to retrieve col2 by the transaction which only appears in col2
_, err = collections.LightByTransactionID(col2Tx.ID())
require.NoError(t, err)

// col1 (not col2) should be indexed by the shared transaction (since col1 was inserted first)
gotLightByDupTxID, err := collections.LightByTransactionID(dupTx.ID())
require.NoError(t, err)
assert.Equal(t, &col1Light, gotLightByDupTxID)
})
}

0 comments on commit 5b4c787

Please sign in to comment.