Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spv: Add batched cfilter fetching #2314

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
101 changes: 99 additions & 2 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var uaVersion = version.String()
const minPver = wire.RemoveRejectVersion

// Pver is the maximum protocol version implemented by the LocalPeer.
const Pver = wire.MixVersion
const Pver = wire.BatchedCFiltersV2Version

// stallTimeout is the amount of time allowed before a request to receive data
// that is known to exist at the RemotePeer times out with no matching reply.
Expand Down Expand Up @@ -94,7 +94,8 @@ type RemotePeer struct {
requestedBlocksMu sync.Mutex
requestedBlocks map[chainhash.Hash]*blockRequest

requestedCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedManyCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFiltersV2

requestedTxs map[chainhash.Hash]chan<- *wire.MsgTx
requestedTxsMu sync.Mutex
Expand Down Expand Up @@ -747,6 +748,8 @@ func (rp *RemotePeer) readMessages(ctx context.Context) error {
rp.receivedBlock(ctx, m)
case *wire.MsgCFilterV2:
rp.receivedCFilterV2(ctx, m)
case *wire.MsgCFiltersV2:
rp.receivedManyCFilterV2(ctx, m)
case *wire.MsgNotFound:
rp.receivedNotFound(ctx, m)
case *wire.MsgTx:
Expand Down Expand Up @@ -962,6 +965,42 @@ func (rp *RemotePeer) receivedBlock(ctx context.Context, msg *wire.MsgBlock) {
rp.requestedBlocksMu.Unlock()
}

func (rp *RemotePeer) addRequestedManyCFilterV2(hash *chainhash.Hash, c chan<- *wire.MsgCFiltersV2) (newRequest bool) {
_, loaded := rp.requestedManyCFiltersV2.LoadOrStore(*hash, c)
return !loaded
}

func (rp *RemotePeer) deleteRequestedManyCFilterV2(hash *chainhash.Hash) {
rp.requestedManyCFiltersV2.Delete(*hash)
}

func (rp *RemotePeer) receivedManyCFilterV2(ctx context.Context, msg *wire.MsgCFiltersV2) {
const opf = "remotepeer(%v).receivedCFilterV2(%v)"
if len(msg.CFilters) == 0 {
op := errors.Opf(opf, rp.raddr, chainhash.Hash{})
err := errors.E(op, errors.Protocol, "received empty cfiltersv2 message")
rp.Disconnect(err)
return

}

var k any = msg.CFilters[0].BlockHash
v, ok := rp.requestedManyCFiltersV2.Load(k)
if !ok {
op := errors.Opf(opf, rp.raddr, k)
err := errors.E(op, errors.Protocol, "received unrequested many cfilter")
rp.Disconnect(err)
return
}

rp.requestedManyCFiltersV2.Delete(k)
c := v.(chan<- *wire.MsgCFiltersV2)
select {
case <-ctx.Done():
case c <- msg:
}
}

func (rp *RemotePeer) addRequestedCFilterV2(hash *chainhash.Hash, c chan<- *wire.MsgCFilterV2) (newRequest bool) {
_, loaded := rp.requestedCFiltersV2.LoadOrStore(*hash, c)
return !loaded
Expand Down Expand Up @@ -1743,6 +1782,64 @@ func (rp *RemotePeer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.H
return filters, nil
}

// BatchedCFiltersV2 fetches all cfilters between the passed start and end
// blocks. The first block MUST be an ancestor of the final block.
func (rp *RemotePeer) BatchedCFiltersV2(ctx context.Context, startHash, endHash *chainhash.Hash) ([]filterProof, error) {
const opf = "remotepeer(%v).CFilterV2(%v)"

if rp.pver < wire.CFilterV2Version {
op := errors.Opf(opf, rp.raddr, startHash)
err := errors.Errorf("protocol version %v is too low to fetch cfiltersv2 from this peer", rp.pver)
return nil, errors.E(op, errors.Protocol, err)
}

m := wire.NewMsgGetCFsV2(startHash, endHash)
c := make(chan *wire.MsgCFiltersV2, 1)
if !rp.addRequestedManyCFilterV2(startHash, c) {
op := errors.Opf(opf, rp.raddr, startHash)
return nil, errors.E(op, errors.Invalid, "cfilterv2 is already being requested from this peer for this block")
}
stalled := time.NewTimer(stallTimeout)
out := rp.out
for {
select {
case <-ctx.Done():
rp.deleteRequestedManyCFilterV2(startHash)
go func() {
<-stalled.C
}()
return nil, ctx.Err()
case <-stalled.C:
rp.deleteRequestedManyCFilterV2(startHash)
op := errors.Opf(opf, rp.raddr, startHash)
err := errors.E(op, errors.IO, "peer appears stalled")
rp.Disconnect(err)
return nil, err
case <-rp.errc:
stalled.Stop()
return nil, rp.err
case out <- &msgAck{m, nil}:
out = nil
case m := <-c:
stalled.Stop()
res := make([]filterProof, len(m.CFilters))
for i := 0; i < len(m.CFilters); i++ {
var f *gcs.FilterV2
var err error
f, err = gcs.FromBytesV2(blockcf.B, blockcf.M, m.CFilters[i].Data)
if err != nil {
op := errors.Opf(opf, rp.raddr, startHash)
return nil, errors.E(op, err)
}
res[i].Filter = f
res[i].Proof = m.CFilters[i].ProofHashes
res[i].ProofIndex = m.CFilters[i].ProofIndex
}
return res, nil
}
}
}

// SendHeaders sends the remote peer a sendheaders message. This informs the
// peer to announce new blocks by immediately sending them in a headers message
// rather than sending an inv message containing the block hash.
Expand Down
21 changes: 17 additions & 4 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN
cnet := s.wallet.ChainParams().Net

// Split fetching into batches of a max size.
const cfilterBatchSize = 100
const cfilterBatchSize = wire.MaxCFiltersV2PerBatch
if len(nodes) > cfilterBatchSize {
g, ctx := errgroup.WithContext(ctx)
for len(nodes) > cfilterBatchSize {
Expand Down Expand Up @@ -175,9 +175,22 @@ nextTry:

startTime := time.Now()

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

filters, err := rp.CFiltersV2(ctx, nodeHashes)
// If the node supports batched fetch, use it. Otherwise,
// fetch one by one.
var filters []wallet.FilterProof
if rp.Pver() >= wire.BatchedCFiltersV2Version {
filters, err = rp.BatchedCFiltersV2(ctx, nodes[0].Hash, nodes[len(nodes)-1].Hash)
if err == nil && len(filters) != len(nodes) {
errMsg := fmt.Errorf("peer returned unexpected "+
"number of filters (got %d, want %d)",
len(filters), len(nodes))
err = errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
continue nextTry
}
} else {
filters, err = rp.CFiltersV2(ctx, nodeHashes)
}
if err != nil {
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
Expand Down
Loading