-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync2: implement multi-peer synchronization #6358
base: develop
Are you sure you want to change the base?
Conversation
f472048
to
71c9f56
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #6358 +/- ##
=========================================
+ Coverage 79.7% 79.8% +0.1%
=========================================
Files 328 335 +7
Lines 42977 43656 +679
=========================================
+ Hits 34256 34860 +604
- Misses 6782 6831 +49
- Partials 1939 1965 +26 ☔ View full report in Codecov by Sentry. |
2e8e59c
to
c22a7c4
Compare
b8ea626
to
82b5a72
Compare
Given that after recent item sync is done (if it's needed at all), the range set reconciliation algorithm no longer depends on newly received item being added to the set, we can save memory by not adding the received items during reconciliation. During real sync, the received items will be sent to the respective handlers and after the corresponding data are fetched and validated, they will be added to the database, without the need to add them to cloned OrderedSets which are used to sync against particular peers.
6f4b0ca
to
500ab74
Compare
Given that no review comments were added yet, I've rebased the PR on top of #6404, squashing commits one more time |
This adds multi-peer synchronization support. When the local set differs too much from the remote sets, "torrent-style" "split sync" is attempted which splits the set into subranges and syncs each sub-range against a separate peer. Otherwise, the full sync is done, syncing the whole set against each of the synchronization peers. Full sync is also done after each split sync run. The local set can be considered synchronized after the specified number of full syncs has happened. The approach is loosely based on [SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf) paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.
4c57c22
to
ef30f47
Compare
sync2/multipeer/delim.go
Outdated
"github.com/spacemeshos/go-spacemesh/sync2/rangesync" | ||
) | ||
|
||
func getDelimiters(numPeers, keyLen, maxDepth int) (h []rangesync.KeyBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I understand the context of the word delimiters
here. It isn't really clear what this function does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment describing the purpose of this function
sync2/multipeer/interface.go
Outdated
// It extends rangesync.OrderedSet with methods which are needed for multi-peer | ||
// reconciliation. | ||
type OrderedSet interface { | ||
rangesync.OrderedSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to have these multiple interfaces that share the same name in a different package? why can't we just have one OrderedSet
interface? also the inheritance syntax is confusing in this context. if this is strictly needed and we can't do without it, consider adding to the interface naming something that suggests that it has to do with syncing (iiuc from the comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should probably do it indeed, this kind of splitting for this interface was motivated by the need to split the whole syncv2 thing into multiple PRs, but at this point it's already not needed that much
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged multipeer.OrderedSet
interface into rangesync.OrderedSet
sync2/multipeer/interface.go
Outdated
Has(rangesync.KeyBytes) (bool, error) | ||
// Release releases the resources associated with the set. | ||
// Calling Release on a set that is already released is a no-op. | ||
Release() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider using io.Closer
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On one hand this seems to make sense, but OrderedSet
is not really an I/O primitive, so I'm somewhat in doubt here.
sync2/multipeer/interface.go
Outdated
} | ||
|
||
// Syncer is a synchronization interface for a single peer. | ||
type Syncer interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: PeerSyncer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, will rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
return sr | ||
} | ||
|
||
func newSyncQueue(numPeers, keyLen, maxDepth int) syncQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why not to return a pointer here? also all the methods have pointer semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syncQueue
is just a slice, not a struct, so the pointer is only needed for the methods that modify it, as there's not much copying involved
for sl.syncs.Len() != 0 { | ||
el := sl.syncs.Back() | ||
if t.After(el.Value.(time.Time)) { | ||
sl.syncs.Remove(el) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this not cause a memory leak? i.e. if a a series of double-linked list items get cut off from the rest, it means they might just continue living in memory because they reference each other (won't show up in the gc mark-and-sweep runs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go's double-linked implementation unlinks the list element properly: https://github.com/golang/go/blob/go1.23.2/src/container/list/list.go#L108-L115
sync2/p2p.go
Outdated
"github.com/spacemeshos/go-spacemesh/sync2/rangesync" | ||
) | ||
|
||
type Dispatcher = rangesync.Dispatcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the type alias needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea was for sync2
package to serve as a facade that hides all the implementation details of sync itself beneath it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, I'm still not sure I understand the type-aliasing narrative though. Not sure how the two are related. Unless you're decorating the original type with more functionality I don't see why this should be necessary. It just adds more indirection to an already quite large package. If you need to leak stuff out of the package, maybe better to do it through interfaces instead of type aliasing (that's just my opinion though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok it was a remnant from an older iteration where Dispatcher
did not use the constructor, etc.
Given that rangesync.OrderedSet
etc. is needed anyway, I removed this type alias.
sync2/p2p.go
Outdated
s.reconciler = multipeer.NewMultiPeerReconciler( | ||
s.syncBase, peers, keyLen, maxDepth, | ||
multipeer.WithLogger(logger), | ||
multipeer.WithSyncPeerCount(cfg.SyncPeerCount), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like a good candidate for a Config
type. many function calls that can be avoided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to config type (also did that for rangesync
package)
return | ||
} | ||
s.running.Store(true) | ||
s.start.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why Once
is needed? can it be that it will be called from multiple places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was for the Start
method to be idempotent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand the reasoning - Start
can be called any amount of times - but only the first time it actually does something? Start
- Stop
- Start
causes running
to be true
but the component is actually in the stopped state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that there's no harm in invoking Start
multiple times on a P2PHashSync
, but after you Stop()
it, you throw it away (it is non-restartable)
s.eg.Go(func() error { | ||
defer s.running.Store(false) | ||
var ctx context.Context | ||
ctx, s.cancel = context.WithCancel(context.Background()) | ||
return s.reconciler.Run(ctx) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start
just serves as a wrapper around the Run
method here. Would it make sense to instead of having the Start
, Stop
and s.running
methods/fields to just have a Run
method that passes along the context to s.reconciler
? This would also get rid of the need for s.cancel
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2PHashSync
has active sync enable/disable flag as its config option. That's part of P2PHashSync
logic.
When cfg.EnableActiveSync
is false
, Start
/ Stop
are noops. In this case, P2PHashSync
only serves requests received from a p2p Server
via a Dispatcher
.
sync2/multipeer/multipeer_test.go
Outdated
var ctx context.Context | ||
for i := 0; i < numSyncs; i++ { | ||
pl := mt.expectProbe(6, rangesync.ProbeResult{ | ||
FP: "foo", | ||
Count: 100, | ||
Sim: 0.99, // high enough for full sync | ||
}) | ||
mt.expectFullSync(pl, 6, 0) | ||
mt.syncBase.EXPECT().Wait() | ||
if i == 0 { | ||
//nolint:fatcontext | ||
ctx = mt.start() | ||
} else { | ||
// first full sync happens immediately | ||
mt.clock.Advance(time.Minute) | ||
} | ||
mt.clock.BlockUntilContext(ctx, 1) | ||
mt.satisfy() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the first loop behaves differently would it make sense to indicate it as such more clearly? This should also get rid of the linter warning.
var ctx context.Context | |
for i := 0; i < numSyncs; i++ { | |
pl := mt.expectProbe(6, rangesync.ProbeResult{ | |
FP: "foo", | |
Count: 100, | |
Sim: 0.99, // high enough for full sync | |
}) | |
mt.expectFullSync(pl, 6, 0) | |
mt.syncBase.EXPECT().Wait() | |
if i == 0 { | |
//nolint:fatcontext | |
ctx = mt.start() | |
} else { | |
// first full sync happens immediately | |
mt.clock.Advance(time.Minute) | |
} | |
mt.clock.BlockUntilContext(ctx, 1) | |
mt.satisfy() | |
} | |
pl := mt.expectProbe(6, rangesync.ProbeResult{ | |
FP: "foo", | |
Count: 100, | |
Sim: 0.99, // high enough for full sync | |
}) | |
mt.expectFullSync(pl, 6, 0) | |
mt.syncBase.EXPECT().Wait() | |
mt.clock.Advance(time.Minute) // first sync happens immediatly | |
mt.clock.BlockUntilContext(context.Background(), 1) | |
mt.satisfy() | |
for i := 1; i < numSyncs; i++ { | |
pl := mt.expectProbe(6, rangesync.ProbeResult{ | |
FP: "foo", | |
Count: 100, | |
Sim: 0.99, // high enough for full sync | |
}) | |
mt.expectFullSync(pl, 6, 0) | |
mt.syncBase.EXPECT().Wait() | |
ctx := mt.start() | |
mt.clock.BlockUntilContext(ctx, 1) | |
mt.satisfy() | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the other way around, we need to do mt.start()
in the initial iteration and advance the clocks in the following ones. But otherwise that's probably the right idea, so I updated the code, except that I wrapped expectations in the nested func expect
to highlight the fact that the expectations are the same right after startup and when it's time to do more syncs
sync2/p2p.go
Outdated
peer, found := server.ContextPeerID(ctx) | ||
if !found { | ||
panic("BUG: no peer ID found in the handler") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to pass the peer
explicitly instead of putting it into the context and then panicing when it isn't there? Afaik this is the only place in our codebase at the moment where we put values in the context at all. A missing value should not lead to a panic or this feels like a misuse of context.WithValue
to me 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That entailed quite a few changes in unrelated code as most of the existing p2p.Server
use cases don't care about the peer ID, but I still updated the p2p.Server
and got rid of that context key
Motivation
syncv2
must ensure that the network is in sync by performing sync against multiple peers from time to time, also when starting a fresh/stale node.When a lot of data needs to be transferred during sync, it would be better to spread the load across multiple peers to avoid costly
ax/1
-like requests.Description
#6404 needs to be merged before this one.
This adds multi-peer synchronization support.
When the local set differs too much from the remote sets, making pairwise sync degrade to transferring the whole set, "torrent-style" "split sync" is attempted which splits the set into subranges and syncs each sub-range against a separate peer. Otherwise, the full sync is done, syncing the whole set against each of the synchronization peers.
Full sync is also done after each split sync run.
The local set can be considered synchronized after the specified number of full syncs has happened.
The approach is loosely based on SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale
Blockchains paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.