From f308956244b02bdf247c35542fcd399e82b648b6 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Thu, 2 Nov 2023 12:39:06 -0300 Subject: [PATCH] multi: Respond to getcfsv2 message. This adds the appropriate processing to the peer and server structs to respond to the recently introduced getcfsv2 message. It also bumps the peer and server max supported protocol versions to version 10 (BatchedCFiltersV2Version). This message queries the chain for a batch of committed filters spanning a set of sequential blocks and will be used by SPV clients to fetch committed filters during their initial sync process. --- peer/go.mod | 2 ++ peer/peer.go | 19 ++++++++++++++++++- peer/peer_test.go | 14 ++++++++++++++ server.go | 13 ++++++++++++- 4 files changed, 46 insertions(+), 2 deletions(-) diff --git a/peer/go.mod b/peer/go.mod index cb4b8ff8c8..614778a12c 100644 --- a/peer/go.mod +++ b/peer/go.mod @@ -12,6 +12,8 @@ require ( github.com/decred/slog v1.2.0 ) +replace github.com/decred/dcrd/wire => ../wire + require ( github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect github.com/dchest/siphash v1.2.3 // indirect diff --git a/peer/peer.go b/peer/peer.go index 438ac9de7b..8070f442d8 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -28,7 +28,7 @@ import ( const ( // MaxProtocolVersion is the max protocol version the peer supports. - MaxProtocolVersion = wire.RemoveRejectVersion + MaxProtocolVersion = wire.BatchedCFiltersV2Version // outputBufferSize is the number of elements the output channels use. outputBufferSize = 5000 @@ -129,6 +129,9 @@ type MessageListeners struct { // OnCFilterV2 is invoked when a peer receives a cfilterv2 wire message. OnCFilterV2 func(p *Peer, msg *wire.MsgCFilterV2) + // OnCFiltersV2 is invoked when a peer receives a cfiltersv2 wire message. + OnCFiltersV2 func(p *Peer, msg *wire.MsgCFiltersV2) + // OnCFHeaders is invoked when a peer receives a cfheaders wire // message. OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders) @@ -163,6 +166,10 @@ type MessageListeners struct { // message. OnGetCFilterV2 func(p *Peer, msg *wire.MsgGetCFilterV2) + // OnGetCFiltersV2 is invoked when a peer receives a getcfsv2 wire + // message. + OnGetCFiltersV2 func(p *Peer, msg *wire.MsgGetCFsV2) + // OnGetCFHeaders is invoked when a peer receives a getcfheaders // wire message. OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders) @@ -1423,6 +1430,16 @@ out: p.cfg.Listeners.OnCFilterV2(p, msg) } + case *wire.MsgGetCFsV2: + if p.cfg.Listeners.OnGetCFiltersV2 != nil { + p.cfg.Listeners.OnGetCFiltersV2(p, msg) + } + + case *wire.MsgCFiltersV2: + if p.cfg.Listeners.OnCFiltersV2 != nil { + p.cfg.Listeners.OnCFiltersV2(p, msg) + } + case *wire.MsgGetInitState: if p.cfg.Listeners.OnGetInitState != nil { p.cfg.Listeners.OnGetInitState(p, msg) diff --git a/peer/peer_test.go b/peer/peer_test.go index 1a2befb310..aa74fe6b1c 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -411,6 +411,12 @@ func TestPeerListeners(t *testing.T) { OnInitState: func(p *Peer, msg *wire.MsgInitState) { ok <- msg }, + OnGetCFiltersV2: func(p *Peer, msg *wire.MsgGetCFsV2) { + ok <- msg + }, + OnCFiltersV2: func(p *Peer, msg *wire.MsgCFiltersV2) { + ok <- msg + }, }, UserAgentName: "peer", UserAgentVersion: "1.0", @@ -565,6 +571,14 @@ func TestPeerListeners(t *testing.T) { "OnInitState", wire.NewMsgInitState(), }, + { + "OnGetCFiltersV2", + wire.NewMsgGetCFsV2(&chainhash.Hash{}, &chainhash.Hash{}), + }, + { + "OnCFiltersV2", + wire.NewMsgCFiltersV2([]wire.MsgCFilterV2{}), + }, } t.Logf("Running %d tests", len(tests)) for _, test := range tests { diff --git a/server.go b/server.go index ecfbd918d3..3a536419c9 100644 --- a/server.go +++ b/server.go @@ -74,7 +74,7 @@ const ( connectionRetryInterval = time.Second * 5 // maxProtocolVersion is the max protocol version the server supports. - maxProtocolVersion = wire.RemoveRejectVersion + maxProtocolVersion = wire.BatchedCFiltersV2Version // These fields are used to track known addresses on a per-peer basis. // @@ -1522,6 +1522,16 @@ func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) { sp.QueueMessage(filterMsg, nil) } +// OnGetCFiltersV2 is invoked when a peer receives a getcfsv2 wire message. +func (sp *serverPeer) OnGetCFiltersV2(_ *peer.Peer, msg *wire.MsgGetCFsV2) { + filtersMsg, err := sp.server.chain.LocateCFiltersV2(&msg.StartHash, &msg.EndHash) + if err != nil { + return + } + + sp.QueueMessage(filtersMsg, nil) +} + // OnGetCFHeaders is invoked when a peer receives a getcfheader wire message. func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { // Disconnect and/or ban depending on the node cf services flag and @@ -2308,6 +2318,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnGetHeaders: sp.OnGetHeaders, OnGetCFilter: sp.OnGetCFilter, OnGetCFilterV2: sp.OnGetCFilterV2, + OnGetCFiltersV2: sp.OnGetCFiltersV2, OnGetCFHeaders: sp.OnGetCFHeaders, OnGetCFTypes: sp.OnGetCFTypes, OnGetAddr: sp.OnGetAddr,