diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2726a126b..6dc45d71c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,28 @@
Release Notes
====
+# 02-25-2024
+DotNext 5.0.3
+* Fixed behavior to no-op when `GCLatencyModeScope` is initialized to default
+
+DotNext.Metaprogramming 5.0.3
+* Updated dependencies
+
+DotNext.Unsafe 5.0.3
+* Updated dependencies
+
+DotNext.Threading 5.0.3
+* Updated dependencies
+
+DotNext.IO 5.0.3
+* Updated dependencies
+
+DotNext.Net.Cluster 5.0.3
+* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221)
+
+DotNext.AspNetCore.Cluster 5.0.3
+* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221)
+
# 02-17-2024
DotNext 5.0.2
* Fixed XML docs
diff --git a/README.md b/README.md
index e11bc6c5d..0a564aeee 100644
--- a/README.md
+++ b/README.md
@@ -44,28 +44,28 @@ All these things are implemented in 100% managed code on top of existing .NET AP
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)
# What's new
-Release Date: 02-17-2024
+Release Date: 02-25-2024
-DotNext 5.0.2
-* Fixed XML docs
+DotNext 5.0.3
+* Fixed behavior to no-op when `GCLatencyModeScope` is initialized to default
-DotNext.Metaprogramming 5.0.2
-* Fixed [223](https://github.com/dotnet/dotNext/issues/223)
+DotNext.Metaprogramming 5.0.3
+* Updated dependencies
-DotNext.Unsafe 5.0.2
+DotNext.Unsafe 5.0.3
* Updated dependencies
-DotNext.Threading 5.0.2
-* Added correct validation for maximum possible timeout for all `WaitAsync` methods
+DotNext.Threading 5.0.3
+* Updated dependencies
-DotNext.IO 5.0.2
+DotNext.IO 5.0.3
* Updated dependencies
-DotNext.Net.Cluster 5.0.2
-* Prevent indexing of WAL files on Windows
+DotNext.Net.Cluster 5.0.3
+* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221)
-DotNext.AspNetCore.Cluster 5.0.2
-* Updated dependencies
+DotNext.AspNetCore.Cluster 5.0.3
+* Attempt to fix [221](https://github.com/dotnet/dotNext/issues/221)
Changelog for previous versions located [here](./CHANGELOG.md).
diff --git a/src/DotNext.IO/DotNext.IO.csproj b/src/DotNext.IO/DotNext.IO.csproj
index 3eebc2074..eb282ab74 100644
--- a/src/DotNext.IO/DotNext.IO.csproj
+++ b/src/DotNext.IO/DotNext.IO.csproj
@@ -11,7 +11,7 @@
.NET Foundation and Contributors
.NEXT Family of Libraries
- 5.0.2
+ 5.0.3
DotNext.IO
MIT
diff --git a/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj b/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj
index e9e1ad8e0..befc25c6f 100644
--- a/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj
+++ b/src/DotNext.Metaprogramming/DotNext.Metaprogramming.csproj
@@ -8,7 +8,7 @@
true
false
nullablePublicOnly
- 5.0.2
+ 5.0.3
.NET Foundation
.NEXT Family of Libraries
diff --git a/src/DotNext.Threading/DotNext.Threading.csproj b/src/DotNext.Threading/DotNext.Threading.csproj
index 8794d3a67..6122db3c5 100644
--- a/src/DotNext.Threading/DotNext.Threading.csproj
+++ b/src/DotNext.Threading/DotNext.Threading.csproj
@@ -7,7 +7,7 @@
true
true
nullablePublicOnly
- 5.0.2
+ 5.0.3
.NET Foundation and Contributors
.NEXT Family of Libraries
diff --git a/src/DotNext.Unsafe/DotNext.Unsafe.csproj b/src/DotNext.Unsafe/DotNext.Unsafe.csproj
index 291a3ed08..a185cb01b 100644
--- a/src/DotNext.Unsafe/DotNext.Unsafe.csproj
+++ b/src/DotNext.Unsafe/DotNext.Unsafe.csproj
@@ -7,7 +7,7 @@
enable
true
true
- 5.0.2
+ 5.0.3
nullablePublicOnly
.NET Foundation and Contributors
diff --git a/src/DotNext/DotNext.csproj b/src/DotNext/DotNext.csproj
index 7218a2535..d20523fe3 100644
--- a/src/DotNext/DotNext.csproj
+++ b/src/DotNext/DotNext.csproj
@@ -11,7 +11,7 @@
.NET Foundation and Contributors
.NEXT Family of Libraries
- 5.0.2
+ 5.0.3
DotNext
MIT
diff --git a/src/DotNext/Runtime/GCLatencyModeScope.cs b/src/DotNext/Runtime/GCLatencyModeScope.cs
index b4671040a..3e01b1cb7 100644
--- a/src/DotNext/Runtime/GCLatencyModeScope.cs
+++ b/src/DotNext/Runtime/GCLatencyModeScope.cs
@@ -9,7 +9,7 @@ namespace DotNext.Runtime;
[StructLayout(LayoutKind.Auto)]
public readonly struct GCLatencyModeScope : IDisposable
{
- private readonly GCLatencyMode currentMode;
+ private readonly GCLatencyMode? currentMode;
///
/// Initializes a new scope that affects GC intrusion level.
@@ -24,7 +24,11 @@ public GCLatencyModeScope(GCLatencyMode mode)
///
/// Cancels previously defined GC latency.
///
- public void Dispose() => GCSettings.LatencyMode = currentMode;
+ public void Dispose()
+ {
+ if (currentMode.HasValue)
+ GCSettings.LatencyMode = currentMode.GetValueOrDefault();
+ }
///
/// Creates a scope with GC intrusion level.
diff --git a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj
index 9af211a48..ed24bdd05 100644
--- a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj
+++ b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj
@@ -8,7 +8,7 @@
true
true
nullablePublicOnly
- 5.0.2
+ 5.0.3
.NET Foundation and Contributors
.NEXT Family of Libraries
diff --git a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
index f2c8e0e8f..c4d7eb6fe 100644
--- a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
+++ b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj
@@ -8,7 +8,7 @@
enable
true
nullablePublicOnly
- 5.0.2
+ 5.0.3
.NET Foundation and Contributors
.NEXT Family of Libraries
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs
index 345957dd0..c40a3080a 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CandidateState.cs
@@ -7,63 +7,66 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
using Runtime.CompilerServices;
using Threading.Tasks;
-internal sealed class CandidateState(IRaftStateMachine stateMachine, long term) : RaftState(stateMachine)
+internal sealed class CandidateState : RaftState
where TMember : class, IRaftClusterMember
{
- private enum VotingResult : byte
+ private readonly CancellationTokenSource votingCancellation;
+ private readonly CancellationToken votingCancellationToken; // cached to prevent ObjectDisposedException
+ internal readonly long Term;
+ private Task? votingTask;
+
+ public CandidateState(IRaftStateMachine stateMachine, long term)
+ : base(stateMachine)
{
- Rejected = 0,
- Granted,
- Canceled,
- NotAvailable,
+ Term = term;
+ votingCancellation = new();
+ votingCancellationToken = votingCancellation.Token;
}
- private readonly CancellationTokenSource votingCancellation = new();
- internal readonly long Term = term;
- private Task? votingTask;
-
+ [AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder))]
private async Task VoteAsync(TimeSpan timeout, IAuditTrail auditTrail)
{
// Perf: reuse index and related term once for all members
var lastIndex = auditTrail.LastEntryIndex;
- var lastTerm = await auditTrail.GetTermAsync(lastIndex, votingCancellation.Token).ConfigureAwait(false);
+ var lastTerm = await auditTrail.GetTermAsync(lastIndex, votingCancellationToken).ConfigureAwait(false);
// start voting in parallel
- var voters = StartVoting(Members, Term, lastIndex, lastTerm, votingCancellation.Token);
+ var voters = StartVoting(Members, Term, lastIndex, lastTerm, votingCancellationToken);
votingCancellation.CancelAfter(timeout);
// finish voting
await EndVoting(voters.GetConsumer(), votingCancellation.Token).ConfigureAwait(false);
- static TaskCompletionPipe> StartVoting(IReadOnlyCollection members, long currentTerm, long lastIndex, long lastTerm, CancellationToken token)
+ static TaskCompletionPipe> StartVoting(IReadOnlyCollection members, long currentTerm, long lastIndex, long lastTerm, CancellationToken token)
{
- var voters = new TaskCompletionPipe>();
+ var voters = new TaskCompletionPipe>();
// start voting in parallel
- foreach (var member in members)
- voters.Add(VoteAsync(member, currentTerm, lastIndex, lastTerm, token));
+ using (var enumerator = members.GetEnumerator())
+ {
+ while (enumerator.MoveNext() && !token.IsCancellationRequested)
+ {
+ voters.Add(VoteAsync(enumerator.Current, currentTerm, lastIndex, lastTerm, token));
+ }
+ }
voters.Complete();
return voters;
}
[AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder<>))]
- static async Task<(TMember, long, VotingResult)> VoteAsync(TMember voter, long currentTerm, long lastIndex, long lastTerm, CancellationToken token)
+ static async Task<(TMember, long, bool?)> VoteAsync(TMember voter, long currentTerm, long lastIndex, long lastTerm, CancellationToken token)
{
- VotingResult result;
+ bool? result;
try
{
var response = await voter.VoteAsync(currentTerm, lastIndex, lastTerm, token).ConfigureAwait(false);
currentTerm = response.Term;
- result = response.Value ? VotingResult.Granted : VotingResult.Rejected;
- }
- catch (OperationCanceledException)
- {
- result = VotingResult.Canceled;
+ result = response.Value;
}
catch (MemberUnavailableException)
{
- result = VotingResult.NotAvailable;
+ result = null;
currentTerm = -1L;
}
@@ -71,50 +74,68 @@ private async Task VoteAsync(TimeSpan timeout, IAuditTrail auditT
}
}
- private async Task EndVoting(IAsyncEnumerable<(TMember, long, VotingResult)> voters, CancellationToken token)
+ private async Task EndVoting(TaskCompletionPipe.Consumer<(TMember, long, bool?)> voters, CancellationToken token)
{
var votes = 0;
var localMember = default(TMember);
- await foreach (var (member, term, result) in voters.ConfigureAwait(false))
- {
- if (IsDisposingOrDisposed)
- return;
- // current node is outdated
- if (term > Term)
+ var enumerator = voters.GetAsyncEnumerator(token);
+ try
+ {
+ while (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
- MoveToFollowerState(randomizeTimeout: false, term);
- return;
- }
+ var (member, term, result) = enumerator.Current;
- switch (result)
- {
- case VotingResult.Canceled: // candidate timeout happened
- MoveToFollowerState(randomizeTimeout: false);
+ if (IsDisposingOrDisposed)
return;
- case VotingResult.Granted:
- Logger.VoteGranted(member.EndPoint);
- votes += 1;
- break;
- case VotingResult.Rejected:
- Logger.VoteRejected(member.EndPoint);
- votes -= 1;
- break;
- case VotingResult.NotAvailable:
- Logger.MemberUnavailable(member.EndPoint);
- votes -= 1;
- break;
- }
- if (!member.IsRemote)
- localMember = member;
+ // current node is outdated
+ if (term > Term)
+ {
+ MoveToFollowerState(randomizeTimeout: false, term);
+ return;
+ }
+
+ switch (result)
+ {
+ case true:
+ Logger.VoteGranted(member.EndPoint);
+ votes += 1;
+ break;
+ case false:
+ Logger.VoteRejected(member.EndPoint);
+ votes -= 1;
+ break;
+ default:
+ Logger.MemberUnavailable(member.EndPoint);
+ votes -= 1;
+ break;
+ }
+
+ if (!member.IsRemote)
+ localMember = member;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // candidate timeout happened
+ MoveToFollowerState(randomizeTimeout: false);
+ return;
+ }
+ finally
+ {
+ await enumerator.DisposeAsync().ConfigureAwait(false);
}
Logger.VotingCompleted(votes, Term);
if (token.IsCancellationRequested || votes <= 0 || localMember is null)
+ {
MoveToFollowerState(randomizeTimeout: true); // no clear consensus
+ }
else
+ {
MoveToLeaderState(localMember); // becomes a leader
+ }
}
///
@@ -133,7 +154,7 @@ protected override async ValueTask DisposeAsyncCore()
{
try
{
- votingCancellation.Cancel();
+ votingCancellation.Cancel(throwOnFirstException: false);
await (votingTask ?? Task.CompletedTask).ConfigureAwait(false);
}
catch (Exception e)
@@ -151,6 +172,7 @@ protected override void Dispose(bool disposing)
if (disposing)
{
votingCancellation.Dispose();
+ votingTask = null;
}
base.Dispose(disposing);
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs
index 1b04677e8..ddf275720 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/ConsensusOnlyState.cs
@@ -287,10 +287,12 @@ public long LastEntryIndex
}
///
- ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member)
+ ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member, CancellationToken token)
{
lastVote = BoxedClusterMemberId.Box(member);
- return new(Interlocked.Increment(ref term));
+ return token.IsCancellationRequested
+ ? ValueTask.FromCanceled(token)
+ : ValueTask.FromResult(Interlocked.Increment(ref term));
}
///
@@ -348,20 +350,20 @@ public async ValueTask ReadAsync(ILogEntryConsumer
- ValueTask IPersistentState.UpdateTermAsync(long value, bool resetLastVote)
+ ValueTask IPersistentState.UpdateTermAsync(long value, bool resetLastVote, CancellationToken token)
{
Term = value;
if (resetLastVote)
lastVote = null;
- return new();
+ return token.IsCancellationRequested ? ValueTask.FromCanceled(token) : ValueTask.CompletedTask;
}
///
- ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id)
+ ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id, CancellationToken token)
{
lastVote = BoxedClusterMemberId.Box(id);
- return new();
+ return token.IsCancellationRequested ? ValueTask.FromCanceled(token) : ValueTask.CompletedTask;
}
///
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs
index 32a38568c..3065fc652 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/FollowerState.cs
@@ -6,15 +6,25 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
using Threading;
-internal sealed class FollowerState(IRaftStateMachine stateMachine) : RaftState(stateMachine)
+internal sealed class FollowerState : RaftState
where TMember : class, IRaftClusterMember
{
- private readonly AsyncAutoResetEvent refreshEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags };
- private readonly AsyncManualResetEvent suppressionEvent = new(initialState: true) { MeasurementTags = stateMachine.MeasurementTags };
- private readonly CancellationTokenSource trackerCancellation = new();
+ private readonly AsyncAutoResetEvent refreshEvent;
+ private readonly AsyncManualResetEvent suppressionEvent;
+ private readonly CancellationTokenSource trackerCancellation;
+ private readonly CancellationToken trackerCancellationToken; // cached to prevent ObjectDisposedException
private Task? tracker;
private volatile bool timedOut;
+ public FollowerState(IRaftStateMachine stateMachine)
+ : base(stateMachine)
+ {
+ refreshEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags };
+ suppressionEvent = new(initialState: true) { MeasurementTags = stateMachine.MeasurementTags };
+ trackerCancellation = new();
+ trackerCancellationToken = trackerCancellation.Token;
+ }
+
private void SuspendTracking()
{
suppressionEvent.Reset();
@@ -23,18 +33,14 @@ private void SuspendTracking()
private void ResumeTracking() => suppressionEvent.Set();
- private async Task Track(TimeSpan timeout, CancellationToken token)
+ private async Task Track(TimeSpan timeout)
{
- Debug.Assert(token != trackerCancellation.Token);
-
- using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, trackerCancellation.Token);
-
// spin loop to wait for the timeout
- while (await refreshEvent.WaitAsync(timeout, tokenSource.Token).ConfigureAwait(false))
+ while (await refreshEvent.WaitAsync(timeout, trackerCancellationToken).ConfigureAwait(false))
{
// Transition can be suppressed. If so, resume the loop and reset the timer.
// If the event is in signaled state then the returned task is completed synchronously.
- await suppressionEvent.WaitAsync(tokenSource.Token).ConfigureAwait(false);
+ await suppressionEvent.WaitAsync(trackerCancellationToken).ConfigureAwait(false);
}
timedOut = true;
@@ -50,19 +56,11 @@ private async Task Track(TimeSpan timeout, CancellationToken token)
MoveToCandidateState();
}
- internal void StartServing(TimeSpan timeout, CancellationToken token)
+ internal void StartServing(TimeSpan timeout)
{
- if (token.IsCancellationRequested)
- {
- trackerCancellation.Cancel(false);
- tracker = null;
- }
- else
- {
- refreshEvent.Reset();
- timedOut = false;
- tracker = Track(timeout, token);
- }
+ refreshEvent.Reset();
+ timedOut = false;
+ tracker = Track(timeout);
FollowerState.TransitionRateMeter.Add(1, in MeasurementTags);
}
@@ -82,7 +80,7 @@ protected override async ValueTask DisposeAsyncCore()
{
try
{
- trackerCancellation.Cancel(false);
+ trackerCancellation.Cancel(throwOnFirstException: false);
await (tracker ?? Task.CompletedTask).ConfigureAwait(false);
}
catch (Exception e)
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs
index e44dc9cbb..3399a8bc8 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IPersistentState.cs
@@ -26,8 +26,10 @@ public interface IPersistentState : IO.Log.IAuditTrail
/// Increments value and persists the item that was voted for on in the last vote.
///
/// The member which identifier should be stored inside of persistence storage. May be .
+ /// The token that can be used to cancel the operation.
/// The updated value.
- ValueTask IncrementTermAsync(ClusterMemberId member);
+ /// The operation has been canceled.
+ ValueTask IncrementTermAsync(ClusterMemberId member, CancellationToken token = default);
///
/// Persists the last actual Term.
@@ -37,15 +39,19 @@ public interface IPersistentState : IO.Log.IAuditTrail
/// to reset information about the last vote;
/// to keep information about the last vote unchanged.
///
+ /// The token that can be used to cancel the operation.
/// The task representing asynchronous execution of the operation.
- ValueTask UpdateTermAsync(long term, bool resetLastVote);
+ /// The operation has been canceled.
+ ValueTask UpdateTermAsync(long term, bool resetLastVote, CancellationToken token = default);
///
/// Persists the item that was voted for on in the last vote.
///
/// The member which identifier should be stored inside of persistence storage.
+ /// The token that can be used to cancel the operation.
/// The task representing state of the asynchronous execution.
- ValueTask UpdateVotedForAsync(ClusterMemberId member);
+ /// The operation has been canceled.
+ ValueTask UpdateVotedForAsync(ClusterMemberId member, CancellationToken token = default);
internal static bool IsVotedFor(BoxedClusterMemberId? lastVote, in ClusterMemberId expected)
=> lastVote is null || lastVote.Value == expected;
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs
index 8753c3d78..b23d2301b 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs
@@ -98,7 +98,7 @@ internal void Initialize(
PrecedingTerm = precedingTerm;
}
- public virtual void Reset()
+ public void Reset()
{
replicationAwaiter = default;
configuration = this;
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs
index 071061d69..9b567b918 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs
@@ -11,14 +11,14 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
using Membership;
using Runtime.CompilerServices;
using Threading.Tasks;
-using static Threading.LinkedTokenSourceFactory;
+using GCLatencyModeScope = Runtime.GCLatencyModeScope;
internal sealed partial class LeaderState : RaftState
where TMember : class, IRaftClusterMember
{
private readonly long currentTerm;
private readonly CancellationTokenSource timerCancellation;
- internal readonly CancellationToken LeadershipToken; // cached to avoid ObjectDisposedException
+ internal readonly CancellationToken LeadershipToken; // cached to prevent ObjectDisposedException
private readonly Task> localMemberResponse;
private Task? heartbeatTask;
@@ -38,7 +38,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
replicatorFactory = localReplicatorFactory = CreateDefaultReplicator;
}
- private (long, long, int) ForkHeartbeats(TaskCompletionPipe>> responsePipe, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IEnumerator members, CancellationToken token)
+ private (long, long, int) ForkHeartbeats(TaskCompletionPipe>> responsePipe, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IEnumerator members)
{
Replicator replicator;
Task> response;
@@ -48,7 +48,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
var majority = 0;
// send heartbeat in parallel
- for (IClusterConfiguration? activeConfig = configurationStorage.ActiveConfiguration, proposedConfig = configurationStorage.ProposedConfiguration; members.MoveNext(); responsePipe.Add(response, replicator), majority++)
+ for (IClusterConfiguration? activeConfig = configurationStorage.ActiveConfiguration, proposedConfig = configurationStorage.ProposedConfiguration; members.MoveNext() && !LeadershipToken.IsCancellationRequested; responsePipe.Add(response, replicator), majority++)
{
var member = members.Current;
if (member.IsRemote)
@@ -58,7 +58,7 @@ internal LeaderState(IRaftStateMachine stateMachine, long term, TimeSpa
// fork replication procedure
replicator = context.GetOrCreate(member, replicatorFactory);
replicator.Initialize(activeConfig, proposedConfig, commitIndex, currentTerm, precedingIndex);
- response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, token);
+ response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, LeadershipToken);
}
else
{
@@ -80,7 +80,9 @@ private MemberResponse ProcessMemberResponse(Task> response, Replic
{
result = response.GetAwaiter().GetResult();
detector?.ReportHeartbeat();
- return currentTerm >= result.Term ? MemberResponse.Successful : MemberResponse.HigherTermDetected;
+ return currentTerm >= result.Term
+ ? MemberResponse.Successful
+ : MemberResponse.HigherTermDetected;
}
catch (MemberUnavailableException)
{
@@ -120,45 +122,33 @@ private void CheckMemberHealthStatus(IFailureDetector? detector, TMember member)
}
[AsyncMethodBuilder(typeof(SpawningAsyncTaskMethodBuilder))]
- private async Task DoHeartbeats(TimeSpan period, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IReadOnlyCollection members, CancellationToken token)
+ private async Task DoHeartbeats(TimeSpan period, IAuditTrail auditTrail, IClusterConfigurationStorage configurationStorage, IReadOnlyCollection members)
{
- var cancellationSource = token.LinkTo(LeadershipToken);
-
// cached enumerator allows to avoid memory allocation on every GetEnumerator call inside of the loop
var enumerator = members.GetEnumerator();
try
{
var forced = false;
- for (var responsePipe = new TaskCompletionPipe>>(); !token.IsCancellationRequested; responsePipe.Reset(), ReuseEnumerator(ref members, ref enumerator))
+ for (var responsePipe = new TaskCompletionPipe>>(); !LeadershipToken.IsCancellationRequested; responsePipe.Reset(), ReuseEnumerator(ref members, ref enumerator))
{
var startTime = new Timestamp();
// do not resume suspended callers that came after the barrier, resume them in the next iteration
replicationQueue.SwitchValve();
- GCLatencyMode latencyMode;
- if (forced)
- {
- // in case of forced (initiated programmatically, not by timeout) replication
- // do not change GC latency. Otherwise, in case of high load GC is not able to collect garbage
- Unsafe.SkipInit(out latencyMode);
- }
- else
- {
- // we want to minimize GC intrusion during replication process
- // (however, it is still allowed in case of system-wide memory pressure, e.g. due to container limits)
- latencyMode = GCSettings.LatencyMode;
- GCSettings.LatencyMode = GCLatencyMode.SustainedLowLatency;
- }
-
+ // in case of forced (initiated programmatically, not by timeout) replication
+ // do not change GC latency. Otherwise, in case of high load GC is not able to collect garbage
+ var latencyScope = forced
+ ? default
+ : GCLatencyModeScope.SustainedLowLatency;
try
{
// Perf: the code in this block is inlined instead of moved to separated method because
// we want to prevent allocation of state machine on every call
int quorum = 0, commitQuorum = 0, majority;
- (long currentIndex, long commitIndex, majority) = ForkHeartbeats(responsePipe, auditTrail, configurationStorage, enumerator, token);
+ (long currentIndex, long commitIndex, majority) = ForkHeartbeats(responsePipe, auditTrail, configurationStorage, enumerator);
- while (await responsePipe.WaitToReadAsync(token).ConfigureAwait(false))
+ while (await responsePipe.WaitToReadAsync(LeadershipToken).ConfigureAwait(false))
{
while (responsePipe.TryRead(out var response, out var replicator))
{
@@ -179,13 +169,13 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi
{
RenewLease(startTime.Elapsed);
UpdateLeaderStickiness();
- await configurationStorage.ApplyAsync(token).ConfigureAwait(false);
+ await configurationStorage.ApplyAsync(LeadershipToken).ConfigureAwait(false);
}
if (result.Value && ++commitQuorum == majority)
{
// majority of nodes accept entries with at least one entry from the current term
- var count = await auditTrail.CommitAsync(currentIndex, token).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end
+ var count = await auditTrail.CommitAsync(currentIndex, LeadershipToken).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end
Logger.CommitSuccessful(currentIndex, count);
}
}
@@ -205,10 +195,7 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi
finally
{
var broadcastTime = startTime.ElapsedMilliseconds;
-
- if (forced)
- GCSettings.LatencyMode = latencyMode;
-
+ latencyScope.Dispose();
LeaderState.BroadcastTimeMeter.Record(broadcastTime, MeasurementTags);
}
@@ -216,12 +203,11 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail audi
replicationQueue.Drain();
// wait for heartbeat timeout or forced replication
- forced = await WaitForReplicationAsync(startTime, period, token).ConfigureAwait(false);
+ forced = await WaitForReplicationAsync(startTime, period, LeadershipToken).ConfigureAwait(false);
}
}
finally
{
- cancellationSource?.Dispose();
enumerator.Dispose();
}
}
@@ -247,8 +233,7 @@ private void ReuseEnumerator(ref IReadOnlyCollection currentList, ref I
/// Time period of Heartbeats.
/// Transaction log.
/// Cluster configuration storage.
- /// The toke that can be used to cancel the operation.
- internal void StartLeading(TimeSpan period, IAuditTrail transactionLog, IClusterConfigurationStorage configurationStorage, CancellationToken token)
+ internal void StartLeading(TimeSpan period, IAuditTrail transactionLog, IClusterConfigurationStorage configurationStorage)
{
var members = Members;
context = new(members.Count);
@@ -262,7 +247,7 @@ internal void StartLeading(TimeSpan period, IAuditTrail transacti
member.State = state;
}
- heartbeatTask = DoHeartbeats(period, transactionLog, configurationStorage, members, token);
+ heartbeatTask = DoHeartbeats(period, transactionLog, configurationStorage, members);
LeaderState.TransitionRateMeter.Add(1, in MeasurementTags);
}
@@ -270,7 +255,7 @@ protected override async ValueTask DisposeAsyncCore()
{
try
{
- timerCancellation.Cancel(false);
+ timerCancellation.Cancel(throwOnFirstException: false);
replicationEvent.CancelSuspendedCallers(LeadershipToken);
await (heartbeatTask ?? Task.CompletedTask).ConfigureAwait(false); // may throw OperationCanceledException
}
@@ -298,6 +283,7 @@ protected override void Dispose(bool disposing)
replicationEvent.Dispose();
context.Dispose();
+ heartbeatTask = null;
}
base.Dispose(disposing);
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs
index 3051d7889..4f308074b 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs
@@ -869,10 +869,10 @@ private protected long GetCommitIndexAndCount(ref long commitIndex)
public long Term => state.Term;
///
- async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member)
+ async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member, CancellationToken token)
{
long result;
- await syncRoot.AcquireAsync(LockType.WriteLock).ConfigureAwait(false);
+ await syncRoot.AcquireAsync(LockType.WriteLock, token).ConfigureAwait(false);
try
{
result = state.IncrementTerm(member);
@@ -887,9 +887,9 @@ async ValueTask IPersistentState.IncrementTermAsync(ClusterMemberId member
}
///
- async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote)
+ async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote, CancellationToken token)
{
- await syncRoot.AcquireAsync(LockType.WriteLock).ConfigureAwait(false);
+ await syncRoot.AcquireAsync(LockType.WriteLock, token).ConfigureAwait(false);
try
{
state.UpdateTerm(term, resetLastVote);
@@ -902,9 +902,9 @@ async ValueTask IPersistentState.UpdateTermAsync(long term, bool resetLastVote)
}
///
- async ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id)
+ async ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId id, CancellationToken token)
{
- await syncRoot.AcquireAsync(LockType.WriteLock).ConfigureAwait(false);
+ await syncRoot.AcquireAsync(LockType.WriteLock, token).ConfigureAwait(false);
try
{
state.UpdateVotedFor(id);
diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs
index df5982784..bc1dd5214 100644
--- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs
+++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs
@@ -296,7 +296,7 @@ async ValueTask UnfreezeCoreAsync()
{
var newState = new FollowerState(this);
await UpdateStateAsync(newState).ConfigureAwait(false);
- newState.StartServing(ElectionTimeout, LifecycleToken);
+ newState.StartServing(ElectionTimeout);
readinessProbe.TrySetResult();
}
}
@@ -341,7 +341,7 @@ public virtual async Task StartAsync(CancellationToken token)
///
/// Starts Follower timer.
///
- protected void StartFollowing() => (state as FollowerState)?.StartServing(ElectionTimeout, LifecycleToken);
+ protected void StartFollowing() => (state as FollowerState)?.StartServing(ElectionTimeout);
///
public async ValueTask RevertToNormalModeAsync(CancellationToken token = default)
@@ -362,7 +362,7 @@ public async ValueTask RevertToNormalModeAsync(CancellationToken token = d
{
var newState = new FollowerState(this);
await UpdateStateAsync(newState).ConfigureAwait(false);
- newState.StartServing(ElectionTimeout, LifecycleToken);
+ newState.StartServing(ElectionTimeout);
return true;
}
}
@@ -485,7 +485,7 @@ private ValueTask StepDown(long newTerm)
async ValueTask UpdateTermAndStepDownAsync(long newTerm)
{
- await auditTrail.UpdateTermAsync(newTerm, true).ConfigureAwait(false);
+ await auditTrail.UpdateTermAsync(newTerm, true, LifecycleToken).ConfigureAwait(false);
await StepDown().ConfigureAwait(false);
}
}
@@ -502,7 +502,7 @@ private async ValueTask StepDown()
case LeaderState or CandidateState:
var newState = new FollowerState(this);
await UpdateStateAsync(newState).ConfigureAwait(false);
- newState.StartServing(ElectionTimeout, LifecycleToken);
+ newState.StartServing(ElectionTimeout);
break;
}
@@ -802,7 +802,7 @@ protected async ValueTask> VoteAsync(ClusterMemberId sender, long s
if (auditTrail.IsVotedFor(sender) && await auditTrail.IsUpToDateAsync(lastLogIndex, lastLogTerm, token).ConfigureAwait(false))
{
- await auditTrail.UpdateVotedForAsync(sender).ConfigureAwait(false);
+ await auditTrail.UpdateVotedForAsync(sender, token).ConfigureAwait(false);
result = result with { Value = true };
}
}
@@ -838,7 +838,7 @@ protected async ValueTask ResignAsync(CancellationToken token)
var newState = new FollowerState(this);
await UpdateStateAsync(newState).ConfigureAwait(false);
Leader = null;
- newState.StartServing(ElectionTimeout, LifecycleToken);
+ newState.StartServing(ElectionTimeout);
return true;
}
}
@@ -1008,7 +1008,7 @@ async void IRaftStateMachine.MoveToCandidateState(IRaftStateMachine.IWe
if (readyForTransition && TryGetLocalMember()?.Id is { } localMemberId)
{
- var newState = new CandidateState(this, await auditTrail.IncrementTermAsync(localMemberId).ConfigureAwait(false));
+ var newState = new CandidateState(this, await auditTrail.IncrementTermAsync(localMemberId, LifecycleToken).ConfigureAwait(false));
await UpdateStateAsync(newState).ConfigureAwait(false);
// vote for self
@@ -1018,7 +1018,7 @@ async void IRaftStateMachine.MoveToCandidateState(IRaftStateMachine.IWe
else
{
// resume follower state
- followerState.StartServing(ElectionTimeout, LifecycleToken);
+ followerState.StartServing(ElectionTimeout);
Logger.DowngradedToFollowerState(Term);
}
}
@@ -1047,7 +1047,7 @@ async void IRaftStateMachine.MoveToCandidateState(IRaftStateMachine.IWe
Task IsReadyForTransitionAsync(long currentTerm)
=> state is FollowerState { IsExpired: true, IsRefreshRequested: false } followerState && callerState.IsValid(followerState)
? PreVoteAsync(currentTerm)
- : Task.FromResult(false);
+ : Task.FromResult(false);
}
///
@@ -1073,7 +1073,7 @@ async void IRaftStateMachine.MoveToLeaderState(IRaftStateMachine.IWeakC
Leader = newLeader;
await auditTrail.AppendNoOpEntry(LifecycleToken).ConfigureAwait(false);
- newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage, LifecycleToken);
+ newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage);
Logger.TransitionToLeaderStateCompleted(currentTerm);
}