Skip to content

Commit

Permalink
multi: Respond to getcfsv2 message.
Browse files Browse the repository at this point in the history
This adds the appropriate processing to the peer and server structs to
respond to the recently introduced getcfsv2 message.  It also bumps the
peer and server max supported protocol versions to version 10
(BatchedCFiltersV2Version).

This message queries the chain for a batch of committed filters spanning
a set of sequential blocks and will be used by SPV clients to fetch
committed filters during their initial sync process.
  • Loading branch information
matheusd committed Nov 8, 2023
1 parent 6b01883 commit ebac834
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 2 deletions.
2 changes: 2 additions & 0 deletions peer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (
github.com/decred/slog v1.2.0
)

replace github.com/decred/dcrd/wire => ../wire

require (
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect
github.com/dchest/siphash v1.2.3 // indirect
Expand Down
19 changes: 18 additions & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

const (
// MaxProtocolVersion is the max protocol version the peer supports.
MaxProtocolVersion = wire.RemoveRejectVersion
MaxProtocolVersion = wire.BatchedCFiltersV2Version

// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 5000
Expand Down Expand Up @@ -128,6 +128,9 @@ type MessageListeners struct {
// OnCFilterV2 is invoked when a peer receives a cfilterv2 wire message.
OnCFilterV2 func(p *Peer, msg *wire.MsgCFilterV2)

// OnCFiltersV2 is invoked when a peer receives a cfiltersv2 wire message.
OnCFiltersV2 func(p *Peer, msg *wire.MsgCFiltersV2)

// OnCFHeaders is invoked when a peer receives a cfheaders wire
// message.
OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders)
Expand Down Expand Up @@ -162,6 +165,10 @@ type MessageListeners struct {
// message.
OnGetCFilterV2 func(p *Peer, msg *wire.MsgGetCFilterV2)

// OnGetCFiltersV2 is invoked when a peer receives a getcfsv2 wire
// message.
OnGetCFiltersV2 func(p *Peer, msg *wire.MsgGetCFiltersV2)

// OnGetCFHeaders is invoked when a peer receives a getcfheaders
// wire message.
OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders)
Expand Down Expand Up @@ -1426,6 +1433,16 @@ out:
p.cfg.Listeners.OnCFilterV2(p, msg)
}

case *wire.MsgGetCFiltersV2:
if p.cfg.Listeners.OnGetCFiltersV2 != nil {
p.cfg.Listeners.OnGetCFiltersV2(p, msg)
}

case *wire.MsgCFiltersV2:
if p.cfg.Listeners.OnCFiltersV2 != nil {
p.cfg.Listeners.OnCFiltersV2(p, msg)
}

case *wire.MsgGetInitState:
if p.cfg.Listeners.OnGetInitState != nil {
p.cfg.Listeners.OnGetInitState(p, msg)
Expand Down
14 changes: 14 additions & 0 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ func TestPeerListeners(t *testing.T) {
OnInitState: func(p *Peer, msg *wire.MsgInitState) {
ok <- msg
},
OnGetCFiltersV2: func(p *Peer, msg *wire.MsgGetCFiltersV2) {
ok <- msg
},
OnCFiltersV2: func(p *Peer, msg *wire.MsgCFiltersV2) {
ok <- msg
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
Expand Down Expand Up @@ -564,6 +570,14 @@ func TestPeerListeners(t *testing.T) {
"OnInitState",
wire.NewMsgInitState(),
},
{
"OnGetCFiltersV2",
wire.NewMsgGetCFiltersV2(&chainhash.Hash{}, &chainhash.Hash{}),
},
{
"OnCFiltersV2",
wire.NewMsgCFiltersV2([]*wire.MsgCFilterV2{}),
},
}
t.Logf("Running %d tests", len(tests))
for _, test := range tests {
Expand Down
13 changes: 12 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (
connectionRetryInterval = time.Second * 5

// maxProtocolVersion is the max protocol version the server supports.
maxProtocolVersion = wire.RemoveRejectVersion
maxProtocolVersion = wire.BatchedCFiltersV2Version

// These fields are used to track known addresses on a per-peer basis.
//
Expand Down Expand Up @@ -1535,6 +1535,16 @@ func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) {
sp.QueueMessage(filterMsg, nil)
}

// OnGetCFiltersV2 is invoked when a peer receives a getcfsv2 wire message.
func (sp *serverPeer) OnGetCFiltersV2(_ *peer.Peer, msg *wire.MsgGetCFiltersV2) {
filtersMsg, err := sp.server.chain.LocateCFiltersV2(&msg.StartHash, &msg.EndHash)
if err != nil {
return
}

sp.QueueMessage(filtersMsg, nil)
}

// OnGetCFHeaders is invoked when a peer receives a getcfheader wire message.
func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
// Disconnect and/or ban depending on the node cf services flag and
Expand Down Expand Up @@ -2318,6 +2328,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnGetHeaders: sp.OnGetHeaders,
OnGetCFilter: sp.OnGetCFilter,
OnGetCFilterV2: sp.OnGetCFilterV2,
OnGetCFiltersV2: sp.OnGetCFiltersV2,
OnGetCFHeaders: sp.OnGetCFHeaders,
OnGetCFTypes: sp.OnGetCFTypes,
OnGetAddr: sp.OnGetAddr,
Expand Down

0 comments on commit ebac834

Please sign in to comment.