Skip to content

Commit

Permalink
Removed redundant linkage with lifecycle token
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Oct 28, 2023
1 parent 513af89 commit dfc963b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,12 @@ protected async Task<bool> AddMemberAsync<TAddress>(TMember member, int rounds,
// proposes a new member
if (await configurationStorage.AddMemberAsync(addressProvider(member), token).ConfigureAwait(false))
{
while (!await ReplicateAsync(new EmptyLogEntry(Term), token).ConfigureAwait(false)) ;
await ReplicateAsync(token).ConfigureAwait(false);

// ensure that the newly added member has been committed
await configurationStorage.WaitForApplyAsync(token).ConfigureAwait(false);
return true;
}

return false;
}
catch (OperationCanceledException e)
{
Expand All @@ -312,6 +310,8 @@ protected async Task<bool> AddMemberAsync<TAddress>(TMember member, int rounds,
tokenSource?.Dispose();
membershipState.Value = false;
}

return false;
}

private ValueTask<Result<bool>> CatchUpAsync(TMember member, long commitIndex, long term, long precedingIndex, long precedingTerm, long currentIndex, CancellationToken token)
Expand Down Expand Up @@ -354,7 +354,7 @@ protected async Task<bool> RemoveMemberAsync<TAddress>(ClusterMemberId id, IClus
// remove the existing member
if (await configurationStorage.RemoveMemberAsync(addressProvider(member), token).ConfigureAwait(false))
{
while (!await ReplicateAsync(new EmptyLogEntry(Term), token).ConfigureAwait(false));
await ReplicateAsync(token).ConfigureAwait(false);

// ensure that the removed member has been committed
await configurationStorage.WaitForApplyAsync(token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,19 @@ public async ValueTask<bool> ReplicateAsync<TEntry>(TEntry entry, CancellationTo
return auditTrail.Term == entry.Term;
}

private async ValueTask ReplicateAsync(CancellationToken token)
{
EmptyLogEntry entry;
do
{
entry = new(Term);
var index = await auditTrail.AppendAsync(entry, token).ConfigureAwait(false);
await ForceReplicationAsync(token).ConfigureAwait(false);
await auditTrail.WaitForCommitAsync(index, token).ConfigureAwait(false);
}
while (Term != entry.Term);
}

private TMember? TryGetPeer(EndPoint peer)
{
foreach (var member in members.Values)
Expand Down

0 comments on commit dfc963b

Please sign in to comment.