Skip to content

Commit

Permalink
fix: RpcPeerStateMonitor logic
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Nov 27, 2023
1 parent 31f48e0 commit 7c8e15f
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,21 @@ protected override async Task OnRun(CancellationToken cancellationToken)
connectionState = connectionState.Last;
var nextConnectionStateTask = connectionState.WhenNext(peerCancellationToken);
var isConnected = connectionState.Value.IsConnected();
var nextState = new RpcPeerState(isConnected, connectionState.Value.Error);

if (isConnected) {
_state.Value = nextState;
_state.Value = new RpcPeerState(true);
connectionState = await nextConnectionStateTask.ConfigureAwait(false);
}
else {
_state.Value = new RpcPeerState(false, connectionState.Value.Error);
// Disconnected -> update ReconnectsAt value until the nextConnectionStateTask completes
using var reconnectAtCts = new CancellationTokenSource();
// ReSharper disable once AccessToDisposedClosure
_ = nextConnectionStateTask.ContinueWith(_ => reconnectAtCts.Cancel(), TaskScheduler.Default);
try {
var reconnectAtChanges = peer.ReconnectsAt.Changes(reconnectAtCts.Token);
await foreach (var reconnectsAt in reconnectAtChanges.ConfigureAwait(false)) {
nextState = nextState with { ReconnectsAt = reconnectsAt };
_state.Value = nextState;
}
await foreach (var reconnectsAt in reconnectAtChanges.ConfigureAwait(false))
_state.Value = _state.Value with { ReconnectsAt = reconnectsAt };
}
catch (OperationCanceledException) when (reconnectAtCts.IsCancellationRequested) {
// Intended
Expand Down

0 comments on commit 7c8e15f

Please sign in to comment.