Skip to content

Commit

Permalink
Fix incorrectly merged db_test.go
Browse files Browse the repository at this point in the history
  • Loading branch information
joelsmith-2019 committed Sep 16, 2024
1 parent d5d6d96 commit 3652be8
Showing 1 changed file with 264 additions and 104 deletions.
368 changes: 264 additions & 104 deletions node/pkg/db/db_test.go
Original file line number Diff line number Diff line change
@@ -1,130 +1,290 @@
package processor
package db

import (
"encoding/hex"
"time"

"github.com/mr-tron/base58"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"bytes"
"crypto/ecdsa"
"crypto/rand"
"fmt"
math_rand "math/rand"
"os"
"runtime"
"sync"
"sync/atomic"

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/dgraph-io/badger/v3"
"github.com/ethereum/go-ethereum/crypto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
// SECURITY: source_chain/target_chain are untrusted uint8 values. An attacker could cause a maximum of 255**2 label
// pairs to be created, which is acceptable.
func getVAA() vaa.VAA {
return getVAAWithSeqNum(1)
}

messagesObservedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_message_observations_total",
Help: "Total number of messages observed",
},
[]string{"emitter_chain"})
)
func getVAAWithSeqNum(seqNum uint64) vaa.VAA {
var payload = []byte{97, 97, 97, 97, 97, 97}
var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}

return vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: time.Unix(0, 0),
Nonce: uint32(1),
Sequence: seqNum,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: governanceEmitter,
Payload: payload,
}
}

// Testing the expected default behavior of a CreateGovernanceVAA
func TestVaaIDFromString(t *testing.T) {
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1"
vaaID, _ := VaaIDFromString(vaaIdString)
expectAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}

assert.Equal(t, vaa.ChainIDSolana, vaaID.EmitterChain)
assert.Equal(t, expectAddr, vaaID.EmitterAddress)
assert.Equal(t, uint64(1), vaaID.Sequence)
}

func TestVaaIDFromVAA(t *testing.T) {
testVaa := getVAA()
vaaID := VaaIDFromVAA(&testVaa)
expectAddr := vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}

assert.Equal(t, vaa.ChainIDSolana, vaaID.EmitterChain)
assert.Equal(t, expectAddr, vaaID.EmitterAddress)
assert.Equal(t, uint64(1), vaaID.Sequence)
}

func TestBytes(t *testing.T) {
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1"
vaaID, _ := VaaIDFromString(vaaIdString)
expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34, 0x2f, 0x31}

assert.Equal(t, expected, vaaID.Bytes())
}

func TestEmitterPrefixBytesWithChainIDAndAddress(t *testing.T) {
vaaIdString := "1/0000000000000000000000000000000000000000000000000000000000000004/1"
vaaID, _ := VaaIDFromString(vaaIdString)
expected := []byte{0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x2f, 0x31, 0x2f, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x34}

assert.Equal(t, expected, vaaID.EmitterPrefixBytes())
}

func TestEmitterPrefixBytesWithOnlyChainID(t *testing.T) {
vaaID := VAAID{EmitterChain: vaa.ChainID(26)}
assert.Equal(t, []byte("signed/26"), vaaID.EmitterPrefixBytes())
}

func TestStoreSignedVAAUnsigned(t *testing.T) {
dbPath := t.TempDir()
db := OpenDb(zap.NewNop(), &dbPath)
defer db.Close()
defer os.Remove(dbPath)

testVaa := getVAA()

// Should panic because the VAA is not signed
assert.Panics(t, func() { db.StoreSignedVAA(&testVaa) }, "The code did not panic") //nolint:errcheck
}

func TestStoreSignedVAASigned(t *testing.T) {
dbPath := t.TempDir()
db := OpenDb(zap.NewNop(), &dbPath)
defer db.Close()
defer os.Remove(dbPath)

testVaa := getVAA()

privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
testVaa.AddSignature(privKey, 0)

err2 := db.StoreSignedVAA(&testVaa)
assert.NoError(t, err2)
}

func TestStoreSignedVAABatch(t *testing.T) {
dbPath := t.TempDir()
db := OpenDb(zap.NewNop(), &dbPath)
defer db.Close()
defer os.Remove(dbPath)

// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
// event may be received multiple times and must be handled in an idempotent fashion.
func (p *Processor) handleMessage(k *common.MessagePublication) {
if p.gs == nil {
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
zap.String("message_id", k.MessageIDString()),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
)
return
privKey, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
require.NoError(t, err)

require.Less(t, int64(0), db.db.MaxBatchCount()) // In testing this was 104857.
require.Less(t, int64(0), db.db.MaxBatchSize()) // In testing this was 10066329.

// Make sure we exceed the max batch size.
numVAAs := uint64(db.db.MaxBatchCount() + 1)

// Build the VAA batch.
vaaBatch := make([]*vaa.VAA, 0, numVAAs)
for seqNum := uint64(0); seqNum < numVAAs; seqNum++ {
v := getVAAWithSeqNum(seqNum)
v.AddSignature(privKey, 0)
vaaBatch = append(vaaBatch, &v)
}

messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc()

// All nodes will create the exact same VAA and sign its digest.
// Consensus is established on this digest.

v := &VAA{
VAA: vaa.VAA{
Version: vaa.SupportedVAAVersion,
GuardianSetIndex: p.gs.Index,
Signatures: nil,
Timestamp: k.Timestamp,
Nonce: k.Nonce,
EmitterChain: k.EmitterChain,
EmitterAddress: k.EmitterAddress,
Payload: k.Payload,
Sequence: k.Sequence,
ConsistencyLevel: k.ConsistencyLevel,
},
Unreliable: k.Unreliable,
Reobservation: k.IsReobservation,
// Store the batch in the database.
err = db.StoreSignedVAABatch(vaaBatch)
require.NoError(t, err)

// Verify all the VAAs are in the database.
for _, v := range vaaBatch {
storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v))
require.NoError(t, err)

origBytes, err := v.Marshal()
require.NoError(t, err)

assert.True(t, bytes.Equal(origBytes, storedBytes))
}

// Verify that updates work as well by tweaking the VAAs and rewriting them.
for _, v := range vaaBatch {
v.Nonce += 1
}

// Generate digest of the unsigned VAA.
digest := v.SigningDigest()
hash := hex.EncodeToString(digest.Bytes())
// Store the updated batch in the database.
err = db.StoreSignedVAABatch(vaaBatch)
require.NoError(t, err)

// Sign the digest using our node's guardian key.
signature, err := crypto.Sign(digest.Bytes(), p.gk)
if err != nil {
panic(err)
// Verify all the updated VAAs are in the database.
for _, v := range vaaBatch {
storedBytes, err := db.GetSignedVAABytes(*VaaIDFromVAA(v))
require.NoError(t, err)

origBytes, err := v.Marshal()
require.NoError(t, err)

assert.True(t, bytes.Equal(origBytes, storedBytes))
}
}

func TestGetSignedVAABytes(t *testing.T) {
dbPath := t.TempDir()
db := OpenDb(zap.NewNop(), &dbPath)
defer db.Close()
defer os.Remove(dbPath)

testVaa := getVAA()

vaaID := VaaIDFromVAA(&testVaa)

privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
testVaa.AddSignature(privKey, 0)

// Store full VAA
err2 := db.StoreSignedVAA(&testVaa)
assert.NoError(t, err2)

// Retrieve it using vaaID
vaaBytes, err2 := db.GetSignedVAABytes(*vaaID)
assert.NoError(t, err2)

testVaaBytes, err3 := testVaa.Marshal()
assert.NoError(t, err3)

assert.Equal(t, testVaaBytes, vaaBytes)
}

func TestFindEmitterSequenceGap(t *testing.T) {
dbPath := t.TempDir()
db := OpenDb(zap.NewNop(), &dbPath)
defer db.Close()
defer os.Remove(dbPath)

testVaa := getVAA()

vaaID := VaaIDFromVAA(&testVaa)

privKey, _ := ecdsa.GenerateKey(crypto.S256(), rand.Reader)
testVaa.AddSignature(privKey, 0)

// Store full VAA
err2 := db.StoreSignedVAA(&testVaa)
assert.NoError(t, err2)

resp, firstSeq, lastSeq, err := db.FindEmitterSequenceGap(*vaaID)

assert.Equal(t, []uint64{0x0}, resp)
assert.Equal(t, uint64(0x0), firstSeq)
assert.Equal(t, uint64(0x1), lastSeq)
assert.NoError(t, err)
}

// BenchmarkVaaLookup benchmarks db.GetSignedVAABytes
// You need to set the environment variable WH_DBPATH to a path with a populated BadgerDB.
// You may want to play with the CONCURRENCY parameter.
func BenchmarkVaaLookup(b *testing.B) {
CONCURRENCY := runtime.NumCPU()
dbPath := os.Getenv("WH_DBPATH")
require.NotEqual(b, dbPath, "")

// open DB
optionsDB := badger.DefaultOptions(dbPath)
optionsDB.Logger = nil
badgerDb, err := badger.Open(optionsDB)
require.NoError(b, err)
db := &Database{
db: badgerDb,
}

shouldPublishImmediately := p.shouldPublishImmediately(&v.VAA)

if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("observed and signed confirmed message publication",
zap.String("message_id", k.MessageIDString()),
zap.Stringer("txhash", k.TxHash),
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
zap.String("hash", hash),
zap.Uint32("nonce", k.Nonce),
zap.Time("timestamp", k.Timestamp),
zap.Uint8("consistency_level", k.ConsistencyLevel),
zap.String("signature", hex.EncodeToString(signature)),
zap.Bool("shouldPublishImmediately", shouldPublishImmediately),
zap.Bool("isReobservation", k.IsReobservation),
)
if err != nil {
b.Error("failed to open database")
}
defer db.Close()

vaaIds := make(chan *VAAID, b.N)

// Broadcast the signature.
ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature, shouldPublishImmediately)
for i := 0; i < b.N; i++ {
randId := math_rand.Intn(250000) //nolint
randId = 250000 - (i / 18)
vaaId, err := VaaIDFromString(fmt.Sprintf("4/000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7/%d", randId))
assert.NoError(b, err)
vaaIds <- vaaId
}

// Indicate that we observed this one.
observationsReceivedTotal.Inc()
observationsReceivedByGuardianAddressTotal.WithLabelValues(p.ourAddr.Hex()).Inc()
b.ResetTimer()

// Get / create our state entry.
s := p.state.signatures[hash]
if s == nil {
s = &state{
firstObserved: time.Now(),
nextRetry: time.Now().Add(nextRetryDuration(0)),
signatures: map[ethCommon.Address][]byte{},
source: "loopback",
}
// actual timed code
var errCtr atomic.Int32
var wg sync.WaitGroup

p.state.signatures[hash] = s
for i := 0; i < CONCURRENCY; i++ {
wg.Add(1)
go func() {
for {
select {
case vaaId := <-vaaIds:
_, err = db.GetSignedVAABytes(*vaaId)
if err != nil {
fmt.Printf("error retrieving %s/%s/%d: %s\n", vaaId.EmitterChain, vaaId.EmitterAddress, vaaId.Sequence, err)
errCtr.Add(1)
}
default:
wg.Done()
return
}
}
}()
}

// Update our state.
s.ourObservation = v
s.txHash = k.TxHash.Bytes()
s.source = v.GetEmitterChain().String()
s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
s.signatures[p.ourAddr] = signature
s.ourObs = ourObs
s.ourMsg = msg

// Fast path for our own signature.
if !s.submitted {
start := time.Now()
p.checkForQuorum(ourObs, s, s.gs, hash)
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
wg.Wait()

if int(errCtr.Load()) > b.N/3 {
b.Error("More than 1/3 of GetSignedVAABytes failed.")
}
}

0 comments on commit 3652be8

Please sign in to comment.