From a0427a76f59d277379376c146efade306afd0b05 Mon Sep 17 00:00:00 2001 From: sakno Date: Sun, 25 Feb 2024 12:00:21 +0200 Subject: [PATCH] Release 5.0.3 --- CHANGELOG.md | 22 +++ README.md | 26 ++-- src/DotNext.IO/DotNext.IO.csproj | 2 +- .../DotNext.Metaprogramming.csproj | 2 +- .../DotNext.Threading.csproj | 2 +- src/DotNext.Unsafe/DotNext.Unsafe.csproj | 2 +- src/DotNext/DotNext.csproj | 2 +- src/DotNext/Runtime/GCLatencyModeScope.cs | 8 +- .../DotNext.AspNetCore.Cluster.csproj | 2 +- .../DotNext.Net.Cluster.csproj | 2 +- .../Cluster/Consensus/Raft/CandidateState.cs | 130 ++++++++++-------- .../Consensus/Raft/ConsensusOnlyState.cs | 14 +- .../Cluster/Consensus/Raft/FollowerState.cs | 46 +++---- .../Consensus/Raft/IPersistentState.cs | 12 +- .../Consensus/Raft/LeaderState.Replication.cs | 2 +- .../Net/Cluster/Consensus/Raft/LeaderState.cs | 64 ++++----- .../Cluster/Consensus/Raft/PersistentState.cs | 12 +- .../Net/Cluster/Consensus/Raft/RaftCluster.cs | 22 +-- 18 files changed, 206 insertions(+), 166 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2726a126b3..6dc45d71c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,28 @@ Release Notes ==== +# 02-25-2024 +DotNext 5.0.3 +* Fixed behavior to no-op when `GCLatencyModeScope` is initialized to default + +DotNext.Metaprogramming 5.0.3 +* Updated dependencies + +DotNext.Unsafe 5.0.3 +* Updated dependencies + +DotNext.Threading 5.0.3 +* Updated dependencies + +DotNext.IO 5.0.3 +* Updated dependencies + +DotNext.Net.Cluster 5.0.3 +* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221) + +DotNext.AspNetCore.Cluster 5.0.3 +* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221) + # 02-17-2024 DotNext 5.0.2 * Fixed XML docs diff --git a/README.md b/README.md index e11bc6c5d2..0a564aeee6 100644 --- a/README.md +++ b/README.md @@ -44,28 +44,28 @@ All these things are implemented in 100% managed code on top of existing .NET AP * [NuGet Packages](https://www.nuget.org/profiles/rvsakno) # What's new -Release Date: 02-17-2024 +Release Date: 02-25-2024 -DotNext 5.0.2 -* Fixed XML docs +DotNext 5.0.3 +* Fixed behavior to no-op when `GCLatencyModeScope` is initialized to default -DotNext.Metaprogramming 5.0.2 -* Fixed [223](https://github.com/dotnet/dotNext/issues/223) +DotNext.Metaprogramming 5.0.3 +* Updated dependencies -DotNext.Unsafe 5.0.2 +DotNext.Unsafe 5.0.3 * Updated dependencies -DotNext.Threading 5.0.2 -* Added correct validation for maximum possible timeout for all `WaitAsync` methods +DotNext.Threading 5.0.3 +* Updated dependencies -DotNext.IO 5.0.2 +DotNext.IO 5.0.3 * Updated dependencies -DotNext.Net.Cluster 5.0.2 -* Prevent indexing of WAL files on Windows +DotNext.Net.Cluster 5.0.3 +* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221) -DotNext.AspNetCore.Cluster 5.0.2 -* Updated dependencies +DotNext.AspNetCore.Cluster 5.0.3 +* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221) Changelog for previous versions located [here](./CHANGELOG.md). diff --git a/src/DotNext.IO/DotNext.IO.csproj b/src/DotNext.IO/DotNext.IO.csproj index 3eebc20742..eb282ab740 100644 --- a/src/DotNext.IO/DotNext.IO.csproj +++ b/src/DotNext.IO/DotNext.IO.csproj @@ -11,7 +11,7 @@ .NET Foundation and Contributors .NEXT Family of Libraries - 5.0.2 + 5.0.3 DotNext.IO MIT diff --git a/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj b/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj index e9e1ad8e06..befc25c6fc 100644 --- a/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj +++ b/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj @@ -8,7 +8,7 @@ true false nullablePublicOnly - 5.0.2 + 5.0.3 .NET Foundation .NEXT Family of Libraries diff --git a/src/DotNext.Threading/DotNext.Threading.csproj b/src/DotNext.Threading/DotNext.Threading.csproj index 8794d3a676..6122db3c5a 100644 --- a/src/DotNext.Threading/DotNext.Threading.csproj +++ b/src/DotNext.Threading/DotNext.Threading.csproj @@ -7,7 +7,7 @@ true true nullablePublicOnly - 5.0.2 + 5.0.3 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/DotNext.Unsafe/DotNext.Unsafe.csproj b/src/DotNext.Unsafe/DotNext.Unsafe.csproj index 291a3ed086..a185cb01b3 100644 --- a/src/DotNext.Unsafe/DotNext.Unsafe.csproj +++ b/src/DotNext.Unsafe/DotNext.Unsafe.csproj @@ -7,7 +7,7 @@ enable true true - 5.0.2 + 5.0.3 nullablePublicOnly .NET Foundation and Contributors diff --git a/src/DotNext/DotNext.csproj b/src/DotNext/DotNext.csproj index 7218a2535c..d20523fe36 100644 --- a/src/DotNext/DotNext.csproj +++ b/src/DotNext/DotNext.csproj @@ -11,7 +11,7 @@ .NET Foundation and Contributors .NEXT Family of Libraries - 5.0.2 + 5.0.3 DotNext MIT diff --git a/src/DotNext/Runtime/GCLatencyModeScope.cs b/src/DotNext/Runtime/GCLatencyModeScope.cs index b4671040a0..3e01b1cb73 100644 --- a/src/DotNext/Runtime/GCLatencyModeScope.cs +++ b/src/DotNext/Runtime/GCLatencyModeScope.cs @@ -9,7 +9,7 @@ namespace DotNext.Runtime; [StructLayout(LayoutKind.Auto)] public readonly struct GCLatencyModeScope : IDisposable { - private readonly GCLatencyMode currentMode; + private readonly GCLatencyMode? currentMode; /// /// Initializes a new scope that affects GC intrusion level. @@ -24,7 +24,11 @@ public GCLatencyModeScope(GCLatencyMode mode) /// /// Cancels previously defined GC latency. /// - public void Dispose() => GCSettings.LatencyMode = currentMode; + public void Dispose() + { + if (currentMode.HasValue) + GCSettings.LatencyMode = currentMode.GetValueOrDefault(); + } /// /// Creates a scope with GC intrusion level. diff --git a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj index 9af211a485..ed24bdd057 100644 --- a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj +++ b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj @@ -8,7 +8,7 @@ true true nullablePublicOnly - 5.0.2 + 5.0.3 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj index f2c8e0e8f9..c4d7eb6fe0 100644 --- a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj +++ b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj @@ -8,7 +8,7 @@ enable true nullablePublicOnly - 5.0.2 + 5.0.3 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs index 345957dd0a..c40a3080a5 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs @@ -7,63 +7,66 @@ namespace DotNext.Net.Cluster.Consensus.Raft; using Runtime.CompilerServices; using Threading.Tasks; -internal sealed class CandidateState(IRaftStateMachine stateMachine, long term) : RaftState(stateMachine) +internal sealed class CandidateState : RaftState where TMember : class, IRaftClusterMember { - private enum VotingResult : byte + private readonly CancellationTokenSource votingCancellation; + private readonly CancellationToken votingCancellationToken; // cached to prevent ObjectDisposedException + internal readonly long Term; + private Task? votingTask; + + public CandidateState(IRaftStateMachine stateMachine, long term) + : base(stateMachine) { - Rejected = 0, - Granted, - Canceled, - NotAvailable, + Term = term; + votingCancellation = new(); + votingCancellationToken = votingCancellation.Token; } - private readonly CancellationTokenSource votingCancellation = new(); - internal readonly long Term = term; - private Task? votingTask; - + [AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder))] private async Task VoteAsync(TimeSpan timeout, IAuditTrail auditTrail) { // Perf: reuse index and related term once for all members var lastIndex = auditTrail.LastEntryIndex; - var lastTerm = await auditTrail.GetTermAsync(lastIndex, votingCancellation.Token).ConfigureAwait(false); + var lastTerm = await auditTrail.GetTermAsync(lastIndex, votingCancellationToken).ConfigureAwait(false); // start voting in parallel - var voters = StartVoting(Members, Term, lastIndex, lastTerm, votingCancellation.Token); + var voters = StartVoting(Members, Term, lastIndex, lastTerm, votingCancellationToken); votingCancellation.CancelAfter(timeout); // finish voting await EndVoting(voters.GetConsumer(), votingCancellation.Token).ConfigureAwait(false); - static TaskCompletionPipe> StartVoting(IReadOnlyCollection members, long currentTerm, long lastIndex, long lastTerm, CancellationToken token) + static TaskCompletionPipe> StartVoting(IReadOnlyCollection members, long currentTerm, long lastIndex, long lastTerm, CancellationToken token) { - var voters = new TaskCompletionPipe>(); + var voters = new TaskCompletionPipe>(); // start voting in parallel - foreach (var member in members) - voters.Add(VoteAsync(member, currentTerm, lastIndex, lastTerm, token)); + using (var enumerator = members.GetEnumerator()) + { + while (enumerator.MoveNext() && !token.IsCancellationRequested) + { + voters.Add(VoteAsync(enumerator.Current, currentTerm, lastIndex, lastTerm, token)); + } + } voters.Complete(); return voters; } [AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder<>))] - static async Task<(TMember, long, VotingResult)> VoteAsync(TMember voter, long currentTerm, long lastIndex, long lastTerm, CancellationToken token) + static async Task<(TMember, long, bool?)> VoteAsync(TMember voter, long currentTerm, long lastIndex, long lastTerm, CancellationToken token) { - VotingResult result; + bool? result; try { var response = await voter.VoteAsync(currentTerm, lastIndex, lastTerm, token).ConfigureAwait(false); currentTerm = response.Term; - result = response.Value ? VotingResult.Granted : VotingResult.Rejected; - } - catch (OperationCanceledException) - { - result = VotingResult.Canceled; + result = response.Value; } catch (MemberUnavailableException) { - result = VotingResult.NotAvailable; + result = null; currentTerm = -1L; } @@ -71,50 +74,68 @@ private async Task VoteAsync(TimeSpan timeout, IAuditTrail auditT } } - private async Task EndVoting(IAsyncEnumerable<(TMember, long, VotingResult)> voters, CancellationToken token) + private async Task EndVoting(TaskCompletionPipe.Consumer<(TMember, long, bool?)> voters, CancellationToken token) { var votes = 0; var localMember = default(TMember); - await foreach (var (member, term, result) in voters.ConfigureAwait(false)) - { - if (IsDisposingOrDisposed) - return; - // current node is outdated - if (term > Term) + var enumerator = voters.GetAsyncEnumerator(token); + try + { + while (await enumerator.MoveNextAsync().ConfigureAwait(false)) { - MoveToFollowerState(randomizeTimeout: false, term); - return; - } + var (member, term, result) = enumerator.Current; - switch (result) - { - case VotingResult.Canceled: // candidate timeout happened - MoveToFollowerState(randomizeTimeout: false); + if (IsDisposingOrDisposed) return; - case VotingResult.Granted: - Logger.VoteGranted(member.EndPoint); - votes += 1; - break; - case VotingResult.Rejected: - Logger.VoteRejected(member.EndPoint); - votes -= 1; - break; - case VotingResult.NotAvailable: - Logger.MemberUnavailable(member.EndPoint); - votes -= 1; - break; - } - if (!member.IsRemote) - localMember = member; + // current node is outdated + if (term > Term) + { + MoveToFollowerState(randomizeTimeout: false, term); + return; + } + + switch (result) + { + case true: + Logger.VoteGranted(member.EndPoint); + votes += 1; + break; + case false: + Logger.VoteRejected(member.EndPoint); + votes -= 1; + break; + default: + Logger.MemberUnavailable(member.EndPoint); + votes -= 1; + break; + } + + if (!member.IsRemote) + localMember = member; + } + } + catch (OperationCanceledException) + { + // candidate timeout happened + MoveToFollowerState(randomizeTimeout: false); + return; + } + finally + { + await enumerator.DisposeAsync().ConfigureAwait(false); } Logger.VotingCompleted(votes, Term); if (token.IsCancellationRequested || votes <= 0 || localMember is null) + { MoveToFollowerState(randomizeTimeout: true); // no clear consensus + } else + { MoveToLeaderState(localMember); // becomes a leader + } } /// @@ -133,7 +154,7 @@ protected override async ValueTask DisposeAsyncCore() { try { - votingCancellation.Cancel(); + votingCancellation.Cancel(throwOnFirstException: false); await (votingTask ?? Task.CompletedTask).ConfigureAwait(false); } catch (Exception e) @@ -151,6 +172,7 @@ protected override void Dispose(bool disposing) if (disposing) { votingCancellation.Dispose(); + votingTask = null; } base.Dispose(disposing); diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs index 1b04677e8d..ddf275720f 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs @@ -287,10 +287,12 @@ public long LastEntryIndex } /// - ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member) + ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member, CancellationToken token) { lastVote = BoxedClusterMemberId.Box(member); - return new(Interlocked.Increment(ref term)); + return token.IsCancellationRequested + ? ValueTask.FromCanceled(token) + : ValueTask.FromResult(Interlocked.Increment(ref term)); } /// @@ -348,20 +350,20 @@ public async ValueTask ReadAsync(ILogEntryConsumer - ValueTask IPersistentState.UpdateTermAsync(long value, bool resetLastVote) + ValueTask IPersistentState.UpdateTermAsync(long value, bool resetLastVote, CancellationToken token) { Term = value; if (resetLastVote) lastVote = null; - return new(); + return token.IsCancellationRequested ? ValueTask.FromCanceled(token) : ValueTask.CompletedTask; } /// - ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id) + ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id, CancellationToken token) { lastVote = BoxedClusterMemberId.Box(id); - return new(); + return token.IsCancellationRequested ? ValueTask.FromCanceled(token) : ValueTask.CompletedTask; } /// diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs index 32a38568c0..3065fc6523 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs @@ -6,15 +6,25 @@ namespace DotNext.Net.Cluster.Consensus.Raft; using Threading; -internal sealed class FollowerState(IRaftStateMachine stateMachine) : RaftState(stateMachine) +internal sealed class FollowerState : RaftState where TMember : class, IRaftClusterMember { - private readonly AsyncAutoResetEvent refreshEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags }; - private readonly AsyncManualResetEvent suppressionEvent = new(initialState: true) { MeasurementTags = stateMachine.MeasurementTags }; - private readonly CancellationTokenSource trackerCancellation = new(); + private readonly AsyncAutoResetEvent refreshEvent; + private readonly AsyncManualResetEvent suppressionEvent; + private readonly CancellationTokenSource trackerCancellation; + private readonly CancellationToken trackerCancellationToken; // cached to prevent ObjectDisposedException private Task? tracker; private volatile bool timedOut; + public FollowerState(IRaftStateMachine stateMachine) + : base(stateMachine) + { + refreshEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags }; + suppressionEvent = new(initialState: true) { MeasurementTags = stateMachine.MeasurementTags }; + trackerCancellation = new(); + trackerCancellationToken = trackerCancellation.Token; + } + private void SuspendTracking() { suppressionEvent.Reset(); @@ -23,18 +33,14 @@ private void SuspendTracking() private void ResumeTracking() => suppressionEvent.Set(); - private async Task Track(TimeSpan timeout, CancellationToken token) + private async Task Track(TimeSpan timeout) { - Debug.Assert(token != trackerCancellation.Token); - - using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, trackerCancellation.Token); - // spin loop to wait for the timeout - while (await refreshEvent.WaitAsync(timeout, tokenSource.Token).ConfigureAwait(false)) + while (await refreshEvent.WaitAsync(timeout, trackerCancellationToken).ConfigureAwait(false)) { // Transition can be suppressed. If so, resume the loop and reset the timer. // If the event is in signaled state then the returned task is completed synchronously. - await suppressionEvent.WaitAsync(tokenSource.Token).ConfigureAwait(false); + await suppressionEvent.WaitAsync(trackerCancellationToken).ConfigureAwait(false); } timedOut = true; @@ -50,19 +56,11 @@ private async Task Track(TimeSpan timeout, CancellationToken token) MoveToCandidateState(); } - internal void StartServing(TimeSpan timeout, CancellationToken token) + internal void StartServing(TimeSpan timeout) { - if (token.IsCancellationRequested) - { - trackerCancellation.Cancel(false); - tracker = null; - } - else - { - refreshEvent.Reset(); - timedOut = false; - tracker = Track(timeout, token); - } + refreshEvent.Reset(); + timedOut = false; + tracker = Track(timeout); FollowerState.TransitionRateMeter.Add(1, in MeasurementTags); } @@ -82,7 +80,7 @@ protected override async ValueTask DisposeAsyncCore() { try { - trackerCancellation.Cancel(false); + trackerCancellation.Cancel(throwOnFirstException: false); await (tracker ?? Task.CompletedTask).ConfigureAwait(false); } catch (Exception e) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs index e44dc9cbbf..3399a8bc88 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs @@ -26,8 +26,10 @@ public interface IPersistentState : IO.Log.IAuditTrail /// Increments value and persists the item that was voted for on in the last vote. /// /// The member which identifier should be stored inside of persistence storage. May be . + /// The token that can be used to cancel the operation. /// The updated value. - ValueTask IncrementTermAsync(ClusterMemberId member); + /// The operation has been canceled. + ValueTask IncrementTermAsync(ClusterMemberId member, CancellationToken token = default); /// /// Persists the last actual Term. @@ -37,15 +39,19 @@ public interface IPersistentState : IO.Log.IAuditTrail /// to reset information about the last vote; /// to keep information about the last vote unchanged. /// + /// The token that can be used to cancel the operation. /// The task representing asynchronous execution of the operation. - ValueTask UpdateTermAsync(long term, bool resetLastVote); + /// The operation has been canceled. + ValueTask UpdateTermAsync(long term, bool resetLastVote, CancellationToken token = default); /// /// Persists the item that was voted for on in the last vote. /// /// The member which identifier should be stored inside of persistence storage. + /// The token that can be used to cancel the operation. /// The task representing state of the asynchronous execution. - ValueTask UpdateVotedForAsync(ClusterMemberId member); + /// The operation has been canceled. + ValueTask UpdateVotedForAsync(ClusterMemberId member, CancellationToken token = default); internal static bool IsVotedFor(BoxedClusterMemberId? lastVote, in ClusterMemberId expected) => lastVote is null || lastVote.Value == expected; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs index 8753c3d789..b23d2301be 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs @@ -98,7 +98,7 @@ internal void Initialize( PrecedingTerm = precedingTerm; } - public virtual void Reset() + public void Reset() { replicationAwaiter = default; configuration = this; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs index 071061d698..9b567b9184 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs @@ -11,14 +11,14 @@ namespace DotNext.Net.Cluster.Consensus.Raft; using Membership; using Runtime.CompilerServices; using Threading.Tasks; -using static Threading.LinkedTokenSourceFactory; +using GCLatencyModeScope = Runtime.GCLatencyModeScope; internal sealed partial class LeaderState : RaftState where TMember : class, IRaftClusterMember { private readonly long currentTerm; private readonly CancellationTokenSource timerCancellation; - internal readonly CancellationToken LeadershipToken; // cached to avoid ObjectDisposedException + internal readonly CancellationToken LeadershipToken; // cached to prevent ObjectDisposedException private readonly Task> localMemberResponse; private Task? heartbeatTask; @@ -38,7 +38,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa replicatorFactory = localReplicatorFactory = CreateDefaultReplicator; } - private (long, long, int) ForkHeartbeats(TaskCompletionPipe>> responsePipe, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IEnumerator members, CancellationToken token) + private (long, long, int) ForkHeartbeats(TaskCompletionPipe>> responsePipe, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IEnumerator members) { Replicator replicator; Task> response; @@ -48,7 +48,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa var majority = 0; // send heartbeat in parallel - for (IClusterConfiguration? activeConfig = configurationStorage.ActiveConfiguration, proposedConfig = configurationStorage.ProposedConfiguration; members.MoveNext(); responsePipe.Add(response, replicator), majority++) + for (IClusterConfiguration? activeConfig = configurationStorage.ActiveConfiguration, proposedConfig = configurationStorage.ProposedConfiguration; members.MoveNext() && !LeadershipToken.IsCancellationRequested; responsePipe.Add(response, replicator), majority++) { var member = members.Current; if (member.IsRemote) @@ -58,7 +58,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa // fork replication procedure replicator = context.GetOrCreate(member, replicatorFactory); replicator.Initialize(activeConfig, proposedConfig, commitIndex, currentTerm, precedingIndex); - response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, token); + response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, LeadershipToken); } else { @@ -80,7 +80,9 @@ private MemberResponse ProcessMemberResponse(Task> response, Replic { result = response.GetAwaiter().GetResult(); detector?.ReportHeartbeat(); - return currentTerm >= result.Term ? MemberResponse.Successful : MemberResponse.HigherTermDetected; + return currentTerm >= result.Term + ? MemberResponse.Successful + : MemberResponse.HigherTermDetected; } catch (MemberUnavailableException) { @@ -120,45 +122,33 @@ private void CheckMemberHealthStatus(IFailureDetector? detector, TMember member) } [AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder))] - private async Task DoHeartbeats(TimeSpan period, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IReadOnlyCollection members, CancellationToken token) + private async Task DoHeartbeats(TimeSpan period, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IReadOnlyCollection members) { - var cancellationSource = token.LinkTo(LeadershipToken); - // cached enumerator allows to avoid memory allocation on every GetEnumerator call inside of the loop var enumerator = members.GetEnumerator(); try { var forced = false; - for (var responsePipe = new TaskCompletionPipe>>(); !token.IsCancellationRequested; responsePipe.Reset(), ReuseEnumerator(ref members, ref enumerator)) + for (var responsePipe = new TaskCompletionPipe>>(); !LeadershipToken.IsCancellationRequested; responsePipe.Reset(), ReuseEnumerator(ref members, ref enumerator)) { var startTime = new Timestamp(); // do not resume suspended callers that came after the barrier, resume them in the next iteration replicationQueue.SwitchValve(); - GCLatencyMode latencyMode; - if (forced) - { - // in case of forced (initiated programmatically, not by timeout) replication - // do not change GC latency. Otherwise, in case of high load GC is not able to collect garbage - Unsafe.SkipInit(out latencyMode); - } - else - { - // we want to minimize GC intrusion during replication process - // (however, it is still allowed in case of system-wide memory pressure, e.g. due to container limits) - latencyMode = GCSettings.LatencyMode; - GCSettings.LatencyMode = GCLatencyMode.SustainedLowLatency; - } - + // in case of forced (initiated programmatically, not by timeout) replication + // do not change GC latency. Otherwise, in case of high load GC is not able to collect garbage + var latencyScope = forced + ? default + : GCLatencyModeScope.SustainedLowLatency; try { // Perf: the code in this block is inlined instead of moved to separated method because // we want to prevent allocation of state machine on every call int quorum = 0, commitQuorum = 0, majority; - (long currentIndex, long commitIndex, majority) = ForkHeartbeats(responsePipe, auditTrail, configurationStorage, enumerator, token); + (long currentIndex, long commitIndex, majority) = ForkHeartbeats(responsePipe, auditTrail, configurationStorage, enumerator); - while (await responsePipe.WaitToReadAsync(token).ConfigureAwait(false)) + while (await responsePipe.WaitToReadAsync(LeadershipToken).ConfigureAwait(false)) { while (responsePipe.TryRead(out var response, out var replicator)) { @@ -179,13 +169,13 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi { RenewLease(startTime.Elapsed); UpdateLeaderStickiness(); - await configurationStorage.ApplyAsync(token).ConfigureAwait(false); + await configurationStorage.ApplyAsync(LeadershipToken).ConfigureAwait(false); } if (result.Value && ++commitQuorum == majority) { // majority of nodes accept entries with at least one entry from the current term - var count = await auditTrail.CommitAsync(currentIndex, token).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end + var count = await auditTrail.CommitAsync(currentIndex, LeadershipToken).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end Logger.CommitSuccessful(currentIndex, count); } } @@ -205,10 +195,7 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi finally { var broadcastTime = startTime.ElapsedMilliseconds; - - if (forced) - GCSettings.LatencyMode = latencyMode; - + latencyScope.Dispose(); LeaderState.BroadcastTimeMeter.Record(broadcastTime, MeasurementTags); } @@ -216,12 +203,11 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi replicationQueue.Drain(); // wait for heartbeat timeout or forced replication - forced = await WaitForReplicationAsync(startTime, period, token).ConfigureAwait(false); + forced = await WaitForReplicationAsync(startTime, period, LeadershipToken).ConfigureAwait(false); } } finally { - cancellationSource?.Dispose(); enumerator.Dispose(); } } @@ -247,8 +233,7 @@ private void ReuseEnumerator(ref IReadOnlyCollection currentList, ref I /// Time period of Heartbeats. /// Transaction log. /// Cluster configuration storage. - /// The toke that can be used to cancel the operation. - internal void StartLeading(TimeSpan period, IAuditTrail transactionLog, IClusterConfigurationStorage configurationStorage, CancellationToken token) + internal void StartLeading(TimeSpan period, IAuditTrail transactionLog, IClusterConfigurationStorage configurationStorage) { var members = Members; context = new(members.Count); @@ -262,7 +247,7 @@ internal void StartLeading(TimeSpan period, IAuditTrail transacti member.State = state; } - heartbeatTask = DoHeartbeats(period, transactionLog, configurationStorage, members, token); + heartbeatTask = DoHeartbeats(period, transactionLog, configurationStorage, members); LeaderState.TransitionRateMeter.Add(1, in MeasurementTags); } @@ -270,7 +255,7 @@ protected override async ValueTask DisposeAsyncCore() { try { - timerCancellation.Cancel(false); + timerCancellation.Cancel(throwOnFirstException: false); replicationEvent.CancelSuspendedCallers(LeadershipToken); await (heartbeatTask ?? Task.CompletedTask).ConfigureAwait(false); // may throw OperationCanceledException } @@ -298,6 +283,7 @@ protected override void Dispose(bool disposing) replicationEvent.Dispose(); context.Dispose(); + heartbeatTask = null; } base.Dispose(disposing); diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs index 3051d78891..4f308074b2 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs @@ -869,10 +869,10 @@ private protected long GetCommitIndexAndCount(ref long commitIndex) public long Term => state.Term; /// - async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member) + async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member, CancellationToken token) { long result; - await syncRoot.AcquireAsync(LockType.WriteLock).ConfigureAwait(false); + await syncRoot.AcquireAsync(LockType.WriteLock, token).ConfigureAwait(false); try { result = state.IncrementTerm(member); @@ -887,9 +887,9 @@ async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member } /// - async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote) + async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote, CancellationToken token) { - await syncRoot.AcquireAsync(LockType.WriteLock).ConfigureAwait(false); + await syncRoot.AcquireAsync(LockType.WriteLock, token).ConfigureAwait(false); try { state.UpdateTerm(term, resetLastVote); @@ -902,9 +902,9 @@ async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote) } /// - async ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id) + async ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id, CancellationToken token) { - await syncRoot.AcquireAsync(LockType.WriteLock).ConfigureAwait(false); + await syncRoot.AcquireAsync(LockType.WriteLock, token).ConfigureAwait(false); try { state.UpdateVotedFor(id); diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs index df5982784e..bc1dd52149 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs @@ -296,7 +296,7 @@ async ValueTask UnfreezeCoreAsync() { var newState = new FollowerState(this); await UpdateStateAsync(newState).ConfigureAwait(false); - newState.StartServing(ElectionTimeout, LifecycleToken); + newState.StartServing(ElectionTimeout); readinessProbe.TrySetResult(); } } @@ -341,7 +341,7 @@ public virtual async Task StartAsync(CancellationToken token) /// /// Starts Follower timer. /// - protected void StartFollowing() => (state as FollowerState)?.StartServing(ElectionTimeout, LifecycleToken); + protected void StartFollowing() => (state as FollowerState)?.StartServing(ElectionTimeout); /// public async ValueTask RevertToNormalModeAsync(CancellationToken token = default) @@ -362,7 +362,7 @@ public async ValueTask RevertToNormalModeAsync(CancellationToken token = d { var newState = new FollowerState(this); await UpdateStateAsync(newState).ConfigureAwait(false); - newState.StartServing(ElectionTimeout, LifecycleToken); + newState.StartServing(ElectionTimeout); return true; } } @@ -485,7 +485,7 @@ private ValueTask StepDown(long newTerm) async ValueTask UpdateTermAndStepDownAsync(long newTerm) { - await auditTrail.UpdateTermAsync(newTerm, true).ConfigureAwait(false); + await auditTrail.UpdateTermAsync(newTerm, true, LifecycleToken).ConfigureAwait(false); await StepDown().ConfigureAwait(false); } } @@ -502,7 +502,7 @@ private async ValueTask StepDown() case LeaderState or CandidateState: var newState = new FollowerState(this); await UpdateStateAsync(newState).ConfigureAwait(false); - newState.StartServing(ElectionTimeout, LifecycleToken); + newState.StartServing(ElectionTimeout); break; } @@ -802,7 +802,7 @@ protected async ValueTask> VoteAsync(ClusterMemberId sender, long s if (auditTrail.IsVotedFor(sender) && await auditTrail.IsUpToDateAsync(lastLogIndex, lastLogTerm, token).ConfigureAwait(false)) { - await auditTrail.UpdateVotedForAsync(sender).ConfigureAwait(false); + await auditTrail.UpdateVotedForAsync(sender, token).ConfigureAwait(false); result = result with { Value = true }; } } @@ -838,7 +838,7 @@ protected async ValueTask ResignAsync(CancellationToken token) var newState = new FollowerState(this); await UpdateStateAsync(newState).ConfigureAwait(false); Leader = null; - newState.StartServing(ElectionTimeout, LifecycleToken); + newState.StartServing(ElectionTimeout); return true; } } @@ -1008,7 +1008,7 @@ async void IRaftStateMachine.MoveToCandidateState(IRaftStateMachine.IWe if (readyForTransition && TryGetLocalMember()?.Id is { } localMemberId) { - var newState = new CandidateState(this, await auditTrail.IncrementTermAsync(localMemberId).ConfigureAwait(false)); + var newState = new CandidateState(this, await auditTrail.IncrementTermAsync(localMemberId, LifecycleToken).ConfigureAwait(false)); await UpdateStateAsync(newState).ConfigureAwait(false); // vote for self @@ -1018,7 +1018,7 @@ async void IRaftStateMachine.MoveToCandidateState(IRaftStateMachine.IWe else { // resume follower state - followerState.StartServing(ElectionTimeout, LifecycleToken); + followerState.StartServing(ElectionTimeout); Logger.DowngradedToFollowerState(Term); } } @@ -1047,7 +1047,7 @@ async void IRaftStateMachine.MoveToCandidateState(IRaftStateMachine.IWe Task IsReadyForTransitionAsync(long currentTerm) => state is FollowerState { IsExpired: true, IsRefreshRequested: false } followerState && callerState.IsValid(followerState) ? PreVoteAsync(currentTerm) - : Task.FromResult(false); + : Task.FromResult(false); } /// @@ -1073,7 +1073,7 @@ async void IRaftStateMachine.MoveToLeaderState(IRaftStateMachine.IWeakC Leader = newLeader; await auditTrail.AppendNoOpEntry(LifecycleToken).ConfigureAwait(false); - newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage, LifecycleToken); + newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage); Logger.TransitionToLeaderStateCompleted(currentTerm); }