Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync2: implement multi-peer synchronization #6358

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Binary file added dev-docs/multipeer.excalidraw.gz
Binary file not shown.
Binary file added dev-docs/multipeer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
185 changes: 178 additions & 7 deletions dev-docs/sync2-set-reconciliation.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!-- markdown-toc start - Don't edit this section. Run M-x markdown-toc-refresh-toc -->
**Table of Contents**

- [Set Reconciliation Protocol (sync2)](#set-reconciliation-protocol-sync2)
- [Pairwise Set Reconciliation Protocol](#pairwise-set-reconciliation-protocol)
- [Basic concepts](#basic-concepts)
- [Simplified set reconciliation example](#simplified-set-reconciliation-example)
- [Attack mitigation](#attack-mitigation)
Expand All @@ -27,18 +27,18 @@
- [Redundant ItemChunk messages](#redundant-itemchunk-messages)
- [Range checksums](#range-checksums)
- [Bloom filters for recent sync](#bloom-filters-for-recent-sync)
- [Multi-peer Reconciliation](#multi-peer-reconciliation)
- [Deciding on the sync strategy](#deciding-on-the-sync-strategy)
- [Split sync](#split-sync)
- [Full sync](#full-sync)

<!-- markdown-toc end -->

# Set Reconciliation Protocol (sync2)
# Pairwise Set Reconciliation Protocol

The recursive set reconciliation protocol described in this document is based on
The recursive set reconciliation protocol described in this section is based on
[Range-Based Set Reconciliation](https://arxiv.org/pdf/2212.13567.pdf) paper by Aljoscha Meyer.

The multi-peer reconciliation approach is loosely based on
[SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf)
paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.

## Basic concepts

The set reconciliation protocol is intended to synchronize **ordered sets**.
Expand Down Expand Up @@ -774,3 +774,174 @@ as we don't need the sets to be exactly same after the recent sync; we
just want to bring them closer to each other. That being said, a
sufficient size of the Bloom filter needs to be chosen to minimize the
number of missed elements.

# Multi-peer Reconciliation

The multi-peer reconciliation approach is loosely based on
[SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf)
paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.

![Multi-peer set reconciliation](multipeer.png)

Due to the FPTree data structure being used, the copy operation on a
set is `O(1)`. When synchronizing the local set against the remote
peer's set, we need to make sure the set doesn't change while being
synchronized, except for the recent items being added at the beginning
of the sync. Thus, for the purpose of synchronization against each
remote peer, a separate copy of the original set is made. When new
items are being received during sync with a remote peer, these items
are passed to the fetcher which retrieves the actual data blobs from
the peers, after which the received objects are validated and stored
in the state database. The main set is refreshed from time to time to
include the items that were recently added; this doesn't affect the
derived copies currently in use for sync.

## Deciding on the sync strategy

When picking the peers for the purpose of multi-peer sync, each peer
is [probed](#minhash-based-set-difference-estimation) to determine how
many items it has in its set. The peers with substantially lower
number of items than in the local set (configurable threshold) are not
considered for sync, so as not to place additional load on the peer
which are not fully synced yet, and let them decide on their syncing
strategy on their own. Note that the sync is always bi-directional.

Synchronization against multiple peers can be done in two modes:
1. Split sync involves splitting the whole range into smaller ones,
one smaller range per peer, and limiting the sync to the
corresponding range of IDs when syncing with each peer.
2. Full sync involves syncing the full set against each peer's full
set.

The sync strategy is selected based on the set similarity between the
local peers and the peers that have been chosen for sync, as well as
on the number of items in the remote peer sets. Roughly it can be described using
the following diagram:

```mermaid
stateDiagram-v2
[*] --> Wait
Wait --> ProbePeers : Timeout
ProbePeers --> Wait : No peers / <br/> all probes failed
ProbePeers --> SplitSync : Enough peers for split + <br/> this one is too different + <br> last sync was not split
ProbePeers --> FullSync : Too few peers for split / <br/> this one is similar <br/> enough to peers
SplitSync --> Wait : Sync failed
SplitSync --> FullSync : Sync succeeded
FullSync --> Wait : Sync terminated
```

## Split sync

The split sync approach helps bringing nodes that went substantially
out of sync relatively quickly while also making sure too much load is
not placed on each of the syncing peers. It somewhat resembles
BitTorrent approach where a file is downloaded from multiple peers,
with different pieces being obtained from different peers, even if
this similarity is rather superficial, as the protocol involved is
very different. The split sync is followed by full sync against the
peers, as in some cases, as with ATXs during cycle gaps, the set might
became somewhat "outdated" while the sync was being done. Below is
a diagram describing split sync sequence:

```mermaid
sequenceDiagram
Note over A: Check how different is A <br/> from its peers
par
A ->> B: Probe
B ->> A: Sample <br/> sim=0.95 count=10001
and
A ->> C: Probe
C ->> A: Sample <br/> sim=0.94 count=10002
and
A ->> D: Probe
D ->> A: Sample <br/> sim=0.94 count=10003
and
A ->> E: Probe
E ->> A: Sample <br/> sim=0.96 count=10001
and
A ->> F: Probe
F ->> A: Sample <br/> sim=0.89 count=9000
end
Note over A: Not enough peers close to this one <br/> Enough peers eligible for split sync <br/> Peer F's count is too low <br/> Proceeding with split sync
par
A <<->> B: Sync [0x00..., 0x40...)
and
A <<->> C: Sync [0x40..., 0x80...)
and
A <<->> D: Sync [0x80..., 0xC0...)
and
A <<->> E: Sync [0xC0..., 0x00...)
end
Note over A: Full sync follows split sync <br/> Syncing against peers that are in sync <br/> is very cheap
par
A <<->> B: Sync [0x00..., 0x00...)
and
A <<->> C: Sync [0x00..., 0x00...)
and
A <<->> D: Sync [0x00..., 0x00...)
and
A <<->> E: Sync [0x00..., 0x00...)
end
Note over A: Node A is in sync with the network
```

When some of the peers are too slow, their ranges are additionally
assigned to faster peers that managed to complete their ranges
already. Synchronization against slower peers is not interrupted
though until each range is synced at least once:

```mermaid
sequenceDiagram
par
A <<->> B: Sync [0x00..., 0x40...)
and
A <<->> C: Sync [0x40..., 0x80...)
and
A <<->> D: Sync [0x80..., 0xC0...)
and
A <<->> E: Sync [0xC0..., 0x00...)
and
Note over A: Peer E being too slow
A <<->> E: Sync [0xC0..., 0x00...)
end
```

## Full sync

Full sync is used when this node's set is similar enough to its peers'
sets, or when there's not enough peers for split sync. The full sync
against each peer is more reliable than split sync against the same
peers, so after split sync completes, full sync is always done. The
diagram below illustrates the full sync sequence.

```mermaid
sequenceDiagram
Note over A: Check how different is A <br/> from its peers
par
A ->> B: Probe
B ->> A: Sample <br/> sim=0.999 count=10001
and
A ->> C: Probe
C ->> A: Sample <br/> sim=0.999 count=10002
and
A ->> D: Probe
D ->> A: Sample <br/> sim=0.999 count=10003
and
A ->> E: Probe
E ->> A: Sample <br/> sim=0.999 count=10001
and
A ->> F: Probe
F ->> A: Sample <br/> sim=0.090 count=9000
end
Note over A: Enough peers close to this one <br/> Peer F's count is too low <br/> Proceeding with full sync
par
A <<->> B: Sync [0x00..., 0x00...)
and
A <<->> C: Sync [0x00..., 0x00...)
and
A <<->> D: Sync [0x00..., 0x00...)
and
A <<->> E: Sync [0x00..., 0x00...)
end
Note over A: Node A is in sync with the network
```
7 changes: 7 additions & 0 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type Peers struct {
globalLatency float64
}

func (p *Peers) Contains(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, exist := p.peers[id]
return exist
}

func (p *Peers) Add(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
23 changes: 21 additions & 2 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,30 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt {
}
}

// WithDecayingTag specifies P2P decaying tag that is applied to the peer when a request
// is being served.
func WithDecayingTag(tag DecayingTagSpec) Opt {
return func(s *Server) {
s.decayingTagSpec = &tag
}
}

type peerIDKey struct{}

func withPeerID(ctx context.Context, peerID peer.ID) context.Context {
return context.WithValue(ctx, peerIDKey{}, peerID)
}

// ContextPeerID retrieves the ID of the peer being served from the context and a boolean
// value indicating that the context contains peer ID. If there's no peer ID associated
// with the context, the function returns an empty peer ID and false.
func ContextPeerID(ctx context.Context) (peer.ID, bool) {
if v := ctx.Value(peerIDKey{}); v != nil {
return v.(peer.ID), true
}
return peer.ID(""), false
}

// Handler is a handler to be defined by the application.
type Handler func(context.Context, []byte) ([]byte, error)

Expand Down Expand Up @@ -264,7 +282,8 @@ func (s *Server) Run(ctx context.Context) error {
eg.Wait()
return nil
}
ctx, cancel := context.WithCancel(ctx)
peer := req.stream.Conn().RemotePeer()
ctx, cancel := context.WithCancel(withPeerID(ctx, peer))
eg.Go(func() error {
<-ctx.Done()
s.sem.Release(1)
Expand All @@ -275,7 +294,7 @@ func (s *Server) Run(ctx context.Context) error {
defer cancel()
conn := req.stream.Conn()
if s.decayingTag != nil {
s.decayingTag.Bump(conn.RemotePeer(), s.decayingTagSpec.Inc)
s.decayingTag.Bump(peer, s.decayingTagSpec.Inc)
}
ok := s.queueHandler(ctx, req.stream)
duration := time.Since(req.received)
Expand Down
16 changes: 12 additions & 4 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/spacemeshos/go-scale/tester"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -45,8 +46,10 @@ func TestServer(t *testing.T) {
request := []byte("test request")
testErr := errors.New("test error")

handler := func(_ context.Context, msg []byte) ([]byte, error) {
return msg, nil
handler := func(ctx context.Context, msg []byte) ([]byte, error) {
peerID, found := ContextPeerID(ctx)
require.True(t, found)
return append(msg, []byte(peerID)...), nil
}
errhandler := func(_ context.Context, _ []byte) ([]byte, error) {
return nil, testErr
Expand Down Expand Up @@ -81,6 +84,9 @@ func TestServer(t *testing.T) {
append(opts, WithRequestSizeLimit(limit))...,
)
ctx, cancel := context.WithCancel(context.Background())
noPeerID, found := ContextPeerID(ctx)
require.Equal(t, peer.ID(""), noPeerID)
require.False(t, found)
var eg errgroup.Group
eg.Go(func() error {
return srv1.Run(ctx)
Expand Down Expand Up @@ -109,7 +115,8 @@ func TestServer(t *testing.T) {
srvID := mesh.Hosts()[1].ID()
response, err := client.Request(ctx, srvID, request)
require.NoError(t, err)
require.Equal(t, request, response)
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
require.Equal(t, expResponse, response)
srvConns := mesh.Hosts()[1].Network().ConnsToPeer(mesh.Hosts()[0].ID())
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())
Expand All @@ -129,7 +136,8 @@ func TestServer(t *testing.T) {
srvID := mesh.Hosts()[3].ID()
response, err := client.Request(ctx, srvID, request)
require.NoError(t, err)
require.Equal(t, request, response)
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
require.Equal(t, expResponse, response)
srvConns := mesh.Hosts()[3].Network().ConnsToPeer(mesh.Hosts()[0].ID())
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())
Expand Down
32 changes: 32 additions & 0 deletions sync2/multipeer/delim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package multipeer

import (
"encoding/binary"

"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

// getDelimiters generates keys that can be used as range delimiters for splitting the key
// space among the specified number of peers. maxDepth specifies maximum number of high
// non-zero bits to include in resulting keys, which helps avoiding unaligned splits which
// are more expensive for FPTree data structure.
// keyLen specifies key length in bytes.
// The function returns numPeers-1 keys. The ranges are used for split sync, with each
// range being assigned to a separate peer. The first range begins with zero-valued key
// (k0), represented by KeyBytes of length keyLen consisting entirely of zeroes.
// The ranges to scan are:
// [k0,ks[0]); [k0,ks[1]); ... [k0,ks[numPeers-2]); [ks[numPeers-2],0).
func getDelimiters(numPeers, keyLen, maxDepth int) (ks []rangesync.KeyBytes) {
if numPeers < 2 {
return nil
}
mask := uint64(0xffffffffffffffff) << (64 - maxDepth)
inc := (uint64(0x80) << 56) / uint64(numPeers)
ks = make([]rangesync.KeyBytes, numPeers-1)
for i, v := 0, uint64(0); i < numPeers-1; i++ {
ks[i] = make(rangesync.KeyBytes, keyLen)
v += inc
binary.BigEndian.PutUint64(ks[i], (v<<1)&mask)
}
return ks
}
Loading
Loading