Skip to content

Commit

Permalink
node: don't store pythnet VAAs in the database (#1643)
Browse files Browse the repository at this point in the history
* Don't store pythnet VAAs in the database

Change-Id: Ief4357ab4c909d25dc9182490c322ef253ac23d3

* Clean up logging

Change-Id: I46efea96d6c2ba65459254ffeb21f5d65abebb01

* Rework this to require less custom code

Change-Id: Ib7f521ecff62b1bd13efcb627f88413f4141de59

* Fix copy paste error

Change-Id: I067f8364042f494ad56ed88919cd917f18423073

* Fix typo

Change-Id: I22ccb56ac330bd557b6e8438cfe9c02d7593361d
  • Loading branch information
bruce-riley authored Sep 27, 2022
1 parent e257555 commit ddd8b78
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 17 deletions.
11 changes: 10 additions & 1 deletion node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// This occurs when we observed a message after the cluster has already reached
// consensus on it, causing us to never achieve quorum.
if ourVaa, ok := s.ourObservation.(*VAA); ok {
if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil {
if _, err := p.getSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil {
// If we have a stored quorum VAA, we can safely expire the state.
//
// This is a rare case, and we can safely expire the state, since we
Expand Down Expand Up @@ -225,4 +225,13 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}
}
}

// Clean up old pythnet VAAs.
oldestTime := time.Now().Add(-time.Hour)
for key, pe := range p.pythnetVaas {
if pe.updateTime.Before(oldestTime) {
p.logger.Info("PYTHNET: dropping old pythnet vaa", zap.String("message_id", key), zap.Stringer("updateTime", pe.updateTime))
delete(p.pythnetVaas, key)
}
}
}
7 changes: 1 addition & 6 deletions node/pkg/processor/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,8 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
//
// Exception: if an observation is made within the settlement time (30s), we'll
// process it so other nodes won't consider it a miss.
if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&v.VAA)); err == nil {
// unmarshal vaa
var existing *vaa.VAA
if existing, err = vaa.Unmarshal(vb); err != nil {
panic("failed to unmarshal VAA from db")
}

if existing, err := p.getSignedVAA(*db.VaaIDFromVAA(&v.VAA)); err == nil {
if k.Timestamp.Sub(existing.Timestamp) > settlementTime {
p.logger.Info("ignoring observation since we already have a quorum VAA for it",
zap.Stringer("emitter_chain", k.EmitterChain),
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
// - enough signatures are present for the VAA to reach quorum

// Check if we already store this VAA
_, err = p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v))
_, err = p.getSignedVAA(*db.VaaIDFromVAA(v))
if err == nil {
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already store",
zap.String("digest", hash),
Expand All @@ -307,7 +307,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
zap.String("bytes", hex.EncodeToString(m.Vaa)),
zap.String("message_id", v.MessageID()))

if err := p.db.StoreSignedVAA(v); err != nil {
if err := p.storeSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
return
}
Expand Down
56 changes: 50 additions & 6 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processor
import (
"context"
"crypto/ecdsa"
"fmt"
"time"

"github.com/certusone/wormhole/node/pkg/notify/discord"
Expand Down Expand Up @@ -73,6 +74,11 @@ type (
}
)

type PythNetVaaEntry struct {
v *vaa.VAA
updateTime time.Time // Used for determining when to delete entries
}

type Processor struct {
// lockC is a channel of observed emitted messages
lockC chan *common.MessagePublication
Expand Down Expand Up @@ -122,8 +128,9 @@ type Processor struct {
// cleanup triggers periodic state cleanup
cleanup *time.Ticker

notifier *discord.DiscordNotifier
governor *governor.ChainGovernor
notifier *discord.DiscordNotifier
governor *governor.ChainGovernor
pythnetVaas map[string]PythNetVaaEntry
}

func NewProcessor(
Expand Down Expand Up @@ -165,10 +172,11 @@ func NewProcessor(

notifier: notifier,

logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
pythnetVaas: make(map[string]PythNetVaaEntry),
}
}

Expand Down Expand Up @@ -218,3 +226,39 @@ func (p *Processor) Run(ctx context.Context) error {
}
}
}

func (p *Processor) storeSignedVAA(v *vaa.VAA) error {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.logger.Info("PYTHNET: storing pythnet vaa", zap.String("message_id", key))
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
return nil
}
return p.db.StoreSignedVAA(v)
}

func (p *Processor) getSignedVAA(id db.VAAID) (*vaa.VAA, error) {
if id.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
ret, exists := p.pythnetVaas[key]
if exists {
p.logger.Info("PYTHNET: found pythnet vaa", zap.String("message_id", key))
return ret.v, nil
}

p.logger.Info("PYTHNET: did not find pythnet vaa", zap.String("message_id", key))
return nil, db.ErrVAANotFound
}

vb, err := p.db.GetSignedVAABytes(id)
if err != nil {
return nil, err
}

vaa, err := vaa.Unmarshal(vb)
if err != nil {
panic("failed to unmarshal VAA from db")
}

return vaa, err
}
2 changes: 1 addition & 1 deletion node/pkg/processor/vaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
zap.String("bytes", hex.EncodeToString(vaaBytes)),
zap.String("message_id", signed.MessageID()))

if err := p.db.StoreSignedVAA(signed); err != nil {
if err := p.storeSignedVAA(signed); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
}

Expand Down
9 changes: 8 additions & 1 deletion node/pkg/publicrpc/publicrpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.Get
return nil, status.Error(codes.InvalidArgument, "no message ID specified")
}

chainID := vaa.ChainID(req.MessageId.EmitterChain.Number())

// This interface is not supported for PythNet messages because those VAAs are not stored in the database.
if chainID == vaa.ChainIDPythNet {
return nil, status.Error(codes.InvalidArgument, "not supported for PythNet")
}

address, err := hex.DecodeString(req.MessageId.EmitterAddress)
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err))
Expand All @@ -80,7 +87,7 @@ func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.Get
copy(addr[:], address)

b, err := s.db.GetSignedVAABytes(db.VAAID{
EmitterChain: vaa.ChainID(req.MessageId.EmitterChain.Number()),
EmitterChain: chainID,
EmitterAddress: addr,
Sequence: req.MessageId.Sequence,
})
Expand Down

0 comments on commit ddd8b78

Please sign in to comment.