-
Notifications
You must be signed in to change notification settings - Fork 1
Full Replication #274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Full Replication #274
Conversation
func (e *Epoch) maybeSendNotarizationOrFinalization(to NodeID, round uint64) { | ||
r, ok := e.rounds[round] | ||
|
||
if !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have an empty notarization and a notarization for the same round, which one should we prioritze sending?
epoch.go
Outdated
e.Logger.Debug("Received invalid quorum round", zap.Uint64("seq", data.GetSequence()), zap.Stringer("from", from)) | ||
// TODO: because empty notarizations can be for long periods, we may want to allow messages that are finalized to | ||
// be stored even if they are > maxRoundWindow rounds ahead. For now we allow only the nextSeqToCommit but we might want to | ||
// allow a few more seqs ahead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small bug i caught with our old replication logic
// make sure the latest round is well formed | ||
if err := latestRound.IsWellFormed(); err != nil { | ||
e.Logger.Debug("Received invalid latest round", zap.Error(err)) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
garbage collection is now managed by the replicationState
I made this test in the past, maybe it can be useful. |
advances rounds/sequences? |
0f3cf5b
to
d443b2d
Compare
d443b2d
to
00bbc72
Compare
Signed-off-by: Sam Liokumovich <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass, will make another pass later.
testutil/node.go
Outdated
|
||
// TimeoutOnRound advances time until the node times out of the given round. | ||
func (t *TestNode) TimeoutOnRound(round uint64) { | ||
startTime := t.E.StartTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to have a time as part of the TestNode
struct, otherwise you can only use this method properly once. When this method returns, the start time of the node is out of sync with the E.StartTime
and you will start from the old start time the next time you invoke this method. Instead I think we should store the current time in the TestNode
and when you update the time you also save it in the TestNode
. This way, next time you invoke this method, you will fetch the updated start time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
u added a very helpful currentTime
variable as part of testNode so I'll use that 🎉
testutil/node.go
Outdated
return | ||
} | ||
|
||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to give a little throttle for the for
loop and allow messages to be processed by the node. reduced it to 50ms bc i think 100 was too much.
// it limits the amount of outstanding requests to be at most [maxRoundWindow] ahead of [base] which is | ||
// either nextSeqToCommit or currentRound depending on if we are replicating sequences or rounds. | ||
func (r *replicator) maybeSendMoreReplicationRequests(observed *signerRoundOrSeq, base uint64) { | ||
val := observed.value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think we should be having any variable called "val". It's only slightly more generic than "variable".
// maybeSendMoreReplicationRequests checks if we need to send more replication requests given an observed round or sequence. | ||
// it limits the amount of outstanding requests to be at most [maxRoundWindow] ahead of [base] which is | ||
// either nextSeqToCommit or currentRound depending on if we are replicating sequences or rounds. | ||
func (r *replicator) maybeSendMoreReplicationRequests(observed *signerRoundOrSeq, base uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why call this base
and not nextSeqToCommit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the naming is a bit weird and im very open to suggestions. specifically regarding this and .value()
. the reason its not nextSeq to commit is because it can be the nextSeqToCommit or the current round.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
potentionallyHighestroundOrSequence
?
} | ||
|
||
startSeq := math.Max(float64(base), float64(r.highestRequested)) | ||
// we limit the number of outstanding requests to be at most maxRoundWindow ahead of nextSeqToCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we limit the number of outstanding requests to be at most maxRoundWindow ahead of nextSeqToCommit
why? doesn't that mean we will only replicate up to max round window?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but we only replicate to maxRoundWindow
ahead at a single time. When the state advances, we check if we need to send more. This way if we are behind by a gazilion blocks our memory doesn't get overwhelmed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the memory concern, I am just wondering if it makes sense to use the same config parameter for both the standard execution flow and the replication one.
I guess we can put them the same for now, maybe revisit later.
// sendReplicationRequests sends requests for missing sequences for the | ||
// range of sequences [start, end] <- inclusive. It does so by splitting the | ||
// range of sequences equally amount the nodes that have signed [highestSequenceObserved]. | ||
func (r *replicator) sendReplicationRequests(start uint64, end uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just moved over from replication.go
right?
replicator.go
Outdated
// sendRequestToNode requests the sequences [start, end] from nodes[index]. | ||
// In case the nodes[index] does not respond, we create a timeout that will | ||
// re-send the request. | ||
func (r *replicator) sendRequestToNode(start uint64, end uint64, nodes []NodeID, index int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we call resendReplicationRequests
we call sendRequestToNode
with seqs.Start, seqs.End
where we call AddTask()
on the index.
Isn't this a repetitive re-addition of the sequences to the map? Why do we need to do that again and again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be a no-op though since the task is already in the scheduler. I can add a flag to sendRequestToNode
such as shouldAddTimeout
and only add the timeout if set.
|
msg.VerifiedReplicationResponse.Data = newData | ||
c.replicationResponses <- struct{}{} | ||
select { | ||
case c.replicationResponses <- struct{}{}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not ideal, but was causing a flake because now we send extra replication responses. Once if we receive a notarization, and another if we receive finalizations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will continue to review later, here are some more comments.
return nil | ||
} | ||
|
||
// for future rounds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about future rounds?
// maybeSendMoreReplicationRequests checks if we need to send more replication requests given an observed round or sequence. | ||
// it limits the amount of outstanding requests to be at most [maxRoundWindow] ahead of [base] which is | ||
// either nextSeqToCommit or currentRound depending on if we are replicating sequences or rounds. | ||
func (r *replicator) maybeSendMoreReplicationRequests(observed *signerRoundOrSeq, base uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
potentionallyHighestroundOrSequence
?
val := observed.value() | ||
|
||
// we've observed something we've already requested | ||
if r.highestRequested >= val && r.highestObserved != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am really concerned that we're going to compare apples to oranges if we compare a highest observed sequence to a round, and vice versa.
r.highestObserved
can be either a round or a sequence.
Can this not cause an incorrect return?
} | ||
|
||
startSeq := math.Max(float64(base), float64(r.highestRequested)) | ||
// we limit the number of outstanding requests to be at most maxRoundWindow ahead of nextSeqToCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the memory concern, I am just wondering if it makes sense to use the same config parameter for both the standard execution flow and the replication one.
I guess we can put them the same for now, maybe revisit later.
r.highestObserved = observed | ||
} | ||
|
||
startSeq := math.Max(float64(base), float64(r.highestRequested)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand - if highestRequested
is a round, how can we treat it as a sequence?
Can we perhaps maintain separate highestObservedX for rounds and sequences? I think it would make things more safer.
func (e *Epoch) processLatestRoundReceived(latestRound *QuorumRound, from NodeID) error { | ||
// processQuorumRound processes a quorum round received from another node. | ||
// It verifies the quorum round and stores it in the replication state if valid. | ||
func (e *Epoch) processQuorumRound(latestRound *QuorumRound, from NodeID) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR, but thought I should mention it.
If we replicate a notarized block
Conversely, we can replicate an empty notarization while everyone else but the sender node has a regular notarization for the same round. Then we might get stuck, won't we?
Summary
This PR finalizes the replication scheme by clearly distinguishing between replicating rounds and replicating sequences. It also refactors related components for clarity and reliability. Overall, the replication logic is much more straight-forward and simple which will hopefully lead to less bugs(a few small ones I have found while making the pr)
Replication Overview
In Simplex, when a node receives a
notarization
,emptyNotarization
, orfinalization
for a round or sequence ahead of its current state, it indicates the node is behind. The process of catching up(retrieving missing rounds or sequences) is called Replication.The
replicationState
struct manages this process. Replication is triggered when:notarization
oremptyNotarization
for a higher round is received, orfinalization
for a future sequence arrives.notarization
andfinalization
to potentially trigger replication for the lagging node.The
replicationState
tracks missing rounds and sequences, resends requests as needed, and removes completed items. When aQuorumRound
(notarization, empty notarization, or finalization) is received, it’s added to the relevant state. Finalizations always supersede earlier quorum rounds, allowing us to prune older states from both the rounds and sequence trackers.processReplicationState
advances rounds or sequences by checking for available quorum rounds. If the next sequence is complete, it’s committed; otherwise, the function checks for the next round. Empty notarizations can sometimes be inferred to advance rounds viamaybeAdvanceRoundFromEmptyNotarizations
.Key Changes
replicator
struct, which:QuorumRounds
keyed by rounds or sequenc es.maxRoundWindow
ahead of the current position.TimeoutManager
:taskMap
, theTimeoutHandler
periodically sends them to theTaskRunner
everyrunInterval
.