diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index e406f6969d..843ca0e61f 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -73,7 +73,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { // This occurs when we observed a message after the cluster has already reached // consensus on it, causing us to never achieve quorum. if ourVaa, ok := s.ourObservation.(*VAA); ok { - if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil { + if _, err := p.getSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)); err == nil { // If we have a stored quorum VAA, we can safely expire the state. // // This is a rare case, and we can safely expire the state, since we @@ -225,4 +225,13 @@ func (p *Processor) handleCleanup(ctx context.Context) { } } } + + // Clean up old pythnet VAAs. + oldestTime := time.Now().Add(-time.Hour) + for key, pe := range p.pythnetVaas { + if pe.updateTime.Before(oldestTime) { + p.logger.Info("PYTHNET: dropping old pythnet vaa", zap.String("message_id", key), zap.Stringer("updateTime", pe.updateTime)) + delete(p.pythnetVaas, key) + } + } } diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index 681d193472..8b7ba0c162 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -101,13 +101,8 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat // // Exception: if an observation is made within the settlement time (30s), we'll // process it so other nodes won't consider it a miss. - if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(&v.VAA)); err == nil { - // unmarshal vaa - var existing *vaa.VAA - if existing, err = vaa.Unmarshal(vb); err != nil { - panic("failed to unmarshal VAA from db") - } + if existing, err := p.getSignedVAA(*db.VaaIDFromVAA(&v.VAA)); err == nil { if k.Timestamp.Sub(existing.Timestamp) > settlementTime { p.logger.Info("ignoring observation since we already have a quorum VAA for it", zap.Stringer("emitter_chain", k.EmitterChain), diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 05761dc8ab..f2b26b83ed 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -286,7 +286,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos // - enough signatures are present for the VAA to reach quorum // Check if we already store this VAA - _, err = p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)) + _, err = p.getSignedVAA(*db.VaaIDFromVAA(v)) if err == nil { p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already store", zap.String("digest", hash), @@ -307,7 +307,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos zap.String("bytes", hex.EncodeToString(m.Vaa)), zap.String("message_id", v.MessageID())) - if err := p.db.StoreSignedVAA(v); err != nil { + if err := p.storeSignedVAA(v); err != nil { p.logger.Error("failed to store signed VAA", zap.Error(err)) return } diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index a558405e3e..04282cf56b 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -3,6 +3,7 @@ package processor import ( "context" "crypto/ecdsa" + "fmt" "time" "github.com/certusone/wormhole/node/pkg/notify/discord" @@ -73,6 +74,11 @@ type ( } ) +type PythNetVaaEntry struct { + v *vaa.VAA + updateTime time.Time // Used for determining when to delete entries +} + type Processor struct { // lockC is a channel of observed emitted messages lockC chan *common.MessagePublication @@ -122,8 +128,9 @@ type Processor struct { // cleanup triggers periodic state cleanup cleanup *time.Ticker - notifier *discord.DiscordNotifier - governor *governor.ChainGovernor + notifier *discord.DiscordNotifier + governor *governor.ChainGovernor + pythnetVaas map[string]PythNetVaaEntry } func NewProcessor( @@ -165,10 +172,11 @@ func NewProcessor( notifier: notifier, - logger: supervisor.Logger(ctx), - state: &aggregationState{observationMap{}}, - ourAddr: crypto.PubkeyToAddress(gk.PublicKey), - governor: g, + logger: supervisor.Logger(ctx), + state: &aggregationState{observationMap{}}, + ourAddr: crypto.PubkeyToAddress(gk.PublicKey), + governor: g, + pythnetVaas: make(map[string]PythNetVaaEntry), } } @@ -218,3 +226,39 @@ func (p *Processor) Run(ctx context.Context) error { } } } + +func (p *Processor) storeSignedVAA(v *vaa.VAA) error { + if v.EmitterChain == vaa.ChainIDPythNet { + key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence) + p.logger.Info("PYTHNET: storing pythnet vaa", zap.String("message_id", key)) + p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()} + return nil + } + return p.db.StoreSignedVAA(v) +} + +func (p *Processor) getSignedVAA(id db.VAAID) (*vaa.VAA, error) { + if id.EmitterChain == vaa.ChainIDPythNet { + key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence) + ret, exists := p.pythnetVaas[key] + if exists { + p.logger.Info("PYTHNET: found pythnet vaa", zap.String("message_id", key)) + return ret.v, nil + } + + p.logger.Info("PYTHNET: did not find pythnet vaa", zap.String("message_id", key)) + return nil, db.ErrVAANotFound + } + + vb, err := p.db.GetSignedVAABytes(id) + if err != nil { + return nil, err + } + + vaa, err := vaa.Unmarshal(vb) + if err != nil { + panic("failed to unmarshal VAA from db") + } + + return vaa, err +} diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index a378a576b9..771ab6fd70 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -38,7 +38,7 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { zap.String("bytes", hex.EncodeToString(vaaBytes)), zap.String("message_id", signed.MessageID())) - if err := p.db.StoreSignedVAA(signed); err != nil { + if err := p.storeSignedVAA(signed); err != nil { p.logger.Error("failed to store signed VAA", zap.Error(err)) } diff --git a/node/pkg/publicrpc/publicrpcserver.go b/node/pkg/publicrpc/publicrpcserver.go index 18738649c4..d835196d97 100644 --- a/node/pkg/publicrpc/publicrpcserver.go +++ b/node/pkg/publicrpc/publicrpcserver.go @@ -68,6 +68,13 @@ func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.Get return nil, status.Error(codes.InvalidArgument, "no message ID specified") } + chainID := vaa.ChainID(req.MessageId.EmitterChain.Number()) + + // This interface is not supported for PythNet messages because those VAAs are not stored in the database. + if chainID == vaa.ChainIDPythNet { + return nil, status.Error(codes.InvalidArgument, "not supported for PythNet") + } + address, err := hex.DecodeString(req.MessageId.EmitterAddress) if err != nil { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err)) @@ -80,7 +87,7 @@ func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.Get copy(addr[:], address) b, err := s.db.GetSignedVAABytes(db.VAAID{ - EmitterChain: vaa.ChainID(req.MessageId.EmitterChain.Number()), + EmitterChain: chainID, EmitterAddress: addr, Sequence: req.MessageId.Sequence, })