Skip to content

Commit

Permalink
chore: minor improvements in RpcPeerStateMonitor & UIActionFailureTra…
Browse files Browse the repository at this point in the history
…cker.
  • Loading branch information
alexyakunin committed Dec 2, 2023
1 parent 5f86d49 commit 7c76353
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 29 deletions.
16 changes: 7 additions & 9 deletions src/Stl.Fusion/Extensions/RpcPeerStateMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,26 @@ protected override async Task OnRun(CancellationToken cancellationToken)
var peerCancellationToken = peerCts.Token;
try {
// This delay gives some time for peer to connect
var connectionState = peer.ConnectionState;
while (true) {
connectionState = connectionState.Last;
var nextConnectionStateTask = connectionState.WhenNext(peerCancellationToken);
peerCancellationToken.ThrowIfCancellationRequested();
var connectionState = peer.ConnectionState;
var isConnected = connectionState.Value.IsConnected();
var nextConnectionStateTask = connectionState.WhenNext(peerCancellationToken);

if (isConnected) {
_state.Value = new RpcPeerState(true);
connectionState = await nextConnectionStateTask.ConfigureAwait(false);
await nextConnectionStateTask.ConfigureAwait(false);
}
else {
_state.Value = new RpcPeerState(false, connectionState.Value.Error);
// Disconnected -> update ReconnectsAt value until the nextConnectionStateTask completes
using var reconnectAtCts = new CancellationTokenSource();
// ReSharper disable once AccessToDisposedClosure
_ = nextConnectionStateTask.ContinueWith(_ => reconnectAtCts.Cancel(), TaskScheduler.Default);
var stateChangedToken = CancellationTokenExt.FromTask(nextConnectionStateTask, CancellationToken.None);
try {
var reconnectAtChanges = peer.ReconnectsAt.Changes(reconnectAtCts.Token);
var reconnectAtChanges = peer.ReconnectsAt.Changes(stateChangedToken);
await foreach (var reconnectsAt in reconnectAtChanges.ConfigureAwait(false))
_state.Value = _state.Value with { ReconnectsAt = reconnectsAt };
}
catch (OperationCanceledException) when (reconnectAtCts.IsCancellationRequested) {
catch (Exception e) when (e.IsCancellationOf(stateChangedToken)) {
// Intended
}
}
Expand Down
45 changes: 27 additions & 18 deletions src/Stl.Fusion/UI/UIActionFailureTracker.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
namespace Stl.Fusion.UI;

public class UIActionFailureTracker : MutableList<IUIActionResult>, IHasServices
public class UIActionFailureTracker : MutableList<IUIActionResult>
{
public record Options
{
public TimeSpan MaxDuplicateRecency { get; init; } = TimeSpan.FromSeconds(1);
}

public IServiceProvider Services { get; }
private ILogger? _log;

protected IServiceProvider Services { get; }
protected ILogger Log => _log ??= Services.LogFor(GetType());

public Options Settings { get; }
public Task WhenTracking { get; protected init; } = null!;
public Task? WhenRunning { get; protected set; }

public UIActionFailureTracker(Options settings, IServiceProvider services)
: this(settings, services, true)
Expand All @@ -20,36 +24,41 @@ protected UIActionFailureTracker(Options settings, IServiceProvider services, bo
Settings = settings;
Services = services;
if (mustStart)
// ReSharper disable once VirtualMemberCallInConstructor
#pragma warning disable CA2214
WhenTracking = TrackFailures();
#pragma warning restore CA2214
Start();
}

public override string ToString()
=> $"{GetType().GetName()}({Count} item(s))";

protected virtual async Task TrackFailures()
public void Start()
=> WhenRunning ??= Run();

// Protected methods

protected virtual async Task Run()
{
var uiActionTracker = Services.GetRequiredService<UIActionTracker>();
var cancellationToken = uiActionTracker.StopToken;

var lastResultEvent = uiActionTracker.LastResult;
while (true) {
lastResultEvent = await lastResultEvent.WhenNext(cancellationToken).ConfigureAwait(false);
if (lastResultEvent == null)
return;
try {
lastResultEvent = await lastResultEvent.WhenNext(cancellationToken).ConfigureAwait(false);
TryAddFailure(lastResultEvent.Value);
}
catch (Exception e) {
if (cancellationToken.IsCancellationRequested)
return;

var result = lastResultEvent.Value;
if (result != null)
TryAddFailure(result);
Log.LogError(e, "Run() method failed, will retry");
// We don't want it to consume 100% CPU in case of a weird failure, so...
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
}
// ReSharper disable once FunctionNeverReturns
}

protected virtual bool TryAddFailure(IUIActionResult result)
protected virtual bool TryAddFailure(IUIActionResult? result)
{
if (!result.HasError)
if (result is not { HasError: true })
return false;

if (Settings.MaxDuplicateRecency <= TimeSpan.Zero) {
Expand Down
3 changes: 1 addition & 2 deletions src/Stl.Rpc/RpcPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ protected override Task OnStop()
try {
if (_connectionState.IsFinal)
error = _connectionState.Value.Error
?? Stl.Internal.Errors.InternalError(
"ConnectionState.IsFinal == true, but ConnectionState.Value.Error == null.");
?? Stl.Internal.Errors.InternalError("The exception wasn't provided on peer termination.");
else {
error = Errors.ConnectionUnrecoverable(_connectionState.Value.Error);
SetConnectionState(_connectionState.Value.NextDisconnected(error));
Expand Down
19 changes: 19 additions & 0 deletions src/Stl/Async/CancellationTokenExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@ public static CancellationTokenSource LinkWith(this CancellationToken token1, Ca
public static CancellationTokenSource CreateLinkedTokenSource(this CancellationToken cancellationToken)
=> CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

// FromTask

public static CancellationToken FromTask(Task task, CancellationToken cancellationToken = default)
{
if (cancellationToken.CanBeCanceled) {
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var result = cts.Token;
result.Register(static state => (state as CancellationTokenSource).CancelAndDisposeSilently(), cts);
_ = task.ContinueWith(_ => cts.Cancel(), TaskScheduler.Default);
return result;
}
else {
var cts = new CancellationTokenSource();
var result = cts.Token;
_ = task.ContinueWith(_ => cts.CancelAndDisposeSilently(), TaskScheduler.Default);
return result;
}
}

// ToTask

public static Disposable<Task, (TaskCompletionSource<Unit>, CancellationTokenRegistration)> ToTask(
Expand Down

0 comments on commit 7c76353

Please sign in to comment.