From 7c76353ab7f8be9226ef725085d92348770c2ab5 Mon Sep 17 00:00:00 2001 From: Alex Yakunin Date: Sat, 2 Dec 2023 05:37:56 -0800 Subject: [PATCH] chore: minor improvements in RpcPeerStateMonitor & UIActionFailureTracker. --- .../Extensions/RpcPeerStateMonitor.cs | 16 +++---- src/Stl.Fusion/UI/UIActionFailureTracker.cs | 45 +++++++++++-------- src/Stl.Rpc/RpcPeer.cs | 3 +- src/Stl/Async/CancellationTokenExt.cs | 19 ++++++++ 4 files changed, 54 insertions(+), 29 deletions(-) diff --git a/src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs b/src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs index 861d44358..6acfaec04 100644 --- a/src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs +++ b/src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs @@ -31,28 +31,26 @@ protected override async Task OnRun(CancellationToken cancellationToken) var peerCancellationToken = peerCts.Token; try { // This delay gives some time for peer to connect - var connectionState = peer.ConnectionState; while (true) { - connectionState = connectionState.Last; - var nextConnectionStateTask = connectionState.WhenNext(peerCancellationToken); + peerCancellationToken.ThrowIfCancellationRequested(); + var connectionState = peer.ConnectionState; var isConnected = connectionState.Value.IsConnected(); + var nextConnectionStateTask = connectionState.WhenNext(peerCancellationToken); if (isConnected) { _state.Value = new RpcPeerState(true); - connectionState = await nextConnectionStateTask.ConfigureAwait(false); + await nextConnectionStateTask.ConfigureAwait(false); } else { _state.Value = new RpcPeerState(false, connectionState.Value.Error); // Disconnected -> update ReconnectsAt value until the nextConnectionStateTask completes - using var reconnectAtCts = new CancellationTokenSource(); - // ReSharper disable once AccessToDisposedClosure - _ = nextConnectionStateTask.ContinueWith(_ => reconnectAtCts.Cancel(), TaskScheduler.Default); + var stateChangedToken = CancellationTokenExt.FromTask(nextConnectionStateTask, CancellationToken.None); try { - var reconnectAtChanges = peer.ReconnectsAt.Changes(reconnectAtCts.Token); + var reconnectAtChanges = peer.ReconnectsAt.Changes(stateChangedToken); await foreach (var reconnectsAt in reconnectAtChanges.ConfigureAwait(false)) _state.Value = _state.Value with { ReconnectsAt = reconnectsAt }; } - catch (OperationCanceledException) when (reconnectAtCts.IsCancellationRequested) { + catch (Exception e) when (e.IsCancellationOf(stateChangedToken)) { // Intended } } diff --git a/src/Stl.Fusion/UI/UIActionFailureTracker.cs b/src/Stl.Fusion/UI/UIActionFailureTracker.cs index 4f5360155..e33570f33 100644 --- a/src/Stl.Fusion/UI/UIActionFailureTracker.cs +++ b/src/Stl.Fusion/UI/UIActionFailureTracker.cs @@ -1,15 +1,19 @@ namespace Stl.Fusion.UI; -public class UIActionFailureTracker : MutableList, IHasServices +public class UIActionFailureTracker : MutableList { public record Options { public TimeSpan MaxDuplicateRecency { get; init; } = TimeSpan.FromSeconds(1); } - public IServiceProvider Services { get; } + private ILogger? _log; + + protected IServiceProvider Services { get; } + protected ILogger Log => _log ??= Services.LogFor(GetType()); + public Options Settings { get; } - public Task WhenTracking { get; protected init; } = null!; + public Task? WhenRunning { get; protected set; } public UIActionFailureTracker(Options settings, IServiceProvider services) : this(settings, services, true) @@ -20,36 +24,41 @@ protected UIActionFailureTracker(Options settings, IServiceProvider services, bo Settings = settings; Services = services; if (mustStart) - // ReSharper disable once VirtualMemberCallInConstructor -#pragma warning disable CA2214 - WhenTracking = TrackFailures(); -#pragma warning restore CA2214 + Start(); } public override string ToString() => $"{GetType().GetName()}({Count} item(s))"; - protected virtual async Task TrackFailures() + public void Start() + => WhenRunning ??= Run(); + + // Protected methods + + protected virtual async Task Run() { var uiActionTracker = Services.GetRequiredService(); var cancellationToken = uiActionTracker.StopToken; - var lastResultEvent = uiActionTracker.LastResult; while (true) { - lastResultEvent = await lastResultEvent.WhenNext(cancellationToken).ConfigureAwait(false); - if (lastResultEvent == null) - return; + try { + lastResultEvent = await lastResultEvent.WhenNext(cancellationToken).ConfigureAwait(false); + TryAddFailure(lastResultEvent.Value); + } + catch (Exception e) { + if (cancellationToken.IsCancellationRequested) + return; - var result = lastResultEvent.Value; - if (result != null) - TryAddFailure(result); + Log.LogError(e, "Run() method failed, will retry"); + // We don't want it to consume 100% CPU in case of a weird failure, so... + await Task.Delay(100, cancellationToken).ConfigureAwait(false); + } } - // ReSharper disable once FunctionNeverReturns } - protected virtual bool TryAddFailure(IUIActionResult result) + protected virtual bool TryAddFailure(IUIActionResult? result) { - if (!result.HasError) + if (result is not { HasError: true }) return false; if (Settings.MaxDuplicateRecency <= TimeSpan.Zero) { diff --git a/src/Stl.Rpc/RpcPeer.cs b/src/Stl.Rpc/RpcPeer.cs index d9db4c439..850da1d45 100644 --- a/src/Stl.Rpc/RpcPeer.cs +++ b/src/Stl.Rpc/RpcPeer.cs @@ -230,8 +230,7 @@ protected override Task OnStop() try { if (_connectionState.IsFinal) error = _connectionState.Value.Error - ?? Stl.Internal.Errors.InternalError( - "ConnectionState.IsFinal == true, but ConnectionState.Value.Error == null."); + ?? Stl.Internal.Errors.InternalError("The exception wasn't provided on peer termination."); else { error = Errors.ConnectionUnrecoverable(_connectionState.Value.Error); SetConnectionState(_connectionState.Value.NextDisconnected(error)); diff --git a/src/Stl/Async/CancellationTokenExt.cs b/src/Stl/Async/CancellationTokenExt.cs index 4ac80f2c7..5e646b1ab 100644 --- a/src/Stl/Async/CancellationTokenExt.cs +++ b/src/Stl/Async/CancellationTokenExt.cs @@ -10,6 +10,25 @@ public static CancellationTokenSource LinkWith(this CancellationToken token1, Ca public static CancellationTokenSource CreateLinkedTokenSource(this CancellationToken cancellationToken) => CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + // FromTask + + public static CancellationToken FromTask(Task task, CancellationToken cancellationToken = default) + { + if (cancellationToken.CanBeCanceled) { + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var result = cts.Token; + result.Register(static state => (state as CancellationTokenSource).CancelAndDisposeSilently(), cts); + _ = task.ContinueWith(_ => cts.Cancel(), TaskScheduler.Default); + return result; + } + else { + var cts = new CancellationTokenSource(); + var result = cts.Token; + _ = task.ContinueWith(_ => cts.CancelAndDisposeSilently(), TaskScheduler.Default); + return result; + } + } + // ToTask public static Disposable, CancellationTokenRegistration)> ToTask(