From 39b749229721eecfb47179329434b41996cdf587 Mon Sep 17 00:00:00 2001 From: sakno Date: Sun, 18 Dec 2022 15:03:39 +0200 Subject: [PATCH] Removed allocation of display class objects --- src/DotNext.IO/IO/FileReader.Binary.cs | 52 ++-- src/DotNext.IO/IO/FileReader.cs | 47 ++-- src/DotNext.IO/IO/FileWriter.Binary.cs | 42 +-- .../InMemoryClusterConfigurationStorage.cs | 24 +- .../PersistentClusterConfigurationStorage.cs | 22 +- .../Consensus/Raft/MemoryBasedStateMachine.cs | 254 +++++++++--------- .../Raft/PersistentState.Internal.cs | 28 +- .../Raft/PersistentState.Partition.cs | 34 +-- .../ProtocolStream.ReadOperations.cs | 16 +- .../ProtocolStream.WriteOperations.cs | 14 +- 10 files changed, 265 insertions(+), 268 deletions(-) diff --git a/src/DotNext.IO/IO/FileReader.Binary.cs b/src/DotNext.IO/IO/FileReader.Binary.cs index d03b2d176..e3a6ec329 100644 --- a/src/DotNext.IO/IO/FileReader.Binary.cs +++ b/src/DotNext.IO/IO/FileReader.Binary.cs @@ -58,17 +58,18 @@ public ValueTask ReadAsync(CancellationToken token = default) } else { - result = ReadSlowAsync(); + result = ReadSlowAsync(token); } return result; + } - async ValueTask ReadSlowAsync() - { - using var buffer = MemoryAllocator.Allocate(Unsafe.SizeOf(), exactSize: true); - await ReadBlockAsync(buffer.Memory, token).ConfigureAwait(false); - return MemoryMarshal.Read(buffer.Span); - } + private async ValueTask ReadSlowAsync(CancellationToken token) + where T : unmanaged + { + using var buffer = MemoryAllocator.Allocate(Unsafe.SizeOf(), exactSize: true); + await ReadBlockAsync(buffer.Memory, token).ConfigureAwait(false); + return MemoryMarshal.Read(buffer.Span); } private int Read7BitEncodedInt() @@ -272,7 +273,7 @@ public ValueTask ParseAsync(CancellationToken token = default) { try { - result = new(Parse()); + result = new(ParseFast()); } catch (Exception e) { @@ -281,29 +282,30 @@ public ValueTask ParseAsync(CancellationToken token = default) } else { - result = ParseSlowAsync(); + result = ParseSlowAsync(token); } return result; + } - T Parse() - { - Debug.Assert(BufferLength >= T.Size); + private T ParseFast() + where T : notnull, IBinaryFormattable + { + Debug.Assert(BufferLength >= T.Size); - var reader = new SpanReader(BufferSpan.Slice(0, T.Size)); - var result = T.Parse(ref reader); - Consume(reader.ConsumedCount); - length -= reader.ConsumedCount; - return result; - } + var reader = new SpanReader(BufferSpan.Slice(0, T.Size)); + var result = T.Parse(ref reader); + Consume(reader.ConsumedCount); + length -= reader.ConsumedCount; + return result; + } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] - async ValueTask ParseSlowAsync() - { - using var buffer = MemoryAllocator.Allocate(T.Size, exactSize: true); - await ReadBlockAsync(buffer.Memory, token).ConfigureAwait(false); - return IBinaryFormattable.Parse(buffer.Span); - } + private async ValueTask ParseSlowAsync(CancellationToken token) + where T : notnull, IBinaryFormattable + { + using var buffer = MemoryAllocator.Allocate(T.Size, exactSize: true); + await ReadBlockAsync(buffer.Memory, token).ConfigureAwait(false); + return IBinaryFormattable.Parse(buffer.Span); } /// diff --git a/src/DotNext.IO/IO/FileReader.cs b/src/DotNext.IO/IO/FileReader.cs index a4a636077..16c81ad98 100644 --- a/src/DotNext.IO/IO/FileReader.cs +++ b/src/DotNext.IO/IO/FileReader.cs @@ -143,6 +143,7 @@ public void Consume(int bytes) /// The reader has been disposed. /// Internal buffer has no free space. /// The operation has been canceled. + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] public async ValueTask ReadAsync(CancellationToken token = default) { ThrowIfDisposed(); @@ -215,34 +216,34 @@ public ValueTask ReadAsync(Memory output, CancellationToken token = d return new(0); return HasBufferedData || output.Length < buffer.Length - ? ReadBufferedAsync() - : ReadDirectAsync(); - - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] - async ValueTask ReadDirectAsync() - { - var count = await RandomAccess.ReadAsync(handle, output, fileOffset, token).ConfigureAwait(false); - fileOffset += count; - return count; - } + ? ReadBufferedAsync(output, token) + : ReadDirectAsync(output, token); + } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] - async ValueTask ReadBufferedAsync() - { - var result = 0; + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + private async ValueTask ReadDirectAsync(Memory output, CancellationToken token) + { + var count = await RandomAccess.ReadAsync(handle, output, fileOffset, token).ConfigureAwait(false); + fileOffset += count; + return count; + } - for (int writtenCount; !output.IsEmpty; output = output.Slice(writtenCount)) - { - if (!HasBufferedData && !await ReadAsync(token).ConfigureAwait(false)) - break; + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + private async ValueTask ReadBufferedAsync(Memory output, CancellationToken token) + { + var result = 0; - BufferSpan.CopyTo(output.Span, out writtenCount); - result += writtenCount; - Consume(writtenCount); - } + for (int writtenCount; !output.IsEmpty; output = output.Slice(writtenCount)) + { + if (!HasBufferedData && !await ReadAsync(token).ConfigureAwait(false)) + break; - return result; + BufferSpan.CopyTo(output.Span, out writtenCount); + result += writtenCount; + Consume(writtenCount); } + + return result; } /// diff --git a/src/DotNext.IO/IO/FileWriter.Binary.cs b/src/DotNext.IO/IO/FileWriter.Binary.cs index 31aa334b4..07aec5e70 100644 --- a/src/DotNext.IO/IO/FileWriter.Binary.cs +++ b/src/DotNext.IO/IO/FileWriter.Binary.cs @@ -50,28 +50,30 @@ public ValueTask WriteAsync(T value, CancellationToken token = default) } else if (buffer.Length >= Unsafe.SizeOf()) { - result = WriteSmallValueAsync(); + result = WriteSmallValueAsync(value, token); } else { - result = WriteLargeValueAsync(); + result = WriteLargeValueAsync(value, token); } return result; + } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask WriteSmallValueAsync() - { - await FlushCoreAsync(token).ConfigureAwait(false); - Write(in value); - } + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask WriteSmallValueAsync(T value, CancellationToken token) + where T : unmanaged + { + await FlushCoreAsync(token).ConfigureAwait(false); + Write(in value); + } - async ValueTask WriteLargeValueAsync() - { - // rare case, T is very large and doesn't fit the buffer - using var buffer = Span.AsReadOnlyBytes(in value).Copy(); - await WriteAsync(buffer.Memory, token).ConfigureAwait(false); - } + async ValueTask WriteLargeValueAsync(T value, CancellationToken token) + where T : unmanaged + { + // rare case, T is very large and doesn't fit the buffer + using var buffer = Span.AsReadOnlyBytes(in value).Copy(); + await WriteAsync(buffer.Memory, token).ConfigureAwait(false); } private void Write7BitEncodedInt(int value) @@ -332,16 +334,16 @@ ValueTask IAsyncBinaryWriter.WriteAsync(Action> } else { - result = WriteSlowAsync(); + result = WriteSlowAsync(writer, arg, token); } return result; + } - async ValueTask WriteSlowAsync() - { - await FlushCoreAsync(token).ConfigureAwait(false); - writer(arg, this); - } + private async ValueTask WriteSlowAsync(Action> writer, TArg arg, CancellationToken token) + { + await FlushCoreAsync(token).ConfigureAwait(false); + writer(arg, this); } /// diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/InMemoryClusterConfigurationStorage.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/InMemoryClusterConfigurationStorage.cs index d65f8741a..fa2a00d3d 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/InMemoryClusterConfigurationStorage.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/InMemoryClusterConfigurationStorage.cs @@ -109,23 +109,21 @@ protected sealed override async ValueTask ProposeAsync(IClusterConfiguration con /// protected sealed override ValueTask ApplyAsync(CancellationToken token) + => proposed is null ? ValueTask.CompletedTask : ApplyProposedAsync(token); + + private async ValueTask ApplyProposedAsync(CancellationToken token) { - return proposed is null ? ValueTask.CompletedTask : ApplyProposedAsync(); + await CompareAsync(activeCache, proposedCache, token).ConfigureAwait(false); - async ValueTask ApplyProposedAsync() - { - await CompareAsync(activeCache, proposedCache, token).ConfigureAwait(false); + active?.Dispose(); + active = proposed; + activeCache = proposedCache; - active?.Dispose(); - active = proposed; - activeCache = proposedCache; + proposed = null; + proposedCache = proposedCache.Clear(); - proposed = null; - proposedCache = proposedCache.Clear(); - - Interlocked.MemoryBarrierProcessWide(); - OnActivated(); - } + Interlocked.MemoryBarrierProcessWide(); + OnActivated(); } private MemoryOwner Encode(IReadOnlyDictionary configuration) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/PersistentClusterConfigurationStorage.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/PersistentClusterConfigurationStorage.cs index 1ab5e4122..01958e0e5 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/PersistentClusterConfigurationStorage.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Membership/PersistentClusterConfigurationStorage.cs @@ -158,21 +158,19 @@ protected sealed override async ValueTask ProposeAsync(IClusterConfiguration con /// protected sealed override ValueTask ApplyAsync(CancellationToken token = default) - { - return proposed.IsEmpty ? ValueTask.CompletedTask : ApplyProposedAsync(); + => proposed.IsEmpty ? ValueTask.CompletedTask : ApplyProposedAsync(token); - async ValueTask ApplyProposedAsync() - { - await proposed.CopyToAsync(active, token).ConfigureAwait(false); - await CompareAsync(activeCache, proposedCache, token).ConfigureAwait(false); - activeCache = proposedCache; + private async ValueTask ApplyProposedAsync(CancellationToken token) + { + await proposed.CopyToAsync(active, token).ConfigureAwait(false); + await CompareAsync(activeCache, proposedCache, token).ConfigureAwait(false); + activeCache = proposedCache; - proposed.Clear(); - proposedCache = proposedCache.Clear(); + proposed.Clear(); + proposedCache = proposedCache.Clear(); - Interlocked.MemoryBarrierProcessWide(); - OnActivated(); - } + Interlocked.MemoryBarrierProcessWide(); + OnActivated(); } /// diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs index 144fc0288..71ab9336d 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs @@ -434,168 +434,168 @@ private protected sealed override ValueTask CommitAsync(long? endIndex, Ca // otherwise - write lock which doesn't block background compaction return compaction switch { - CompactionMode.Sequential => CommitAndCompactSequentiallyAsync(), - CompactionMode.Foreground => CommitAndCompactInParallelAsync(), - CompactionMode.Incremental => CommitAndCompactIncrementallyAsync(), - _ => CommitWithoutCompactionAsync(), + CompactionMode.Sequential => CommitAndCompactSequentiallyAsync(endIndex, token), + CompactionMode.Foreground => CommitAndCompactInParallelAsync(endIndex, token), + CompactionMode.Incremental => CommitAndCompactIncrementallyAsync(endIndex, token), + _ => CommitWithoutCompactionAsync(endIndex, token), }; + } - async ValueTask CommitAndCompactSequentiallyAsync() + private async ValueTask CommitAndCompactSequentiallyAsync(long? endIndex, CancellationToken token) + { + Partition? removedHead; + long count; + await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); + var session = sessionManager.Take(); + try { - Partition? removedHead; - long count; - await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); - var session = sessionManager.Take(); - try - { - count = GetCommitIndexAndCount(in endIndex, out var commitIndex); - if (count <= 0L) - return 0L; + count = GetCommitIndexAndCount(in endIndex, out var commitIndex); + if (count <= 0L) + return 0L; - LastCommittedEntryIndex = commitIndex; - await ApplyAsync(session, token).ConfigureAwait(false); - InternalStateScope scope; - if (IsCompactionRequired(commitIndex)) - { - await ForceSequentialCompactionAsync(session, commitIndex, token).ConfigureAwait(false); - removedHead = DetachPartitions(commitIndex); - scope = InternalStateScope.IndexesAndSnapshot; - } - else - { - removedHead = null; - scope = InternalStateScope.Indexes; - } - - await PersistInternalStateAsync(scope).ConfigureAwait(false); + LastCommittedEntryIndex = commitIndex; + await ApplyAsync(session, token).ConfigureAwait(false); + InternalStateScope scope; + if (IsCompactionRequired(commitIndex)) + { + await ForceSequentialCompactionAsync(session, commitIndex, token).ConfigureAwait(false); + removedHead = DetachPartitions(commitIndex); + scope = InternalStateScope.IndexesAndSnapshot; } - finally + else { - sessionManager.Return(session); - syncRoot.Release(LockType.ExclusiveLock); + removedHead = null; + scope = InternalStateScope.Indexes; } - OnCommit(count); - DeletePartitions(removedHead); - return count; + await PersistInternalStateAsync(scope).ConfigureAwait(false); } - - async ValueTask CommitWithoutCompactionAsync() + finally { - long count; - await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); - var session = sessionManager.Take(); - try - { - count = GetCommitIndexAndCount(in endIndex, out var commitIndex); - if (count <= 0L) - return 0L; + sessionManager.Return(session); + syncRoot.Release(LockType.ExclusiveLock); + } - LastCommittedEntryIndex = commitIndex; - await ApplyAsync(session, token).ConfigureAwait(false); - await PersistInternalStateAsync(InternalStateScope.Indexes).ConfigureAwait(false); - } - finally - { - sessionManager.Return(session); - syncRoot.Release(LockType.ExclusiveLock); - } + OnCommit(count); + DeletePartitions(removedHead); + return count; + } - OnCommit(count); - return count; - } + private async ValueTask CommitWithoutCompactionAsync(long? endIndex, CancellationToken token) + { + long count; + await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); + var session = sessionManager.Take(); + try + { + count = GetCommitIndexAndCount(in endIndex, out var commitIndex); + if (count <= 0L) + return 0L; - async ValueTask CommitAndCompactInParallelAsync() + LastCommittedEntryIndex = commitIndex; + await ApplyAsync(session, token).ConfigureAwait(false); + await PersistInternalStateAsync(InternalStateScope.Indexes).ConfigureAwait(false); + } + finally { - Partition? removedHead; - long count; - await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); - var session = sessionManager.Take(); - try - { - count = GetCommitIndexAndCount(in endIndex, out var commitIndex); - if (count <= 0L) - return 0L; + sessionManager.Return(session); + syncRoot.Release(LockType.ExclusiveLock); + } - var compactionIndex = Math.Min(LastCommittedEntryIndex, SnapshotInfo.Index + count); - LastCommittedEntryIndex = commitIndex; + OnCommit(count); + return count; + } + + private async ValueTask CommitAndCompactInParallelAsync(long? endIndex, CancellationToken token) + { + Partition? removedHead; + long count; + await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); + var session = sessionManager.Take(); + try + { + count = GetCommitIndexAndCount(in endIndex, out var commitIndex); + if (count <= 0L) + return 0L; - var compactionTask = compactionIndex > 0L - ? Task.Run(() => ForceParallelCompactionAsync(compactionIndex, token)) - : Task.CompletedTask; + var compactionIndex = Math.Min(LastCommittedEntryIndex, SnapshotInfo.Index + count); + LastCommittedEntryIndex = commitIndex; - try - { - await ApplyAsync(session, token).ConfigureAwait(false); - } - finally - { - await compactionTask.ConfigureAwait(false); - removedHead = DetachPartitions(compactionIndex); - } + var compactionTask = compactionIndex > 0L + ? Task.Run(() => ForceParallelCompactionAsync(compactionIndex, token)) + : Task.CompletedTask; - await PersistInternalStateAsync(InternalStateScope.IndexesAndSnapshot).ConfigureAwait(false); + try + { + await ApplyAsync(session, token).ConfigureAwait(false); } finally { - sessionManager.Return(session); - syncRoot.Release(LockType.ExclusiveLock); + await compactionTask.ConfigureAwait(false); + removedHead = DetachPartitions(compactionIndex); } - OnCommit(count); - DeletePartitions(removedHead); - return count; + await PersistInternalStateAsync(InternalStateScope.IndexesAndSnapshot).ConfigureAwait(false); } + finally + { + sessionManager.Return(session); + syncRoot.Release(LockType.ExclusiveLock); + } + + OnCommit(count); + DeletePartitions(removedHead); + return count; + } - async ValueTask CommitAndCompactIncrementallyAsync() + private async ValueTask CommitAndCompactIncrementallyAsync(long? endIndex, CancellationToken token) + { + Partition? removedHead; + long count; + await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); + var session = sessionManager.Take(); + try { - Partition? removedHead; - long count; - await syncRoot.AcquireAsync(LockType.ExclusiveLock, token).ConfigureAwait(false); - var session = sessionManager.Take(); - try - { - count = GetCommitIndexAndCount(in endIndex, out var commitIndex); - if (count <= 0L) - return 0L; + count = GetCommitIndexAndCount(in endIndex, out var commitIndex); + if (count <= 0L) + return 0L; - var compactionIndex = LastAppliedEntryIndex; - LastCommittedEntryIndex = commitIndex; - var compactionTask = compactionIndex > 0L - ? Task.Run(() => ForceIncrementalCompactionAsync(compactionIndex, token)) - : Task.FromResult(false); - InternalStateScope scope; + var compactionIndex = LastAppliedEntryIndex; + LastCommittedEntryIndex = commitIndex; + var compactionTask = compactionIndex > 0L + ? Task.Run(() => ForceIncrementalCompactionAsync(compactionIndex, token)) + : Task.FromResult(false); + InternalStateScope scope; - try + try + { + await ApplyAsync(session, token).ConfigureAwait(false); + } + finally + { + if (await compactionTask.ConfigureAwait(false)) { - await ApplyAsync(session, token).ConfigureAwait(false); + removedHead = DetachPartitions(compactionIndex); + scope = InternalStateScope.IndexesAndSnapshot; } - finally + else { - if (await compactionTask.ConfigureAwait(false)) - { - removedHead = DetachPartitions(compactionIndex); - scope = InternalStateScope.IndexesAndSnapshot; - } - else - { - removedHead = null; - scope = InternalStateScope.Indexes; - } + removedHead = null; + scope = InternalStateScope.Indexes; } - - await PersistInternalStateAsync(scope).ConfigureAwait(false); - } - finally - { - sessionManager.Return(session); - syncRoot.Release(LockType.ExclusiveLock); } - OnCommit(count); - DeletePartitions(removedHead); - return count; + await PersistInternalStateAsync(scope).ConfigureAwait(false); } + finally + { + sessionManager.Return(session); + syncRoot.Release(LockType.ExclusiveLock); + } + + OnCommit(count); + DeletePartitions(removedHead); + return count; } private async ValueTask ForceSequentialCompactionAsync(int sessionId, long upperBoundIndex, CancellationToken token) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs index 0a9667ff8..16bc17d5f 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs @@ -279,27 +279,27 @@ private protected ConcurrentStorageAccess(string fileName, int fileOffset, int b internal void Invalidate() => version.IncrementAndGet(); - internal ValueTask SetWritePositionAsync(long value, CancellationToken token = default) + internal ValueTask SetWritePositionAsync(long position, CancellationToken token = default) { var result = ValueTask.CompletedTask; if (!writer.HasBufferedData) { - writer.FilePosition = value; + writer.FilePosition = position; } - else if (value != writer.FilePosition) + else if (position != writer.FilePosition) { - result = FlushAndSetPositionAsync(value, token); + result = FlushAndSetPositionAsync(position, token); } return result; + } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask FlushAndSetPositionAsync(long value, CancellationToken token) - { - await FlushAsync(token).ConfigureAwait(false); - writer.FilePosition = value; - } + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask FlushAndSetPositionAsync(long position, CancellationToken token) + { + await FlushAsync(token).ConfigureAwait(false); + writer.FilePosition = position; } public virtual ValueTask FlushAsync(CancellationToken token = default) @@ -319,11 +319,11 @@ private protected FileReader GetSessionReader(int sessionId) { Debug.Assert(sessionId >= 0 && sessionId < readers.Length); - var result = GetReader(); + ref var result = ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(readers), sessionId); if (result is null) { - GetReader() = result = new(Handle, fileOffset, writer.MaxBufferSize, allocator, version.VolatileRead()); + result = new(Handle, fileOffset, writer.MaxBufferSize, allocator, version.VolatileRead()); } else { @@ -331,10 +331,6 @@ private protected FileReader GetSessionReader(int sessionId) } return result; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - ref VersionedFileReader? GetReader() - => ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(readers), sessionId); } protected override void Dispose(bool disposing) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index fd5d8d4b4..7ddd4b646 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -224,27 +224,27 @@ internal ValueTask PersistCachedEntryAsync(long absoluteIndex, long offset, bool return cachedEntry.Content.IsEmpty ? ValueTask.CompletedTask : removeFromMemory - ? PersistAndDeleteAsync(cachedEntry.Content.Memory) - : PersistAsync(cachedEntry.Content.Memory); + ? PersistAndDeleteAsync(cachedEntry.Content.Memory, index, offset) + : PersistAsync(cachedEntry.Content.Memory, offset); + } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask PersistAsync(ReadOnlyMemory content) + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask PersistAsync(ReadOnlyMemory content, long offset) + { + await SetWritePositionAsync(offset).ConfigureAwait(false); + await writer.WriteAsync(content).ConfigureAwait(false); + } + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask PersistAndDeleteAsync(ReadOnlyMemory content, int index, long offset) + { + try { - await SetWritePositionAsync(offset).ConfigureAwait(false); - await writer.WriteAsync(content).ConfigureAwait(false); + await PersistAsync(content, offset).ConfigureAwait(false); } - - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask PersistAndDeleteAsync(ReadOnlyMemory content) + finally { - try - { - await PersistAsync(content).ConfigureAwait(false); - } - finally - { - entryCache[index].Dispose(); - } + entryCache[index].Dispose(); } } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.ReadOperations.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.ReadOperations.cs index 4096223e9..f3f5074c5 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.ReadOperations.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.ReadOperations.cs @@ -37,16 +37,16 @@ private ValueTask BufferizeAsync(int count, CancellationToken token) bufferStart = 0; } - return BufferizeSlowAsync(); + return BufferizeSlowAsync(count, token); + } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask BufferizeSlowAsync() - { - Debug.Assert(bufferEnd < this.buffer.Length); + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask BufferizeSlowAsync(int count, CancellationToken token) + { + Debug.Assert(bufferEnd < this.buffer.Length); - var buffer = this.buffer.Memory.Slice(bufferEnd); - bufferEnd += await ReadFromTransportAsync(count, buffer, token).ConfigureAwait(false); - } + var buffer = this.buffer.Memory.Slice(bufferEnd); + bufferEnd += await ReadFromTransportAsync(count, buffer, token).ConfigureAwait(false); } private async ValueTask ReadAsync(int count, IntPtr decoder, CancellationToken token) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.WriteOperations.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.WriteOperations.cs index 1b4f4a87b..1320fbee9 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.WriteOperations.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/TransportServices/ConnectionOriented/ProtocolStream.WriteOperations.cs @@ -212,15 +212,15 @@ public sealed override void Write(byte[] buffer, int offset, int count) Write(new ReadOnlySpan(buffer, offset, count)); } - public sealed override Task FlushAsync(CancellationToken token) + private async Task FlushCoreAsync(CancellationToken token) { - return bufferEnd > 0 ? FlushCoreAsync() : Task.CompletedTask; + await WriteToTransportAsync(buffer.Memory.Slice(0, bufferEnd), token).ConfigureAwait(false); + bufferStart = bufferEnd = 0; + } - async Task FlushCoreAsync() - { - await WriteToTransportAsync(buffer.Memory.Slice(0, bufferEnd), token).ConfigureAwait(false); - bufferStart = bufferEnd = 0; - } + public sealed override Task FlushAsync(CancellationToken token) + { + return bufferEnd > 0 ? FlushCoreAsync(token) : Task.CompletedTask; } internal void WriteFinalFrame()